Skip to content

Commit 3fe4476

Browse files
committed
Guard completed Orleans commands from retries
1 parent ee75f89 commit 3fe4476

File tree

8 files changed

+516
-120
lines changed

8 files changed

+516
-120
lines changed

ManagedCode.Communication.AspNetCore/Commands/Extensions/CommandIdempotencyExtensions.cs

Lines changed: 38 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -34,57 +34,50 @@ public static async Task<T> ExecuteIdempotentAsync<T>(
3434
CommandMetadata? metadata,
3535
CancellationToken cancellationToken = default)
3636
{
37-
// Fast path: check for existing completed result
38-
var existingResult = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
39-
if (existingResult != null)
37+
while (true)
4038
{
41-
return existingResult;
42-
}
39+
cancellationToken.ThrowIfCancellationRequested();
4340

44-
// Atomically try to claim the command for execution
45-
var (currentStatus, wasSet) = await store.GetAndSetStatusAsync(
46-
commandId,
47-
CommandExecutionStatus.InProgress,
48-
cancellationToken);
41+
var currentStatus = await store.GetCommandStatusAsync(commandId, cancellationToken);
4942

50-
switch (currentStatus)
51-
{
52-
case CommandExecutionStatus.Completed:
53-
// Result exists but might have been evicted, get it again
54-
existingResult = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
55-
return existingResult ?? throw new InvalidOperationException($"Command {commandId} marked as completed but result not found");
56-
57-
case CommandExecutionStatus.InProgress:
58-
case CommandExecutionStatus.Processing:
59-
// Another thread is executing, wait for completion
60-
return await WaitForCompletionAsync<T>(store, commandId, cancellationToken);
61-
62-
case CommandExecutionStatus.Failed:
63-
// Previous execution failed, we can retry (wasSet should be true)
64-
if (!wasSet)
43+
switch (currentStatus)
44+
{
45+
case CommandExecutionStatus.Completed:
6546
{
66-
// Race condition - another thread claimed it
67-
return await WaitForCompletionAsync<T>(store, commandId, cancellationToken);
47+
var cachedResult = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
48+
return cachedResult ?? default!;
6849
}
69-
break;
70-
71-
case CommandExecutionStatus.NotFound:
72-
case CommandExecutionStatus.NotStarted:
73-
default:
74-
// First execution (wasSet should be true)
75-
if (!wasSet)
76-
{
77-
// Race condition - another thread claimed it
50+
51+
case CommandExecutionStatus.InProgress:
52+
case CommandExecutionStatus.Processing:
7853
return await WaitForCompletionAsync<T>(store, commandId, cancellationToken);
54+
55+
case CommandExecutionStatus.NotFound:
56+
case CommandExecutionStatus.NotStarted:
57+
case CommandExecutionStatus.Failed:
58+
default:
59+
{
60+
var claimed = await store.TrySetCommandStatusAsync(
61+
commandId,
62+
currentStatus,
63+
CommandExecutionStatus.InProgress,
64+
cancellationToken);
65+
66+
if (claimed)
67+
{
68+
goto ExecuteOperation;
69+
}
70+
71+
break;
7972
}
80-
break;
73+
}
8174
}
8275

83-
// We successfully claimed the command for execution
76+
ExecuteOperation:
8477
try
8578
{
8679
var result = await operation();
87-
80+
8881
// Store result and mark as completed atomically
8982
await store.SetCommandResultAsync(commandId, result, cancellationToken);
9083
await store.SetCommandStatusAsync(commandId, CommandExecutionStatus.Completed, cancellationToken);
@@ -159,7 +152,7 @@ public static async Task<T> ExecuteIdempotentWithRetryAsync<T>(
159152
if (status == CommandExecutionStatus.Completed)
160153
{
161154
var result = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
162-
return (result != null, result);
155+
return (true, result);
163156
}
164157

165158
return (false, default);
@@ -192,17 +185,18 @@ public static async Task<Dictionary<string, T>> ExecuteBatchIdempotentAsync<T>(
192185
var operationsList = operations.ToList();
193186
var commandIds = operationsList.Select(op => op.commandId).ToList();
194187

195-
// Check for existing results in batch
188+
var existingStatuses = await store.GetMultipleStatusAsync(commandIds, cancellationToken);
196189
var existingResults = await store.GetMultipleResultsAsync<T>(commandIds, cancellationToken);
197190
var results = new Dictionary<string, T>();
198191
var pendingOperations = new List<(string commandId, Func<Task<T>> operation)>();
199192

200193
// Separate completed from pending
201194
foreach (var (commandId, operation) in operationsList)
202195
{
203-
if (existingResults.TryGetValue(commandId, out var existingResult) && existingResult != null)
196+
if (existingStatuses.TryGetValue(commandId, out var status) && status == CommandExecutionStatus.Completed)
204197
{
205-
results[commandId] = existingResult;
198+
existingResults.TryGetValue(commandId, out var existingResult);
199+
results[commandId] = existingResult ?? default!;
206200
}
207201
else
208202
{
@@ -255,7 +249,7 @@ private static async Task<T> WaitForCompletionAsync<T>(
255249
{
256250
case CommandExecutionStatus.Completed:
257251
var result = await store.GetCommandResultAsync<T>(commandId, cancellationToken);
258-
return result ?? throw new InvalidOperationException($"Command {commandId} completed but result not found");
252+
return result ?? default!;
259253

260254
case CommandExecutionStatus.Failed:
261255
throw new InvalidOperationException($"Command {commandId} failed during execution");

ManagedCode.Communication.Orleans/Grains/CommandIdempotencyGrain.cs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,26 @@ public Task<CommandExecutionStatus> GetStatusAsync()
2626

2727
public async Task<bool> TryStartProcessingAsync()
2828
{
29-
// Check if already processing or completed
30-
if (state.State.Status != CommandExecutionStatus.NotFound)
29+
// Reject concurrent executions
30+
if (state.State.Status is CommandExecutionStatus.InProgress or CommandExecutionStatus.Processing)
31+
{
32+
return false;
33+
}
34+
35+
if (state.State.Status is CommandExecutionStatus.Completed)
36+
{
37+
return false;
38+
}
39+
40+
// Allow retries after failures by clearing the previous outcome
41+
if (state.State.Status is CommandExecutionStatus.Failed)
42+
{
43+
state.State.Result = null;
44+
state.State.ErrorMessage = null;
45+
state.State.CompletedAt = null;
46+
state.State.FailedAt = null;
47+
}
48+
else if (state.State.Status is not CommandExecutionStatus.NotFound and not CommandExecutionStatus.NotStarted)
3149
{
3250
return false;
3351
}

ManagedCode.Communication.Orleans/Stores/OrleansCommandIdempotencyStore.cs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,15 @@ public async Task SetCommandStatusAsync(string commandId, CommandExecutionStatus
4141
await grain.MarkFailedAsync("Status set to failed");
4242
break;
4343
case CommandExecutionStatus.Completed:
44-
await grain.MarkCompletedAsync<object?>(null);
44+
var (hasResult, result) = await grain.TryGetResultAsync();
45+
if (hasResult)
46+
{
47+
await grain.MarkCompletedAsync(result);
48+
}
49+
else
50+
{
51+
await grain.MarkCompletedAsync<object?>(default);
52+
}
4553
break;
4654
case CommandExecutionStatus.NotStarted:
4755
case CommandExecutionStatus.NotFound:
@@ -175,7 +183,21 @@ public async Task MarkFailedAsync(Guid commandId, string errorMessage, Cancellat
175183

176184
public async Task<(bool success, TResult? result)> TryGetResultAsync<TResult>(Guid commandId, CancellationToken cancellationToken = default)
177185
{
178-
var result = await GetCommandResultAsync<TResult>(commandId.ToString(), cancellationToken);
179-
return (result != null, result);
186+
var grain = _grainFactory.GetGrain<ICommandIdempotencyGrain>(commandId.ToString());
187+
var status = await grain.GetStatusAsync();
188+
189+
if (status != CommandExecutionStatus.Completed)
190+
{
191+
return (false, default);
192+
}
193+
194+
var (_, result) = await grain.TryGetResultAsync();
195+
196+
if (result is TResult typedResult)
197+
{
198+
return (true, typedResult);
199+
}
200+
201+
return (true, default);
180202
}
181203
}

0 commit comments

Comments
 (0)