Skip to content

Commit 6eb055c

Browse files
committed
Added queue polling delay constructor parameter and QueuePollingDelayMilliseconds property
1 parent 63714fa commit 6eb055c

3 files changed

Lines changed: 78 additions & 19 deletions

File tree

TaskExecutor.Tests/TaskExecutorTests.cs

Lines changed: 33 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,10 @@ public class TaskExecutorTests
55
[Fact]
66
public async Task EnqueueTask_ShouldExecuteTasksConcurrently()
77
{
8-
// Arrange
98
var cts = new CancellationTokenSource();
109
var executor = new TaskExecutor(3, cts.Token);
1110
int completedTasks = 0;
1211

13-
// Act
1412
for (int i = 0; i < 10; i++)
1513
{
1614
executor.EnqueueTask(i, async () =>
@@ -20,27 +18,24 @@ public async Task EnqueueTask_ShouldExecuteTasksConcurrently()
2018
});
2119
}
2220

23-
await Task.Delay(500); // Allow time for tasks to start
21+
await Task.Delay(500);
2422

2523
while (executor.HasRunningTasks)
2624
{
2725
await Task.Delay(100);
2826
}
2927
await executor.StopAsync();
3028

31-
// Assert
3229
Assert.Equal(10, completedTasks);
3330
}
3431

3532
[Fact]
3633
public async Task ChangeConcurrency_ShouldAdjustConcurrencyLevel()
3734
{
38-
// Arrange
3935
var cts = new CancellationTokenSource();
4036
var executor = new TaskExecutor(1, cts.Token);
4137
int runningTasks = 0;
4238

43-
// Act
4439
for (int i = 0; i < 5; i++)
4540
{
4641
executor.EnqueueTask(i, async () =>
@@ -51,13 +46,12 @@ public async Task ChangeConcurrency_ShouldAdjustConcurrencyLevel()
5146
});
5247
}
5348

54-
await Task.Delay(100); // Allow initial tasks to start
49+
await Task.Delay(100);
5550
Assert.Equal(1, runningTasks);
5651

5752
executor.ChangeConcurrency(3);
58-
await Task.Delay(100); // Allow new concurrency level to take effect
53+
await Task.Delay(100);
5954

60-
// Assert
6155
Assert.True(runningTasks >= 2);
6256

6357
await executor.StopAsync();
@@ -66,27 +60,23 @@ public async Task ChangeConcurrency_ShouldAdjustConcurrencyLevel()
6660
[Fact]
6761
public async Task OnTaskError_ShouldCaptureTaskExceptions()
6862
{
69-
// Arrange
7063
var cts = new CancellationTokenSource();
7164
var executor = new TaskExecutor(3, cts.Token);
7265
Exception? capturedException = null;
7366

7467
executor.OnTaskError += (id, ex) => capturedException = ex;
7568

76-
// Act
7769
executor.EnqueueTask(1, () => throw new InvalidOperationException("Test exception"));
78-
await Task.Delay(200); // Allow time for the task to fail
70+
await Task.Delay(200);
7971
await executor.StopAsync();
8072

81-
// Assert
8273
Assert.NotNull(capturedException);
8374
Assert.IsType<InvalidOperationException>(capturedException);
8475
}
8576

8677
[Fact]
8778
public async Task StopAsync_ShouldWaitForRunningTasksToComplete()
8879
{
89-
// Arrange
9080
var cts = new CancellationTokenSource();
9181
var executor = new TaskExecutor(3, cts.Token);
9282
bool taskCompleted = false;
@@ -97,11 +87,37 @@ public async Task StopAsync_ShouldWaitForRunningTasksToComplete()
9787
taskCompleted = true;
9888
});
9989

100-
// Act
101-
await Task.Delay(100); // Allow time for the task to start
90+
await Task.Delay(100);
10291
await executor.StopAsync();
10392

104-
// Assert
10593
Assert.True(taskCompleted);
10694
}
95+
96+
[Fact]
97+
public void QueuePollingDelayMilliseconds_Setter_ShouldThrowOnNegative()
98+
{
99+
var executor = new TaskExecutor(1, CancellationToken.None);
100+
Assert.Throws<ArgumentException>(() => executor.QueuePollingDelayMilliseconds = -1);
101+
}
102+
103+
[Fact]
104+
public void Constructor_ShouldThrowOnNegativeQueuePollingDelay()
105+
{
106+
Assert.Throws<ArgumentException>(() => new TaskExecutor(1, -10));
107+
}
108+
109+
[Fact]
110+
public void Constructor_ShouldThrowOnZeroOrNegativeConcurrency()
111+
{
112+
Assert.Throws<ArgumentException>(() => new TaskExecutor(0, CancellationToken.None));
113+
Assert.Throws<ArgumentException>(() => new TaskExecutor(-1, CancellationToken.None));
114+
}
115+
116+
[Fact]
117+
public void ChangeConcurrency_ShouldThrowOnZeroOrNegative()
118+
{
119+
var executor = new TaskExecutor(1, CancellationToken.None);
120+
Assert.Throws<ArgumentException>(() => executor.ChangeConcurrency(0));
121+
Assert.Throws<ArgumentException>(() => executor.ChangeConcurrency(-5));
122+
}
107123
}

