diff --git a/README.md b/README.md index 7623a0b1..ff1d0c55 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,7 @@ If you are still using the legacy [Access scopes][access-scopes], the `https://w | `monitoring.aggregate-deltas` | No | | If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge. Be sure to read [what to know about aggregating DELTA metrics](#what-to-know-about-aggregating-delta-metrics) | | `monitoring.aggregate-deltas-ttl` | No | `30m` | How long should a delta metric continue to be exported and stored after GCP stops producing it. Read [slow moving metrics](#slow-moving-metrics) to understand the problem this attempts to solve | | `monitoring.descriptor-cache-ttl` | No | `0s` | How long should the metric descriptors for a prefixed be cached for | +| `monitoring.ignore-duplicates` | No | `false` | Ignore duplicate time series returned by Cloud Monitoring | | `stackdriver.max-retries` | No | `0` | Max number of retries that should be attempted on 503 errors from stackdriver. | | `stackdriver.http-timeout` | No | `10s` | How long should stackdriver_exporter wait for a result from the Stackdriver API. | | `stackdriver.max-backoff=` | No | | Max time between each request in an exp backoff scenario. | diff --git a/collectors/gatherer.go b/collectors/gatherer.go new file mode 100644 index 00000000..34e24f8c --- /dev/null +++ b/collectors/gatherer.go @@ -0,0 +1,77 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collectors + +import ( + "errors" + "fmt" + "strings" + + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +const duplicateSeriesErrorMessage = "was collected before with the same name and label values" + +// GathererFor builds a gatherer from the given collectors. when IgnoreDuplicates +// is set it also drops duplicate series that Cloud Monitoring returns. +func (r *Runtime) GathererFor(cs []*MonitoringCollector) (prometheus.Gatherer, error) { + registry := prometheus.NewRegistry() + for _, c := range cs { + if err := registry.Register(c); err != nil { + return nil, fmt.Errorf("register collector: %w", err) + } + } + if r.cfg.IgnoreDuplicates { + return IgnoreDuplicatesGatherer(registry), nil + } + return registry, nil +} + +// IgnoreDuplicatesGatherer drops the error that shows up when Cloud Monitoring +// returns the same series twice. the series that are left still get gathered, +// and anything that differs by timestamp stays separate. +func IgnoreDuplicatesGatherer(gatherer prometheus.Gatherer) prometheus.Gatherer { + return prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) { + families, err := gatherer.Gather() + return families, suppressDuplicateSeriesErrors(err) + }) +} + +func suppressDuplicateSeriesErrors(err error) error { + if err == nil { + return nil + } + + var multiErr prometheus.MultiError + if !errors.As(err, &multiErr) { + if isDuplicateSeriesError(err) { + return nil + } + return err + } + + remaining := prometheus.MultiError{} + for _, e := range multiErr { + if isDuplicateSeriesError(e) { + continue + } + remaining = append(remaining, e) + } + return remaining.MaybeUnwrap() +} + +func isDuplicateSeriesError(err error) bool { + return strings.Contains(err.Error(), duplicateSeriesErrorMessage) +} diff --git a/collectors/gatherer_test.go b/collectors/gatherer_test.go new file mode 100644 index 00000000..aa4039dc --- /dev/null +++ b/collectors/gatherer_test.go @@ -0,0 +1,108 @@ +// Copyright The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collectors + +import ( + "errors" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" +) + +type dupCollector struct { + desc *prometheus.Desc +} + +func newDupCollector() dupCollector { + return dupCollector{ + desc: prometheus.NewDesc("test_metric", "help", nil, prometheus.Labels{"project_id": "p"}), + } +} + +func (c dupCollector) Describe(chan<- *prometheus.Desc) {} + +func (c dupCollector) Collect(ch chan<- prometheus.Metric) { + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, 1) +} + +func registryWithDuplicateSeries(t *testing.T) *prometheus.Registry { + t.Helper() + + registry := prometheus.NewRegistry() + if err := registry.Register(newDupCollector()); err != nil { + t.Fatalf("register first collector: %v", err) + } + if err := registry.Register(newDupCollector()); err != nil { + t.Fatalf("register second collector: %v", err) + } + return registry +} + +func duplicateSeriesError() error { + return errors.New(`collected metric "test_metric" { label:{name:"project_id" value:"p"} gauge:{value:1}} ` + duplicateSeriesErrorMessage) +} + +func TestIgnoreDuplicatesGatherer(t *testing.T) { + t.Parallel() + + families, err := IgnoreDuplicatesGatherer(registryWithDuplicateSeries(t)).Gather() + if err != nil { + t.Fatalf("Gather() error = %v, want nil", err) + } + if len(families) != 1 { + t.Fatalf("Gather() families = %d, want 1", len(families)) + } + if got := len(families[0].Metric); got != 1 { + t.Fatalf("Gather() metrics = %d, want duplicate collapsed to 1", got) + } +} + +func TestSuppressDuplicateSeriesErrors(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + err error + wantPresent string + wantAbsent string + }{ + {"nil", nil, "", ""}, + {"single duplicate", duplicateSeriesError(), "", ""}, + {"all duplicates", prometheus.MultiError{duplicateSeriesError(), duplicateSeriesError()}, "", ""}, + {"real error", errors.New("boom"), "boom", ""}, + {"help mismatch", errors.New(`collected metric "x" has help "a" but should have "b"`), "has help", ""}, + {"mixed", prometheus.MultiError{duplicateSeriesError(), errors.New("boom")}, "boom", duplicateSeriesErrorMessage}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + got := suppressDuplicateSeriesErrors(tt.err) + if tt.wantPresent == "" { + if got != nil { + t.Fatalf("suppressDuplicateSeriesErrors() = %v, want nil", got) + } + return + } + if got == nil || !strings.Contains(got.Error(), tt.wantPresent) { + t.Fatalf("suppressDuplicateSeriesErrors() = %v, want %q kept", got, tt.wantPresent) + } + if tt.wantAbsent != "" && strings.Contains(got.Error(), tt.wantAbsent) { + t.Fatalf("suppressDuplicateSeriesErrors() = %v, want %q dropped", got, tt.wantAbsent) + } + }) + } +} diff --git a/config/config.go b/config/config.go index e6301474..41d2a278 100644 --- a/config/config.go +++ b/config/config.go @@ -37,6 +37,7 @@ const ( DefaultDeltasTTL = 30 * time.Minute DefaultDescriptorTTL = 0 * time.Second DefaultDescriptorGoogleOnly = true + DefaultIgnoreDuplicates = false ) // DefaultRetryStatuses must be treated as immutable after declaration. @@ -62,6 +63,7 @@ type Config struct { AggregateDeltasTTL time.Duration DescriptorCacheTTL time.Duration DescriptorCacheOnlyGoogle bool + IgnoreDuplicates bool // validated is set by Validate on success. validated bool @@ -87,6 +89,7 @@ func NewConfigWithDefaults() *Config { AggregateDeltasTTL: DefaultDeltasTTL, DescriptorCacheTTL: DefaultDescriptorTTL, DescriptorCacheOnlyGoogle: DefaultDescriptorGoogleOnly, + IgnoreDuplicates: DefaultIgnoreDuplicates, } } diff --git a/config/config_test.go b/config/config_test.go index 07b44e34..d70fa206 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -72,6 +72,9 @@ func TestNewConfigWithDefaults(t *testing.T) { if len(c.RetryStatuses) != len(DefaultRetryStatuses) || c.RetryStatuses[0] != DefaultRetryStatuses[0] { t.Errorf("RetryStatuses = %v, want %v", c.RetryStatuses, DefaultRetryStatuses) } + if c.IgnoreDuplicates != DefaultIgnoreDuplicates { + t.Errorf("IgnoreDuplicates = %v, want %v", c.IgnoreDuplicates, DefaultIgnoreDuplicates) + } c.RetryStatuses[0] = 999 if DefaultRetryStatuses[0] == 999 { t.Fatal("NewConfigWithDefaults did not copy RetryStatuses; default mutated") diff --git a/go.mod b/go.mod index 781b43c2..ebe142f2 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.41.0 github.com/prometheus/client_golang v1.23.2 + github.com/prometheus/client_model v0.6.2 github.com/prometheus/common v0.68.1 github.com/prometheus/exporter-toolkit v0.16.0 golang.org/x/oauth2 v0.36.0 @@ -39,7 +40,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/nxadm/tail v1.4.8 // indirect - github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.16.1 // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect diff --git a/stackdriver_exporter.go b/stackdriver_exporter.go index fa7cda26..95fc919f 100644 --- a/stackdriver_exporter.go +++ b/stackdriver_exporter.go @@ -136,6 +136,10 @@ var ( monitoringDescriptorCacheOnlyGoogle = kingpin.Flag( "monitoring.descriptor-cache-only-google", "Only cache descriptors for *.googleapis.com metrics", ).Default(strconv.FormatBool(config.DefaultDescriptorGoogleOnly)).Bool() + + monitoringIgnoreDuplicates = kingpin.Flag( + "monitoring.ignore-duplicates", "Ignore duplicate time series returned by Cloud Monitoring", + ).Default(strconv.FormatBool(config.DefaultIgnoreDuplicates)).Bool() ) func init() { @@ -181,13 +185,11 @@ func newHandler(runtime *collectors.Runtime, logger *slog.Logger, additionalGath if err != nil { return nil, fmt.Errorf("build collectors: %w", err) } - registry := prometheus.NewRegistry() - for _, c := range cs { - if err := registry.Register(c); err != nil { - return nil, fmt.Errorf("register collector: %w", err) - } + gatherer, err := runtime.GathererFor(cs) + if err != nil { + return nil, err } - h.handler = h.handlerFor(registry) + h.handler = h.handlerFor(gatherer) return h, nil } @@ -202,19 +204,18 @@ func (h *handler) filteredHandler(filters map[string]bool) (http.Handler, error) if err != nil { return nil, err } - registry := prometheus.NewRegistry() - for _, c := range cs { - registry.MustRegister(c) + gatherer, err := h.runtime.GathererFor(cs) + if err != nil { + return nil, err } - return h.handlerFor(registry), nil + return h.handlerFor(gatherer), nil } -func (h *handler) handlerFor(registry *prometheus.Registry) http.Handler { - var gatherers prometheus.Gatherer = registry +func (h *handler) handlerFor(gatherers prometheus.Gatherer) http.Handler { if h.additionalGatherer != nil { gatherers = prometheus.Gatherers{ h.additionalGatherer, - registry, + gatherers, } } opts := promhttp.HandlerOpts{ErrorLog: slog.NewLogLogger(h.logger.Handler(), slog.LevelError)} @@ -343,6 +344,7 @@ func collectorConfigFromFlags() *config.Config { AggregateDeltasTTL: *monitoringMetricsDeltasTTL, DescriptorCacheTTL: *monitoringDescriptorCacheTTL, DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle, + IgnoreDuplicates: *monitoringIgnoreDuplicates, } }