diff --git a/cmd/metrics/metrics.go b/cmd/metrics/metrics.go index 4b8c62ba..a81257fc 100644 --- a/cmd/metrics/metrics.go +++ b/cmd/metrics/metrics.go @@ -1577,7 +1577,7 @@ func processPerfOutput( var err error metricFrames, frameTimestamp, err = ProcessEvents(batchLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata) if err != nil { - slog.Error(err.Error()) + slog.Warn(err.Error()) numConsecutiveProcessEventErrors++ if numConsecutiveProcessEventErrors > maxConsecutiveProcessEventErrors { slog.Error("too many consecutive errors processing events, triggering shutdown", slog.Int("max errors", maxConsecutiveProcessEventErrors)) diff --git a/cmd/telemetry/telemetry.go b/cmd/telemetry/telemetry.go index cdb801dc..4971f536 100644 --- a/cmd/telemetry/telemetry.go +++ b/cmd/telemetry/telemetry.go @@ -499,13 +499,13 @@ func getCPUAveragePercentage(tableValues table.TableValues, fieldName string, in func getPkgAverageTemperature(allTableValues []table.TableValues) string { tableValues := getTableValues(allTableValues, TemperatureTelemetryTableName) - // number of packages can vary, so we need to find the average temperature across all packages + // compute the average PkgTmp value across all samples if len(tableValues.Fields) == 0 { return "" } pkgTempFieldIndices := make([]int, 0) for i, field := range tableValues.Fields { - if strings.HasPrefix(field.Name, "Package") { + if field.Name == "PkgTmp" { pkgTempFieldIndices = append(pkgTempFieldIndices, i) } } @@ -538,7 +538,7 @@ func getPkgAveragePower(allTableValues []table.TableValues) string { } pkgPowerFieldIndices := make([]int, 0) for i, field := range tableValues.Fields { - if strings.HasPrefix(field.Name, "Package") { + if strings.HasSuffix(field.Name, "PkgWatt") { pkgPowerFieldIndices = append(pkgPowerFieldIndices, i) } } diff --git a/cmd/telemetry/telemetry_tables.go b/cmd/telemetry/telemetry_tables.go index 5a68093a..1a673620 100644 --- a/cmd/telemetry/telemetry_tables.go +++ b/cmd/telemetry/telemetry_tables.go @@ -375,29 +375,42 @@ func memoryTelemetryTableValues(outputs map[string]script.ScriptOutput) []table. } func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.Field { - fields := []table.Field{ - {Name: "Time"}, - } - packageRows, err := extract.TurbostatPackageRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"PkgWatt", "RAMWatt"}) + reWatt := regexp.MustCompile(`^\w+Watt$`) // matches PkgWatt, RAMWatt, etc. + packageRows, err := extract.TurbostatPackageRowsByRegexMatch(outputs[script.TurbostatTelemetryScriptName].Stdout, []*regexp.Regexp{reWatt}) if err != nil { slog.Warn(err.Error()) return []table.Field{} } - for i := range packageRows { - fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d", i)}) - fields = append(fields, table.Field{Name: fmt.Sprintf("DRAM %d", i)}) + if len(packageRows) == 0 { + return []table.Field{} } - // for each package - numPackages := len(packageRows) - for i := range packageRows { - // traverse the rows - for _, row := range packageRows[i] { + // dynamically build fields from the header row of the first package + // header format: ["timestamp", "PkgWatt", "RAMWatt", ...] + // fields will be: "Time", then for each package, one field per matched watt metric + // e.g., "Package 0 PkgWatt", "Package 0 RAMWatt", "Package 1 PkgWatt", ... + header := packageRows[0][0] + fields := []table.Field{{Name: "Time"}} + for i, pkgRows := range packageRows { + if len(pkgRows) == 0 { + continue + } + for _, fieldName := range header[1:] { + fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d %s", i, fieldName)}) + } + } + numMetrics := len(header) - 1 // number of matched watt fields per package + for i, pkgRows := range packageRows { + if len(pkgRows) < 2 { + continue // skip packages with only a header or empty + } + // skip the header row (first row), iterate data rows + for _, row := range pkgRows[1:] { if i == 0 { fields[0].Values = append(fields[0].Values, row[0]) // Timestamp } - // append the package power and DRAM power to the fields - fields[i*numPackages+1].Values = append(fields[i*numPackages+1].Values, row[1]) // Package power - fields[i*numPackages+2].Values = append(fields[i*numPackages+2].Values, row[2]) // DRAM power + for j := range numMetrics { + fields[i*numMetrics+1+j].Values = append(fields[i*numMetrics+1+j].Values, row[j+1]) + } } } if len(fields[0].Values) == 0 { @@ -407,37 +420,26 @@ func powerTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.F } func temperatureTelemetryTableValues(outputs map[string]script.ScriptOutput) []table.Field { - fields := []table.Field{ - {Name: "Time"}, - {Name: "Core (Avg.)"}, - } - platformRows, err := extract.TurbostatPlatformRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"CoreTmp"}) + fields := []table.Field{} + reTemp := regexp.MustCompile(`^\w+Tmp$`) // matches CoreTmp, PkgTmp, etc. + platformRows, err := extract.TurbostatPlatformRowsByRegexMatch(outputs[script.TurbostatTelemetryScriptName].Stdout, []*regexp.Regexp{reTemp}) if err != nil { - slog.Warn(err.Error()) // not all systems report core temperature, e.g., cloud VMs + slog.Warn(err.Error()) // not all systems report temperature, e.g., cloud VMs return []table.Field{} } - packageRows, err := extract.TurbostatPackageRows(outputs[script.TurbostatTelemetryScriptName].Stdout, []string{"PkgTmp"}) - if err != nil { - // not an error, just means no package rows (package temperature) - slog.Warn(err.Error()) - } - // add the package rows to the fields - for i := range packageRows { - fields = append(fields, table.Field{Name: fmt.Sprintf("Package %d", i)}) + if len(platformRows) == 0 { + return []table.Field{} } - // for each platform row + // dynamically build fields from the header row for i := range platformRows { - // append the timestamp to the fields - fields[0].Values = append(fields[0].Values, platformRows[i][0]) // Timestamp - // append the core temperature values to the fields - fields[1].Values = append(fields[1].Values, platformRows[i][1]) // Core temperature - } - // for each package - for i := range packageRows { - // traverse the rows - for _, row := range packageRows[i] { - // append the package temperature to the fields - fields[i+2].Values = append(fields[i+2].Values, row[1]) // Package temperature + if i == 0 { + for _, fieldName := range platformRows[i] { + fields = append(fields, table.Field{Name: fieldName}) + } + } else { + for j := range platformRows[i] { + fields[j].Values = append(fields[j].Values, platformRows[i][j]) + } } } if len(fields[0].Values) == 0 { diff --git a/internal/extract/turbostat.go b/internal/extract/turbostat.go index da14d137..9aa98e3d 100644 --- a/internal/extract/turbostat.go +++ b/internal/extract/turbostat.go @@ -197,8 +197,117 @@ func isPlatformRow(row map[string]string) bool { return true } +// TurbostatPackageRowsByRegexMatch parses the output of the turbostat script and returns the rows +// for each package, matching fields by regex. +// Multiple fields may match the regex, all matching fields, and their values, will be returned in +// alphabetical order. +// Returns: +// - [][][]string: first dimension is indexed by package number, second dimension is the row +// for each package reading, third dimension has "timestamp" followed by matched field values. +// The first row of each package's data is the header. +func TurbostatPackageRowsByRegexMatch(turboStatScriptOutput string, fieldRegexs []*regexp.Regexp) ([][][]string, error) { + if turboStatScriptOutput == "" { + return nil, fmt.Errorf("turbostat output is empty") + } + if len(fieldRegexs) == 0 { + return nil, fmt.Errorf("no field regexes provided") + } + rows, err := parseTurbostatOutput(turboStatScriptOutput) + if err != nil { + return nil, fmt.Errorf("unable to parse turbostat output: %w", err) + } + if len(rows) == 0 { + return nil, fmt.Errorf("no rows found in turbostat output") + } + // Build our list of matched field names from the first package row + var matchedFields []string + foundPackageRow := false + for _, row := range rows { + if _, ok := row["Package"]; !ok { + if row["CPU"] == "0" { + row["Package"] = "0" + } else { + continue + } + } + if !isPackageRow(row) { + continue + } + foundPackageRow = true + for field := range row { + for _, re := range fieldRegexs { + if re.MatchString(field) { + if !slices.Contains(matchedFields, field) { + matchedFields = append(matchedFields, field) + } + break + } + } + } + break // only need the first package row to discover fields + } + if !foundPackageRow { + return nil, fmt.Errorf("no package rows found in turbostat output") + } + if len(matchedFields) == 0 { + return nil, fmt.Errorf("no fields matched the provided regexes in turbostat output") + } + // Sort alphabetically for deterministic output since map iteration is unordered. + slices.Sort(matchedFields) + // Build the header row + header := make([]string, len(matchedFields)+1) + header[0] = "timestamp" + copy(header[1:], matchedFields) + // Collect rows per package + var packageRows [][][]string + for _, row := range rows { + if _, ok := row["Package"]; !ok { + if row["CPU"] == "0" { + row["Package"] = "0" + } else { + continue + } + } + if !isPackageRow(row) { + continue + } + rowValues := make([]string, len(matchedFields)+1) + rowValues[0] = row["timestamp"] + for i, field := range matchedFields { + rowValues[i+1] = row[field] + } + packageNum, err := strconv.Atoi(row["Package"]) + if err != nil { + return nil, fmt.Errorf("unable to parse package number %q: %w", row["Package"], err) + } + if len(packageRows) < packageNum+1 { + // Initialize with header row followed by the data row + pkg := [][]string{header, rowValues} + // Extend slice to accommodate the package index, initializing placeholders with header + for len(packageRows) < packageNum { + packageRows = append(packageRows, [][]string{header}) + } + packageRows = append(packageRows, pkg) + } else { + // Ensure the package slice is initialized with the header before appending data rows + if packageRows[packageNum] == nil { + packageRows[packageNum] = [][]string{header} + } + packageRows[packageNum] = append(packageRows[packageNum], rowValues) + } + } + if len(packageRows) == 0 { + return nil, fmt.Errorf("no data found in turbostat output for the provided regexes") + } + return packageRows, nil +} + // TurbostatPackageRows parses the output of the turbostat script and returns the rows // for each package. +// Returns: +// - [][][]string: first dimension is indexed by package number, second dimension is the row +// for each package reading, third dimension is the fields for each row with the first field +// being the timestamp followed by the requested field names in order. func TurbostatPackageRows(turboStatScriptOutput string, fieldNames []string) ([][][]string, error) { if turboStatScriptOutput == "" { return nil, fmt.Errorf("turbostat output is empty") @@ -236,7 +345,7 @@ func TurbostatPackageRows(turboStatScriptOutput string, fieldNames []string) ([] } packageNum, err := strconv.Atoi(row["Package"]) if err != nil { - return nil, fmt.Errorf("unable to parse package number: %s", row["Package"]) + return nil, fmt.Errorf("unable to parse package number %q: %w", row["Package"], err) } if len(packageRows) < packageNum+1 { packageRows = append(packageRows, [][]string{rowValues}) diff --git a/internal/extract/turbostat_test.go b/internal/extract/turbostat_test.go index 83f479ba..228f504f 100644 --- a/internal/extract/turbostat_test.go +++ b/internal/extract/turbostat_test.go @@ -5,6 +5,7 @@ package extract import ( "reflect" + "regexp" "strings" "testing" ) @@ -575,3 +576,130 @@ Package Core CPU Avg_MHz Busy% }) } } + +func TestTurbostatPackageRowsByRegexMatch(t *testing.T) { + tests := []struct { + name string + turbostatOutput string + fieldRegexs []*regexp.Regexp + want [][][]string + wantErr bool + }{ + { + name: "Match Watt fields across two packages", + turbostatOutput: turbostatOutput, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^\w+Watt$`)}, + want: [][][]string{ + {{"timestamp", "PkgWatt", "RAMWatt"}, {"15:04:05", "223.53", "7.38"}, {"15:04:07", "229.53", "7.38"}, {"15:04:09", "223.53", "7.38"}}, + {{"timestamp", "PkgWatt", "RAMWatt"}, {"15:04:05", "208.40", "16.83"}, {"15:04:07", "218.40", "16.83"}, {"15:04:09", "208.40", "16.83"}}, + }, + wantErr: false, + }, + { + name: "Two packages, single field match", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz Busy% FooBar +- - - 1000 10 999 +0 0 0 1000 10 999 +0 1 1 1100 11 +1 0 2 2000 20 999 +1 1 3 2100 21 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^Avg_MHz$`)}, + want: [][][]string{ + {{"timestamp", "Avg_MHz"}, {"12:00:00", "1000"}}, + {{"timestamp", "Avg_MHz"}, {"12:00:00", "2000"}}, + }, + wantErr: false, + }, + { + name: "Fields sorted alphabetically", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Zebra Alpha +- - - 1 2 +0 0 0 10 20 +1 0 2 30 40 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^(Zebra|Alpha)$`)}, + want: [][][]string{ + {{"timestamp", "Alpha", "Zebra"}, {"12:00:00", "20", "10"}}, + {{"timestamp", "Alpha", "Zebra"}, {"12:00:00", "40", "30"}}, + }, + wantErr: false, + }, + { + name: "No regex matches", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +- - - 1000 +0 0 0 1000 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^NoMatch$`)}, + want: nil, + wantErr: true, + }, + { + name: "Empty output", + turbostatOutput: "", + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^\w+$`)}, + want: nil, + wantErr: true, + }, + { + name: "Empty regexes", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +0 0 0 1000 +`, + fieldRegexs: []*regexp.Regexp{}, + want: nil, + wantErr: true, + }, + { + name: "No package rows", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +- - - 999 +- - - 888 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^Avg_MHz$`)}, + want: nil, + wantErr: true, + }, + { + name: "Malformed package number", + turbostatOutput: ` +TIME: 12:00:00 +INTERVAL: 1 +Package Core CPU Avg_MHz +X 0 0 1000 +`, + fieldRegexs: []*regexp.Regexp{regexp.MustCompile(`^Avg_MHz$`)}, + want: nil, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := TurbostatPackageRowsByRegexMatch(tt.turbostatOutput, tt.fieldRegexs) + if (err != nil) != tt.wantErr { + t.Errorf("TurbostatPackageRowsByRegexMatch() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && !reflect.DeepEqual(got, tt.want) { + t.Errorf("TurbostatPackageRowsByRegexMatch() = %v, want %v", got, tt.want) + } + }) + } +}