TaskExecutor/TaskExecutor.cs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using System.Collections.Concurrent;
2-
using System.Threading.Tasks;
32

43
namespace TaskExecutor;
54

@@ -8,11 +7,13 @@ namespace TaskExecutor;
87
/// </summary>
98
public class TaskExecutor : IDisposable
109
{
10+
private const int DefaultQueuePollingDelayMilliseconds = 50;
1111
private readonly ConcurrentQueue<TaskForExecute> _taskQueue = new();
1212
private readonly CancellationTokenSource _internalCts = new();
1313
private readonly ActiveTaskRegistry _taskRegistry = new();
1414
private readonly CancellationToken _externalCancellationToken;
1515
private readonly SemaphoreSlim _semaphore;
16+
private int _queuePollingDelayMilliseconds;
1617
private int _maxConcurrency;
1718
private bool _disposed;
1819

@@ -33,6 +34,20 @@ public bool HasRunningTasks
3334
}
3435
}
3536

37+
/// <summary>
38+
/// Gets or sets the delay, in milliseconds, between polling attempts for the queue.
39+
/// </summary>
40+
public int QueuePollingDelayMilliseconds
41+
{
42+
get => _queuePollingDelayMilliseconds;
43+
set
44+
{
45+
if (value < 0)
46+
throw new ArgumentException("Queue polling delay must be non-negative");
47+
_queuePollingDelayMilliseconds = value;
48+
}
49+
}
50+
3651
/// <summary>
3752
/// Constructor for the TaskExecutor class.
3853
/// </summary>
@@ -60,6 +75,33 @@ public TaskExecutor(int initialConcurrency, CancellationToken cancellationToken
6075
StartProcessing();
6176
}
6277

78+
/// <summary>
79+
/// Constructor for the TaskExecutor class.
80+
/// </summary>
81+
/// <param name="initialConcurrency">
82+
/// Specifies the initial level of concurrency for the TaskExecutor.
83+
/// This determines how many tasks can run concurrently.
84+
/// Must be greater than 0.
85+
/// </param>
86+
/// <param name="queuePollingDelayMilliseconds">
87+
/// Specifies the delay, in milliseconds, between polling attempts when the task queue is empty.
88+
/// Must be non-negative. A higher value reduces CPU usage but may increase task start latency.
89+
/// </param>
90+
/// </param>
91+
/// <param name="cancellationToken">
92+
/// An optional CancellationToken that can be used to signal cancellation
93+
/// of task processing from an external source.
94+
/// </param>
95+
/// <exception cref="ArgumentException">
96+
/// Thrown when the initialConcurrency is less than or equal to 0.
97+
/// </exception>
98+
public TaskExecutor(int initialConcurrency, int queuePollingDelayMilliseconds = DefaultQueuePollingDelayMilliseconds, CancellationToken cancellationToken = default) : this(initialConcurrency, cancellationToken)
99+
{
100+
if (queuePollingDelayMilliseconds < 0)
101+
throw new ArgumentException("Queue polling delay must be non-negative");
102+
_queuePollingDelayMilliseconds = queuePollingDelayMilliseconds;
103+
}
104+
63105
/// <summary>
64106
/// Enqueues a task for execution.
65107
/// </summary>
@@ -129,7 +171,7 @@ private void StartProcessing()
129171
}
130172
else
131173
{
132-
await Task.Delay(50, _internalCts.Token).ConfigureAwait(false);
174+
await Task.Delay(_queuePollingDelayMilliseconds, _internalCts.Token).ConfigureAwait(false);
133175
}
134176
}
135177
}

release-notes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
2025-05-30 Added queue polling delay constructor parameter and QueuePollingDelayMilliseconds property
12
2025-05-29 Refactored code. Improved stability.
23
2025-04-29 Added .ConfigureAwait(false) to all asynchronous calls in TaskExecutor to improve performance and avoid deadlocks.
34
2025-04-27 Initial version

0 commit comments

Comments
 (0)