From 63d595e5045a5079adbe20f5f02582cc0e24ad74 Mon Sep 17 00:00:00 2001 From: Elizabeth Chatman Date: Fri, 5 Jun 2026 15:15:25 -0700 Subject: [PATCH] otelriver: Add `kinds` span attribute to `insert_many` spans Emit the distinct, sorted job kinds present in each batch as a `kinds` string-slice attribute on `river.insert_many` spans. This surfaces which kinds were inserted in each batch for trace queries without affecting metric cardinality. Co-authored-by: Cursor --- CHANGELOG.md | 1 + otelriver/middleware.go | 13 ++++++++++++- otelriver/middleware_test.go | 31 +++++++++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78d8abb..5319301 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added `unique_skipped_as_duplicate` attributes to otel `insert_many` spans and `insert_count` metric. [PR #58](https://github.com/riverqueue/rivercontrib/pull/58). +- Add `kinds` span attribute to `otelriver` `insert_many` spans listing the distinct job kinds in each batch. [PR #62](https://github.com/riverqueue/rivercontrib/pull/62). ### Changed diff --git a/otelriver/middleware.go b/otelriver/middleware.go index 5878809..374a70d 100644 --- a/otelriver/middleware.go +++ b/otelriver/middleware.go @@ -4,6 +4,7 @@ import ( "cmp" "context" "errors" + "slices" "time" "go.opentelemetry.io/otel" @@ -164,8 +165,18 @@ func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.Job } } + kinds := make([]string, 0, len(manyParams)) + for _, p := range manyParams { + kinds = append(kinds, p.Kind) + } + slices.Sort(kinds) + kinds = slices.Compact(kinds) + span.SetAttributes(attrs...) // set after finalizing status - span.SetAttributes(attribute.Int64("unique_skipped_as_duplicate_count", skipped)) + span.SetAttributes( + attribute.Int64("unique_skipped_as_duplicate_count", skipped), + attribute.StringSlice("kinds", kinds), + ) // This allocates a new slice, so make sure to do it as few times as possible. measurementOpt := metric.WithAttributes(attrs...) diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go index 7097dfd..f75eb00 100644 --- a/otelriver/middleware_test.go +++ b/otelriver/middleware_test.go @@ -83,6 +83,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, "river.insert_many", span.Name) require.Equal(t, codes.Ok, span.Status.Code) require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) + require.Equal(t, []string{"no_op"}, getAttribute(t, span.Attributes, "kinds").AsStringSlice()) var ( expectedAttrs = []attribute.KeyValue{ @@ -130,6 +131,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "error from doInner", span.Status.Description) require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) + require.Equal(t, []string{"no_op"}, getAttribute(t, span.Attributes, "kinds").AsStringSlice()) var ( expectedAttrs = []attribute.KeyValue{ @@ -172,6 +174,7 @@ func TestMiddleware(t *testing.T) { require.Equal(t, codes.Error, span.Status.Code) require.Equal(t, "panic", span.Status.Description) require.EqualValues(t, 0, getAttribute(t, span.Attributes, "unique_skipped_as_duplicate_count").AsInt64()) + require.Equal(t, []string{"no_op"}, getAttribute(t, span.Attributes, "kinds").AsStringSlice()) var ( expectedAttrs = []attribute.KeyValue{ @@ -273,6 +276,34 @@ func TestMiddleware(t *testing.T) { requireSumByAttrs(t, metrics, "river.insert_count", 5) }) + t.Run("InsertManyMixedKinds", func(t *testing.T) { + t.Parallel() + + middleware, bundle := setup(t) + + doInner := func(ctx context.Context) ([]*rivertype.JobInsertResult, error) { + return []*rivertype.JobInsertResult{ + {Job: &rivertype.JobRow{ID: 1}}, + {Job: &rivertype.JobRow{ID: 2}}, + {Job: &rivertype.JobRow{ID: 3}}, + }, nil + } + + _, err := middleware.InsertMany(ctx, []*rivertype.JobInsertParams{ + {Kind: "email_send"}, + {Kind: "notification"}, + {Kind: "email_send"}, + }, doInner) + require.NoError(t, err) + + spans := bundle.traceExporter.GetSpans() + require.Len(t, spans, 1) + // kinds are deduplicated and sorted so identical batches produce + // identical span attributes. + require.Equal(t, []string{"email_send", "notification"}, + getAttribute(t, spans[0].Attributes, "kinds").AsStringSlice()) + }) + t.Run("InsertManyDurationUnitMS", func(t *testing.T) { t.Parallel()