add case in otelriver middleware to handle riverbatch batch results#54
Merged
brandur merged 3 commits intoMay 15, 2026
Merged
Conversation
Contributor
|
Hey Jack, thanks for opening the PR! I agree that it might be cleaner to have Want to try adding a couple test cases to verify the expected functionality? Here's a couple that my LLM came up with for example (this may be a tad overkill too, I could see having fewer than this): diff --git a/otelriver/middleware_test.go b/otelriver/middleware_test.go
index a0b84e3..53b649d 100644
--- a/otelriver/middleware_test.go
+++ b/otelriver/middleware_test.go
@@ -402,6 +402,117 @@ func TestMiddleware(t *testing.T) {
require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
})
+ t.Run("WorkBatchResultWithJobError", func(t *testing.T) {
+ t.Parallel()
+
+ middleware, bundle := setup(t)
+
+ jobErr := errors.New("job-specific error")
+ doInner := func(ctx context.Context) error {
+ return &fakeBatchError{
+ errorsByID: map[int64]error{
+ 123: jobErr,
+ 456: errors.New("other job error"),
+ },
+ }
+ }
+
+ err := middleware.Work(ctx, &rivertype.JobRow{
+ ID: 123,
+ Kind: "no_op",
+ }, doInner)
+ // The original batch error is returned (not unwrapped by the middleware).
+ require.Error(t, err)
+
+ spans := bundle.traceExporter.GetSpans()
+ require.Len(t, spans, 1)
+
+ span := spans[0]
+ require.Equal(t, "error", getAttribute(t, span.Attributes, "status").AsString())
+ require.Equal(t, codes.Error, span.Status.Code)
+ require.Equal(t, "job-specific error", span.Status.Description)
+ })
+
+ t.Run("WorkBatchResultWithNoJobError", func(t *testing.T) {
+ t.Parallel()
+
+ middleware, bundle := setup(t)
+
+ doInner := func(ctx context.Context) error {
+ return &fakeBatchError{
+ errorsByID: map[int64]error{
+ 456: errors.New("other job error"),
+ },
+ }
+ }
+
+ err := middleware.Work(ctx, &rivertype.JobRow{
+ ID: 123,
+ Kind: "no_op",
+ }, doInner)
+ require.Error(t, err)
+
+ spans := bundle.traceExporter.GetSpans()
+ require.Len(t, spans, 1)
+
+ span := spans[0]
+ // Job 123 has no error in the batch, so span status should be ok.
+ require.Equal(t, "ok", getAttribute(t, span.Attributes, "status").AsString())
+ require.Equal(t, codes.Ok, span.Status.Code)
+ })
+
+ t.Run("WorkBatchResultWithJobCancelError", func(t *testing.T) {
+ t.Parallel()
+
+ middleware, bundle := setup(t)
+
+ doInner := func(ctx context.Context) error {
+ return &fakeBatchError{
+ errorsByID: map[int64]error{
+ 123: rivertype.JobCancel(errors.New("cancelled")),
+ },
+ }
+ }
+
+ err := middleware.Work(ctx, &rivertype.JobRow{
+ ID: 123,
+ Kind: "no_op",
+ }, doInner)
+ require.Error(t, err)
+
+ spans := bundle.traceExporter.GetSpans()
+ require.Len(t, spans, 1)
+
+ span := spans[0]
+ require.True(t, getAttribute(t, span.Attributes, "cancel").AsBool())
+ })
+
+ t.Run("WorkBatchResultWithJobSnoozeError", func(t *testing.T) {
+ t.Parallel()
+
+ middleware, bundle := setup(t)
+
+ doInner := func(ctx context.Context) error {
+ return &fakeBatchError{
+ errorsByID: map[int64]error{
+ 123: &rivertype.JobSnoozeError{},
+ },
+ }
+ }
+
+ err := middleware.Work(ctx, &rivertype.JobRow{
+ ID: 123,
+ Kind: "no_op",
+ }, doInner)
+ require.Error(t, err)
+
+ spans := bundle.traceExporter.GetSpans()
+ require.Len(t, spans, 1)
+
+ span := spans[0]
+ require.True(t, getAttribute(t, span.Attributes, "snooze").AsBool())
+ })
+
t.Run("WorkPanic", func(t *testing.T) {
t.Parallel()
@@ -543,6 +654,16 @@ func TestMiddleware(t *testing.T) {
})
}
+// fakeBatchError simulates the error type returned by riverpro/riverbatch,
+// which wraps per-job errors in a single error that implements ErrorsByID().
+type fakeBatchError struct {
+ errorsByID map[int64]error
+}
+
+func (e *fakeBatchError) Error() string { return "batch error" }
+
+func (e *fakeBatchError) ErrorsByID() map[int64]error { return e.errorsByID }
+
func getAttribute(t *testing.T, attrs []attribute.KeyValue, key string) attribute.Value {
t.Helper() |
071cca4 to
8f1b545
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
As per
riverpro/riverbatch'sbatchWorker.Work:This

MultiErrorfulfils its purpose correctly, but results in an error span status when combined withotelrivereven when all jobs in thatMultiErrorcontain nil errors. This results in spans like the following:i.e. The only supposed error in this batch is
nil(we've confirmed the job resulted in the correct result in our datastore).This PR contains what I believe is the most minimal fix, although I do think if
MultiErrorwere moved intorivertype, it'd be a bit cleaner than this manually defined interface I've had to do instead.