Skip to content

Commit c8c73e8

Browse files
committed
fair queues prototype
1 parent 1a94e83 commit c8c73e8

File tree

3 files changed

+127
-27
lines changed

3 files changed

+127
-27
lines changed

Core.TaskProcessor.Tests/ProcessorTests.cs

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,19 @@ public ProcessorTests(ITestOutputHelper output)
1515
_processor = new TaskProcessor(new TaskProcessorOptions
1616
{
1717
Prefix = "{dev}",
18-
MaxWorkers = 4,
19-
Queues = new[] { "q1", "q2", "q3" },
18+
MaxWorkers = 1,
19+
Queues = new[] { "q1", "fair_q2", "q3" },
2020
Redis = "localhost:6379,abortConnect=false",
2121
Retries = 3,
2222
Deadletter = true,
2323
OnTaskStart = info =>
2424
{
25-
_output.WriteLine($"Start: {info.Queue} {info.Topic}");
25+
//_output.WriteLine($"Start: {info.Queue} {info.Topic}");
2626
return Task.CompletedTask;
2727
},
2828
OnTaskEnd = info =>
2929
{
30-
_output.WriteLine($"End: {info.Queue} {info.Topic}");
30+
//_output.WriteLine($"End: {info.Queue} {info.Topic}");
3131
return Task.CompletedTask;
3232
}
3333
})
@@ -37,11 +37,55 @@ public ProcessorTests(ITestOutputHelper output)
3737
//await info.ExtendLockAsync(TimeSpan.FromMinutes(5));
3838
_output.WriteLine($"Process: {info.Queue} {info.Topic}");
3939
//throw new Exception("error");
40-
await Task.Delay(500, info.CancelToken);
40+
await Task.Delay(10, info.CancelToken);
4141
}
4242
};
4343
}
4444

45+
[Fact]
46+
public async Task EnqueueFairness()
47+
{
48+
for (int i = 0; i < 10; i++)
49+
{
50+
await _processor.EnqueueTaskAsync("fair_q2", "1001", new TaskData
51+
{
52+
Topic = "A",
53+
});
54+
}
55+
56+
await Task.Delay(500);
57+
58+
for (int i = 0; i < 10; i++)
59+
{
60+
await _processor.EnqueueTaskAsync("fair_q2", "1002", new TaskData
61+
{
62+
Topic = "B",
63+
});
64+
}
65+
}
66+
67+
[Fact]
68+
public async Task EnqueueNoFairness()
69+
{
70+
for (int i = 0; i < 10; i++)
71+
{
72+
await _processor.EnqueueTaskAsync("q1", "1001", new TaskData
73+
{
74+
Topic = "A",
75+
});
76+
}
77+
78+
await Task.Delay(500);
79+
80+
for (int i = 0; i < 10; i++)
81+
{
82+
await _processor.EnqueueTaskAsync("q1", "1002", new TaskData
83+
{
84+
Topic = "B",
85+
});
86+
}
87+
}
88+
4589
[Fact]
4690
public async Task Enqueue()
4791
{
@@ -97,8 +141,8 @@ public async Task Run()
97141

98142
await _processor.StopAsync();
99143

100-
var batches = await _processor.GetBatchesAsync("1001");
101-
_output.WriteLine(JsonConvert.SerializeObject(batches, Formatting.Indented));
144+
//var batches = await _processor.GetBatchesAsync("1001");
145+
//_output.WriteLine(JsonConvert.SerializeObject(batches, Formatting.Indented));
102146
}
103147

104148
[Fact]

Core.TaskProcessor/TaskProcessor.cs

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
using Cronos;
2+
using StackExchange.Redis;
13
using System.Collections.Concurrent;
24
using System.Linq.Expressions;
5+
using System.Threading.Tasks;
36
using System.Threading.Tasks.Dataflow;
4-
using Cronos;
5-
using StackExchange.Redis;
67

78
namespace Core.TaskProcessor;
89

