Skip to content

Commit e5ba46d

Browse files
committed
fair queues
1 parent e7d33cb commit e5ba46d

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

Core.TaskProcessor.Tests/ProcessorTests.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public ProcessorTests(ITestOutputHelper output)
1515
_processor = new TaskProcessor(new TaskProcessorOptions
1616
{
1717
Prefix = "{dev}",
18-
MaxWorkers = 1,
18+
MaxWorkers = 2,
1919
Queues = new[] { "q1", "fair_q2", "q3" },
2020
Redis = "localhost:6379,abortConnect=false",
2121
Retries = 3,
@@ -45,7 +45,7 @@ public ProcessorTests(ITestOutputHelper output)
4545
[Fact]
4646
public async Task EnqueueFairness()
4747
{
48-
for (int i = 0; i < 10; i++)
48+
for (int i = 0; i < 100; i++)
4949
{
5050
await _processor.EnqueueTaskAsync("fair_q2", "1001", new TaskData
5151
{
@@ -55,7 +55,7 @@ public async Task EnqueueFairness()
5555

5656
await Task.Delay(500);
5757

58-
for (int i = 0; i < 10; i++)
58+
for (int i = 0; i < 100; i++)
5959
{
6060
await _processor.EnqueueTaskAsync("fair_q2", "1002", new TaskData
6161
{

Core.TaskProcessor/TaskProcessor.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,15 @@ public async Task<long> RetryDeadTasksAsync(string queue, int? retries = null, l
596596
redis.call('hset', '{Prefix("task:")}'..taskId, 'retries', ARGV[2]);
597597
redis.call('hincrby', '{Prefix("batch:")}'..batchId, 'failed', -1)
598598
redis.call('hset', '{Prefix("batch:")}'..batchId, 'state', 'go')
599-
redis.call('lpush', KEYS[1], taskId);
599+
600+
if string.find(KEYS[1], 'fair_') then
601+
local tenant = redis.call('hget', ""{Prefix("task:")}""..taskId, 'tenant');
602+
redis.call('hincrby', KEYS[1]..':fairness', tenant);
603+
redis.call('lpush', KEYS[1]..':'..tenant, taskId);
604+
else
605+
redis.call('lpush', KEYS[1], taskId);
606+
end
607+
600608
redis.call('zrem', KEYS[2], taskId);
601609
redis.call('publish', KEYS[1]..':event', 'fetch');
602610
end;

0 commit comments

Comments
 (0)