Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/enqueuer/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"flag"
"log"

Expand All @@ -24,7 +25,7 @@ func main() {

job := work.NewJob()
job.MarshalPayload(flag.Args())
err = queue.Enqueue(job, &work.EnqueueOptions{
err = queue.Enqueue(context.Background(), job, &work.EnqueueOptions{
Namespace: *namespace,
QueueID: "cmd_queue",
})
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
Expand All @@ -44,6 +45,7 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
Expand All @@ -58,8 +60,10 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand All @@ -69,12 +73,15 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.15.0 h1:1V1NfVQR87RtWAgp1lv9JZJ5Jap+XFGKPi00andXGi4=
github.com/onsi/ginkgo v1.15.0/go.mod h1:hF8qUzuuC8DJGygJH3726JnCZX4MYbRB8yFfISqnKUg=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.10.5 h1:7n6FEkpFmfCoo2t+YYqXH0evK+a9ICQz0xcAy9dYcaQ=
github.com/onsi/gomega v1.10.5/go.mod h1:gza4q3jKQJijlu05nKWRCW/GavJumGt8aNRxWg7mt48=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -121,6 +128,7 @@ go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel/metric v0.20.0 h1:4kzhXFP+btKm4jwxpjIqjs41A7MakRFUS86bqLHTIw8=
go.opentelemetry.io/otel/metric v0.20.0/go.mod h1:598I5tYlH1vzBjn+BTuhzTCSb/9debfNp6R3s7Pr1eU=
go.opentelemetry.io/otel/oteltest v0.20.0 h1:HiITxCawalo5vQzdHfKeZurV8x7ljcqAgiWzF6Vaeaw=
go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa9FqQ2IQ8LoxiGnw=
go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
Expand All @@ -139,6 +147,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -169,13 +178,15 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40 h1:JWgyZ1qgdTaF3N3oxC+MdTV7q
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand All @@ -188,13 +199,16 @@ google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
19 changes: 10 additions & 9 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package work

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -137,13 +138,13 @@ func (opt *EnqueueOptions) Validate() error {

// Enqueuer enqueues a job.
type Enqueuer interface {
Enqueue(*Job, *EnqueueOptions) error
Enqueue(context.Context, *Job, *EnqueueOptions) error
}

// ExternalEnqueuer enqueues a job with other queue protocol.
// Queue adaptor that implements this can publish jobs directly to other types of queue systems.
type ExternalEnqueuer interface {
ExternalEnqueue(*Job, *EnqueueOptions) error
ExternalEnqueue(context.Context, *Job, *EnqueueOptions) error
}

// DequeueOptions specifies how a job is dequeued.
Expand Down Expand Up @@ -202,8 +203,8 @@ var (
// Dequeuer dequeues a job.
// If a job is processed successfully, call Ack() to delete the job.
type Dequeuer interface {
Dequeue(*DequeueOptions) (*Job, error)
Ack(*Job, *AckOptions) error
Dequeue(context.Context, *DequeueOptions) (*Job, error)
Ack(context.Context, *Job, *AckOptions) error
}

// Queue can enqueue and dequeue jobs.
Expand All @@ -214,19 +215,19 @@ type Queue interface {

// BulkEnqueuer enqueues jobs in a batch.
type BulkEnqueuer interface {
BulkEnqueue([]*Job, *EnqueueOptions) error
BulkEnqueue(context.Context, []*Job, *EnqueueOptions) error
}

// ExternalBulkEnqueuer enqueues jobs in a batch with other queue protocol.
// Queue adaptor that implements this can publish jobs directly to other types of queue systems.
type ExternalBulkEnqueuer interface {
ExternalBulkEnqueue([]*Job, *EnqueueOptions) error
ExternalBulkEnqueue(context.Context, []*Job, *EnqueueOptions) error
}

// BulkDequeuer dequeues jobs in a batch.
type BulkDequeuer interface {
BulkDequeue(int64, *DequeueOptions) ([]*Job, error)
BulkAck([]*Job, *AckOptions) error
BulkDequeue(context.Context, int64, *DequeueOptions) ([]*Job, error)
BulkAck(context.Context, []*Job, *AckOptions) error
}

// FindOptions specifies how a job is searched from a queue.
Expand All @@ -247,5 +248,5 @@ func (opt *FindOptions) Validate() error {
// It returns nil if the job is no longer in the queue.
// The length of the returned job list will be equal to the length of jobIDs.
type BulkJobFinder interface {
BulkFind(jobIDs []string, opts *FindOptions) ([]*Job, error)
BulkFind(ctx context.Context, jobIDs []string, opts *FindOptions) ([]*Job, error)
}
7 changes: 5 additions & 2 deletions metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package work

import "time"
import (
"context"
"time"
)

// QueueMetrics contains metrics from a queue.
type QueueMetrics struct {
Expand All @@ -14,7 +17,7 @@ type QueueMetrics struct {

// MetricsExporter can be implemented by Queue to report metrics.
type MetricsExporter interface {
GetQueueMetrics(*QueueMetricsOptions) (*QueueMetrics, error)
GetQueueMetrics(context.Context, *QueueMetricsOptions) (*QueueMetrics, error)
}

// Metrics wraps metrics reported by MetricsExporter.
Expand Down
9 changes: 5 additions & 4 deletions middleware/concurrent/dequeuer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package concurrent

import (
"context"
"fmt"

"github.com/go-redis/redis/v8"
Expand All @@ -25,7 +26,7 @@ func Dequeuer(copt *DequeuerOptions) work.DequeueMiddleware {
if workerID == "" {
workerID = uuid.NewString()
}
return func(opt *work.DequeueOptions) (*work.Job, error) {
return func(ctx context.Context, opt *work.DequeueOptions) (*work.Job, error) {
lock := &redislock.Lock{
Client: copt.Client,
Key: fmt.Sprintf("%s:lock:%s", opt.Namespace, opt.QueueID),
Expand All @@ -34,17 +35,17 @@ func Dequeuer(copt *DequeuerOptions) work.DequeueMiddleware {
ExpireInSec: opt.InvisibleSec,
MaxAcquirers: copt.Max,
}
acquired, err := lock.Acquire()
acquired, err := lock.Acquire(ctx)
if err != nil {
return nil, err
}
if !acquired {
return nil, work.ErrEmptyQueue
}
if !copt.disableUnlock {
defer lock.Release()
defer lock.Release(ctx)
}
return f(opt)
return f(ctx, opt)
}
}
}
16 changes: 9 additions & 7 deletions middleware/concurrent/dequeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
)

func TestDequeuer(t *testing.T) {
ctx := context.Background()
client := redistest.NewClient()
defer client.Close()
require.NoError(t, redistest.Reset(client))
Expand All @@ -24,7 +25,7 @@ func TestDequeuer(t *testing.T) {
InvisibleSec: 60,
}
var called int
h := func(*work.DequeueOptions) (*work.Job, error) {
h := func(context.Context, *work.DequeueOptions) (*work.Job, error) {
called++
return work.NewJob(), nil
}
Expand All @@ -35,7 +36,7 @@ func TestDequeuer(t *testing.T) {
Max: 2,
workerID: fmt.Sprintf("w%d", i),
})
_, err := deq(h)(opt)
_, err := deq(h)(ctx, opt)

require.NoError(t, err)
}
Expand All @@ -50,7 +51,7 @@ func TestDequeuer(t *testing.T) {
workerID: fmt.Sprintf("w%d", i),
disableUnlock: true,
})
_, err := deq(h)(opt)
_, err := deq(h)(ctx, opt)

if i <= 1 {
require.NoError(t, err)
Expand Down Expand Up @@ -85,7 +86,7 @@ func TestDequeuer(t *testing.T) {
workerID: "w0",
disableUnlock: true,
})
_, err := deq(h)(&optLater)
_, err := deq(h)(ctx, &optLater)
require.Equal(t, work.ErrEmptyQueue, err)
}
require.Equal(t, 5, called)
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestDequeuer(t *testing.T) {
workerID: fmt.Sprintf("w%d", i),
disableUnlock: true,
})
_, err := deq(h)(&optExpired)
_, err := deq(h)(ctx, &optExpired)
if i < 5 {
require.NoError(t, err)
} else {
Expand All @@ -138,6 +139,7 @@ func TestDequeuer(t *testing.T) {
}

func BenchmarkConcurrency(b *testing.B) {
ctx := context.Background()
b.StopTimer()

client := redistest.NewClient()
Expand All @@ -155,14 +157,14 @@ func BenchmarkConcurrency(b *testing.B) {
Max: 1,
})
var called int
h := deq(func(*work.DequeueOptions) (*work.Job, error) {
h := deq(func(context.Context, *work.DequeueOptions) (*work.Job, error) {
called++
return work.NewJob(), nil
})

b.StartTimer()
for n := 0; n < b.N; n++ {
h(opt)
h(ctx, opt)
}
b.StopTimer()
require.Equal(b, b.N, called)
Expand Down
7 changes: 4 additions & 3 deletions middleware/discard/after.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
package discard

import (
"context"
"time"

"github.com/taylorchu/work"
)

// After discards a job if it is already stale.
func After(d time.Duration) work.HandleMiddleware {
return func(f work.HandleFunc) work.HandleFunc {
return func(job *work.Job, opt *work.DequeueOptions) error {
return func(f work.ContextHandleFunc) work.ContextHandleFunc {
return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error {
if time.Since(job.CreatedAt) > d {
return work.ErrUnrecoverable
}
err := f(job, opt)
err := f(ctx, job, opt)
if time.Since(job.CreatedAt) > d {
return work.ErrUnrecoverable
}
Expand Down
7 changes: 4 additions & 3 deletions middleware/discard/after_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package discard

import (
"context"
"errors"
"testing"
"time"
Expand All @@ -16,15 +17,15 @@ func TestAfter(t *testing.T) {
QueueID: "q1",
}
d := After(time.Minute)
h := d(func(*work.Job, *work.DequeueOptions) error {
h := d(func(context.Context, *work.Job, *work.DequeueOptions) error {
return errors.New("no reason")
})

err := h(job, opt)
err := h(context.Background(), job, opt)
require.Error(t, err)
require.NotEqual(t, work.ErrUnrecoverable, err)

job.CreatedAt = job.CreatedAt.Add(-time.Hour)
err = h(job, opt)
err = h(context.Background(), job, opt)
require.Equal(t, work.ErrUnrecoverable, err)
}
7 changes: 4 additions & 3 deletions middleware/discard/invalid_payload.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package discard

import (
"context"
"errors"

"github.com/taylorchu/work"
)

// InvalidPayload discards a job if it has decode error.
func InvalidPayload(f work.HandleFunc) work.HandleFunc {
return func(job *work.Job, opt *work.DequeueOptions) error {
err := f(job, opt)
func InvalidPayload(f work.ContextHandleFunc) work.ContextHandleFunc {
return func(ctx context.Context, job *work.Job, opt *work.DequeueOptions) error {
err := f(ctx, job, opt)
if err != nil {
var perr *work.InvalidJobPayloadError
if errors.As(err, &perr) {
Expand Down
Loading