Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 84 additions & 72 deletions src/ServiceControl.Infrastructure/AsyncTimer.cs
Original file line number Diff line number Diff line change
@@ -1,96 +1,108 @@
namespace ServiceControl.Infrastructure.BackgroundTasks
namespace ServiceControl.Infrastructure.BackgroundTasks;

using System;
using System.Threading;
using System.Threading.Tasks;

public enum TimerJobExecutionResult
{
using System;
using System.Threading;
using System.Threading.Tasks;
ScheduleNextExecution,
ExecuteImmediately,
DoNotContinueExecuting
}

public enum TimerJobExecutionResult
public class TimerJob
{
public TimerJob(Func<CancellationToken, Task<TimerJobExecutionResult>> callback, TimeSpan due, TimeSpan interval, Action<Exception> errorCallback)
{
ScheduleNextExecution,
ExecuteImmediately,
DoNotContinueExecuting
}
tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

public class TimerJob
{
public TimerJob(Func<CancellationToken, Task<TimerJobExecutionResult>> callback, TimeSpan due, TimeSpan interval, Action<Exception> errorCallback)
task = Task.Run(async () =>
{
tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

task = Task.Run(async () =>
try
{
try
{
await Task.Delay(due, token).ConfigureAwait(false);
await Task.Delay(due, token).ConfigureAwait(false);

var consecutiveFailures = 0;

while (!token.IsCancellationRequested)
while (!token.IsCancellationRequested)
{
try
{
try
{
var result = await callback(token).ConfigureAwait(false);
if (result == TimerJobExecutionResult.DoNotContinueExecuting)
{
tokenSource.Cancel();
}
else if (result == TimerJobExecutionResult.ScheduleNextExecution)
{
await Task.Delay(interval, token).ConfigureAwait(false);
}

//Otherwise execute immediately
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
var result = await callback(token).ConfigureAwait(false);

consecutiveFailures = 0;
if (result == TimerJobExecutionResult.DoNotContinueExecuting)
{
break;
tokenSource.Cancel();
}
catch (Exception ex)
else if (result == TimerJobExecutionResult.ScheduleNextExecution)
{
errorCallback(ex);
await Task.Delay(interval, token).ConfigureAwait(false);
}

//Otherwise execute immediately
}
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
// no-op
}
}, CancellationToken.None);
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
break;
}
catch (Exception ex)
{
consecutiveFailures++;
const int MaxDelayDurationInSeconds = 60;
var delayInSeconds = consecutiveFailures * 10;
var backoffDelay = TimeSpan.FromSeconds(int.Min(MaxDelayDurationInSeconds, delayInSeconds));

public async Task Stop()
{
if (tokenSource == null)
await Task.Delay(backoffDelay, token).ConfigureAwait(false);

errorCallback(ex);
}
}
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
return;
// no-op
}
}, CancellationToken.None);
}

await tokenSource.CancelAsync().ConfigureAwait(false);
tokenSource.Dispose();
public async Task Stop(CancellationToken cancellationToken)
{
if (tokenSource == null)
{
return;
}

if (task != null)
{
try
{
await task.ConfigureAwait(false);
}
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
{
//NOOP
}
}
await tokenSource.CancelAsync().ConfigureAwait(false);
tokenSource.Dispose();

if (task == null)
{
return;
}

Task task;
CancellationTokenSource tokenSource;
try
{
await task.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException) when (tokenSource.IsCancellationRequested)
{
//NOOP
}
}

public interface IAsyncTimer
{
TimerJob Schedule(Func<CancellationToken, Task<TimerJobExecutionResult>> callback, TimeSpan due, TimeSpan interval, Action<Exception> errorCallback);
}
readonly Task task;
readonly CancellationTokenSource tokenSource;
}

public class AsyncTimer : IAsyncTimer
{
public TimerJob Schedule(Func<CancellationToken, Task<TimerJobExecutionResult>> callback, TimeSpan due, TimeSpan interval, Action<Exception> errorCallback) => new TimerJob(callback, due, interval, errorCallback);
}
public interface IAsyncTimer
{
TimerJob Schedule(Func<CancellationToken, Task<TimerJobExecutionResult>> callback, TimeSpan due, TimeSpan interval, Action<Exception> errorCallback);
}

public class AsyncTimer : IAsyncTimer
{
public TimerJob Schedule(Func<CancellationToken, Task<TimerJobExecutionResult>> callback, TimeSpan due, TimeSpan interval, Action<Exception> errorCallback) => new(callback, due, interval, errorCallback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken) => timer.Stop();
public Task StopAsync(CancellationToken cancellationToken) => timer.Stop(cancellationToken);

TimerJob timer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async Task<TimerJobExecutionResult> Run(CancellationToken cancellationToken)
: TimerJobExecutionResult.DoNotContinueExecuting;
}

public Task Stop() => timer?.Stop() ?? Task.CompletedTask;
public Task Stop() => timer?.Stop(CancellationToken.None) ?? Task.CompletedTask;

TimerJob timer;
readonly ICustomCheck check;
Expand Down
2 changes: 1 addition & 1 deletion src/ServiceControl/Licensing/LicenseCheckHostedService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken) => timer.Stop();
public Task StopAsync(CancellationToken cancellationToken) => timer.Stop(cancellationToken);

TimerJob timer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
timer = scheduler.Schedule(_ => CheckEndpoints(), TimeSpan.Zero, TimeSpan.FromSeconds(5), e => { log.Error("Exception occurred when monitoring endpoint instances", e); });
}

public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await timer.Stop();
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
//NOOP, invoked Stop does not
}
}
public Task StopAsync(CancellationToken cancellationToken) => timer.Stop(cancellationToken);

async Task<TimerJobExecutionResult> CheckEndpoints()
{
Expand Down
15 changes: 3 additions & 12 deletions src/ServiceControl/Recoverability/RecoverabilityComponent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,10 +174,7 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return timer?.Stop() ?? Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken) => timer?.Stop(cancellationToken) ?? Task.CompletedTask;

async Task<TimerJobExecutionResult> ProcessRequestedBulkRetryOperations()
{
Expand Down Expand Up @@ -237,10 +234,7 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return timer.Stop();
}
public Task StopAsync(CancellationToken cancellationToken) => timer.Stop(cancellationToken);

TimerJob timer;
readonly IAsyncTimer scheduler;
Expand All @@ -266,10 +260,7 @@ public Task StartAsync(CancellationToken cancellationToken)
return Task.CompletedTask;
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await timer.Stop();
}
public Task StopAsync(CancellationToken cancellationToken) => timer.Stop(cancellationToken);

async Task<TimerJobExecutionResult> Process(CancellationToken cancellationToken)
{
Expand Down