Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 7 additions & 16 deletions src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,9 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
}

// Set orchestration ID reuse policy for deduplication support
// Note: This requires the protobuf to support OrchestrationIdReusePolicy field
// If the protobuf doesn't support it yet, this will need to be updated when the protobuf is updated
if (options?.DedupeStatuses != null && options.DedupeStatuses.Count > 0)
{
// Parse and validate all status strings to enum first
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options.DedupeStatuses
ImmutableHashSet<OrchestrationRuntimeStatus> dedupeStatuses = options?.DedupeStatuses is null
? []
: [.. options.DedupeStatuses
.Select(s =>
{
if (!System.Enum.TryParse<OrchestrationRuntimeStatus>(s, ignoreCase: true, out OrchestrationRuntimeStatus status))
Expand All @@ -139,17 +136,11 @@ public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
}

return status;
}).ToImmutableHashSet();

// Convert dedupe statuses to protobuf statuses and create reuse policy
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
P.OrchestrationIdReusePolicy? policy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);
})];

if (policy != null)
{
request.OrchestrationIdReusePolicy = policy;
}
}
// Convert dedupe statuses to protobuf statuses and create reuse policy
IEnumerable<P.OrchestrationStatus> dedupeStatusesProto = dedupeStatuses.Select(s => s.ToGrpcStatus());
request.OrchestrationIdReusePolicy = ProtoUtils.ConvertDedupeStatusesToReusePolicy(dedupeStatusesProto);

using Activity? newActivity = TraceHelper.StartActivityForNewOrchestration(request);

Expand Down
52 changes: 25 additions & 27 deletions src/Client/Grpc/ProtoUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,22 @@ namespace Microsoft.DurableTask.Client.Grpc;
public static class ProtoUtils
{
/// <summary>
/// Gets the terminal orchestration statuses that are commonly used for deduplication.
/// Gets an array of all orchestration statuses.
/// These are the statuses that can be used in OrchestrationIdReusePolicy.
/// </summary>
/// <returns>An immutable array of terminal orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetTerminalStatuses()
/// <returns>An immutable array of all orchestration statuses.</returns>
public static ImmutableArray<P.OrchestrationStatus> GetAllStatuses()
{
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
#pragma warning disable CS0618 // Type or member is obsolete - Canceled is intentionally included for compatibility
// compatibility with what?
return ImmutableArray.Create(
P.OrchestrationStatus.Completed,
P.OrchestrationStatus.Failed,
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled);
P.OrchestrationStatus.Terminated,
P.OrchestrationStatus.Canceled,
P.OrchestrationStatus.Pending,
P.OrchestrationStatus.Running,
P.OrchestrationStatus.Suspended);
#pragma warning restore CS0618
}

Expand All @@ -33,55 +37,49 @@ public static class ProtoUtils
/// with replaceable statuses (statuses that CAN be replaced).
/// </summary>
/// <param name="dedupeStatuses">The orchestration statuses that should NOT be replaced. These are statuses for which an exception should be thrown if an orchestration already exists.</param>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set, or null if all terminal statuses are dedupe statuses.</returns>
/// <returns>An OrchestrationIdReusePolicy with replaceable statuses set.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced.
/// So replaceableStatus = all terminal statuses MINUS dedupeStatuses.
/// So replaceableStatus = all statuses MINUS dedupeStatuses.
/// </remarks>
public static P.OrchestrationIdReusePolicy? ConvertDedupeStatusesToReusePolicy(
public static P.OrchestrationIdReusePolicy ConvertDedupeStatusesToReusePolicy(
IEnumerable<P.OrchestrationStatus>? dedupeStatuses)
{
ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableArray<P.OrchestrationStatus> statuses = GetAllStatuses();
ImmutableHashSet<P.OrchestrationStatus> dedupeStatusSet = dedupeStatuses?.ToImmutableHashSet() ?? ImmutableHashSet<P.OrchestrationStatus>.Empty;

P.OrchestrationIdReusePolicy policy = new();

// Add terminal statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus terminalStatus in terminalStatuses.Where(status => !dedupeStatusSet.Contains(status)))
// Add statuses that are NOT in dedupeStatuses as replaceable
foreach (P.OrchestrationStatus status in statuses.Where(status => !dedupeStatusSet.Contains(status)))
{
policy.ReplaceableStatus.Add(terminalStatus);
policy.ReplaceableStatus.Add(status);
}

// Only return policy if we have replaceable statuses
return policy.ReplaceableStatus.Count > 0 ? policy : null;
return policy;
}

/// <summary>
/// Converts an OrchestrationIdReusePolicy with replaceable statuses to dedupe statuses
/// (statuses that should NOT be replaced).
/// </summary>
/// <param name="policy">The OrchestrationIdReusePolicy containing replaceable statuses.</param>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all terminal statuses are replaceable.</returns>
/// <returns>An array of orchestration statuses that should NOT be replaced, or null if all statuses are replaceable.</returns>
/// <remarks>
/// The policy uses "replaceableStatus" - these are statuses that CAN be replaced.
/// dedupeStatuses are statuses that should NOT be replaced (should throw exception).
/// So dedupeStatuses = all terminal statuses MINUS replaceableStatus.
/// So dedupeStatuses = all statuses MINUS replaceableStatus.
/// </remarks>
public static P.OrchestrationStatus[]? ConvertReusePolicyToDedupeStatuses(
P.OrchestrationIdReusePolicy? policy)
P.OrchestrationIdReusePolicy policy)
{
if (policy == null || policy.ReplaceableStatus.Count == 0)
{
return null;
}

ImmutableArray<P.OrchestrationStatus> terminalStatuses = GetTerminalStatuses();
ImmutableArray<P.OrchestrationStatus> allStatuses = GetAllStatuses();
ImmutableHashSet<P.OrchestrationStatus> replaceableStatusSet = policy.ReplaceableStatus.ToImmutableHashSet();

// Calculate dedupe statuses = terminal statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = terminalStatuses
.Where(terminalStatus => !replaceableStatusSet.Contains(terminalStatus))
// Calculate dedupe statuses = all statuses - replaceable statuses
P.OrchestrationStatus[] dedupeStatuses = allStatuses
.Where(status => !replaceableStatusSet.Contains(status))
.ToArray();

// Only return if there are dedupe statuses
Expand Down
2 changes: 1 addition & 1 deletion src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ async Task WaitForWorkItemClientConnection()
// Convert OrchestrationIdReusePolicy to dedupeStatuses
// The policy uses "replaceableStatus" - these are statuses that CAN be replaced
// dedupeStatuses are statuses that should NOT be replaced (should throw exception)
// So dedupeStatuses = all terminal statuses MINUS replaceableStatus
// So dedupeStatuses = all statuses MINUS replaceableStatus
OrchestrationStatus[]? dedupeStatuses = null;
P.OrchestrationStatus[]? dedupeStatusesProto = ProtoUtils.ConvertReusePolicyToDedupeStatuses(request.OrchestrationIdReusePolicy);
if (dedupeStatusesProto != null)
Expand Down
Loading
Loading