From 079e577b49a63c51cc6575a57250d853d5137213 Mon Sep 17 00:00:00 2001 From: Nathan Yergler Date: Mon, 4 Jan 2021 15:23:19 -0800 Subject: [PATCH 1/2] Pass at least one key to script executions This allows Redis to route our execution to the correct node when running in a Redis Cluster. --- redis_queue.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/redis_queue.go b/redis_queue.go index 6023e47..e2a0e8a 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -16,6 +16,13 @@ type redisQueue struct { findScript *redis.Script } +// scriptKey returns a slice of strings containing at least one of the keys to +// be used by a script. This allows Redis route our script execution to the +// correct node in the event we're using a namespace. +func scriptKey(ns, queueID string) []string { + return []string{strings.Join([]string{ns, "queue", queueID}, ":")} +} + // NewRedisQueue creates a new queue stored in redis. func NewRedisQueue(client redis.UniversalClient) Queue { enqueueScript := redis.NewScript(` @@ -153,7 +160,7 @@ func (q *redisQueue) BulkEnqueue(jobs []*Job, opt *EnqueueOptions) error { args[2+3*i+1] = job.ID args[2+3*i+2] = jobm } - return q.enqueueScript.Run(q.client, nil, args...).Err() + return q.enqueueScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) Dequeue(opt *DequeueOptions) (*Job, error) { @@ -169,7 +176,7 @@ func (q *redisQueue) BulkDequeue(count int64, opt *DequeueOptions) ([]*Job, erro if err != nil { return nil, err } - res, err := q.dequeueScript.Run(q.client, nil, + res, err := q.dequeueScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), opt.Namespace, opt.QueueID, opt.At.Unix(), @@ -213,7 +220,7 @@ func (q *redisQueue) BulkAck(jobs []*Job, opt *AckOptions) error { for i, job := range jobs { args[2+i] = job.ID } - return q.ackScript.Run(q.client, nil, args...).Err() + return q.ackScript.Run(q.client, scriptKey(opt.Namespace, opt.QueueID), args...).Err() } func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) { @@ -229,7 +236,7 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) for i, jobID := range jobIDs { args[1+i] = jobID } - res, err := q.findScript.Run(q.client, nil, args...).Result() + res, err := q.findScript.Run(q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() if err != nil { return nil, err } From 8d89f34cef15bc697bcecb5c7d22c8eaf58f7da1 Mon Sep 17 00:00:00 2001 From: James Clarke Date: Wed, 3 Nov 2021 11:43:28 -0400 Subject: [PATCH 2/2] Include queue id in job key --- job.go | 4 ++ redis_queue.go | 14 ++++--- redis_queue_test.go | 96 +++++++++++++++++++++++---------------------- 3 files changed, 61 insertions(+), 53 deletions(-) diff --git a/job.go b/job.go index 4fd6c32..d86a6b4 100644 --- a/job.go +++ b/job.go @@ -232,6 +232,7 @@ type BulkDequeuer interface { // FindOptions specifies how a job is searched from a queue. type FindOptions struct { Namespace string + QueueID string } // Validate validates FindOptions. @@ -239,6 +240,9 @@ func (opt *FindOptions) Validate() error { if opt.Namespace == "" { return ErrEmptyNamespace } + if opt.QueueID == "" { + return ErrEmptyQueueID + } return nil } diff --git a/redis_queue.go b/redis_queue.go index 5f45791..53057a4 100644 --- a/redis_queue.go +++ b/redis_queue.go @@ -46,7 +46,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { local at = tonumber(ARGV[i]) local job_id = ARGV[i+1] local jobm = ARGV[i+2] - local job_key = table.concat({ns, "job", job_id}, ":") + local job_key = table.concat({ns, "queue", queue_id, "job", job_id}, ":") -- update job fields redis.call("hset", job_key, "msgpack", jobm) @@ -112,7 +112,7 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { for i = 3,table.getn(ARGV) do local job_id = ARGV[i] - local job_key = table.concat({ns, "job", job_id}, ":") + local job_key = table.concat({ns, "queue", queue_id, "job", job_id}, ":") -- delete job fields table.insert(del_args, job_key) @@ -126,10 +126,11 @@ func NewRedisQueue(client redis.UniversalClient) RedisQueue { findScript := redis.NewScript(` local ns = ARGV[1] + local queue_id = ARGV[2] local ret = {} - for i = 2,table.getn(ARGV) do + for i = 3,table.getn(ARGV) do local job_id = ARGV[i] - local job_key = table.concat({ns, "job", job_id}, ":") + local job_key = table.concat({ns, "queue", queue_id, "job", job_id}, ":") local jobm = redis.call("hget", job_key, "msgpack") table.insert(ret, jobm) @@ -241,10 +242,11 @@ func (q *redisQueue) BulkFind(jobIDs []string, opt *FindOptions) ([]*Job, error) if len(jobIDs) == 0 { return nil, nil } - args := make([]interface{}, 1+len(jobIDs)) + args := make([]interface{}, 2+len(jobIDs)) args[0] = opt.Namespace + args[1] = opt.QueueID for i, jobID := range jobIDs { - args[1+i] = jobID + args[2+i] = jobID } res, err := q.findScript.Run(context.Background(), q.client, scriptKey(opt.Namespace, jobIDs[0]), args...).Result() if err != nil { diff --git a/redis_queue_test.go b/redis_queue_test.go index b3d8cda..67c300c 100644 --- a/redis_queue_test.go +++ b/redis_queue_test.go @@ -26,12 +26,12 @@ func TestRedisQueueEnqueue(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) @@ -42,7 +42,8 @@ func TestRedisQueueEnqueue(t *testing.T) { }, h) jobs, err := q.BulkFind([]string{job.ID, "not-exist-id"}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) require.Len(t, jobs, 2) @@ -52,12 +53,13 @@ func TestRedisQueueEnqueue(t *testing.T) { jobs[0].LastError = "hello world" err = q.Enqueue(jobs[0], &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) jobs, err = q.BulkFind([]string{job.ID}, &FindOptions{ - Namespace: "{ns1}", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) require.Len(t, jobs, 1) @@ -66,7 +68,7 @@ func TestRedisQueueEnqueue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -77,14 +79,14 @@ func TestRedisQueueEnqueue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) err = q.Enqueue(job.Delay(time.Minute), &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -108,19 +110,19 @@ func TestRedisQueueDequeue(t *testing.T) { job := NewJob() err := job.MarshalPayload(message{Text: "hello"}) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) now := job.EnqueuedAt.Add(123 * time.Second) jobDequeued, err := q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", At: now, InvisibleSec: 0, }) @@ -129,7 +131,7 @@ func TestRedisQueueDequeue(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -140,8 +142,8 @@ func TestRedisQueueDequeue(t *testing.T) { require.EqualValues(t, job.EnqueuedAt.Unix(), z[0].Score) jobDequeued, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", At: now, InvisibleSec: 60, }) @@ -158,7 +160,7 @@ func TestRedisQueueDequeue(t *testing.T) { z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -170,8 +172,8 @@ func TestRedisQueueDequeue(t *testing.T) { // empty _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", At: now, InvisibleSec: 60, }) @@ -194,12 +196,12 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, err) err = q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID) h, err := client.HGetAll(context.Background(), jobKey).Result() require.NoError(t, err) @@ -212,8 +214,8 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { require.NoError(t, client.Del(context.Background(), jobKey).Err()) _, err = q.Dequeue(&DequeueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", At: job.EnqueuedAt, InvisibleSec: 60, }) @@ -221,7 +223,7 @@ func TestRedisQueueDequeueDeletedJob(t *testing.T) { z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -239,16 +241,16 @@ func TestRedisQueueAck(t *testing.T) { job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) - jobKey := fmt.Sprintf("{ns1}:job:%s", job.ID) + jobKey := fmt.Sprintf("ns1:queue:{q1}:job:%s", job.ID) z, err := client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -263,14 +265,14 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 1, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) z, err = client.ZRangeByScoreWithScores( context.Background(), - "{ns1}:queue:q1", + "ns1:queue:{q1}", &redis.ZRangeBy{ Min: "-inf", Max: "+inf", @@ -283,8 +285,8 @@ func TestRedisQueueAck(t *testing.T) { require.EqualValues(t, 0, e) err = q.Ack(job, &AckOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) } @@ -298,30 +300,30 @@ func TestRedisQueueGetQueueMetrics(t *testing.T) { job := NewJob() err := q.Enqueue(job, &EnqueueOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", }) require.NoError(t, err) m, err := q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", At: job.EnqueuedAt, }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) - require.Equal(t, "q1", m.QueueID) + require.Equal(t, "ns1", m.Namespace) + require.Equal(t, "{q1}", m.QueueID) require.EqualValues(t, 1, m.ReadyTotal) require.EqualValues(t, 0, m.ScheduledTotal) m, err = q.GetQueueMetrics(&QueueMetricsOptions{ - Namespace: "{ns1}", - QueueID: "q1", + Namespace: "ns1", + QueueID: "{q1}", At: job.EnqueuedAt.Add(-time.Second), }) require.NoError(t, err) - require.Equal(t, "{ns1}", m.Namespace) - require.Equal(t, "q1", m.QueueID) + require.Equal(t, "ns1", m.Namespace) + require.Equal(t, "{q1}", m.QueueID) require.EqualValues(t, 0, m.ReadyTotal) require.EqualValues(t, 1, m.ScheduledTotal) }