From 079e577b49a63c51cc6575a57250d853d5137213 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Mon, 4 Jan 2021 15:23:19 -0800 Subject: [PATCH 1/2] Pass at least one key to script executions This allows Redis to route our execution to the correct node when running in a Redis Cluster. --- redis_queue.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/redis_queue.go b/redis_queue.go index 6023e47..e2a0e8a 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -16,6 +16,13 @@ type redisQueue struct { findScript *redis.Script } +// scriptKey returns a slice of strings containing at least one of the keys to +// be used by a script. This allows Redis route our script execution to the +// correct node in the event we're using a namespace. +func scriptKey(ns, queueID string) []string { + return []string{strings.Join([]string{ns, "queue", queueID}, ":")} +} + // NewRedisQueue creates a new queue stored in redis. func NewRedisQueue(client redis.UniversalClient) Queue { enqueueScript := redis.NewScript(` @@ -153,7 +160,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { args[2+3*i+1] = job.ID args[2+3*i+2] = jobm } - return q.enqueueScript.Run(q.client, nil, args...).Err() + return q.enqueueScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) { @@ -169,7 +176,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro if err != nil { return nil, err } - res, err := q.dequeueScript.Run(q.client, nil, + res, err := q.dequeueScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), opt.Namespace, opt.QueueID, opt.At.Unix(), @@ -213,7 +220,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error { for i, job := range jobs { args[2+i] = job.ID } - return q.ackScript.Run(q.client, nil, args...).Err() + return q.ackScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) { @@ -229,7 +236,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) for i, jobID := range jobIDs { args[1+i] = jobID } - res, err := q.findScript.Run(q.client, nil, args...).Result() + res, err := q.findScript.Run(q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() if err != nil { return nil, err } From 54a220546e33f58ca635acb257b8e05bcfc5c02c Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Thu, 8 Jul 2021 15:21:05 -0700 Subject: [PATCH 2/2] Expose context to middleware A recent upstream change bumped Work to go-redis/v8, which uses a context everywhere. Despite this, the context wasn't available to the middleware, which meant we couldn't pass execution-specific information to the actual handler. This commit effectively moves everything from HandlerFunc to ContextHandlerFunc, passing the context to the middleware and underlying operations. --- cmd/enqueuer/main.go | 3 +- go.sum | 14 +++++ job.go | 19 ++++--- metrics.go | 7 ++- middleware/concurrent/dequeuer.go | 9 +-- middleware/concurrent/dequeuer_test.go | 16 +++--- middleware/discard/after.go | 7 ++- middleware/discard/after_test.go | 7 ++- middleware/discard/invalid_payload.go | 7 ++- middleware/discard/invalid_payload_test.go | 5 +- middleware/discard/max_retry.go | 8 ++- middleware/discard/max_retry_test.go | 7 ++- middleware/logrus/logger.go | 11 ++-- middleware/logrus/logger_test.go | 17 +++--- middleware/prometheus/metrics.go | 15 ++--- middleware/prometheus/metrics_test.go | 17 +++--- middleware/unique/enqueuer.go | 8 +-- middleware/unique/enqueuer_test.go | 17 +++--- redis_queue.go | 34 +++++------ redis_queue_test.go | 39 +++++++------ redislock/lock.go | 8 +-- sidekiq/external_queue.go | 18 +++--- sidekiq/queue.go | 12 ++-- sidekiq/queue_test.go | 62 ++++++++++++--------- worker.go | 65 +++++++++++----------- worker_test.go | 58 ++++++++++--------- 26 files changed, 272 insertions(+), 218 deletions(-) diff --git a/cmd/enqueuer/main.go b/cmd/enqueuer/main.go index 5b30aee..b144d71 100644 --- a/cmd/enqueuer/main.go +++ b/cmd/enqueuer/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "log" @@ -24,7 +25,7 @@ func main() { job := work.NewJob() job.MarshalPayload(flag.Args()) - err = queue.Enqueue(job, &work.EnqueueOptions{ + err = queue.Enqueue(context.Background(), job, &work.EnqueueOptions{ Namespace: *namespace, QueueID: "cmd_queue", }) diff --git a/go.sum b/go.sum index 24c7fd4..1095814 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= @@ -44,6 +45,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= @@ -58,8 +60,10 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -69,12 +73,15 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= +github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4= github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ= github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -121,6 +128,7 @@ go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g= go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8= go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU= +go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw= go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw= go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= @@ -139,6 +147,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -169,6 +178,7 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7q golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -176,6 +186,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -188,13 +199,16 @@ google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/job.go b/job.go index 4fd6c32..751b67d 100644 --- a/job.go +++ b/job.go @@ -2,6 +2,7 @@ package work import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -137,13 +138,13 @@ func (opt *EnqueueOptions) Validate() error { // Enqueuer enqueues a job. type Enqueuer interface { - Enqueue(*Job, *EnqueueOptions) error + Enqueue(context.Context, *Job, *EnqueueOptions) error } // ExternalEnqueuer enqueues a job with other queue protocol. // Queue adaptor that implements this can publish jobs directly to other types of queue systems. type ExternalEnqueuer interface { - ExternalEnqueue(*Job, *EnqueueOptions) error + ExternalEnqueue(context.Context, *Job, *EnqueueOptions) error } // DequeueOptions specifies how a job is dequeued. @@ -202,8 +203,8 @@ var ( // Dequeuer dequeues a job. // If a job is processed successfully, call Ack() to delete the job. type Dequeuer interface { - Dequeue(*DequeueOptions) (*Job, error) - Ack(*Job, *AckOptions) error + Dequeue(context.Context, *DequeueOptions) (*Job, error) + Ack(context.Context, *Job, *AckOptions) error } // Queue can enqueue and dequeue jobs. @@ -214,19 +215,19 @@ type Queue interface { // BulkEnqueuer enqueues jobs in a batch. type BulkEnqueuer interface { - BulkEnqueue([]*Job, *EnqueueOptions) error + BulkEnqueue(context.Context, []*Job, *EnqueueOptions) error } // ExternalBulkEnqueuer enqueues jobs in a batch with other queue protocol. // Queue adaptor that implements this can publish jobs directly to other types of queue systems. type ExternalBulkEnqueuer interface { - ExternalBulkEnqueue([]*Job, *EnqueueOptions) error + ExternalBulkEnqueue(context.Context, []*Job, *EnqueueOptions) error } // BulkDequeuer dequeues jobs in a batch. type BulkDequeuer interface { - BulkDequeue(int64, *DequeueOptions) ([]*Job, error) - BulkAck([]*Job, *AckOptions) error + BulkDequeue(context.Context, int64, *DequeueOptions) ([]*Job, error) + BulkAck(context.Context, []*Job, *AckOptions) error } // FindOptions specifies how a job is searched from a queue. @@ -247,5 +248,5 @@ func (opt *FindOptions) Validate() error { // It returns nil if the job is no longer in the queue. // The length of the returned job list will be equal to the length of jobIDs. type BulkJobFinder interface { - BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error) + BulkFind(ctx context.Context, jobIDs []string, opts *FindOptions) ([]*Job, error) } diff --git a/metrics.go b/metrics.go index 557f673..3b662d1 100644 --- a/metrics.go +++ b/metrics.go @@ -1,6 +1,9 @@ package work -import "time" +import ( + "context" + "time" +) // QueueMetrics contains metrics from a queue. type QueueMetrics struct { @@ -14,7 +17,7 @@ type QueueMetrics struct { // MetricsExporter can be implemented by Queue to report metrics. type MetricsExporter interface { - GetQueueMetrics(*QueueMetricsOptions) (*QueueMetrics, error) + GetQueueMetrics(context.Context, *QueueMetricsOptions) (*QueueMetrics, error) } // Metrics wraps metrics reported by MetricsExporter. diff --git a/middleware/concurrent/dequeuer.go b/middleware/concurrent/dequeuer.go index 4b3d8f6..a47f431 100644 --- a/middleware/concurrent/dequeuer.go +++ b/middleware/concurrent/dequeuer.go @@ -1,6 +1,7 @@ package concurrent import ( + "context" "fmt" "github.com/go-redis/redis/v8" @@ -25,7 +26,7 @@ func Dequeuer(copt *DequeuerOptions) work.DequeueMiddleware { if workerID == "" { workerID = uuid.NewString() } - return func(opt *work.DequeueOptions) (*work.Job, error) { + return func(ctx context.Context, opt *work.DequeueOptions) (*work.Job, error) { lock := &redislock.Lock{ Client: copt.Client, Key: fmt.Sprintf("%s:lock:%s", opt.Namespace, opt.QueueID), @@ -34,7 +35,7 @@ func Dequeuer(copt *DequeuerOptions) work.DequeueMiddleware { ExpireInSec: opt.InvisibleSec, MaxAcquirers: copt.Max, } - acquired, err := lock.Acquire() + acquired, err := lock.Acquire(ctx) if err != nil { return nil, err } @@ -42,9 +43,9 @@ func Dequeuer(copt *DequeuerOptions) work.DequeueMiddleware { return nil, work.ErrEmptyQueue } if !copt.disableUnlock { - defer lock.Release() + defer lock.Release(ctx) } - return f(opt) + return f(ctx, opt) } } } diff --git a/middleware/concurrent/dequeuer_test.go b/middleware/concurrent/dequeuer_test.go index 29034b4..c7d1a2f 100644 --- a/middleware/concurrent/dequeuer_test.go +++ b/middleware/concurrent/dequeuer_test.go @@ -13,6 +13,7 @@ import ( ) func TestDequeuer(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -24,7 +25,7 @@ func TestDequeuer(t *testing.T) { InvisibleSec: 60, } var called int - h := func(*work.DequeueOptions) (*work.Job, error) { + h := func(context.Context, *work.DequeueOptions) (*work.Job, error) { called++ return work.NewJob(), nil } @@ -35,7 +36,7 @@ func TestDequeuer(t *testing.T) { Max: 2, workerID: fmt.Sprintf("w%d", i), }) - _, err := deq(h)(opt) + _, err := deq(h)(ctx, opt) require.NoError(t, err) } @@ -50,7 +51,7 @@ func TestDequeuer(t *testing.T) { workerID: fmt.Sprintf("w%d", i), disableUnlock: true, }) - _, err := deq(h)(opt) + _, err := deq(h)(ctx, opt) if i <= 1 { require.NoError(t, err) @@ -85,7 +86,7 @@ func TestDequeuer(t *testing.T) { workerID: "w0", disableUnlock: true, }) - _, err := deq(h)(&optLater) + _, err := deq(h)(ctx, &optLater) require.Equal(t, work.ErrEmptyQueue, err) } require.Equal(t, 5, called) @@ -113,7 +114,7 @@ func TestDequeuer(t *testing.T) { workerID: fmt.Sprintf("w%d", i), disableUnlock: true, }) - _, err := deq(h)(&optExpired) + _, err := deq(h)(ctx, &optExpired) if i < 5 { require.NoError(t, err) } else { @@ -138,6 +139,7 @@ func TestDequeuer(t *testing.T) { } func BenchmarkConcurrency(b *testing.B) { + ctx := context.Background() b.StopTimer() client := redistest.NewClient() @@ -155,14 +157,14 @@ func BenchmarkConcurrency(b *testing.B) { Max: 1, }) var called int - h := deq(func(*work.DequeueOptions) (*work.Job, error) { + h := deq(func(context.Context, *work.DequeueOptions) (*work.Job, error) { called++ return work.NewJob(), nil }) b.StartTimer() for n := 0; n < b.N; n++ { - h(opt) + h(ctx, opt) } b.StopTimer() require.Equal(b, b.N, called) diff --git a/middleware/discard/after.go b/middleware/discard/after.go index 3a37629..04143da 100644 --- a/middleware/discard/after.go +++ b/middleware/discard/after.go @@ -1,6 +1,7 @@ package discard import ( + "context" "time" "github.com/taylorchu/work" @@ -8,12 +9,12 @@ import ( // After discards a job if it is already stale. func After(d time.Duration) work.HandleMiddleware { - return func(f work.HandleFunc) work.HandleFunc { - return func(job *work.Job, opt *work.DequeueOptions) error { + return func(f work.ContextHandleFunc) work.ContextHandleFunc { + return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error { if time.Since(job.CreatedAt) > d { return work.ErrUnrecoverable } - err := f(job, opt) + err := f(ctx, job, opt) if time.Since(job.CreatedAt) > d { return work.ErrUnrecoverable } diff --git a/middleware/discard/after_test.go b/middleware/discard/after_test.go index e07bf83..4f0473e 100644 --- a/middleware/discard/after_test.go +++ b/middleware/discard/after_test.go @@ -1,6 +1,7 @@ package discard import ( + "context" "errors" "testing" "time" @@ -16,15 +17,15 @@ func TestAfter(t *testing.T) { QueueID: "q1", } d := After(time.Minute) - h := d(func(*work.Job, *work.DequeueOptions) error { + h := d(func(context.Context, *work.Job, *work.DequeueOptions) error { return errors.New("no reason") }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.Error(t, err) require.NotEqual(t, work.ErrUnrecoverable, err) job.CreatedAt = job.CreatedAt.Add(-time.Hour) - err = h(job, opt) + err = h(context.Background(), job, opt) require.Equal(t, work.ErrUnrecoverable, err) } diff --git a/middleware/discard/invalid_payload.go b/middleware/discard/invalid_payload.go index 036a873..8c7a7ff 100644 --- a/middleware/discard/invalid_payload.go +++ b/middleware/discard/invalid_payload.go @@ -1,15 +1,16 @@ package discard import ( + "context" "errors" "github.com/taylorchu/work" ) // InvalidPayload discards a job if it has decode error. -func InvalidPayload(f work.HandleFunc) work.HandleFunc { - return func(job *work.Job, opt *work.DequeueOptions) error { - err := f(job, opt) +func InvalidPayload(f work.ContextHandleFunc) work.ContextHandleFunc { + return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error { + err := f(ctx, job, opt) if err != nil { var perr *work.InvalidJobPayloadError if errors.As(err, &perr) { diff --git a/middleware/discard/invalid_payload_test.go b/middleware/discard/invalid_payload_test.go index 5ef2c8c..17356eb 100644 --- a/middleware/discard/invalid_payload_test.go +++ b/middleware/discard/invalid_payload_test.go @@ -1,6 +1,7 @@ package discard import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -13,12 +14,12 @@ func TestInvalidPayload(t *testing.T) { Namespace: "{ns1}", QueueID: "q1", } - h := InvalidPayload(func(*work.Job, *work.DequeueOptions) error { + h := InvalidPayload(func(context.Context, *work.Job, *work.DequeueOptions) error { var s string return job.UnmarshalPayload(&s) }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.Error(t, err) require.Equal(t, work.ErrUnrecoverable, err) } diff --git a/middleware/discard/max_retry.go b/middleware/discard/max_retry.go index 5c8192a..abe7129 100644 --- a/middleware/discard/max_retry.go +++ b/middleware/discard/max_retry.go @@ -1,14 +1,16 @@ package discard import ( + "context" + "github.com/taylorchu/work" ) // MaxRetry discards a job if its retry count is over limit. func MaxRetry(n int64) work.HandleMiddleware { - return func(f work.HandleFunc) work.HandleFunc { - return func(job *work.Job, opt *work.DequeueOptions) error { - err := f(job, opt) + return func(f work.ContextHandleFunc) work.ContextHandleFunc { + return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error { + err := f(ctx, job, opt) if job.Retries >= n { return work.ErrUnrecoverable } diff --git a/middleware/discard/max_retry_test.go b/middleware/discard/max_retry_test.go index c8b15a7..4daff75 100644 --- a/middleware/discard/max_retry_test.go +++ b/middleware/discard/max_retry_test.go @@ -1,6 +1,7 @@ package discard import ( + "context" "errors" "testing" @@ -15,15 +16,15 @@ func TestMaxRetry(t *testing.T) { QueueID: "q1", } d := MaxRetry(1) - h := d(func(*work.Job, *work.DequeueOptions) error { + h := d(func(context.Context, *work.Job, *work.DequeueOptions) error { return errors.New("no reason") }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.Error(t, err) require.NotEqual(t, work.ErrUnrecoverable, err) job.Retries = 1 - err = h(job, opt) + err = h(context.Background(), job, opt) require.Equal(t, work.ErrUnrecoverable, err) } diff --git a/middleware/logrus/logger.go b/middleware/logrus/logger.go index 2902108..8e3079c 100644 --- a/middleware/logrus/logger.go +++ b/middleware/logrus/logger.go @@ -1,6 +1,7 @@ package logrus import ( + "context" "time" "github.com/sirupsen/logrus" @@ -8,15 +9,15 @@ import ( ) // HandleFuncLogger logs job execution with logrus structured logger. -func HandleFuncLogger(f work.HandleFunc) work.HandleFunc { - return func(job *work.Job, opt *work.DequeueOptions) error { +func HandleFuncLogger(f work.ContextHandleFunc) work.ContextHandleFunc { + return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error { logger := logrus.WithFields(logrus.Fields{ "queue": opt.QueueID, "namespace": opt.Namespace, "job": job.ID, }) startTime := time.Now() - err := f(job, opt) + err := f(ctx, job, opt) if err != nil { logger.WithFields(logrus.Fields{ "last_error": job.LastError, @@ -34,13 +35,13 @@ func HandleFuncLogger(f work.HandleFunc) work.HandleFunc { // EnqueueFuncLogger logs job enqueuing with logrus structured logger. func EnqueueFuncLogger(f work.EnqueueFunc) work.EnqueueFunc { - return func(job *work.Job, opt *work.EnqueueOptions) error { + return func(ctx context.Context, job *work.Job, opt *work.EnqueueOptions) error { logger := logrus.WithFields(logrus.Fields{ "queue": opt.QueueID, "namespace": opt.Namespace, "job": job.ID, }) - err := f(job, opt) + err := f(ctx, job, opt) if err != nil { logger.WithError(err).Error("Job failed to enqueue.") return err diff --git a/middleware/logrus/logger_test.go b/middleware/logrus/logger_test.go index effb308..ee88869 100644 --- a/middleware/logrus/logger_test.go +++ b/middleware/logrus/logger_test.go @@ -1,6 +1,7 @@ package logrus import ( + "context" "errors" "testing" @@ -14,17 +15,17 @@ func TestHandleFuncLogger(t *testing.T) { Namespace: "{ns1}", QueueID: "q1", } - h := HandleFuncLogger(func(*work.Job, *work.DequeueOptions) error { + h := HandleFuncLogger(func(context.Context, *work.Job, *work.DequeueOptions) error { return nil }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.NoError(t, err) - h = HandleFuncLogger(func(*work.Job, *work.DequeueOptions) error { + h = HandleFuncLogger(func(context.Context, *work.Job, *work.DequeueOptions) error { return errors.New("no reason") }) - err = h(job, opt) + err = h(context.Background(), job, opt) require.Error(t, err) } @@ -34,16 +35,16 @@ func TestEnqueueFuncLogger(t *testing.T) { Namespace: "{ns1}", QueueID: "q1", } - h := EnqueueFuncLogger(func(*work.Job, *work.EnqueueOptions) error { + h := EnqueueFuncLogger(func(context.Context, *work.Job, *work.EnqueueOptions) error { return nil }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.NoError(t, err) - h = EnqueueFuncLogger(func(*work.Job, *work.EnqueueOptions) error { + h = EnqueueFuncLogger(func(context.Context, *work.Job, *work.EnqueueOptions) error { return errors.New("no reason") }) - err = h(job, opt) + err = h(context.Background(), job, opt) require.Error(t, err) } diff --git a/middleware/prometheus/metrics.go b/middleware/prometheus/metrics.go index a5fea21..7d3aa52 100644 --- a/middleware/prometheus/metrics.go +++ b/middleware/prometheus/metrics.go @@ -1,6 +1,7 @@ package prometheus import ( + "context" "time" "github.com/prometheus/client_golang/prometheus" @@ -70,12 +71,12 @@ func init() { } // HandleFuncMetrics adds prometheus metrics like executed job count. -func HandleFuncMetrics(f work.HandleFunc) work.HandleFunc { - return func(job *work.Job, opt *work.DequeueOptions) error { +func HandleFuncMetrics(f work.ContextHandleFunc) work.ContextHandleFunc { + return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error { jobBusy.WithLabelValues(opt.Namespace, opt.QueueID).Inc() defer jobBusy.WithLabelValues(opt.Namespace, opt.QueueID).Dec() startTime := time.Now() - err := f(job, opt) + err := f(ctx, job, opt) if err != nil { jobExecutedTotal.WithLabelValues(opt.Namespace, opt.QueueID, "failure").Inc() return err @@ -88,8 +89,8 @@ func HandleFuncMetrics(f work.HandleFunc) work.HandleFunc { // EnqueueFuncMetrics adds prometheus metrics like enqueued job count. func EnqueueFuncMetrics(f work.EnqueueFunc) work.EnqueueFunc { - return func(job *work.Job, opt *work.EnqueueOptions) error { - err := f(job, opt) + return func(ctx context.Context, job *work.Job, opt *work.EnqueueOptions) error { + err := f(ctx, job, opt) if err != nil { jobEnqueuedTotal.WithLabelValues(opt.Namespace, opt.QueueID, "failure").Inc() return err @@ -100,8 +101,8 @@ func EnqueueFuncMetrics(f work.EnqueueFunc) work.EnqueueFunc { } // ExportWorkerMetrics adds prometheus metrics from work.Worker. -func ExportWorkerMetrics(w *work.Worker) error { - all, err := w.ExportMetrics() +func ExportWorkerMetrics(ctx context.Context, w *work.Worker) error { + all, err := w.ExportMetrics(ctx) if err != nil { return err } diff --git a/middleware/prometheus/metrics_test.go b/middleware/prometheus/metrics_test.go index fda8249..4345c83 100644 --- a/middleware/prometheus/metrics_test.go +++ b/middleware/prometheus/metrics_test.go @@ -1,6 +1,7 @@ package prometheus import ( + "context" "errors" "net/http/httptest" "testing" @@ -16,17 +17,17 @@ func TestHandleFuncMetrics(t *testing.T) { Namespace: "{ns1}", QueueID: "q1", } - h := HandleFuncMetrics(func(*work.Job, *work.DequeueOptions) error { + h := HandleFuncMetrics(func(context.Context, *work.Job, *work.DequeueOptions) error { return nil }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.NoError(t, err) - h = HandleFuncMetrics(func(*work.Job, *work.DequeueOptions) error { + h = HandleFuncMetrics(func(context.Context, *work.Job, *work.DequeueOptions) error { return errors.New("no reason") }) - err = h(job, opt) + err = h(context.Background(), job, opt) require.Error(t, err) r := httptest.NewRecorder() @@ -47,17 +48,17 @@ func TestEnqueueFuncMetrics(t *testing.T) { Namespace: "{ns1}", QueueID: "q1", } - h := EnqueueFuncMetrics(func(*work.Job, *work.EnqueueOptions) error { + h := EnqueueFuncMetrics(func(context.Context, *work.Job, *work.EnqueueOptions) error { return nil }) - err := h(job, opt) + err := h(context.Background(), job, opt) require.NoError(t, err) - h = EnqueueFuncMetrics(func(*work.Job, *work.EnqueueOptions) error { + h = EnqueueFuncMetrics(func(context.Context, *work.Job, *work.EnqueueOptions) error { return errors.New("no reason") }) - err = h(job, opt) + err = h(context.Background(), job, opt) require.Error(t, err) r := httptest.NewRecorder() diff --git a/middleware/unique/enqueuer.go b/middleware/unique/enqueuer.go index 3ab8354..7f92b1e 100644 --- a/middleware/unique/enqueuer.go +++ b/middleware/unique/enqueuer.go @@ -31,13 +31,13 @@ var ( // Enqueuer uses UniqueFunc to ensure job uniqueness in a period. func Enqueuer(eopt *EnqueuerOptions) work.EnqueueMiddleware { return func(f work.EnqueueFunc) work.EnqueueFunc { - return func(job *work.Job, opt *work.EnqueueOptions) error { + return func(ctx context.Context, job *work.Job, opt *work.EnqueueOptions) error { b, expireIn, err := eopt.UniqueFunc(job, opt) if err != nil { return err } if b == nil { - return f(job, opt) + return f(ctx, job, opt) } if expireIn <= 0 { return ErrDedupDuration @@ -49,12 +49,12 @@ func Enqueuer(eopt *EnqueuerOptions) work.EnqueueMiddleware { return err } key := fmt.Sprintf("%s:unique:%s:%x", opt.Namespace, opt.QueueID, h.Sum(nil)) - notExist, err := eopt.Client.SetNX(context.Background(), key, 1, expireIn).Result() + notExist, err := eopt.Client.SetNX(ctx, key, 1, expireIn).Result() if err != nil { return err } if notExist { - return f(job, opt) + return f(ctx, job, opt) } return nil } diff --git a/middleware/unique/enqueuer_test.go b/middleware/unique/enqueuer_test.go index 3c217e7..456547c 100644 --- a/middleware/unique/enqueuer_test.go +++ b/middleware/unique/enqueuer_test.go @@ -23,13 +23,13 @@ func TestEnqueuerBypass(t *testing.T) { }) var called int - h := enq(func(*work.Job, *work.EnqueueOptions) error { + h := enq(func(context.Context, *work.Job, *work.EnqueueOptions) error { called++ return nil }) for i := 0; i < 3; i++ { job := work.NewJob() - err := h(job, &work.EnqueueOptions{ + err := h(context.Background(), job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -39,6 +39,7 @@ func TestEnqueuerBypass(t *testing.T) { } func TestEnqueuer(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -51,13 +52,13 @@ func TestEnqueuer(t *testing.T) { }) var called int - h := enq(func(*work.Job, *work.EnqueueOptions) error { + h := enq(func(context.Context, *work.Job, *work.EnqueueOptions) error { called++ return nil }) for i := 0; i < 3; i++ { job := work.NewJob() - err := h(job, &work.EnqueueOptions{ + err := h(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -66,9 +67,9 @@ func TestEnqueuer(t *testing.T) { require.Equal(t, 1, called) for i := 0; i < 3; i++ { - require.NoError(t, client.Del(context.Background(), "{ns1}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) + require.NoError(t, client.Del(ctx, "{ns1}:unique:q1:9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08").Err()) job := work.NewJob() - err := h(job, &work.EnqueueOptions{ + err := h(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -92,7 +93,7 @@ func BenchmarkEnqueuer(b *testing.B) { }) var called int - h := enq(func(*work.Job, *work.EnqueueOptions) error { + h := enq(func(context.Context, *work.Job, *work.EnqueueOptions) error { called++ return nil }) @@ -100,7 +101,7 @@ func BenchmarkEnqueuer(b *testing.B) { b.StartTimer() for n := 0; n < b.N; n++ { job := work.NewJob() - h(job, &work.EnqueueOptions{ + h(context.Background(), job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) diff --git a/redis_queue.go b/redis_queue.go index f36258b..b6df92e 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -137,11 +137,11 @@ func NewRedisQueue(client redis.UniversalClient) Queue { } } -func (q *redisQueue) Enqueue(job *Job, opt *EnqueueOptions) error { - return q.BulkEnqueue([]*Job{job}, opt) +func (q *redisQueue) Enqueue(ctx context.Context, job *Job, opt *EnqueueOptions) error { + return q.BulkEnqueue(ctx, []*Job{job}, opt) } -func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { +func (q *redisQueue) BulkEnqueue(ctx context.Context, jobs []*Job, opt *EnqueueOptions) error { err := opt.Validate() if err != nil { return err @@ -161,23 +161,23 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { args[2+3*i+1] = job.ID args[2+3*i+2] = jobm } - return q.enqueueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() + return q.enqueueScript.Run(ctx, q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } -func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) { - jobs, err := q.BulkDequeue(1, opt) +func (q *redisQueue) Dequeue(ctx context.Context, opt *DequeueOptions) (*Job, error) { + jobs, err := q.BulkDequeue(ctx, 1, opt) if err != nil { return nil, err } return jobs[0], nil } -func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, error) { +func (q *redisQueue) BulkDequeue(ctx context.Context, count int64, opt *DequeueOptions) ([]*Job, error) { err := opt.Validate() if err != nil { return nil, err } - res, err := q.dequeueScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), + res, err := q.dequeueScript.Run(ctx, q.client, scriptKey(opt.Namespace, opt.QueueID), opt.Namespace, opt.QueueID, opt.At.Unix(), @@ -203,11 +203,11 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro return jobs, nil } -func (q *redisQueue) Ack(job *Job, opt *AckOptions) error { - return q.BulkAck([]*Job{job}, opt) +func (q *redisQueue) Ack(ctx context.Context, job *Job, opt *AckOptions) error { + return q.BulkAck(ctx, []*Job{job}, opt) } -func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error { +func (q *redisQueue) BulkAck(ctx context.Context, jobs []*Job, opt *AckOptions) error { err := opt.Validate() if err != nil { return err @@ -221,10 +221,10 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error { for i, job := range jobs { args[2+i] = job.ID } - return q.ackScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() + return q.ackScript.Run(ctx, q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } -func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) { +func (q *redisQueue) BulkFind(ctx context.Context, jobIDs []string, opt *FindOptions) ([]*Job, error) { err := opt.Validate() if err != nil { return nil, err @@ -237,7 +237,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) for i, jobID := range jobIDs { args[1+i] = jobID } - res, err := q.findScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() + res, err := q.findScript.Run(ctx, q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() if err != nil { return nil, err } @@ -264,18 +264,18 @@ var ( _ BulkJobFinder = (*redisQueue)(nil) ) -func (q *redisQueue) GetQueueMetrics(opt *QueueMetricsOptions) (*QueueMetrics, error) { +func (q *redisQueue) GetQueueMetrics(ctx context.Context, opt *QueueMetricsOptions) (*QueueMetrics, error) { err := opt.Validate() if err != nil { return nil, err } queueKey := fmt.Sprintf("%s:queue:%s", opt.Namespace, opt.QueueID) now := fmt.Sprint(opt.At.Unix()) - readyTotal, err := q.client.ZCount(context.Background(), queueKey, "-inf", now).Result() + readyTotal, err := q.client.ZCount(ctx, queueKey, "-inf", now).Result() if err != nil { return nil, err } - scheduledTotal, err := q.client.ZCount(context.Background(), queueKey, "("+now, "+inf").Result() + scheduledTotal, err := q.client.ZCount(ctx, queueKey, "("+now, "+inf").Result() if err != nil { return nil, err } diff --git a/redis_queue_test.go b/redis_queue_test.go index 453f77b..ba3ea6f 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -12,6 +12,7 @@ import ( ) func TestRedisQueueEnqueue(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -25,7 +26,7 @@ func TestRedisQueueEnqueue(t *testing.T) { err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - err = q.Enqueue(job, &EnqueueOptions{ + err = q.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -41,7 +42,7 @@ func TestRedisQueueEnqueue(t *testing.T) { "msgpack": string(jobm), }, h) - jobs, err := q.(BulkJobFinder).BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{ + jobs, err := q.(BulkJobFinder).BulkFind(ctx, []string{job.ID, "not-exist-id"}, &FindOptions{ Namespace: "{ns1}", }) require.NoError(t, err) @@ -51,12 +52,12 @@ func TestRedisQueueEnqueue(t *testing.T) { require.Nil(t, jobs[1]) jobs[0].LastError = "hello world" - err = q.Enqueue(jobs[0], &EnqueueOptions{ + err = q.Enqueue(ctx, jobs[0], &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) require.NoError(t, err) - jobs, err = q.(BulkJobFinder).BulkFind([]string{job.ID}, &FindOptions{ + jobs, err = q.(BulkJobFinder).BulkFind(ctx, []string{job.ID}, &FindOptions{ Namespace: "{ns1}", }) require.NoError(t, err) @@ -76,7 +77,7 @@ func TestRedisQueueEnqueue(t *testing.T) { require.Equal(t, jobKey, z[0].Member) require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) - err = q.Enqueue(job.Delay(time.Minute), &EnqueueOptions{ + err = q.Enqueue(ctx, job.Delay(time.Minute), &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -96,6 +97,7 @@ func TestRedisQueueEnqueue(t *testing.T) { } func TestRedisQueueDequeue(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -110,7 +112,7 @@ func TestRedisQueueDequeue(t *testing.T) { require.NoError(t, err) jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) - err = q.Enqueue(job, &EnqueueOptions{ + err = q.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -118,7 +120,7 @@ func TestRedisQueueDequeue(t *testing.T) { now := job.EnqueuedAt.Add(123 * time.Second) - jobDequeued, err := q.Dequeue(&DequeueOptions{ + jobDequeued, err := q.Dequeue(ctx, &DequeueOptions{ Namespace: "{ns1}", QueueID: "q1", At: now, @@ -139,7 +141,7 @@ func TestRedisQueueDequeue(t *testing.T) { require.Equal(t, jobKey, z[0].Member) require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) - jobDequeued, err = q.Dequeue(&DequeueOptions{ + jobDequeued, err = q.Dequeue(ctx, &DequeueOptions{ Namespace: "{ns1}", QueueID: "q1", At: now, @@ -169,7 +171,7 @@ func TestRedisQueueDequeue(t *testing.T) { require.EqualValues(t, now.Unix()+60, z[0].Score) // empty - _, err = q.Dequeue(&DequeueOptions{ + _, err = q.Dequeue(ctx, &DequeueOptions{ Namespace: "{ns1}", QueueID: "q1", At: now, @@ -180,6 +182,7 @@ func TestRedisQueueDequeue(t *testing.T) { } func TestRedisQueueDequeueDeletedJob(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -193,7 +196,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - err = q.Enqueue(job, &EnqueueOptions{ + err = q.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -211,7 +214,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, client.Del(context.Background(), jobKey).Err()) - _, err = q.Dequeue(&DequeueOptions{ + _, err = q.Dequeue(ctx, &DequeueOptions{ Namespace: "{ns1}", QueueID: "q1", At: job.EnqueuedAt, @@ -231,6 +234,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { } func TestRedisQueueAck(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -238,7 +242,7 @@ func TestRedisQueueAck(t *testing.T) { job := NewJob() - err := q.Enqueue(job, &EnqueueOptions{ + err := q.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -262,7 +266,7 @@ func TestRedisQueueAck(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 1, e) - err = q.Ack(job, &AckOptions{ + err = q.Ack(ctx, job, &AckOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -282,7 +286,7 @@ func TestRedisQueueAck(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 0, e) - err = q.Ack(job, &AckOptions{ + err = q.Ack(ctx, job, &AckOptions{ Namespace: "{ns1}", QueueID: "q1", }) @@ -290,6 +294,7 @@ func TestRedisQueueAck(t *testing.T) { } func TestRedisQueueGetQueueMetrics(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -297,13 +302,13 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { job := NewJob() - err := q.Enqueue(job, &EnqueueOptions{ + err := q.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "q1", }) require.NoError(t, err) - m, err := q.(MetricsExporter).GetQueueMetrics(&QueueMetricsOptions{ + m, err := q.(MetricsExporter).GetQueueMetrics(ctx, &QueueMetricsOptions{ Namespace: "{ns1}", QueueID: "q1", At: job.EnqueuedAt, @@ -314,7 +319,7 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) - m, err = q.(MetricsExporter).GetQueueMetrics(&QueueMetricsOptions{ + m, err = q.(MetricsExporter).GetQueueMetrics(ctx, &QueueMetricsOptions{ Namespace: "{ns1}", QueueID: "q1", At: job.EnqueuedAt.Add(-time.Second), diff --git a/redislock/lock.go b/redislock/lock.go index 1458931..e120a5b 100644 --- a/redislock/lock.go +++ b/redislock/lock.go @@ -22,7 +22,7 @@ type Lock struct { // Acquire creates the lock if possible. // If it is acquired, true is returned. // Call Release to unlock. -func (l *Lock) Acquire() (bool, error) { +func (l *Lock) Acquire(ctx context.Context) (bool, error) { lockScript := redis.NewScript(` local lock_key = ARGV[1] local lock_id = ARGV[2] @@ -42,7 +42,7 @@ func (l *Lock) Acquire() (bool, error) { return 0 `) - acquired, err := lockScript.Run(context.Background(), l.Client, nil, + acquired, err := lockScript.Run(ctx, l.Client, nil, l.Key, l.ID, l.At.Unix(), @@ -56,14 +56,14 @@ func (l *Lock) Acquire() (bool, error) { } // Release clears the lock. -func (l *Lock) Release() error { +func (l *Lock) Release(ctx context.Context) error { unlockScript := redis.NewScript(` local lock_key = ARGV[1] local lock_id = ARGV[2] return redis.call("zrem", lock_key, lock_id) `) - err := unlockScript.Run(context.Background(), l.Client, nil, + err := unlockScript.Run(ctx, l.Client, nil, l.Key, l.ID, ).Err() diff --git a/sidekiq/external_queue.go b/sidekiq/external_queue.go index 0d26e18..1d89f40 100644 --- a/sidekiq/external_queue.go +++ b/sidekiq/external_queue.go @@ -8,11 +8,11 @@ import ( "github.com/taylorchu/work" ) -func (q *sidekiqQueue) ExternalEnqueue(job *work.Job, opt *work.EnqueueOptions) error { - return q.ExternalBulkEnqueue([]*work.Job{job}, opt) +func (q *sidekiqQueue) ExternalEnqueue(ctx context.Context, job *work.Job, opt *work.EnqueueOptions) error { + return q.ExternalBulkEnqueue(ctx, []*work.Job{job}, opt) } -func (q *sidekiqQueue) ExternalBulkEnqueue(jobs []*work.Job, opt *work.EnqueueOptions) error { +func (q *sidekiqQueue) ExternalBulkEnqueue(ctx context.Context, jobs []*work.Job, opt *work.EnqueueOptions) error { now := time.Now() readyJobs := make([]*work.Job, 0, len(jobs)) scheduledJobs := make([]*work.Job, 0, len(jobs)) @@ -24,18 +24,18 @@ func (q *sidekiqQueue) ExternalBulkEnqueue(jobs []*work.Job, opt *work.EnqueueOp } } - err := q.externalBulkEnqueue(readyJobs, opt) + err := q.externalBulkEnqueue(ctx, readyJobs, opt) if err != nil { return err } - err = q.externalBulkEnqueueIn(scheduledJobs, opt) + err = q.externalBulkEnqueueIn(ctx, scheduledJobs, opt) if err != nil { return err } return nil } -func (q *sidekiqQueue) externalBulkEnqueue(jobs []*work.Job, opt *work.EnqueueOptions) error { +func (q *sidekiqQueue) externalBulkEnqueue(ctx context.Context, jobs []*work.Job, opt *work.EnqueueOptions) error { if len(jobs) == 0 { return nil } @@ -61,10 +61,10 @@ func (q *sidekiqQueue) externalBulkEnqueue(jobs []*work.Job, opt *work.EnqueueOp } args[2+i] = jobm } - return q.enqueueScript.Run(context.Background(), q.client, nil, args...).Err() + return q.enqueueScript.Run(ctx, q.client, nil, args...).Err() } -func (q *sidekiqQueue) externalBulkEnqueueIn(jobs []*work.Job, opt *work.EnqueueOptions) error { +func (q *sidekiqQueue) externalBulkEnqueueIn(ctx context.Context, jobs []*work.Job, opt *work.EnqueueOptions) error { if len(jobs) == 0 { return nil } @@ -90,7 +90,7 @@ func (q *sidekiqQueue) externalBulkEnqueueIn(jobs []*work.Job, opt *work.Enqueue args[1+2*i] = job.EnqueuedAt.Unix() args[1+2*i+1] = jobm } - return q.enqueueInScript.Run(context.Background(), q.client, nil, args...).Err() + return q.enqueueInScript.Run(ctx, q.client, nil, args...).Err() } var ( diff --git a/sidekiq/queue.go b/sidekiq/queue.go index 204f2e6..bcdadd7 100644 --- a/sidekiq/queue.go +++ b/sidekiq/queue.go @@ -175,13 +175,13 @@ func FormatQueueID(queue, class string) string { return fmt.Sprintf("%s/%s", queue, class) } -func (q *sidekiqQueue) schedule(ns string, at time.Time) error { - return q.scheduleScript.Run(context.Background(), q.client, nil, ns, at.Unix()).Err() +func (q *sidekiqQueue) schedule(ctx context.Context, ns string, at time.Time) error { + return q.scheduleScript.Run(ctx, q.client, nil, ns, at.Unix()).Err() } // JobPuller pulls jobs from sidekiq-compatible queue. type JobPuller interface { - Pull(*PullOptions) error + Pull(context.Context, *PullOptions) error } // PullOptions specifies how a job is pulled from sidekiq-compatible queue. @@ -204,12 +204,12 @@ func (opt *PullOptions) Validate() error { } // Pull moves jobs from sidekiq-compatible queue into work-compatible queue. -func (q *sidekiqQueue) Pull(opt *PullOptions) error { +func (q *sidekiqQueue) Pull(ctx context.Context, opt *PullOptions) error { err := opt.Validate() if err != nil { return err } - res, err := q.dequeueScript.Run(context.Background(), q.client, nil, + res, err := q.dequeueScript.Run(ctx, q.client, nil, opt.SidekiqNamespace, opt.SidekiqQueue, ).Result() @@ -233,7 +233,7 @@ func (q *sidekiqQueue) Pull(opt *PullOptions) error { if err != nil { return err } - err = q.redisQueue.Enqueue(job, &work.EnqueueOptions{ + err = q.redisQueue.Enqueue(ctx, job, &work.EnqueueOptions{ Namespace: opt.Namespace, QueueID: FormatQueueID(sqJob.Queue, sqJob.Class), }) diff --git a/sidekiq/queue_test.go b/sidekiq/queue_test.go index b58b96f..8285697 100644 --- a/sidekiq/queue_test.go +++ b/sidekiq/queue_test.go @@ -44,7 +44,7 @@ func TestSidekiqQueueExternalEnqueue(t *testing.T) { err := job.MarshalJSONPayload([]int{1, 2, 3}) require.NoError(t, err) - err = q.ExternalEnqueue(job, &work.EnqueueOptions{ + err = q.ExternalEnqueue(context.Background(), job, &work.EnqueueOptions{ Namespace: "{sidekiq}", QueueID: "import/TestWorker", }) @@ -58,6 +58,7 @@ func TestSidekiqQueueExternalEnqueue(t *testing.T) { } func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -74,7 +75,7 @@ func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { err := job.MarshalJSONPayload([]int{1, 2, 3}) require.NoError(t, err) - err = q.ExternalEnqueue(job, &work.EnqueueOptions{ + err = q.ExternalEnqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{sidekiq}", QueueID: "import/TestWorker", }) @@ -94,6 +95,7 @@ func TestSidekiqQueueExternalEnqueueScheduled(t *testing.T) { } func TestSidekiqQueueExternalDequeue(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -101,13 +103,13 @@ func TestSidekiqQueueExternalDequeue(t *testing.T) { require.NoError(t, err) q := NewQueue(client) - err = q.Pull(&PullOptions{ + err = q.Pull(ctx, &PullOptions{ Namespace: "{sidekiq}", SidekiqNamespace: "{sidekiq}", SidekiqQueue: "default", }) require.NoError(t, err) - job, err := q.Dequeue(&work.DequeueOptions{ + job, err := q.Dequeue(ctx, &work.DequeueOptions{ Namespace: "{sidekiq}", QueueID: "default/TestWorker", At: time.Now(), @@ -124,6 +126,7 @@ func TestSidekiqQueueExternalDequeue(t *testing.T) { } func TestSidekiqQueueEnqueue(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -133,7 +136,7 @@ func TestSidekiqQueueEnqueue(t *testing.T) { err := job.MarshalJSONPayload([]int{1, 2, 3}) require.NoError(t, err) - err = q.Enqueue(job, &work.EnqueueOptions{ + err = q.Enqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -161,7 +164,7 @@ func TestSidekiqQueueEnqueue(t *testing.T) { require.Equal(t, jobKey, z[0].Member) require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) - err = q.Enqueue(job.Delay(time.Minute), &work.EnqueueOptions{ + err = q.Enqueue(ctx, job.Delay(time.Minute), &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -181,6 +184,7 @@ func TestSidekiqQueueEnqueue(t *testing.T) { } func TestSidekiqQueueDequeue(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -191,7 +195,7 @@ func TestSidekiqQueueDequeue(t *testing.T) { require.NoError(t, err) jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) - err = q.ExternalEnqueue(job, &work.EnqueueOptions{ + err = q.ExternalEnqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -199,15 +203,15 @@ func TestSidekiqQueueDequeue(t *testing.T) { now := job.EnqueuedAt.Add(123 * time.Second) - err = q.(*sidekiqQueue).schedule("{ns1}", now) + err = q.(*sidekiqQueue).schedule(ctx, "{ns1}", now) require.NoError(t, err) - err = q.Pull(&PullOptions{ + err = q.Pull(ctx, &PullOptions{ Namespace: "{ns1}", SidekiqNamespace: "{ns1}", SidekiqQueue: "low", }) require.NoError(t, err) - jobDequeued, err := q.Dequeue(&work.DequeueOptions{ + jobDequeued, err := q.Dequeue(ctx, &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: now, @@ -228,9 +232,9 @@ func TestSidekiqQueueDequeue(t *testing.T) { require.Equal(t, jobKey, z[0].Member) require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) - err = q.(*sidekiqQueue).schedule("{ns1}", now) + err = q.(*sidekiqQueue).schedule(ctx, "{ns1}", now) require.NoError(t, err) - jobDequeued, err = q.Dequeue(&work.DequeueOptions{ + jobDequeued, err = q.Dequeue(ctx, &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: now, @@ -260,9 +264,9 @@ func TestSidekiqQueueDequeue(t *testing.T) { require.EqualValues(t, now.Unix()+60, z[0].Score) // empty - err = q.(*sidekiqQueue).schedule("{ns1}", now) + err = q.(*sidekiqQueue).schedule(ctx, "{ns1}", now) require.NoError(t, err) - _, err = q.Dequeue(&work.DequeueOptions{ + _, err = q.Dequeue(ctx, &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: now, @@ -273,6 +277,7 @@ func TestSidekiqQueueDequeue(t *testing.T) { } func TestSidekiqQueueDequeueDeletedJob(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -282,7 +287,7 @@ func TestSidekiqQueueDequeueDeletedJob(t *testing.T) { err := job.MarshalJSONPayload([]int{1, 2, 3}) require.NoError(t, err) - err = q.Enqueue(job, &work.EnqueueOptions{ + err = q.Enqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -300,7 +305,7 @@ func TestSidekiqQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, client.Del(context.Background(), jobKey).Err()) - _, err = q.Dequeue(&work.DequeueOptions{ + _, err = q.Dequeue(ctx, &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: job.EnqueuedAt, @@ -320,6 +325,7 @@ func TestSidekiqQueueDequeueDeletedJob(t *testing.T) { } func TestSidekiqQueueAck(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -329,7 +335,7 @@ func TestSidekiqQueueAck(t *testing.T) { err := job.MarshalJSONPayload([]int{1, 2, 3}) require.NoError(t, err) - err = q.Enqueue(job, &work.EnqueueOptions{ + err = q.Enqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -353,7 +359,7 @@ func TestSidekiqQueueAck(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 1, e) - err = q.Ack(job, &work.AckOptions{ + err = q.Ack(ctx, job, &work.AckOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -373,7 +379,7 @@ func TestSidekiqQueueAck(t *testing.T) { require.NoError(t, err) require.EqualValues(t, 0, e) - err = q.Ack(job, &work.AckOptions{ + err = q.Ack(ctx, job, &work.AckOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -381,6 +387,7 @@ func TestSidekiqQueueAck(t *testing.T) { } func TestSidekiqQueueGetQueueMetrics(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -390,13 +397,13 @@ func TestSidekiqQueueGetQueueMetrics(t *testing.T) { err := job.MarshalJSONPayload([]int{1, 2, 3}) require.NoError(t, err) - err = q.Enqueue(job, &work.EnqueueOptions{ + err = q.Enqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) require.NoError(t, err) - m, err := q.GetQueueMetrics(&work.QueueMetricsOptions{ + m, err := q.GetQueueMetrics(ctx, &work.QueueMetricsOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: job.EnqueuedAt, @@ -407,7 +414,7 @@ func TestSidekiqQueueGetQueueMetrics(t *testing.T) { require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) - m, err = q.GetQueueMetrics(&work.QueueMetricsOptions{ + m, err = q.GetQueueMetrics(ctx, &work.QueueMetricsOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: job.EnqueuedAt.Add(-time.Second), @@ -420,6 +427,7 @@ func TestSidekiqQueueGetQueueMetrics(t *testing.T) { } func TestSidekiqQueueEnqueueDuplicated(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -430,7 +438,7 @@ func TestSidekiqQueueEnqueueDuplicated(t *testing.T) { require.NoError(t, err) jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) - err = q.ExternalEnqueue(job, &work.EnqueueOptions{ + err = q.ExternalEnqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) @@ -438,15 +446,15 @@ func TestSidekiqQueueEnqueueDuplicated(t *testing.T) { now := job.EnqueuedAt - err = q.(*sidekiqQueue).schedule("{ns1}", now) + err = q.(*sidekiqQueue).schedule(ctx, "{ns1}", now) require.NoError(t, err) - err = q.Pull(&PullOptions{ + err = q.Pull(ctx, &PullOptions{ Namespace: "{ns1}", SidekiqNamespace: "{ns1}", SidekiqQueue: "low", }) require.NoError(t, err) - jobDequeued, err := q.Dequeue(&work.DequeueOptions{ + jobDequeued, err := q.Dequeue(ctx, &work.DequeueOptions{ Namespace: "{ns1}", QueueID: "low/q1", At: now, @@ -467,7 +475,7 @@ func TestSidekiqQueueEnqueueDuplicated(t *testing.T) { require.Equal(t, jobKey, z[0].Member) require.EqualValues(t, job.EnqueuedAt.Unix()+60, z[0].Score) - err = q.ExternalEnqueue(job, &work.EnqueueOptions{ + err = q.ExternalEnqueue(ctx, job, &work.EnqueueOptions{ Namespace: "{ns1}", QueueID: "low/q1", }) diff --git a/worker.go b/worker.go index adfeaf2..d845936 100644 --- a/worker.go +++ b/worker.go @@ -12,13 +12,13 @@ import ( ) // DequeueFunc generates a job. -type DequeueFunc func(*DequeueOptions) (*Job, error) +type DequeueFunc func(context.Context, *DequeueOptions) (*Job, error) // DequeueMiddleware modifies DequeueFunc behavior. type DequeueMiddleware func(DequeueFunc) DequeueFunc // EnqueueFunc takes in a job for processing. -type EnqueueFunc func(*Job, *EnqueueOptions) error +type EnqueueFunc func(context.Context, *Job, *EnqueueOptions) error // EnqueueMiddleware modifies EnqueueFunc behavior. type EnqueueMiddleware func(EnqueueFunc) EnqueueFunc @@ -29,8 +29,8 @@ type HandleFunc func(*Job, *DequeueOptions) error // ContextHandleFunc runs a job. type ContextHandleFunc func(context.Context, *Job, *DequeueOptions) error -// HandleMiddleware modifies HandleFunc hehavior. -type HandleMiddleware func(HandleFunc) HandleFunc +// HandleMiddleware modifies HandleFunc behavior. +type HandleMiddleware func(ContextHandleFunc) ContextHandleFunc // BackoffFunc computes when to retry this job from now. type BackoffFunc func(*Job, *DequeueOptions) time.Duration @@ -184,7 +184,7 @@ func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFun dequeue = mw(dequeue) } - handle := func(job *Job, o *DequeueOptions) error { + handle := func(ctx context.Context, job *Job, o *DequeueOptions) error { ctx, cancel := context.WithTimeout(ctx, opt.MaxExecutionTime) defer cancel() return h(ctx, job, o) @@ -206,15 +206,15 @@ func (w *Worker) RunOnce(ctx context.Context, queueID string, h ContextHandleFun At: time.Now(), InvisibleSec: int64(opt.MaxExecutionTime / time.Second), } - job, err := dequeue(dopt) + job, err := dequeue(ctx, dopt) if err != nil { return err } - err = handle(job, dopt) + err = handle(ctx, job, dopt) if err != nil { return err } - err = queue.Ack(job, &AckOptions{ + err = queue.Ack(ctx, job, &AckOptions{ Namespace: dopt.Namespace, QueueID: dopt.QueueID, }) @@ -254,9 +254,7 @@ func (w *Worker) start(ctx context.Context, h handler) { } dequeue = idleWait(ctx, h.JobOptions.IdleWait)(dequeue) - handle := func(job *Job, o *DequeueOptions) error { - return h.HandleFunc(ctx, job, o) - } + handle := h.HandleFunc for _, mw := range h.JobOptions.HandleMiddleware { handle = mw(handle) } @@ -270,14 +268,14 @@ func (w *Worker) start(ctx context.Context, h handler) { // prepare bulk ack flush var ackJobs []*Job - flush := func() error { + flush := func(ctx context.Context) error { opt := &AckOptions{ Namespace: ns, QueueID: h.QueueID, } bulkDeq, ok := queue.(BulkDequeuer) if ok { - err := bulkDeq.BulkAck(ackJobs, opt) + err := bulkDeq.BulkAck(ctx, ackJobs, opt) if err != nil { return err } @@ -285,7 +283,7 @@ func (w *Worker) start(ctx context.Context, h handler) { return nil } for _, job := range ackJobs { - err := queue.Ack(job, opt) + err := queue.Ack(ctx, job, opt) if err != nil { return err } @@ -294,7 +292,10 @@ func (w *Worker) start(ctx context.Context, h handler) { return nil } defer func() { - err := flush() + // We flush Jobs at shutdown, which is signaled by + // canceling the overarching context. Use the + // Background context so we actually attempt to flush. + err := flush(context.Background()) if err != nil { errFunc(err) } @@ -309,7 +310,7 @@ func (w *Worker) start(ctx context.Context, h handler) { case <-ctx.Done(): return case <-flushTicker.C: - err := flush() + err := flush(ctx) if err != nil { errFunc(err) } @@ -321,21 +322,21 @@ func (w *Worker) start(ctx context.Context, h handler) { At: time.Now(), InvisibleSec: int64((h.JobOptions.MaxExecutionTime + flushIntv) / time.Second), } - job, err := dequeue(opt) + job, err := dequeue(ctx, opt) if err != nil { if !errors.Is(err, ErrEmptyQueue) { errFunc(err) } return err } - err = handle(job, opt) + err = handle(ctx, job, opt) if err != nil { return err } ackJobs = append(ackJobs, job) if len(ackJobs) >= 1000 { // prevent un-acked job count to be too large - err := flush() + err := flush(ctx) if err != nil { errFunc(err) return err @@ -354,7 +355,7 @@ func getDequeueFunc(queue Queue) DequeueFunc { } var jobs []*Job - return func(opt *DequeueOptions) (*Job, error) { + return func(ctx context.Context, opt *DequeueOptions) (*Job, error) { if len(jobs) == 0 { // this is an optimization to reduce system calls. // @@ -368,7 +369,7 @@ func getDequeueFunc(queue Queue) DequeueFunc { bulkOpt.InvisibleSec *= count var err error - jobs, err = bulkDeq.BulkDequeue(count, &bulkOpt) + jobs, err = bulkDeq.BulkDequeue(ctx, count, &bulkOpt) if err != nil { return nil, err } @@ -380,7 +381,7 @@ func getDequeueFunc(queue Queue) DequeueFunc { } // ExportMetrics dumps queue stats if the queue implements MetricsExporter. -func (w *Worker) ExportMetrics() (*Metrics, error) { +func (w *Worker) ExportMetrics(ctx context.Context) (*Metrics, error) { var queueMetrics []*QueueMetrics for _, h := range w.handlerMap { queue := w.opt.Queue @@ -389,7 +390,7 @@ func (w *Worker) ExportMetrics() (*Metrics, error) { if !ok { continue } - m, err := exporter.GetQueueMetrics(&QueueMetricsOptions{ + m, err := exporter.GetQueueMetrics(ctx, &QueueMetricsOptions{ Namespace: ns, QueueID: h.QueueID, At: time.Now(), @@ -414,8 +415,8 @@ func (w *Worker) Stop() { func idleWait(ctx context.Context, d time.Duration) DequeueMiddleware { return func(f DequeueFunc) DequeueFunc { - return func(opt *DequeueOptions) (*Job, error) { - job, err := f(opt) + return func(ctx context.Context, opt *DequeueOptions) (*Job, error) { + job, err := f(ctx, opt) if err != nil { if errors.Is(err, ErrEmptyQueue) { select { @@ -430,14 +431,14 @@ func idleWait(ctx context.Context, d time.Duration) DequeueMiddleware { } } -func catchPanic(f HandleFunc) HandleFunc { - return func(job *Job, opt *DequeueOptions) (err error) { +func catchPanic(f ContextHandleFunc) ContextHandleFunc { + return func(ctx context.Context, job *Job, opt *DequeueOptions) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("panic: %v\n\n%s", r, debug.Stack()) } }() - return f(job, opt) + return f(ctx, job, opt) } } @@ -473,9 +474,9 @@ func defaultBackoff() BackoffFunc { } func retry(queue Queue, backoff BackoffFunc) HandleMiddleware { - return func(f HandleFunc) HandleFunc { - return func(job *Job, opt *DequeueOptions) error { - err := f(job, opt) + return func(f ContextHandleFunc) ContextHandleFunc { + return func(ctx context.Context, job *Job, opt *DequeueOptions) error { + err := f(ctx, job, opt) if err != nil { if errors.Is(err, ErrUnrecoverable) { return nil // ack @@ -490,7 +491,7 @@ func retry(queue Queue, backoff BackoffFunc) HandleMiddleware { job.UpdatedAt = now job.EnqueuedAt = now.Add(backoff(job, opt)) - queue.Enqueue(job, &EnqueueOptions{ + queue.Enqueue(ctx, job, &EnqueueOptions{ Namespace: opt.Namespace, QueueID: opt.QueueID, }) diff --git a/worker_test.go b/worker_test.go index 90d741d..2a7f5c2 100644 --- a/worker_test.go +++ b/worker_test.go @@ -57,7 +57,7 @@ func TestWorkerExportMetrics(t *testing.T) { ) require.NoError(t, err) - all, err := w.ExportMetrics() + all, err := w.ExportMetrics(context.Background()) require.NoError(t, err) require.Len(t, all.Queue, 1) require.Equal(t, all.Queue[0].Namespace, "{ns1}") @@ -96,6 +96,7 @@ func waitEmpty(client redis.UniversalClient, key string, timeout time.Duration) } func TestWorkerRunJobMultiQueue(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -146,7 +147,7 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { err := job.MarshalPayload(message{Text: "test1"}) require.NoError(t, err) - err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ + err = w.opt.Queue.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "test1", }) @@ -158,7 +159,7 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { err := job.MarshalPayload(message{Text: "test2"}) require.NoError(t, err) - err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ + err = w.opt.Queue.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "test2", }) @@ -190,6 +191,7 @@ func TestWorkerRunJobMultiQueue(t *testing.T) { } func TestWorkerRunJob(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -236,7 +238,7 @@ func TestWorkerRunJob(t *testing.T) { err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ + err = w.opt.Queue.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "success", }) @@ -261,7 +263,7 @@ func TestWorkerRunJob(t *testing.T) { err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ + err = w.opt.Queue.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "failure", }) @@ -282,7 +284,7 @@ func TestWorkerRunJob(t *testing.T) { require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { - job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ + job, err := NewRedisQueue(client).Dequeue(ctx, &DequeueOptions{ Namespace: "{ns1}", QueueID: "failure", At: time.Now().Add(time.Hour), @@ -298,7 +300,7 @@ func TestWorkerRunJob(t *testing.T) { err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - err = w.opt.Queue.Enqueue(job, &EnqueueOptions{ + err = w.opt.Queue.Enqueue(ctx, job, &EnqueueOptions{ Namespace: "{ns1}", QueueID: "panic", }) @@ -319,7 +321,7 @@ func TestWorkerRunJob(t *testing.T) { require.EqualValues(t, 3, count) for i := 0; i < 3; i++ { - job, err := NewRedisQueue(client).Dequeue(&DequeueOptions{ + job, err := NewRedisQueue(client).Dequeue(ctx, &DequeueOptions{ Namespace: "{ns1}", QueueID: "panic", At: time.Now().Add(time.Hour), @@ -337,10 +339,11 @@ func TestWorkerRunOnce(t *testing.T) { require.NoError(t, redistest.Reset(client)) job := NewJob() - err := NewRedisQueue(client).Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "success", - }) + err := NewRedisQueue(client).Enqueue(context.Background(), + job, &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "success", + }) require.NoError(t, err) count, err := client.ZCard(context.Background(), "{ns1}:queue:success").Result() @@ -348,10 +351,11 @@ func TestWorkerRunOnce(t *testing.T) { require.EqualValues(t, 1, count) job2 := NewJob() - err = NewRedisQueue(client).Enqueue(job2, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "failure", - }) + err = NewRedisQueue(client).Enqueue(context.Background(), + job2, &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "failure", + }) require.NoError(t, err) count, err = client.ZCard(context.Background(), "{ns1}:queue:failure").Result() @@ -359,10 +363,11 @@ func TestWorkerRunOnce(t *testing.T) { require.EqualValues(t, 1, count) job3 := NewJob() - err = NewRedisQueue(client).Enqueue(job3, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "panic", - }) + err = NewRedisQueue(client).Enqueue(context.Background(), + job3, &EnqueueOptions{ + Namespace: "{ns1}", + QueueID: "panic", + }) require.NoError(t, err) count, err = client.ZCard(context.Background(), "{ns1}:queue:panic").Result() @@ -420,6 +425,7 @@ func TestWorkerRunOnce(t *testing.T) { } func TestRetry(t *testing.T) { + ctx := context.Background() client := redistest.NewClient() defer client.Close() require.NoError(t, redistest.Reset(client)) @@ -431,10 +437,10 @@ func TestRetry(t *testing.T) { InvisibleSec: 10, } retrier := retry(NewRedisQueue(client), defaultBackoff()) - h := retrier(func(*Job, *DequeueOptions) error { + h := retrier(func(context.Context, *Job, *DequeueOptions) error { return ErrUnrecoverable }) - err := h(job, opt) + err := h(ctx, job, opt) require.NoError(t, err) require.EqualValues(t, 0, job.Retries) @@ -450,10 +456,10 @@ func TestRetry(t *testing.T) { require.NoError(t, err) require.Len(t, z, 0) - h = retrier(func(*Job, *DequeueOptions) error { + h = retrier(func(context.Context, *Job, *DequeueOptions) error { return fmt.Errorf("recoverable, but not retried: %w", ErrDoNotRetry) }) - err = h(job, opt) + err = h(ctx, job, opt) require.Error(t, err) require.EqualValues(t, 0, job.Retries) @@ -462,10 +468,10 @@ func TestRetry(t *testing.T) { var delays []int64 for i := 1; i <= 10; i++ { retryErr := fmt.Errorf("error %d", i) - h = retrier(func(*Job, *DequeueOptions) error { + h = retrier(func(context.Context, *Job, *DequeueOptions) error { return retryErr }) - err = h(job, opt) + err = h(ctx, job, opt) require.Error(t, err) require.Equal(t, retryErr, err)