Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ If you are still using the legacy [Access scopes][access-scopes], the `https://w
| `monitoring.aggregate-deltas` | No | | If enabled will treat all DELTA metrics as an in-memory counter instead of a gauge. Be sure to read [what to know about aggregating DELTA metrics](#what-to-know-about-aggregating-delta-metrics) |
| `monitoring.aggregate-deltas-ttl` | No | `30m` | How long should a delta metric continue to be exported and stored after GCP stops producing it. Read [slow moving metrics](#slow-moving-metrics) to understand the problem this attempts to solve |
| `monitoring.descriptor-cache-ttl` | No | `0s` | How long should the metric descriptors for a prefixed be cached for |
| `monitoring.ignore-duplicates` | No | `false` | Ignore duplicate time series returned by Cloud Monitoring |
| `stackdriver.max-retries` | No | `0` | Max number of retries that should be attempted on 503 errors from stackdriver. |
| `stackdriver.http-timeout` | No | `10s` | How long should stackdriver_exporter wait for a result from the Stackdriver API. |
| `stackdriver.max-backoff=` | No | | Max time between each request in an exp backoff scenario. |
Expand Down
77 changes: 77 additions & 0 deletions collectors/gatherer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collectors

import (
"errors"
"fmt"
"strings"

"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

const duplicateSeriesErrorMessage = "was collected before with the same name and label values"

// GathererFor builds a gatherer from the given collectors. when IgnoreDuplicates
// is set it also drops duplicate series that Cloud Monitoring returns.
func (r *Runtime) GathererFor(cs []*MonitoringCollector) (prometheus.Gatherer, error) {
registry := prometheus.NewRegistry()
for _, c := range cs {
if err := registry.Register(c); err != nil {
return nil, fmt.Errorf("register collector: %w", err)
}
}
if r.cfg.IgnoreDuplicates {
return IgnoreDuplicatesGatherer(registry), nil
}
return registry, nil
}

// IgnoreDuplicatesGatherer drops the error that shows up when Cloud Monitoring
// returns the same series twice. the series that are left still get gathered,
// and anything that differs by timestamp stays separate.
func IgnoreDuplicatesGatherer(gatherer prometheus.Gatherer) prometheus.Gatherer {
return prometheus.GathererFunc(func() ([]*dto.MetricFamily, error) {
families, err := gatherer.Gather()
return families, suppressDuplicateSeriesErrors(err)
})
}

func suppressDuplicateSeriesErrors(err error) error {
if err == nil {
return nil
}

var multiErr prometheus.MultiError
if !errors.As(err, &multiErr) {
if isDuplicateSeriesError(err) {
return nil
}
return err
}

remaining := prometheus.MultiError{}
for _, e := range multiErr {
if isDuplicateSeriesError(e) {
continue
}
remaining = append(remaining, e)
}
return remaining.MaybeUnwrap()
}

func isDuplicateSeriesError(err error) bool {
return strings.Contains(err.Error(), duplicateSeriesErrorMessage)
}
108 changes: 108 additions & 0 deletions collectors/gatherer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collectors

import (
"errors"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus"
)

type dupCollector struct {
desc *prometheus.Desc
}

func newDupCollector() dupCollector {
return dupCollector{
desc: prometheus.NewDesc("test_metric", "help", nil, prometheus.Labels{"project_id": "p"}),
}
}

func (c dupCollector) Describe(chan<- *prometheus.Desc) {}

func (c dupCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, 1)
}

func registryWithDuplicateSeries(t *testing.T) *prometheus.Registry {
t.Helper()

registry := prometheus.NewRegistry()
if err := registry.Register(newDupCollector()); err != nil {
t.Fatalf("register first collector: %v", err)
}
if err := registry.Register(newDupCollector()); err != nil {
t.Fatalf("register second collector: %v", err)
}
return registry
}

func duplicateSeriesError() error {
return errors.New(`collected metric "test_metric" { label:{name:"project_id" value:"p"} gauge:{value:1}} ` + duplicateSeriesErrorMessage)
}

func TestIgnoreDuplicatesGatherer(t *testing.T) {
t.Parallel()

families, err := IgnoreDuplicatesGatherer(registryWithDuplicateSeries(t)).Gather()
if err != nil {
t.Fatalf("Gather() error = %v, want nil", err)
}
if len(families) != 1 {
t.Fatalf("Gather() families = %d, want 1", len(families))
}
if got := len(families[0].Metric); got != 1 {
t.Fatalf("Gather() metrics = %d, want duplicate collapsed to 1", got)
}
}

func TestSuppressDuplicateSeriesErrors(t *testing.T) {
t.Parallel()

tests := []struct {
name string
err error
wantPresent string
wantAbsent string
}{
{"nil", nil, "", ""},
{"single duplicate", duplicateSeriesError(), "", ""},
{"all duplicates", prometheus.MultiError{duplicateSeriesError(), duplicateSeriesError()}, "", ""},
{"real error", errors.New("boom"), "boom", ""},
{"help mismatch", errors.New(`collected metric "x" has help "a" but should have "b"`), "has help", ""},
{"mixed", prometheus.MultiError{duplicateSeriesError(), errors.New("boom")}, "boom", duplicateSeriesErrorMessage},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

got := suppressDuplicateSeriesErrors(tt.err)
if tt.wantPresent == "" {
if got != nil {
t.Fatalf("suppressDuplicateSeriesErrors() = %v, want nil", got)
}
return
}
if got == nil || !strings.Contains(got.Error(), tt.wantPresent) {
t.Fatalf("suppressDuplicateSeriesErrors() = %v, want %q kept", got, tt.wantPresent)
}
if tt.wantAbsent != "" && strings.Contains(got.Error(), tt.wantAbsent) {
t.Fatalf("suppressDuplicateSeriesErrors() = %v, want %q dropped", got, tt.wantAbsent)
}
})
}
}
3 changes: 3 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
DefaultDeltasTTL = 30 * time.Minute
DefaultDescriptorTTL = 0 * time.Second
DefaultDescriptorGoogleOnly = true
DefaultIgnoreDuplicates = false
)

// DefaultRetryStatuses must be treated as immutable after declaration.
Expand All @@ -62,6 +63,7 @@ type Config struct {
AggregateDeltasTTL time.Duration
DescriptorCacheTTL time.Duration
DescriptorCacheOnlyGoogle bool
IgnoreDuplicates bool

// validated is set by Validate on success.
validated bool
Expand All @@ -87,6 +89,7 @@ func NewConfigWithDefaults() *Config {
AggregateDeltasTTL: DefaultDeltasTTL,
DescriptorCacheTTL: DefaultDescriptorTTL,
DescriptorCacheOnlyGoogle: DefaultDescriptorGoogleOnly,
IgnoreDuplicates: DefaultIgnoreDuplicates,
}
}

Expand Down
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ func TestNewConfigWithDefaults(t *testing.T) {
if len(c.RetryStatuses) != len(DefaultRetryStatuses) || c.RetryStatuses[0] != DefaultRetryStatuses[0] {
t.Errorf("RetryStatuses = %v, want %v", c.RetryStatuses, DefaultRetryStatuses)
}
if c.IgnoreDuplicates != DefaultIgnoreDuplicates {
t.Errorf("IgnoreDuplicates = %v, want %v", c.IgnoreDuplicates, DefaultIgnoreDuplicates)
}
c.RetryStatuses[0] = 999
if DefaultRetryStatuses[0] == 999 {
t.Fatal("NewConfigWithDefaults did not copy RetryStatuses; default mutated")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.41.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/client_model v0.6.2
github.com/prometheus/common v0.68.1
github.com/prometheus/exporter-toolkit v0.16.0
golang.org/x/oauth2 v0.36.0
Expand Down Expand Up @@ -39,7 +40,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
Expand Down
28 changes: 15 additions & 13 deletions stackdriver_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ var (
monitoringDescriptorCacheOnlyGoogle = kingpin.Flag(
"monitoring.descriptor-cache-only-google", "Only cache descriptors for *.googleapis.com metrics",
).Default(strconv.FormatBool(config.DefaultDescriptorGoogleOnly)).Bool()

monitoringIgnoreDuplicates = kingpin.Flag(
"monitoring.ignore-duplicates", "Ignore duplicate time series returned by Cloud Monitoring",
).Default(strconv.FormatBool(config.DefaultIgnoreDuplicates)).Bool()
)

func init() {
Expand Down Expand Up @@ -181,13 +185,11 @@ func newHandler(runtime *collectors.Runtime, logger *slog.Logger, additionalGath
if err != nil {
return nil, fmt.Errorf("build collectors: %w", err)
}
registry := prometheus.NewRegistry()
for _, c := range cs {
if err := registry.Register(c); err != nil {
return nil, fmt.Errorf("register collector: %w", err)
}
gatherer, err := runtime.GathererFor(cs)
if err != nil {
return nil, err
}
h.handler = h.handlerFor(registry)
h.handler = h.handlerFor(gatherer)
return h, nil
}

Expand All @@ -202,19 +204,18 @@ func (h *handler) filteredHandler(filters map[string]bool) (http.Handler, error)
if err != nil {
return nil, err
}
registry := prometheus.NewRegistry()
for _, c := range cs {
registry.MustRegister(c)
gatherer, err := h.runtime.GathererFor(cs)
if err != nil {
return nil, err
}
return h.handlerFor(registry), nil
return h.handlerFor(gatherer), nil
}

func (h *handler) handlerFor(registry *prometheus.Registry) http.Handler {
var gatherers prometheus.Gatherer = registry
func (h *handler) handlerFor(gatherers prometheus.Gatherer) http.Handler {
if h.additionalGatherer != nil {
gatherers = prometheus.Gatherers{
h.additionalGatherer,
registry,
gatherers,
}
}
opts := promhttp.HandlerOpts{ErrorLog: slog.NewLogLogger(h.logger.Handler(), slog.LevelError)}
Expand Down Expand Up @@ -343,6 +344,7 @@ func collectorConfigFromFlags() *config.Config {
AggregateDeltasTTL: *monitoringMetricsDeltasTTL,
DescriptorCacheTTL: *monitoringDescriptorCacheTTL,
DescriptorCacheOnlyGoogle: *monitoringDescriptorCacheOnlyGoogle,
IgnoreDuplicates: *monitoringIgnoreDuplicates,
}
}

Expand Down