@@ -128,9 +129,17 @@ public async Task<string> EnqueueBatchAsync(string queue, string tenant, List<Ta
128129
foreach (var q in push)
129130
{
130131
tra.SortedSetAddAsync(Prefix("queues"), q.Key, DateTimeOffset.UtcNow.ToUnixTimeSeconds());
131-
tra.ListLeftPushAsync(Prefix($"queue:{q.Key}"), q.Value.ToArray());
132-
tra.PublishAsync(RedisChannel.Literal(Prefix($"queue:{q.Key}:event")), "fetch");
133132

133+
if (q.Key.StartsWith("fair_"))
134+
{
135+
tra.HashIncrementAsync(Prefix($"queue:{q.Key}:fairness"), tenant, q.Value.Count);
136+
tra.ListLeftPushAsync(Prefix($"queue:{q.Key}:{tenant}"), q.Value.ToArray());
137+
}
138+
else
139+
tra.ListLeftPushAsync(Prefix($"queue:{q.Key}"), q.Value.ToArray());
140+
141+
142+
tra.PublishAsync(RedisChannel.Literal(Prefix($"queue:{q.Key}:event")), "fetch");
134143
}
135144

136145
#pragma warning restore CS4014
@@ -164,7 +173,15 @@ public async Task<string> EnqueueTaskAsync(string queue, string tenant, TaskData
164173
else
165174
{
166175
tra.SortedSetAddAsync(Prefix("queues"), q, DateTimeOffset.UtcNow.ToUnixTimeSeconds());
167-
tra.ListLeftPushAsync(Prefix($"queue:{q}"), taskId);
176+
177+
if (queue.StartsWith("fair_"))
178+
{
179+
tra.HashIncrementAsync(Prefix($"queue:{q}:fairness"), tenant);
180+
tra.ListLeftPushAsync(Prefix($"queue:{q}:{tenant}"), taskId);
181+
}
182+
else
183+
tra.ListLeftPushAsync(Prefix($"queue:{q}"), taskId);
184+
168185
tra.PublishAsync(RedisChannel.Literal(Prefix($"queue:{q}:event")), "fetch");
169186
}
170187

@@ -255,20 +272,50 @@ public async Task<bool> FetchAsync()
255272

256273
var res = await db.ScriptEvaluateAsync($@"
257274
for i, queue in ipairs(KEYS) do
258-
local taskId = redis.call('rpoplpush', queue, queue.."":checkout"");
259-
if taskId then
260-
local invis = redis.call('time')[1] + ARGV[1];
261-
redis.call('zadd', queue.."":pushback"", invis, taskId);
262-
local taskData = redis.call('hgetall', ""{Prefix("task:")}""..taskId);
263-
local batchId = redis.call('hget', ""{Prefix("task:")}""..taskId, ""batch"");
264275
265-
if batchId then
266-
local batchData = redis.call('hgetall', ""{Prefix("batch:")}""..batchId);
267-
return {{queue, taskId, taskData, batchData}};
268-
end;
276+
if string.find(queue, 'fair_') then
277+
local tenant = redis.call('hrandfield', queue.."":fairness"");
278+
279+
if tenant then
280+
local taskId = redis.call('rpoplpush', queue.."":""..tenant, queue.."":checkout"");
281+
282+
if taskId then
283+
local ctr = redis.call('hincrby', queue.."":fairness"", tenant, -1);
284+
285+
if ctr <= 0 then
286+
redis.call('hdel', queue.."":fairness"", tenant);
287+
end;
269288
270-
return {{queue, taskId, taskData}};
289+
local invis = redis.call('time')[1] + ARGV[1];
290+
redis.call('zadd', queue.."":pushback"", invis, taskId);
291+
local taskData = redis.call('hgetall', ""{Prefix("task:")}""..taskId);
292+
local batchId = redis.call('hget', ""{Prefix("task:")}""..taskId, ""batch"");
293+
294+
if batchId then
295+
local batchData = redis.call('hgetall', ""{Prefix("batch:")}""..batchId);
296+
return {{queue, taskId, taskData, batchData}};
297+
end;
298+
299+
return {{queue, taskId, taskData}};
300+
end;
301+
end;
302+
else
303+
local taskId = redis.call('rpoplpush', queue, queue.."":checkout"");
304+
if taskId then
305+
local invis = redis.call('time')[1] + ARGV[1];
306+
redis.call('zadd', queue.."":pushback"", invis, taskId);
307+
local taskData = redis.call('hgetall', ""{Prefix("task:")}""..taskId);
308+
local batchId = redis.call('hget', ""{Prefix("task:")}""..taskId, ""batch"");
309+
310+
if batchId then
311+
local batchData = redis.call('hgetall', ""{Prefix("batch:")}""..batchId);
312+
return {{queue, taskId, taskData, batchData}};
313+
end;
314+
315+
return {{queue, taskId, taskData}};
316+
end;
271317
end;
318+
272319
end
273320
", _queueKeys, new RedisValue[] { (long)_options.Retention.TotalSeconds });
274321

@@ -951,7 +998,15 @@ public async Task<bool> TriggerScheduleAsync(string id)
951998
}, CommandFlags.FireAndForget);
952999

9531000
// enqueue
954-
tra.ListLeftPushAsync(Prefix($"queue:{queue}"), newTaskId, flags: CommandFlags.FireAndForget);
1001+
1002+
if (queue.StartsWith("fair_"))
1003+
{
1004+
tra.HashIncrementAsync(Prefix($"queue:{queue}:fairness"), tenant);
1005+
tra.ListLeftPushAsync(Prefix($"queue:{queue}:{tenant}"), newTaskId, flags: CommandFlags.FireAndForget);
1006+
}
1007+
else
1008+
tra.ListLeftPushAsync(Prefix($"queue:{queue}"), newTaskId, flags: CommandFlags.FireAndForget);
1009+
9551010
tra.SortedSetAddAsync(Prefix("queues"), queue, DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
9561011
CommandFlags.FireAndForget);
9571012
tra.PublishAsync(RedisChannel.Literal(Prefix($"queue:{queue}:event")), "fetch", CommandFlags.FireAndForget);
@@ -1163,7 +1218,8 @@ public async Task<QueueInfo> GetQueueAsync(string name)
11631218
return new QueueInfo
11641219
{
11651220
Name = name,
1166-
Length = await db.ListLengthAsync(Prefix($"queue:{name}")),
1221+
// TODO: global tracking of fair queue length
1222+
Length = name.StartsWith("fair_") ? -1 : await db.ListLengthAsync(Prefix($"queue:{name}")),
11671223
Checkout = await db.ListLengthAsync(Prefix($"queue:{name}:checkout")),
11681224
Pushback = await db.SortedSetLengthAsync(Prefix($"queue:{name}:pushback")),
11691225
Deadletter = await db.SortedSetLengthAsync(Prefix($"queue:{name}:deadletter"))

redis/docker-compose.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ services:
22

33
redis:
44
container_name: redis
5-
image: redis:7-alpine
5+
image: redis:8-alpine
66
restart: unless-stopped
77
ports:
88
- 6379:6379
@@ -12,7 +12,7 @@ services:
1212

1313
valkey:
1414
container_name: valkey
15-
image: valkey/valkey:8.1-alpine
15+
image: valkey/valkey:9-alpine
1616
restart: unless-stopped
1717
ports:
1818
- 6380:6379

0 commit comments

Comments
 (0)