Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ func processPerfOutput(
var err error
metricFrames, frameTimestamp, err = ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata)
if err != nil {
slog.Error(err.Error())
slog.Warn(err.Error())
numConsecutiveProcessEventErrors++
if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors {
slog.Error("too many consecutive errors processing events, triggering shutdown", slog.Int("max errors", maxConsecutiveProcessEventErrors))
Expand Down
6 changes: 3 additions & 3 deletions cmd/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,13 +499,13 @@ func getCPUAveragePercentage(tableValues table.TableValues, fieldName string, in

func getPkgAverageTemperature(allTableValues []table.TableValues) string {
tableValues := getTableValues(allTableValues, TemperatureTelemetryTableName)
// number of packages can vary, so we need to find the average temperature across all packages
// compute the average PkgTmp value across all samples
if len(tableValues.Fields) == 0 {
return ""
}
pkgTempFieldIndices := make([]int, 0)
for i, field := range tableValues.Fields {
if strings.HasPrefix(field.Name, "Package") {
if field.Name == "PkgTmp" {
pkgTempFieldIndices = append(pkgTempFieldIndices, i)
}
}
Expand Down Expand Up @@ -538,7 +538,7 @@ func getPkgAveragePower(allTableValues []table.TableValues) string {
}
pkgPowerFieldIndices := make([]int, 0)
for i, field := range tableValues.Fields {
if strings.HasPrefix(field.Name, "Package") {
if strings.HasSuffix(field.Name, "PkgWatt") {
pkgPowerFieldIndices = append(pkgPowerFieldIndices, i)
}
}
Expand Down
84 changes: 43 additions & 41 deletions cmd/telemetry/telemetry_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,29 +375,42 @@ func memoryTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.
}

func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.Field {
fields := []table.Field{
{Name: "Time"},
}
packageRows, err := extract.TurbostatPackageRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"PkgWatt", "RAMWatt"})
reWatt := regexp.MustCompile(`^\w+Watt$`) // matches PkgWatt, RAMWatt, etc.
packageRows, err := extract.TurbostatPackageRowsByRegexMatch(outputs[script.TurbostatTelemetryScriptName].Stdout, []*regexp.Regexp{reWatt})
if err != nil {
slog.Warn(err.Error())
return []table.Field{}
}
for i := range packageRows {
fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d", i)})
fields = append(fields, table.Field{Name: fmt.Sprintf("DRAM %d", i)})
if len(packageRows) == 0 {
return []table.Field{}
}
// for each package
numPackages := len(packageRows)
for i := range packageRows {
// traverse the rows
for _, row := range packageRows[i] {
// dynamically build fields from the header row of the first package
// header format: ["timestamp", "PkgWatt", "RAMWatt", ...]
// fields will be: "Time", then for each package, one field per matched watt metric
// e.g., "Package 0 PkgWatt", "Package 0 RAMWatt", "Package 1 PkgWatt", ...
header := packageRows[0][0]
fields := []table.Field{{Name: "Time"}}
for i, pkgRows := range packageRows {
if len(pkgRows) == 0 {
continue
}
for _, fieldName := range header[1:] {
fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d %s", i, fieldName)})
}
}
numMetrics := len(header) - 1 // number of matched watt fields per package
for i, pkgRows := range packageRows {
if len(pkgRows) < 2 {
continue // skip packages with only a header or empty
}
// skip the header row (first row), iterate data rows
for _, row := range pkgRows[1:] {
if i == 0 {
fields[0].Values = append(fields[0].Values, row[0]) // Timestamp
}
// append the package power and DRAM power to the fields
fields[i*numPackages+1].Values = append(fields[i*numPackages+1].Values, row[1]) // Package power
fields[i*numPackages+2].Values = append(fields[i*numPackages+2].Values, row[2]) // DRAM power
for j := range numMetrics {
fields[i*numMetrics+1+j].Values = append(fields[i*numMetrics+1+j].Values, row[j+1])
}
}
}
if len(fields[0].Values) == 0 {
Expand All @@ -407,37 +420,26 @@ func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.F
}

func temperatureTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.Field {
fields := []table.Field{
{Name: "Time"},
{Name: "Core (Avg.)"},
}
platformRows, err := extract.TurbostatPlatformRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"CoreTmp"})
fields := []table.Field{}
reTemp := regexp.MustCompile(`^\w+Tmp$`) // matches CoreTmp, PkgTmp, etc.
platformRows, err := extract.TurbostatPlatformRowsByRegexMatch(outputs[script.TurbostatTelemetryScriptName].Stdout, []*regexp.Regexp{reTemp})
if err != nil {
slog.Warn(err.Error()) // not all systems report core temperature, e.g., cloud VMs
slog.Warn(err.Error()) // not all systems report temperature, e.g., cloud VMs
return []table.Field{}
}
packageRows, err := extract.TurbostatPackageRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"PkgTmp"})
if err != nil {
// not an error, just means no package rows (package temperature)
slog.Warn(err.Error())
}
// add the package rows to the fields
for i := range packageRows {
fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d", i)})
if len(platformRows) == 0 {
return []table.Field{}
}
// for each platform row
// dynamically build fields from the header row
for i := range platformRows {
// append the timestamp to the fields
fields[0].Values = append(fields[0].Values, platformRows[i][0]) // Timestamp
// append the core temperature values to the fields
fields[1].Values = append(fields[1].Values, platformRows[i][1]) // Core temperature
}
// for each package
for i := range packageRows {
// traverse the rows
for _, row := range packageRows[i] {
// append the package temperature to the fields
fields[i+2].Values = append(fields[i+2].Values, row[1]) // Package temperature
if i == 0 {
for _, fieldName := range platformRows[i] {
fields = append(fields, table.Field{Name: fieldName})
}
} else {
for j := range platformRows[i] {
fields[j].Values = append(fields[j].Values, platformRows[i][j])
}
}
}
if len(fields[0].Values) == 0 {
Expand Down
111 changes: 110 additions & 1 deletion internal/extract/turbostat.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,117 @@ func isPlatformRow(row map[string]string) bool {
return true
}

// TurbostatPackageRowsByRegexMatch parses the output of the turbostat script and returns the rows
// for each package, matching fields by regex.
// Multiple fields may match the regex, all matching fields, and their values, will be returned in
// alphabetical order.
// Returns:
// - [][][]string: first dimension is indexed by package number, second dimension is the row
// for each package reading, third dimension has "timestamp" followed by matched field values.
// The first row of each package's data is the header.
func TurbostatPackageRowsByRegexMatch(turboStatScriptOutput string, fieldRegexs []*regexp.Regexp) ([][][]string, error) {
if turboStatScriptOutput == "" {
return nil, fmt.Errorf("turbostat output is empty")
}
if len(fieldRegexs) == 0 {
return nil, fmt.Errorf("no field regexes provided")
}
rows, err := parseTurbostatOutput(turboStatScriptOutput)
if err != nil {
return nil, fmt.Errorf("unable to parse turbostat output: %w", err)
}
if len(rows) == 0 {
return nil, fmt.Errorf("no rows found in turbostat output")
}
// Build our list of matched field names from the first package row
var matchedFields []string
foundPackageRow := false
for _, row := range rows {
if _, ok := row["Package"]; !ok {
if row["CPU"] == "0" {
row["Package"] = "0"
} else {
continue
}
}
if !isPackageRow(row) {
continue
}
foundPackageRow = true
for field := range row {
for _, re := range fieldRegexs {
if re.MatchString(field) {
if !slices.Contains(matchedFields, field) {
matchedFields = append(matchedFields, field)
}
break
}
}
}
break // only need the first package row to discover fields
}
if !foundPackageRow {
return nil, fmt.Errorf("no package rows found in turbostat output")
}
if len(matchedFields) == 0 {
return nil, fmt.Errorf("no fields matched the provided regexes in turbostat output")
}
// Sort alphabetically for deterministic output since map iteration is unordered.
slices.Sort(matchedFields)
// Build the header row
header := make([]string, len(matchedFields)+1)
header[0] = "timestamp"
copy(header[1:], matchedFields)
// Collect rows per package
var packageRows [][][]string
for _, row := range rows {
if _, ok := row["Package"]; !ok {
if row["CPU"] == "0" {
row["Package"] = "0"
} else {
continue
}
}
if !isPackageRow(row) {
continue
}
rowValues := make([]string, len(matchedFields)+1)
rowValues[0] = row["timestamp"]
for i, field := range matchedFields {
rowValues[i+1] = row[field]
}
packageNum, err := strconv.Atoi(row["Package"])
if err != nil {
return nil, fmt.Errorf("unable to parse package number %q: %w", row["Package"], err)
}
if len(packageRows) < packageNum+1 {
// Initialize with header row followed by the data row
pkg := [][]string{header, rowValues}
// Extend slice to accommodate the package index, initializing placeholders with header
for len(packageRows) < packageNum {
packageRows = append(packageRows, [][]string{header})
}
packageRows = append(packageRows, pkg)
} else {
// Ensure the package slice is initialized with the header before appending data rows
if packageRows[packageNum] == nil {
packageRows[packageNum] = [][]string{header}
}
packageRows[packageNum] = append(packageRows[packageNum], rowValues)
}
}
if len(packageRows) == 0 {
return nil, fmt.Errorf("no data found in turbostat output for the provided regexes")
}
return packageRows, nil
}

// TurbostatPackageRows parses the output of the turbostat script and returns the rows
// for each package.
// Returns:
// - [][][]string: first dimension is indexed by package number, second dimension is the row
// for each package reading, third dimension is the fields for each row with the first field
// being the timestamp followed by the requested field names in order.
func TurbostatPackageRows(turboStatScriptOutput string, fieldNames []string) ([][][]string, error) {
if turboStatScriptOutput == "" {
return nil, fmt.Errorf("turbostat output is empty")
Expand Down Expand Up @@ -236,7 +345,7 @@ func TurbostatPackageRows(turboStatScriptOutput string, fieldNames []string) ([]
}
packageNum, err := strconv.Atoi(row["Package"])
if err != nil {
return nil, fmt.Errorf("unable to parse package number: %s", row["Package"])
return nil, fmt.Errorf("unable to parse package number %q: %w", row["Package"], err)
}
if len(packageRows) < packageNum+1 {
packageRows = append(packageRows, [][]string{rowValues})
Expand Down
Loading