From e14364845001b07ab7629c70baefc6d36fedcb1a Mon Sep 17 00:00:00 2001 From: "Harper, Jason M" Date: Thu, 5 Mar 2026 16:01:23 -0800 Subject: [PATCH 1/4] feature: regex-based field matching for package turbostat telemetry Add TurbostatPackageRowsByRegexMatch to dynamically discover package-level fields via regex, mirroring TurbostatPlatformRowsByRegexMatch. Update powerTelemetryTableValues and temperatureTelemetryTableValues to use regex-match variants for dynamic field discovery instead of hardcoded names. Co-Authored-By: Claude Opus 4.6 --- cmd/telemetry/telemetry_tables.go | 76 ++++++++++----------- internal/extract/turbostat.go | 105 ++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 40 deletions(-) diff --git a/cmd/telemetry/telemetry_tables.go b/cmd/telemetry/telemetry_tables.go index 5a68093a..dc62fa20 100644 --- a/cmd/telemetry/telemetry_tables.go +++ b/cmd/telemetry/telemetry_tables.go @@ -375,29 +375,36 @@ func memoryTelemetryTableValues(outputs map[string]script.ScriptOutput) []table. } func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.Field { - fields := []table.Field{ - {Name: "Time"}, - } - packageRows, err := extract.TurbostatPackageRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"PkgWatt", "RAMWatt"}) + reWatt := regexp.MustCompile(`^\w+Watt$`) // matches PkgWatt, RAMWatt, etc. + packageRows, err := extract.TurbostatPackageRowsByRegexMatch(outputs[script.TurbostatTelemetryScriptName].Stdout, []*regexp.Regexp{reWatt}) if err != nil { slog.Warn(err.Error()) return []table.Field{} } - for i := range packageRows { - fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d", i)}) - fields = append(fields, table.Field{Name: fmt.Sprintf("DRAM %d", i)}) + if len(packageRows) == 0 { + return []table.Field{} } - // for each package - numPackages := len(packageRows) + // dynamically build fields from the header row of the first package + // header format: ["timestamp", "PkgWatt", "RAMWatt", ...] + // fields will be: "Time", then for each package, one field per matched watt metric + // e.g., "Package 0 PkgWatt", "Package 0 RAMWatt", "Package 1 PkgWatt", ... + header := packageRows[0][0] + fields := []table.Field{{Name: "Time"}} for i := range packageRows { - // traverse the rows - for _, row := range packageRows[i] { + for _, fieldName := range header[1:] { + fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d %s", i, fieldName)}) + } + } + numMetrics := len(header) - 1 // number of matched watt fields per package + for i, pkgRows := range packageRows { + // skip the header row (first row), iterate data rows + for _, row := range pkgRows[1:] { if i == 0 { fields[0].Values = append(fields[0].Values, row[0]) // Timestamp } - // append the package power and DRAM power to the fields - fields[i*numPackages+1].Values = append(fields[i*numPackages+1].Values, row[1]) // Package power - fields[i*numPackages+2].Values = append(fields[i*numPackages+2].Values, row[2]) // DRAM power + for j := range numMetrics { + fields[i*numMetrics+1+j].Values = append(fields[i*numMetrics+1+j].Values, row[j+1]) + } } } if len(fields[0].Values) == 0 { @@ -407,37 +414,26 @@ func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.F } func temperatureTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.Field { - fields := []table.Field{ - {Name: "Time"}, - {Name: "Core (Avg.)"}, - } - platformRows, err := extract.TurbostatPlatformRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"CoreTmp"}) + fields := []table.Field{} + reTemp := regexp.MustCompile(`^\w+Tmp$`) // matches CoreTmp, PkgTmp, etc. + platformRows, err := extract.TurbostatPlatformRowsByRegexMatch(outputs[script.TurbostatTelemetryScriptName].Stdout, []*regexp.Regexp{reTemp}) if err != nil { - slog.Warn(err.Error()) // not all systems report core temperature, e.g., cloud VMs + slog.Warn(err.Error()) // not all systems report temperature, e.g., cloud VMs return []table.Field{} } - packageRows, err := extract.TurbostatPackageRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"PkgTmp"}) - if err != nil { - // not an error, just means no package rows (package temperature) - slog.Warn(err.Error()) - } - // add the package rows to the fields - for i := range packageRows { - fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d", i)}) + if len(platformRows) == 0 { + return []table.Field{} } - // for each platform row + // dynamically build fields from the header row for i := range platformRows { - // append the timestamp to the fields - fields[0].Values = append(fields[0].Values, platformRows[i][0]) // Timestamp - // append the core temperature values to the fields - fields[1].Values = append(fields[1].Values, platformRows[i][1]) // Core temperature - } - // for each package - for i := range packageRows { - // traverse the rows - for _, row := range packageRows[i] { - // append the package temperature to the fields - fields[i+2].Values = append(fields[i+2].Values, row[1]) // Package temperature + if i == 0 { + for _, fieldName := range platformRows[i] { + fields = append(fields, table.Field{Name: fieldName}) + } + } else { + for j := range platformRows[i] { + fields[j].Values = append(fields[j].Values, platformRows[i][j]) + } } } if len(fields[0].Values) == 0 { diff --git a/internal/extract/turbostat.go b/internal/extract/turbostat.go index da14d137..c05df4e3 100644 --- a/internal/extract/turbostat.go +++ b/internal/extract/turbostat.go @@ -197,8 +197,113 @@ func isPlatformRow(row map[string]string) bool { return true } +// TurbostatPackageRowsByRegexMatch parses the output of the turbostat script and returns the rows +// for each package, matching fields by regex. +// Multiple fields may match the regex, all matching fields, and their values, will be returned in +// alphabetical order. +// Returns: +// - [][][]string: first dimension is indexed by package number, second dimension is the row +// for each package reading, third dimension has "timestamp" followed by matched field values. +// The first row of each package's data is the header. +func TurbostatPackageRowsByRegexMatch(turboStatScriptOutput string, fieldRegexs []*regexp.Regexp) ([][][]string, error) { + if turboStatScriptOutput == "" { + return nil, fmt.Errorf("turbostat output is empty") + } + if len(fieldRegexs) == 0 { + return nil, fmt.Errorf("no field regexes provided") + } + rows, err := parseTurbostatOutput(turboStatScriptOutput) + if err != nil { + return nil, fmt.Errorf("unable to parse turbostat output: %w", err) + } + if len(rows) == 0 { + return nil, fmt.Errorf("no package rows found in turbostat output") + } + // Build our list of matched field names from the first package row + var matchedFields []string + foundPackageRow := false + for _, row := range rows { + if _, ok := row["Package"]; !ok { + if row["CPU"] == "0" { + row["Package"] = "0" + } else { + continue + } + } + if !isPackageRow(row) { + continue + } + foundPackageRow = true + for field := range row { + for _, re := range fieldRegexs { + if re.MatchString(field) { + if !slices.Contains(matchedFields, field) { + matchedFields = append(matchedFields, field) + } + break + } + } + } + break // only need the first package row to discover fields + } + if !foundPackageRow { + return nil, fmt.Errorf("no package rows found in turbostat output") + } + if len(matchedFields) == 0 { + return nil, fmt.Errorf("no fields matched the provided regexes in turbostat output") + } + // Sort alphabetically for deterministic output since map iteration is unordered. + slices.Sort(matchedFields) + // Build the header row + header := make([]string, len(matchedFields)+1) + header[0] = "timestamp" + copy(header[1:], matchedFields) + // Collect rows per package + var packageRows [][][]string + for _, row := range rows { + if _, ok := row["Package"]; !ok { + if row["CPU"] == "0" { + row["Package"] = "0" + } else { + continue + } + } + if !isPackageRow(row) { + continue + } + rowValues := make([]string, len(matchedFields)+1) + rowValues[0] = row["timestamp"] + for i, field := range matchedFields { + rowValues[i+1] = row[field] + } + packageNum, err := strconv.Atoi(row["Package"]) + if err != nil { + return nil, fmt.Errorf("unable to parse package number: %s", row["Package"]) + } + if len(packageRows) < packageNum+1 { + // Initialize with header row followed by the data row + pkg := [][]string{header, rowValues} + // Extend slice to accommodate the package index + for len(packageRows) < packageNum { + packageRows = append(packageRows, nil) + } + packageRows = append(packageRows, pkg) + } else { + packageRows[packageNum] = append(packageRows[packageNum], rowValues) + } + } + if len(packageRows) == 0 { + return nil, fmt.Errorf("no data found in turbostat output for the provided regexes") + } + return packageRows, nil +} + // TurbostatPackageRows parses the output of the turbostat script and returns the rows // for each package. +// Returns: +// - [][][]string: first dimension is indexed by package number, second dimension is the row +// for each package reading, third dimension is the fields for each row with the first field +// being the timestamp followed by the requested field names in order. func TurbostatPackageRows(turboStatScriptOutput string, fieldNames []string) ([][][]string, error) { if turboStatScriptOutput == "" { return nil, fmt.Errorf("turbostat output is empty") From 783caed2c345fbe03e453db74c0dc9966586c628 Mon Sep 17 00:00:00 2001 From: "Harper, Jason M" Date: Thu, 5 Mar 2026 16:32:07 -0800 Subject: [PATCH 2/4] fix: address PR review feedback for regex telemetry - Fix nil placeholder bug: initialize gap entries with header row instead of nil, and guard the else branch for late-initialized packages - Fix getPkgAveragePower: match fields by "PkgWatt" suffix instead of "Package" prefix to avoid including RAMWatt in the average - Fix getPkgAverageTemperature: match "PkgTmp" field name instead of "Package" prefix since fields are now named from turbostat headers - Add nil/empty guards in powerTelemetryTableValues to skip packages with no data rows - Add unit tests for TurbostatPackageRowsByRegexMatch covering happy paths, field ordering, error cases, and edge cases Co-Authored-By: Claude Opus 4.6 --- cmd/telemetry/telemetry.go | 4 +- cmd/telemetry/telemetry_tables.go | 8 +- internal/extract/turbostat.go | 8 +- internal/extract/turbostat_test.go | 128 +++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 5 deletions(-) diff --git a/cmd/telemetry/telemetry.go b/cmd/telemetry/telemetry.go index cdb801dc..02781535 100644 --- a/cmd/telemetry/telemetry.go +++ b/cmd/telemetry/telemetry.go @@ -505,7 +505,7 @@ func getPkgAverageTemperature(allTableValues []table.TableValues) string { } pkgTempFieldIndices := make([]int, 0) for i, field := range tableValues.Fields { - if strings.HasPrefix(field.Name, "Package") { + if field.Name == "PkgTmp" { pkgTempFieldIndices = append(pkgTempFieldIndices, i) } } @@ -538,7 +538,7 @@ func getPkgAveragePower(allTableValues []table.TableValues) string { } pkgPowerFieldIndices := make([]int, 0) for i, field := range tableValues.Fields { - if strings.HasPrefix(field.Name, "Package") { + if strings.HasSuffix(field.Name, "PkgWatt") { pkgPowerFieldIndices = append(pkgPowerFieldIndices, i) } } diff --git a/cmd/telemetry/telemetry_tables.go b/cmd/telemetry/telemetry_tables.go index dc62fa20..1a673620 100644 --- a/cmd/telemetry/telemetry_tables.go +++ b/cmd/telemetry/telemetry_tables.go @@ -390,13 +390,19 @@ func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.F // e.g., "Package 0 PkgWatt", "Package 0 RAMWatt", "Package 1 PkgWatt", ... header := packageRows[0][0] fields := []table.Field{{Name: "Time"}} - for i := range packageRows { + for i, pkgRows := range packageRows { + if len(pkgRows) == 0 { + continue + } for _, fieldName := range header[1:] { fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d %s", i, fieldName)}) } } numMetrics := len(header) - 1 // number of matched watt fields per package for i, pkgRows := range packageRows { + if len(pkgRows) < 2 { + continue // skip packages with only a header or empty + } // skip the header row (first row), iterate data rows for _, row := range pkgRows[1:] { if i == 0 { diff --git a/internal/extract/turbostat.go b/internal/extract/turbostat.go index c05df4e3..72eeb529 100644 --- a/internal/extract/turbostat.go +++ b/internal/extract/turbostat.go @@ -283,12 +283,16 @@ func TurbostatPackageRowsByRegexMatch(turboStatScriptOutput string, fieldRegexs if len(packageRows) < packageNum+1 { // Initialize with header row followed by the data row pkg := [][]string{header, rowValues} - // Extend slice to accommodate the package index + // Extend slice to accommodate the package index, initializing placeholders with header for len(packageRows) < packageNum { - packageRows = append(packageRows, nil) + packageRows = append(packageRows, [][]string{header}) } packageRows = append(packageRows, pkg) } else { + // Ensure the package slice is initialized with the header before appending data rows + if packageRows[packageNum] == nil { + packageRows[packageNum] = [][]string{header} + } packageRows[packageNum] = append(packageRows[packageNum], rowValues) } } diff --git a/internal/extract/turbostat_test.go b/internal/extract/turbostat_test.go index 83f479ba..228f504f 100644 --- a/internal/extract/turbostat_test.go +++ b/internal/extract/turbostat_test.go @@ -5,6 +5,7 @@ package extract import ( "reflect" + "regexp" "strings" "testing" ) @@ -575,3 +576,130 @@ Package Core CPU Avg_MHz Busy% }) } } + +func TestTurbostatPackageRowsByRegexMatch(t *testing.T) { + tests := []struct { + name string + turbostatOutput string + fieldRegexs []*regexp.Regexp + want [][][]string + wantErr bool + }{ + { + name: "Match Watt fields across two packages", + turbostatOutput: turbostatOutput, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^\w+Watt$`)}, + want: [][][]string{ + {{"timestamp", "PkgWatt", "RAMWatt"}, {"15:04:05", "223.53", "7.38"}, {"15:04:07", "229.53", "7.38"}, {"15:04:09", "223.53", "7.38"}}, + {{"timestamp", "PkgWatt", "RAMWatt"}, {"15:04:05", "208.40", "16.83"}, {"15:04:07", "218.40", "16.83"}, {"15:04:09", "208.40", "16.83"}}, + }, + wantErr: false, + }, + { + name: "Two packages, single field match", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz Busy% FooBar +- - - 1000 10 999 +0 0 0 1000 10 999 +0 1 1 1100 11 +1 0 2 2000 20 999 +1 1 3 2100 21 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^Avg_MHz$`)}, + want: [][][]string{ + {{"timestamp", "Avg_MHz"}, {"12:00:00", "1000"}}, + {{"timestamp", "Avg_MHz"}, {"12:00:00", "2000"}}, + }, + wantErr: false, + }, + { + name: "Fields sorted alphabetically", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Zebra Alpha +- - - 1 2 +0 0 0 10 20 +1 0 2 30 40 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^(Zebra|Alpha)$`)}, + want: [][][]string{ + {{"timestamp", "Alpha", "Zebra"}, {"12:00:00", "20", "10"}}, + {{"timestamp", "Alpha", "Zebra"}, {"12:00:00", "40", "30"}}, + }, + wantErr: false, + }, + { + name: "No regex matches", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +- - - 1000 +0 0 0 1000 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^NoMatch$`)}, + want: nil, + wantErr: true, + }, + { + name: "Empty output", + turbostatOutput: "", + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^\w+$`)}, + want: nil, + wantErr: true, + }, + { + name: "Empty regexes", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +0 0 0 1000 +`, + fieldRegexs: []*regexp.Regexp{}, + want: nil, + wantErr: true, + }, + { + name: "No package rows", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +- - - 999 +- - - 888 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^Avg_MHz$`)}, + want: nil, + wantErr: true, + }, + { + name: "Malformed package number", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +X 0 0 1000 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^Avg_MHz$`)}, + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := TurbostatPackageRowsByRegexMatch(tt.turbostatOutput, tt.fieldRegexs) + if (err != nil) != tt.wantErr { + t.Errorf("TurbostatPackageRowsByRegexMatch() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("TurbostatPackageRowsByRegexMatch() = %v, want %v", got, tt.want) + } + }) + } +} From e2dcdd6e9d0ae4a8560fe3740727d4dcc40c0edb Mon Sep 17 00:00:00 2001 From: "Harper, Jason M" Date: Thu, 5 Mar 2026 16:47:47 -0800 Subject: [PATCH 3/4] fix: address second round of review feedback - Clarify error message when no rows found (vs no package rows) - Wrap Atoi error with %w in package number parsing - Update getPkgAverageTemperature comment to reflect platform-level PkgTmp Co-Authored-By: Claude Opus 4.6 --- cmd/telemetry/telemetry.go | 2 +- internal/extract/turbostat.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/telemetry/telemetry.go b/cmd/telemetry/telemetry.go index 02781535..4971f536 100644 --- a/cmd/telemetry/telemetry.go +++ b/cmd/telemetry/telemetry.go @@ -499,7 +499,7 @@ func getCPUAveragePercentage(tableValues table.TableValues, fieldName string, in func getPkgAverageTemperature(allTableValues []table.TableValues) string { tableValues := getTableValues(allTableValues, TemperatureTelemetryTableName) - // number of packages can vary, so we need to find the average temperature across all packages + // compute the average PkgTmp value across all samples if len(tableValues.Fields) == 0 { return "" } diff --git a/internal/extract/turbostat.go b/internal/extract/turbostat.go index 72eeb529..9aa98e3d 100644 --- a/internal/extract/turbostat.go +++ b/internal/extract/turbostat.go @@ -217,7 +217,7 @@ func TurbostatPackageRowsByRegexMatch(turboStatScriptOutput string, fieldRegexs return nil, fmt.Errorf("unable to parse turbostat output: %w", err) } if len(rows) == 0 { - return nil, fmt.Errorf("no package rows found in turbostat output") + return nil, fmt.Errorf("no rows found in turbostat output") } // Build our list of matched field names from the first package row var matchedFields []string @@ -278,7 +278,7 @@ func TurbostatPackageRowsByRegexMatch(turboStatScriptOutput string, fieldRegexs } packageNum, err := strconv.Atoi(row["Package"]) if err != nil { - return nil, fmt.Errorf("unable to parse package number: %s", row["Package"]) + return nil, fmt.Errorf("unable to parse package number %q: %w", row["Package"], err) } if len(packageRows) < packageNum+1 { // Initialize with header row followed by the data row @@ -345,7 +345,7 @@ func TurbostatPackageRows(turboStatScriptOutput string, fieldNames []string) ([] } packageNum, err := strconv.Atoi(row["Package"]) if err != nil { - return nil, fmt.Errorf("unable to parse package number: %s", row["Package"]) + return nil, fmt.Errorf("unable to parse package number %q: %w", row["Package"], err) } if len(packageRows) < packageNum+1 { packageRows = append(packageRows, [][]string{rowValues}) From d9657202a496c5f6502ab264d6919c44d4762be2 Mon Sep 17 00:00:00 2001 From: "Harper, Jason M" Date: Fri, 6 Mar 2026 07:49:26 -0800 Subject: [PATCH 4/4] warn on error form ProcessEvents, error if it happens more than 2 times in a row Signed-off-by: Harper, Jason M --- cmd/metrics/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/metrics/metrics.go b/cmd/metrics/metrics.go index 4b8c62ba..a81257fc 100644 --- a/cmd/metrics/metrics.go +++ b/cmd/metrics/metrics.go @@ -1577,7 +1577,7 @@ func processPerfOutput( var err error metricFrames, frameTimestamp, err = ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata) if err != nil { - slog.Error(err.Error()) + slog.Warn(err.Error()) numConsecutiveProcessEventErrors++ if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors { slog.Error("too many consecutive errors processing events, triggering shutdown", slog.Int("max errors", maxConsecutiveProcessEventErrors))