Skip to content

Commit dc6ad77

Browse files
author
Michal Tichák
committed
[core] OCTRL-1003
-aggregating metrics so we don't flod influxdb -changed json to influxdb line format to properly handle tags
1 parent 0e7e9c4 commit dc6ad77

File tree

12 files changed

+1024
-118
lines changed

12 files changed

+1024
-118
lines changed

common/ecsmetrics/metric.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Michal Tichak <michal.tichak@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package ecsmetrics
226

327
import (
@@ -7,8 +31,7 @@ import (
731
)
832

933
func NewMetric(name string) monitoring.Metric {
10-
timestamp := time.Now()
11-
metric := monitoring.Metric{Name: name, Timestamp: timestamp.UnixMilli()}
34+
metric := monitoring.NewMetric(name, time.Now())
1235
metric.AddTag("subsystem", "ECS")
1336
return metric
1437
}
@@ -18,13 +41,13 @@ func NewMetric(name string) monitoring.Metric {
1841
func TimerMS(metric *monitoring.Metric) func() {
1942
start := time.Now()
2043
return func() {
21-
metric.AddValue("execution_time_ms", time.Since(start).Milliseconds())
44+
metric.SetFieldInt64("execution_time_ms", time.Since(start).Milliseconds())
2245
}
2346
}
2447

2548
func TimerNS(metric *monitoring.Metric) func() {
2649
start := time.Now()
2750
return func() {
28-
metric.AddValue("execution_time_ns", time.Since(start).Nanoseconds())
51+
metric.SetFieldInt64("execution_time_ns", time.Since(start).Nanoseconds())
2952
}
3053
}

common/ecsmetrics/metrics.go

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,27 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Michal Tichak <michal.tichak@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package ecsmetrics
226

327
import (
@@ -38,9 +62,9 @@ func gather() monitoring.Metric {
3862
for _, sample := range samples {
3963
switch sample.Value.Kind() {
4064
case internalmetrics.KindUint64:
41-
metric.AddValue(sample.Name, sample.Value.Uint64())
65+
metric.SetFieldUInt64(sample.Name, sample.Value.Uint64())
4266
case internalmetrics.KindFloat64:
43-
metric.AddValue(sample.Name, sample.Value.Float64())
67+
metric.SetFieldFloat64(sample.Name, sample.Value.Float64())
4468
case internalmetrics.KindFloat64Histogram:
4569
log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
4670
continue
@@ -64,7 +88,8 @@ func StartGolangMetrics(period time.Duration) {
6488
return
6589
default:
6690
log.Debug("sending golang metrics")
67-
monitoring.Send(gather())
91+
metric := gather()
92+
monitoring.Send(&metric)
6893
time.Sleep(period)
6994
}
7095
}

common/ecsmetrics/metrics_test.go

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,30 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Michal Tichak <michal.tichak@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package ecsmetrics
226

327
import (
4-
"fmt"
528
"testing"
629
"time"
730

@@ -17,12 +40,11 @@ func measureFunc(metric *monitoring.Metric) {
1740
func TestSimpleStartStop(t *testing.T) {
1841
metric := NewMetric("test")
1942
measureFunc(&metric)
20-
fmt.Println(metric.Values["execution_time_ms"])
21-
fmt.Println(metric.Values["execution_time_ns"])
22-
if metric.Values["execution_time_ms"].(int64) < 100 {
43+
fields := metric.GetFields()
44+
if fields["execution_time_ms"].(int64) < 100 {
2345
t.Error("wrong milliseconds")
2446
}
25-
if metric.Values["execution_time_ns"].(int64) < 100000000 {
47+
if fields["execution_time_ns"].(int64) < 100000000 {
2648
t.Error("wrong nanoseconds")
2749
}
2850
}

common/event/writer.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func NewWriterWithTopic(topic topic.Topic) *KafkaWriter {
105105
writer.writeFunction = func(messages []kafka.Message, metric *monitoring.Metric) {
106106
defer ecsmetrics.TimerNS(metric)()
107107
if err := writer.WriteMessages(context.Background(), messages...); err != nil {
108-
metric.AddValue("messages_failed", len(messages))
108+
metric.SetFieldUInt64("messages_failed", uint64(len(messages)))
109109
log.Errorf("failed to write %d messages to kafka with error: %v", len(messages), err)
110110
}
111111
}
@@ -145,12 +145,12 @@ func (w *KafkaWriter) writingLoop() {
145145
}
146146

147147
metric := w.newMetric(KAFKAWRITER)
148-
metric.AddValue("messages_sent", len(messagesToSend))
149-
metric.AddValue("messages_failed", 0)
148+
metric.SetFieldUInt64("messages_sent", uint64(len(messagesToSend)))
149+
metric.SetFieldUInt64("messages_failed", 0)
150150

151151
w.writeFunction(messagesToSend, &metric)
152152

153-
monitoring.Send(metric)
153+
monitoring.Send(&metric)
154154
}
155155
}
156156
}
@@ -262,5 +262,5 @@ func (w *KafkaWriter) WriteEventWithTimestamp(e interface{}, timestamp time.Time
262262
w.toBatchMessagesChan <- message
263263
}()
264264

265-
monitoring.Send(metric)
265+
monitoring.Send(&metric)
266266
}

common/monitoring/metric.go

Lines changed: 114 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,127 @@
1+
/*
2+
* === This file is part of ALICE O² ===
3+
*
4+
* Copyright 2025 CERN and copyright holders of ALICE O².
5+
* Author: Michal Tichak <michal.tichak@cern.ch>
6+
*
7+
* This program is free software: you can redistribute it and/or modify
8+
* it under the terms of the GNU General Public License as published by
9+
* the Free Software Foundation, either version 3 of the License, or
10+
* (at your option) any later version.
11+
*
12+
* This program is distributed in the hope that it will be useful,
13+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
* GNU General Public License for more details.
16+
*
17+
* You should have received a copy of the GNU General Public License
18+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
*
20+
* In applying this license CERN does not waive the privileges and
21+
* immunities granted to it by virtue of its status as an
22+
* Intergovernmental Organization or submit itself to any jurisdiction.
23+
*/
24+
125
package monitoring
226

27+
import (
28+
"fmt"
29+
"io"
30+
"time"
31+
32+
lp "github.com/influxdata/line-protocol/v2/lineprotocol"
33+
)
34+
335
type (
4-
TagsType map[string]any
5-
ValuesType map[string]any
36+
Tag struct {
37+
name string
38+
value string
39+
}
40+
41+
TagsType []Tag
42+
FieldsType map[string]any
643
)
744

845
type Metric struct {
9-
Name string `json:"name"`
10-
Values ValuesType `json:"values"`
11-
Tags TagsType `json:"tags,omitempty"`
12-
Timestamp int64 `json:"timestamp"`
46+
name string
47+
fields FieldsType
48+
tags TagsType
49+
timestamp time.Time
50+
}
51+
52+
func NewMetric(name string, timestamp time.Time) Metric {
53+
return Metric{
54+
name: name, timestamp: timestamp,
55+
}
56+
}
57+
58+
func (metric *Metric) GetFields() (fields FieldsType) {
59+
for fieldName, field := range metric.fields {
60+
fields[fieldName] = field
61+
}
62+
return
63+
}
64+
65+
func (metric *Metric) AddTag(tagName string, value string) {
66+
metric.tags = append(metric.tags, Tag{name: tagName, value: value})
1367
}
1468

15-
func (metric *Metric) AddTag(tagName string, value any) {
16-
if metric.Tags == nil {
17-
metric.Tags = make(TagsType)
69+
func (metric *Metric) setField(fieldName string, field any) {
70+
if metric.fields == nil {
71+
metric.fields = make(FieldsType)
1872
}
19-
metric.Tags[tagName] = value
73+
metric.fields[fieldName] = field
2074
}
2175

22-
func (metric *Metric) AddValue(valueName string, value any) {
23-
if metric.Values == nil {
24-
metric.Values = make(ValuesType)
76+
func (metric *Metric) SetFieldInt64(fieldName string, field int64) {
77+
metric.setField(fieldName, field)
78+
}
79+
80+
func (metric *Metric) SetFieldUInt64(fieldName string, field uint64) {
81+
metric.setField(fieldName, field)
82+
}
83+
84+
func (metric *Metric) SetFieldFloat64(fieldName string, field float64) {
85+
metric.setField(fieldName, field)
86+
}
87+
88+
func (metric *Metric) MergeFields(other *Metric) {
89+
for fieldName, field := range other.fields {
90+
if storedField, ok := metric.fields[fieldName]; ok {
91+
switch v := field.(type) {
92+
case int64:
93+
metric.fields[fieldName] = v + storedField.(int64)
94+
case uint64:
95+
metric.fields[fieldName] = v + storedField.(uint64)
96+
case float64:
97+
metric.fields[fieldName] = v + storedField.(float64)
98+
}
99+
} else {
100+
metric.fields[fieldName] = field
101+
}
102+
}
103+
}
104+
105+
func Format(writer io.Writer, metrics []Metric) error {
106+
var enc lp.Encoder
107+
108+
for _, metric := range metrics {
109+
enc.StartLine(metric.name)
110+
for _, tag := range metric.tags {
111+
enc.AddTag(tag.name, tag.value)
112+
}
113+
114+
for fieldName, field := range metric.fields {
115+
// we cannot panic as we provide accessors only for allowed type with AddField*
116+
enc.AddField(fieldName, lp.MustNewValue(field))
117+
}
118+
enc.EndLine(metric.timestamp)
25119
}
26-
metric.Values[valueName] = value
120+
121+
if err := enc.Err(); err != nil {
122+
return err
123+
}
124+
125+
_, err := fmt.Fprintf(writer, "%s", enc.Bytes())
126+
return err
27127
}

0 commit comments

Comments
 (0)