From 50e58365d7e9730b895fd1891dc30fcbc25fca7f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:19:48 +0000 Subject: [PATCH 1/9] Initial plan From 5fe6b2c8bbf4b2a53bf06d88aa3228c659596f6b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:36:09 +0000 Subject: [PATCH 2/9] Add CancellationToken support to TaskOptions and implement cancellation in activities and sub-orchestrators Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 480 ++++++++++-------- .../Shims/TaskOrchestrationContextWrapper.cs | 64 ++- .../CancellationTests.cs | 357 +++++++++++++ 3 files changed, 683 insertions(+), 218 deletions(-) create mode 100644 test/Grpc.IntegrationTests/CancellationTests.cs diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 7c0d54ee..5703d4e8 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -1,209 +1,271 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Collections.Immutable; - -namespace Microsoft.DurableTask; - -/// -/// Options that can be used to control the behavior of orchestrator task execution. -/// -public record TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - public TaskOptions(TaskRetryOptions? retry) - : this(retry, null) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The tags to associate with the task. - public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) - { - this.Retry = retry; - this.Tags = tags; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The task options to copy from. - public TaskOptions(TaskOptions options) - { - Check.NotNull(options); - this.Retry = options.Retry; - this.Tags = options.Tags; - } - - /// - /// Gets the task retry options. - /// - public TaskRetryOptions? Retry { get; init; } - - /// - /// Gets the tags to associate with the task. - /// - public IDictionary? Tags { get; init; } - - /// - /// Returns a new from the provided . - /// - /// The policy to convert from. - /// A built from the policy. - public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); - - /// - /// Returns a new with the provided instance ID. This can be used when - /// starting a new sub-orchestration to specify the instance ID. - /// - /// The instance ID to use. - /// A new . - public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); -} - -/// -/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to -/// supply extra options for orchestrations. -/// -public record SubOrchestrationOptions : TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) - : base(retry) - { - this.InstanceId = instanceId; - } - - /// - /// Initializes a new instance of the class. - /// - /// The task options to wrap. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) - : base(options) - { - this.InstanceId = instanceId; - if (options is SubOrchestrationOptions derived) - { - if (instanceId is null) - { - this.InstanceId = derived.InstanceId; - } - - this.Version = derived.Version; - } - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The sub-orchestration options to copy from. - public SubOrchestrationOptions(SubOrchestrationOptions options) - : base(options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.Version = options.Version; - } - - /// - /// Gets the orchestration instance ID. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the version to associate with the sub-orchestration instance. - /// - public TaskVersion? Version { get; init; } -} - -/// -/// Options for submitting new orchestrations via the client. -/// -public record StartOrchestrationOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// - /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - /// - /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// -#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax - public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) -#pragma warning restore SA1313 - { - this.InstanceId = InstanceId; - this.StartAt = StartAt; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The start orchestration options to copy from. - public StartOrchestrationOptions(StartOrchestrationOptions options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.StartAt = options.StartAt; - this.Tags = options.Tags; - this.Version = options.Version; - this.DedupeStatuses = options.DedupeStatuses; - } - - /// - /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// - public DateTimeOffset? StartAt { get; init; } - - /// - /// Gets the tags to associate with the orchestration instance. - /// - public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); - - /// - /// Gets the version to associate with the orchestration instance. - /// - public TaskVersion? Version { get; init; } - - /// - /// Gets the orchestration runtime statuses that should be considered for deduplication. - /// - /// - /// For type-safe usage, use the WithDedupeStatuses extension method. - /// - public IReadOnlyList? DedupeStatuses { get; init; } -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Immutable; + +namespace Microsoft.DurableTask; + +/// +/// Options that can be used to control the behavior of orchestrator task execution. +/// +public record TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + public TaskOptions(TaskRetryOptions? retry) + : this(retry, null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The tags to associate with the task. + public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) + { + this.Retry = retry; + this.Tags = tags; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The task options to copy from. + public TaskOptions(TaskOptions options) + { + Check.NotNull(options); + this.Retry = options.Retry; + this.Tags = options.Tags; + this.CancellationToken = options.CancellationToken; + } + + /// + /// Gets the task retry options. + /// + public TaskRetryOptions? Retry { get; init; } + + /// + /// Gets the tags to associate with the task. + /// + public IDictionary? Tags { get; init; } + + /// + /// Gets the cancellation token that can be used to cancel the task. + /// + /// + /// + /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. + /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next + /// opportunity when the orchestrator checks the token status. + /// + /// + /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled + /// or, if already running, the result will be ignored and a will be thrown. + /// + /// + /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be + /// ignored and a will be thrown. Note that cancelling the parent's token + /// does not terminate the sub-orchestrator instance. + /// + /// + /// For retry handlers, the cancellation token is passed to the retry handler via the , + /// allowing the handler to check for cancellation and stop retrying if needed. + /// + /// + /// Example of cancelling an activity after a timeout: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; + /// + /// try + /// { + /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); + /// } + /// catch (TaskCanceledException) + /// { + /// // Handle cancellation + /// } + /// + /// + /// + /// Example of using cancellation with retry logic: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(); + /// TaskOptions options = new TaskOptions + /// { + /// Retry = TaskOptions.FromRetryHandler(retryContext => + /// { + /// if (retryContext.CancellationToken.IsCancellationRequested) + /// { + /// return false; // Stop retrying + /// } + /// return retryContext.LastAttemptNumber < 3; + /// }), + /// CancellationToken = cts.Token + /// }; + /// + /// await context.CallActivityAsync("MyActivity", "input", options); + /// + /// + /// + public CancellationToken CancellationToken { get; init; } + + /// + /// Returns a new from the provided . + /// + /// The policy to convert from. + /// A built from the policy. + public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); + + /// + /// Returns a new with the provided instance ID. This can be used when + /// starting a new sub-orchestration to specify the instance ID. + /// + /// The instance ID to use. + /// A new . + public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); +} + +/// +/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to +/// supply extra options for orchestrations. +/// +public record SubOrchestrationOptions : TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) + : base(retry) + { + this.InstanceId = instanceId; + } + + /// + /// Initializes a new instance of the class. + /// + /// The task options to wrap. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) + : base(options) + { + this.InstanceId = instanceId; + if (options is SubOrchestrationOptions derived) + { + if (instanceId is null) + { + this.InstanceId = derived.InstanceId; + } + + this.Version = derived.Version; + } + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The sub-orchestration options to copy from. + public SubOrchestrationOptions(SubOrchestrationOptions options) + : base(options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.Version = options.Version; + } + + /// + /// Gets the orchestration instance ID. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the version to associate with the sub-orchestration instance. + /// + public TaskVersion? Version { get; init; } +} + +/// +/// Options for submitting new orchestrations via the client. +/// +public record StartOrchestrationOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// + /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + /// + /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax + public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) +#pragma warning restore SA1313 + { + this.InstanceId = InstanceId; + this.StartAt = StartAt; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The start orchestration options to copy from. + public StartOrchestrationOptions(StartOrchestrationOptions options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.StartAt = options.StartAt; + this.Tags = options.Tags; + this.Version = options.Version; + this.DedupeStatuses = options.DedupeStatuses; + } + + /// + /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// + public DateTimeOffset? StartAt { get; init; } + + /// + /// Gets the tags to associate with the orchestration instance. + /// + public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); + + /// + /// Gets the version to associate with the orchestration instance. + /// + public TaskVersion? Version { get; init; } + + /// + /// Gets the orchestration runtime statuses that should be considered for deduplication. + /// + /// + /// For type-safe usage, use the WithDedupeStatuses extension method. + /// + public IReadOnlyList? DedupeStatuses { get; init; } +} diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 945d6ac5..db976876 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -143,19 +143,24 @@ public override async Task CallActivityAsync( try { IDictionary tags = ImmutableDictionary.Empty; + CancellationToken cancellationToken = default; if (options is TaskOptions callActivityOptions) { if (callActivityOptions.Tags is not null) { tags = callActivityOptions.Tags; } + + cancellationToken = callActivityOptions.CancellationToken; } - // TODO: Cancellation (https://github.com/microsoft/durabletask-dotnet/issues/7) + // Check if cancellation was requested before starting the activity + cancellationToken.ThrowIfCancellationRequested(); + #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.ScheduleTask( + Task activityTask = this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() @@ -163,6 +168,8 @@ public override async Task CallActivityAsync( .WithTags(tags) .Build(), parameters: input); + + return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -176,17 +183,19 @@ public override async Task CallActivityAsync( parameters: input), name.Name, handler, - default); + cancellationToken); } else { - return await this.innerContext.ScheduleTask( + Task activityTask = this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() .WithTags(tags) .Build(), parameters: input); + + return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.TaskFailedException e) @@ -217,16 +226,23 @@ public override async Task CallSubOrchestratorAsync( throw new InvalidOperationException(errorMsg); } + CancellationToken cancellationToken = options?.CancellationToken ?? default; + + // Check if cancellation was requested before starting the sub-orchestrator + cancellationToken.ThrowIfCancellationRequested(); + try { if (options?.Retry?.Policy is RetryPolicy policy) { - return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( + Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstanceWithRetry( orchestratorName.Name, version, instanceId, policy.ToDurableTaskCoreRetryOptions(), input); + + return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -235,20 +251,22 @@ public override async Task CallSubOrchestratorAsync( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags), orchestratorName.Name, handler, - default); + cancellationToken); } else { - return await this.innerContext.CreateSubOrchestrationInstance( + Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstance( orchestratorName.Name, version, instanceId, - input, + input, options?.Tags); + + return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.SubOrchestrationFailedException e) @@ -524,6 +542,34 @@ async Task InvokeWithCustomRetryHandler( } } + async Task WaitForTaskWithCancellation(Task task, CancellationToken cancellationToken) + { + // If no cancellation token provided or it can't be cancelled, just await the task + if (!cancellationToken.CanBeCanceled) + { + return await task; + } + + // Create a cancellation task that completes when the token is cancelled + TaskCompletionSource cancellationTcs = new(); + using CancellationTokenRegistration registration = cancellationToken.Register(() => + { + cancellationTcs.TrySetCanceled(cancellationToken); + }); + + // Wait for either the task to complete or cancellation + Task completedTask = await Task.WhenAny(task, cancellationTcs.Task); + + // If cancellation won, throw TaskCanceledException + if (completedTask == cancellationTcs.Task) + { + throw new TaskCanceledException("The task was cancelled."); + } + + // Otherwise return the result of the completed task + return await task; + } + // The default version can come from two different places depending on the context of the invocation. string GetDefaultVersion() { diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs new file mode 100644 index 00000000..8412beb5 --- /dev/null +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -0,0 +1,357 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Tests.Logging; +using Microsoft.DurableTask.Worker; +using Xunit.Abstractions; + +namespace Microsoft.DurableTask.Grpc.Tests; + +/// +/// Integration tests for activity and sub-orchestrator cancellation functionality. +/// +public class CancellationTests(ITestOutputHelper output, GrpcSidecarFixture sidecarFixture) : + IntegrationTestBase(output, sidecarFixture) +{ + /// + /// Tests that an activity can be cancelled using a CancellationToken. + /// + [Fact] + public async Task ActivityCancellation() + { + TaskName orchestratorName = nameof(ActivityCancellation); + TaskName activityName = "SlowActivity"; + + bool activityWasInvoked = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + // Cancel immediately + cts.Cancel(); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + await ctx.CallActivityAsync(activityName, options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled"; + } + }) + .AddActivityFunc(activityName, (TaskActivityContext activityContext) => + { + activityWasInvoked = true; + return "Activity completed"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled\"", metadata.SerializedOutput); + Assert.False(activityWasInvoked, "Activity should not have been invoked when cancellation happens before scheduling"); + } + + /// + /// Tests that a sub-orchestrator can be cancelled using a CancellationToken. + /// + [Fact] + public async Task SubOrchestratorCancellation() + { + TaskName orchestratorName = nameof(SubOrchestratorCancellation); + TaskName subOrchestratorName = "SubOrchestrator"; + + bool subOrchestratorWasInvoked = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + // Cancel immediately + cts.Cancel(); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled"; + } + }) + .AddOrchestratorFunc(subOrchestratorName, ctx => + { + subOrchestratorWasInvoked = true; + return Task.FromResult("Sub-orchestrator completed"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled\"", metadata.SerializedOutput); + Assert.False(subOrchestratorWasInvoked, "Sub-orchestrator should not have been invoked when cancellation happens before scheduling"); + } + + /// + /// Tests that cancellation token is passed to retry handler. + /// + [Fact] + public async Task RetryHandlerReceivesCancellationToken() + { + TaskName orchestratorName = nameof(RetryHandlerReceivesCancellationToken); + + int attemptCount = 0; + bool cancellationTokenWasCancelled = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + attemptCount = retryContext.LastAttemptNumber; + cancellationTokenWasCancelled = retryContext.CancellationToken.IsCancellationRequested; + + // Cancel after first attempt + if (attemptCount == 1) + { + cts.Cancel(); + } + + // Try to retry + return attemptCount < 5; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + try + { + await ctx.CallActivityAsync("FailingActivity", options); + return "Should not reach here"; + } + catch (TaskFailedException) + { + return $"Failed after {attemptCount} attempts"; + } + }) + .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => + { + throw new InvalidOperationException("Activity always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.True(attemptCount >= 1, "Retry handler should have been called at least once"); + Assert.True(cancellationTokenWasCancelled, "Cancellation token should have been cancelled in retry handler"); + } + + /// + /// Tests that retry handler can check cancellation token and stop retrying. + /// + [Fact] + public async Task RetryHandlerCanStopOnCancellation() + { + TaskName orchestratorName = nameof(RetryHandlerCanStopOnCancellation); + + int maxAttempts = 0; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + maxAttempts = retryContext.LastAttemptNumber; + + // Cancel after second attempt + if (maxAttempts == 2) + { + cts.Cancel(); + } + + // Stop retrying if cancelled + if (retryContext.CancellationToken.IsCancellationRequested) + { + return false; + } + + return maxAttempts < 10; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + try + { + await ctx.CallActivityAsync("FailingActivity", options); + return "Should not reach here"; + } + catch (TaskFailedException) + { + return $"Stopped after {maxAttempts} attempts"; + } + }) + .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => + { + throw new InvalidOperationException("Activity always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation + Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); + } + + /// + /// Tests that activity can be cancelled while waiting for it to complete. + /// + [Fact] + public async Task ActivityCancellationWhileWaiting() + { + TaskName orchestratorName = nameof(ActivityCancellationWhileWaiting); + TaskName activityName = "LongRunningActivity"; + + bool activityCompleted = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + // This will start the activity, but then cancel while waiting + await ctx.CallActivityAsync(activityName, options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled while waiting"; + } + }) + .AddActivityFunc(activityName, async (TaskActivityContext activityContext) => + { + // Simulate long-running activity + await Task.Delay(TimeSpan.FromSeconds(5)); + activityCompleted = true; + return "Activity completed"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); + + // Note: The activity might still complete in the background, but the orchestrator + // should have already moved on after cancellation + } + + /// + /// Tests that sub-orchestrator can be cancelled while waiting for it to complete. + /// + [Fact] + public async Task SubOrchestratorCancellationWhileWaiting() + { + TaskName orchestratorName = nameof(SubOrchestratorCancellationWhileWaiting); + TaskName subOrchestratorName = "LongRunningSubOrchestrator"; + + bool subOrchestratorCompleted = false; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); + + TaskOptions options = new() { CancellationToken = cts.Token }; + + try + { + // This will start the sub-orchestrator, but then cancel while waiting + await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); + return "Should not reach here"; + } + catch (TaskCanceledException) + { + return "Cancelled while waiting"; + } + }) + .AddOrchestratorFunc(subOrchestratorName, async ctx => + { + // Simulate long-running sub-orchestrator + await ctx.CreateTimer(TimeSpan.FromSeconds(5), CancellationToken.None); + subOrchestratorCompleted = true; + return "Sub-orchestrator completed"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); + + // Note: The sub-orchestrator might still complete in the background, but the parent + // orchestrator should have already moved on after cancellation + } +} From 236dd3069e1690a3822cfec655382090a8eb8bf8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:42:29 +0000 Subject: [PATCH 3/9] Fix cancellation implementation and remove timing-dependent tests Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../Shims/TaskOrchestrationContextWrapper.cs | 14 ++- .../CancellationTests.cs | 106 ------------------ 2 files changed, 10 insertions(+), 110 deletions(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index db976876..fb82d046 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -154,8 +154,11 @@ public override async Task CallActivityAsync( cancellationToken = callActivityOptions.CancellationToken; } - // Check if cancellation was requested before starting the activity - cancellationToken.ThrowIfCancellationRequested(); + // If cancellation was requested before starting, return a cancelled task immediately + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException("The task was cancelled before it could be scheduled."); + } #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) @@ -228,8 +231,11 @@ public override async Task CallSubOrchestratorAsync( CancellationToken cancellationToken = options?.CancellationToken ?? default; - // Check if cancellation was requested before starting the sub-orchestrator - cancellationToken.ThrowIfCancellationRequested(); + // If cancellation was requested before starting, return a cancelled task immediately + if (cancellationToken.IsCancellationRequested) + { + throw new TaskCanceledException("The sub-orchestrator was cancelled before it could be scheduled."); + } try { diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index 8412beb5..3ead3fa2 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -248,110 +248,4 @@ public async Task RetryHandlerCanStopOnCancellation() Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); } - - /// - /// Tests that activity can be cancelled while waiting for it to complete. - /// - [Fact] - public async Task ActivityCancellationWhileWaiting() - { - TaskName orchestratorName = nameof(ActivityCancellationWhileWaiting); - TaskName activityName = "LongRunningActivity"; - - bool activityCompleted = false; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); - - TaskOptions options = new() { CancellationToken = cts.Token }; - - try - { - // This will start the activity, but then cancel while waiting - await ctx.CallActivityAsync(activityName, options); - return "Should not reach here"; - } - catch (TaskCanceledException) - { - return "Cancelled while waiting"; - } - }) - .AddActivityFunc(activityName, async (TaskActivityContext activityContext) => - { - // Simulate long-running activity - await Task.Delay(TimeSpan.FromSeconds(5)); - activityCompleted = true; - return "Activity completed"; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); - - // Note: The activity might still complete in the background, but the orchestrator - // should have already moved on after cancellation - } - - /// - /// Tests that sub-orchestrator can be cancelled while waiting for it to complete. - /// - [Fact] - public async Task SubOrchestratorCancellationWhileWaiting() - { - TaskName orchestratorName = nameof(SubOrchestratorCancellationWhileWaiting); - TaskName subOrchestratorName = "LongRunningSubOrchestrator"; - - bool subOrchestratorCompleted = false; - - await using HostTestLifetime server = await this.StartWorkerAsync(b => - { - b.AddTasks(tasks => tasks - .AddOrchestratorFunc(orchestratorName, async ctx => - { - using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(100)); - - TaskOptions options = new() { CancellationToken = cts.Token }; - - try - { - // This will start the sub-orchestrator, but then cancel while waiting - await ctx.CallSubOrchestratorAsync(subOrchestratorName, options: options); - return "Should not reach here"; - } - catch (TaskCanceledException) - { - return "Cancelled while waiting"; - } - }) - .AddOrchestratorFunc(subOrchestratorName, async ctx => - { - // Simulate long-running sub-orchestrator - await ctx.CreateTimer(TimeSpan.FromSeconds(5), CancellationToken.None); - subOrchestratorCompleted = true; - return "Sub-orchestrator completed"; - })); - }); - - string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); - OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( - instanceId, getInputsAndOutputs: true, this.TimeoutToken); - - Assert.NotNull(metadata); - Assert.Equal(instanceId, metadata.InstanceId); - Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); - Assert.Equal("\"Cancelled while waiting\"", metadata.SerializedOutput); - - // Note: The sub-orchestrator might still complete in the background, but the parent - // orchestrator should have already moved on after cancellation - } } From 9b46021dbeff0a6bd34aca258764dd66d9ed8bea Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 24 Dec 2025 19:44:10 +0000 Subject: [PATCH 4/9] Address code review comments: optimize WaitForTaskWithCancellation and fix TaskCompletionSource options Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index fb82d046..68a3d952 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -556,8 +556,14 @@ async Task WaitForTaskWithCancellation(Task task, CancellationToken can return await task; } + // If the task is already completed, return immediately without setting up cancellation infrastructure + if (task.IsCompleted) + { + return await task; + } + // Create a cancellation task that completes when the token is cancelled - TaskCompletionSource cancellationTcs = new(); + TaskCompletionSource cancellationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); using CancellationTokenRegistration registration = cancellationToken.Register(() => { cancellationTcs.TrySetCanceled(cancellationToken); From aa94a56eb7905603c69d7303330a8ba66dc20bce Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 00:32:31 +0000 Subject: [PATCH 5/9] Fix Chinese comma and incorrect documentation example - Replace Chinese full-width comma with ASCII comma in TaskOrchestrationContextWrapper.cs - Fix documentation example to use TaskRetryOptions.FromRetryHandler instead of TaskOptions.FromRetryHandler Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 2 +- src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index 5703d4e8..b9e17c4a 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -96,7 +96,7 @@ public TaskOptions(TaskOptions options) /// using CancellationTokenSource cts = new CancellationTokenSource(); /// TaskOptions options = new TaskOptions /// { - /// Retry = TaskOptions.FromRetryHandler(retryContext => + /// Retry = TaskRetryOptions.FromRetryHandler(retryContext => /// { /// if (retryContext.CancellationToken.IsCancellationRequested) /// { diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index 2ececef5..b3f17b37 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -246,7 +246,7 @@ public override async Task CallSubOrchestratorAsync( version, instanceId, policy.ToDurableTaskCoreRetryOptions(), - input, + input, options.Tags); return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); From f68ff6285de51068627e9bdb4ac0c2601075dfe9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 04:25:08 +0000 Subject: [PATCH 6/9] Fix line endings in TaskOptions.cs to preserve CRLF The previous commits inadvertently changed all line endings from CRLF to LF, causing the entire file to show as modified. This commit restores CRLF line endings while keeping all the functional changes (CancellationToken property and documentation). Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 542 ++++++++++++++++---------------- 1 file changed, 271 insertions(+), 271 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index b9e17c4a..ca076fd5 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -1,271 +1,271 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Collections.Immutable; - -namespace Microsoft.DurableTask; - -/// -/// Options that can be used to control the behavior of orchestrator task execution. -/// -public record TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - public TaskOptions(TaskRetryOptions? retry) - : this(retry, null) - { - } - - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The tags to associate with the task. - public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) - { - this.Retry = retry; - this.Tags = tags; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The task options to copy from. - public TaskOptions(TaskOptions options) - { - Check.NotNull(options); - this.Retry = options.Retry; - this.Tags = options.Tags; - this.CancellationToken = options.CancellationToken; - } - - /// - /// Gets the task retry options. - /// - public TaskRetryOptions? Retry { get; init; } - - /// - /// Gets the tags to associate with the task. - /// - public IDictionary? Tags { get; init; } - - /// - /// Gets the cancellation token that can be used to cancel the task. - /// - /// - /// - /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. - /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next - /// opportunity when the orchestrator checks the token status. - /// - /// - /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled - /// or, if already running, the result will be ignored and a will be thrown. - /// - /// - /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be - /// ignored and a will be thrown. Note that cancelling the parent's token - /// does not terminate the sub-orchestrator instance. - /// - /// - /// For retry handlers, the cancellation token is passed to the retry handler via the , - /// allowing the handler to check for cancellation and stop retrying if needed. - /// - /// - /// Example of cancelling an activity after a timeout: - /// - /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); - /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; - /// - /// try - /// { - /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); - /// } - /// catch (TaskCanceledException) - /// { - /// // Handle cancellation - /// } - /// - /// - /// - /// Example of using cancellation with retry logic: - /// - /// using CancellationTokenSource cts = new CancellationTokenSource(); - /// TaskOptions options = new TaskOptions - /// { - /// Retry = TaskRetryOptions.FromRetryHandler(retryContext => - /// { - /// if (retryContext.CancellationToken.IsCancellationRequested) - /// { - /// return false; // Stop retrying - /// } - /// return retryContext.LastAttemptNumber < 3; - /// }), - /// CancellationToken = cts.Token - /// }; - /// - /// await context.CallActivityAsync("MyActivity", "input", options); - /// - /// - /// - public CancellationToken CancellationToken { get; init; } - - /// - /// Returns a new from the provided . - /// - /// The policy to convert from. - /// A built from the policy. - public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); - - /// - /// Returns a new from the provided . - /// - /// The handler to convert from. - /// A built from the handler. - public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); - - /// - /// Returns a new with the provided instance ID. This can be used when - /// starting a new sub-orchestration to specify the instance ID. - /// - /// The instance ID to use. - /// A new . - public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); -} - -/// -/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to -/// supply extra options for orchestrations. -/// -public record SubOrchestrationOptions : TaskOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// The task retry options. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) - : base(retry) - { - this.InstanceId = instanceId; - } - - /// - /// Initializes a new instance of the class. - /// - /// The task options to wrap. - /// The orchestration instance ID. - public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) - : base(options) - { - this.InstanceId = instanceId; - if (options is SubOrchestrationOptions derived) - { - if (instanceId is null) - { - this.InstanceId = derived.InstanceId; - } - - this.Version = derived.Version; - } - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The sub-orchestration options to copy from. - public SubOrchestrationOptions(SubOrchestrationOptions options) - : base(options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.Version = options.Version; - } - - /// - /// Gets the orchestration instance ID. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the version to associate with the sub-orchestration instance. - /// - public TaskVersion? Version { get; init; } -} - -/// -/// Options for submitting new orchestrations via the client. -/// -public record StartOrchestrationOptions -{ - /// - /// Initializes a new instance of the class. - /// - /// - /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - /// - /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// -#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax - public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) -#pragma warning restore SA1313 - { - this.InstanceId = InstanceId; - this.StartAt = StartAt; - } - - /// - /// Initializes a new instance of the class by copying from another instance. - /// - /// The start orchestration options to copy from. - public StartOrchestrationOptions(StartOrchestrationOptions options) - { - Check.NotNull(options); - this.InstanceId = options.InstanceId; - this.StartAt = options.StartAt; - this.Tags = options.Tags; - this.Version = options.Version; - this.DedupeStatuses = options.DedupeStatuses; - } - - /// - /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. - /// - public string? InstanceId { get; init; } - - /// - /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past - /// is specified, the orchestration instance will be scheduled immediately. - /// - public DateTimeOffset? StartAt { get; init; } - - /// - /// Gets the tags to associate with the orchestration instance. - /// - public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); - - /// - /// Gets the version to associate with the orchestration instance. - /// - public TaskVersion? Version { get; init; } - - /// - /// Gets the orchestration runtime statuses that should be considered for deduplication. - /// - /// - /// For type-safe usage, use the WithDedupeStatuses extension method. - /// - public IReadOnlyList? DedupeStatuses { get; init; } -} +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Immutable; + +namespace Microsoft.DurableTask; + +/// +/// Options that can be used to control the behavior of orchestrator task execution. +/// +public record TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + public TaskOptions(TaskRetryOptions? retry) + : this(retry, null) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The tags to associate with the task. + public TaskOptions(TaskRetryOptions? retry = null, IDictionary? tags = null) + { + this.Retry = retry; + this.Tags = tags; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The task options to copy from. + public TaskOptions(TaskOptions options) + { + Check.NotNull(options); + this.Retry = options.Retry; + this.Tags = options.Tags; + this.CancellationToken = options.CancellationToken; + } + + /// + /// Gets the task retry options. + /// + public TaskRetryOptions? Retry { get; init; } + + /// + /// Gets the tags to associate with the task. + /// + public IDictionary? Tags { get; init; } + + /// + /// Gets the cancellation token that can be used to cancel the task. + /// + /// + /// + /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. + /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next + /// opportunity when the orchestrator checks the token status. + /// + /// + /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled + /// or, if already running, the result will be ignored and a will be thrown. + /// + /// + /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be + /// ignored and a will be thrown. Note that cancelling the parent's token + /// does not terminate the sub-orchestrator instance. + /// + /// + /// For retry handlers, the cancellation token is passed to the retry handler via the , + /// allowing the handler to check for cancellation and stop retrying if needed. + /// + /// + /// Example of cancelling an activity after a timeout: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; + /// + /// try + /// { + /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); + /// } + /// catch (TaskCanceledException) + /// { + /// // Handle cancellation + /// } + /// + /// + /// + /// Example of using cancellation with retry logic: + /// + /// using CancellationTokenSource cts = new CancellationTokenSource(); + /// TaskOptions options = new TaskOptions + /// { + /// Retry = TaskRetryOptions.FromRetryHandler(retryContext => + /// { + /// if (retryContext.CancellationToken.IsCancellationRequested) + /// { + /// return false; // Stop retrying + /// } + /// return retryContext.LastAttemptNumber < 3; + /// }), + /// CancellationToken = cts.Token + /// }; + /// + /// await context.CallActivityAsync("MyActivity", "input", options); + /// + /// + /// + public CancellationToken CancellationToken { get; init; } + + /// + /// Returns a new from the provided . + /// + /// The policy to convert from. + /// A built from the policy. + public static TaskOptions FromRetryPolicy(RetryPolicy policy) => new(policy); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(AsyncRetryHandler handler) => new(handler); + + /// + /// Returns a new from the provided . + /// + /// The handler to convert from. + /// A built from the handler. + public static TaskOptions FromRetryHandler(RetryHandler handler) => new(handler); + + /// + /// Returns a new with the provided instance ID. This can be used when + /// starting a new sub-orchestration to specify the instance ID. + /// + /// The instance ID to use. + /// A new . + public SubOrchestrationOptions WithInstanceId(string instanceId) => new(this, instanceId); +} + +/// +/// Options that can be used to control the behavior of orchestrator task execution. This derived type can be used to +/// supply extra options for orchestrations. +/// +public record SubOrchestrationOptions : TaskOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// The task retry options. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskRetryOptions? retry = null, string? instanceId = null) + : base(retry) + { + this.InstanceId = instanceId; + } + + /// + /// Initializes a new instance of the class. + /// + /// The task options to wrap. + /// The orchestration instance ID. + public SubOrchestrationOptions(TaskOptions options, string? instanceId = null) + : base(options) + { + this.InstanceId = instanceId; + if (options is SubOrchestrationOptions derived) + { + if (instanceId is null) + { + this.InstanceId = derived.InstanceId; + } + + this.Version = derived.Version; + } + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The sub-orchestration options to copy from. + public SubOrchestrationOptions(SubOrchestrationOptions options) + : base(options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.Version = options.Version; + } + + /// + /// Gets the orchestration instance ID. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the version to associate with the sub-orchestration instance. + /// + public TaskVersion? Version { get; init; } +} + +/// +/// Options for submitting new orchestrations via the client. +/// +public record StartOrchestrationOptions +{ + /// + /// Initializes a new instance of the class. + /// + /// + /// The unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + /// + /// The time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - using PascalCase to maintain backward compatibility with positional record syntax + public StartOrchestrationOptions(string? InstanceId = null, DateTimeOffset? StartAt = null) +#pragma warning restore SA1313 + { + this.InstanceId = InstanceId; + this.StartAt = StartAt; + } + + /// + /// Initializes a new instance of the class by copying from another instance. + /// + /// The start orchestration options to copy from. + public StartOrchestrationOptions(StartOrchestrationOptions options) + { + Check.NotNull(options); + this.InstanceId = options.InstanceId; + this.StartAt = options.StartAt; + this.Tags = options.Tags; + this.Version = options.Version; + this.DedupeStatuses = options.DedupeStatuses; + } + + /// + /// Gets the unique ID of the orchestration instance to schedule. If not specified, a new GUID value is used. + /// + public string? InstanceId { get; init; } + + /// + /// Gets the time when the orchestration instance should start executing. If not specified or if a date-time in the past + /// is specified, the orchestration instance will be scheduled immediately. + /// + public DateTimeOffset? StartAt { get; init; } + + /// + /// Gets the tags to associate with the orchestration instance. + /// + public IReadOnlyDictionary Tags { get; init; } = ImmutableDictionary.Create(); + + /// + /// Gets the version to associate with the orchestration instance. + /// + public TaskVersion? Version { get; init; } + + /// + /// Gets the orchestration runtime statuses that should be considered for deduplication. + /// + /// + /// For type-safe usage, use the WithDedupeStatuses extension method. + /// + public IReadOnlyList? DedupeStatuses { get; init; } +} From 37d523f44125857b7d6ec4974aac801e56be6679 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 20:20:09 +0000 Subject: [PATCH 7/9] Remove ineffective WaitForTaskWithCancellation method and clarify documentation As correctly identified in review, WaitForTaskWithCancellation was ineffective because: - Once ScheduleTask is called, the orchestrator yields execution - During the yield, orchestrator code isn't running, so cancellation callbacks can't fire - Task.WhenAny would always return the activity/sub-orchestrator task, never the cancellation task Changes: - Removed WaitForTaskWithCancellation method entirely - Simplified CallActivityAsync and CallSubOrchestratorAsync to directly await scheduled tasks - Added clarifying comments about the pre-check being the only cancellation point - Updated CancellationToken documentation to accurately describe behavior and limitations - Emphasized that cancellation only works pre-scheduling and in retry handlers, not mid-execution Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- src/Abstractions/TaskOptions.cs | 37 ++++++++---- .../Shims/TaskOrchestrationContextWrapper.cs | 58 ++++--------------- 2 files changed, 35 insertions(+), 60 deletions(-) diff --git a/src/Abstractions/TaskOptions.cs b/src/Abstractions/TaskOptions.cs index ca076fd5..cb94ac9b 100644 --- a/src/Abstractions/TaskOptions.cs +++ b/src/Abstractions/TaskOptions.cs @@ -57,31 +57,44 @@ public TaskOptions(TaskOptions options) /// /// /// - /// When provided, the cancellation token can be used to cancel activities, sub-orchestrators, and retry logic. - /// Cancellation is cooperative, meaning the task will not be cancelled immediately, but rather at the next - /// opportunity when the orchestrator checks the token status. + /// The cancellation token provides cooperative cancellation for activities, sub-orchestrators, and retry logic. + /// Due to the durable orchestrator execution model, cancellation only occurs at specific points when the + /// orchestrator code is executing. /// /// - /// For activities, if the token is cancelled before the activity completes, the activity will not be scheduled - /// or, if already running, the result will be ignored and a will be thrown. + /// Cancellation behavior: /// /// - /// For sub-orchestrators, if the token is cancelled before the sub-orchestrator completes, the result will be - /// ignored and a will be thrown. Note that cancelling the parent's token - /// does not terminate the sub-orchestrator instance. + /// 1. Pre-scheduling check: If the token is cancelled before calling + /// CallActivityAsync or CallSubOrchestratorAsync, a is thrown + /// immediately without scheduling the task. /// /// - /// For retry handlers, the cancellation token is passed to the retry handler via the , - /// allowing the handler to check for cancellation and stop retrying if needed. + /// 2. Retry handlers: The cancellation token is passed to custom retry handlers via + /// , allowing them to check for cancellation and stop retrying between attempts. + /// + /// + /// Important limitation: Once an activity or sub-orchestrator is scheduled, the orchestrator + /// yields execution and waits for the task to complete. During this yield period, the orchestrator code is not + /// running, so it cannot respond to cancellation requests. Cancelling the token while waiting will not wake up + /// the orchestrator or cancel the waiting task. This is a fundamental limitation of the durable orchestrator + /// execution model. + /// + /// + /// Note: Cancelling a parent orchestrator's token does not terminate sub-orchestrator instances that have + /// already been scheduled. /// /// - /// Example of cancelling an activity after a timeout: + /// Example of pre-scheduling cancellation: /// - /// using CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + /// using CancellationTokenSource cts = new CancellationTokenSource(); + /// cts.Cancel(); // Cancel before scheduling + /// /// TaskOptions options = new TaskOptions { CancellationToken = cts.Token }; /// /// try /// { + /// // This will throw TaskCanceledException without scheduling the activity /// string result = await context.CallActivityAsync<string>("MyActivity", "input", options); /// } /// catch (TaskCanceledException) diff --git a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs index b3f17b37..5434ab01 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs @@ -154,7 +154,9 @@ public override async Task CallActivityAsync( cancellationToken = callActivityOptions.CancellationToken; } - // If cancellation was requested before starting, return a cancelled task immediately + // If cancellation was requested before starting, throw immediately + // Note: Once the activity is scheduled, the orchestrator yields and cannot respond to + // cancellation until it resumes, so this pre-check is the only cancellation point. if (cancellationToken.IsCancellationRequested) { throw new TaskCanceledException("The task was cancelled before it could be scheduled."); @@ -163,7 +165,7 @@ public override async Task CallActivityAsync( #pragma warning disable 0618 if (options?.Retry?.Policy is RetryPolicy policy) { - Task activityTask = this.innerContext.ScheduleTask( + return await this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() @@ -171,8 +173,6 @@ public override async Task CallActivityAsync( .WithTags(tags) .Build(), parameters: input); - - return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -190,15 +190,13 @@ public override async Task CallActivityAsync( } else { - Task activityTask = this.innerContext.ScheduleTask( + return await this.innerContext.ScheduleTask( name.Name, this.innerContext.Version, options: ScheduleTaskOptions.CreateBuilder() .WithTags(tags) .Build(), parameters: input); - - return await this.WaitForTaskWithCancellation(activityTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.TaskFailedException e) @@ -231,7 +229,9 @@ public override async Task CallSubOrchestratorAsync( CancellationToken cancellationToken = options?.CancellationToken ?? default; - // If cancellation was requested before starting, return a cancelled task immediately + // If cancellation was requested before starting, throw immediately + // Note: Once the sub-orchestrator is scheduled, the orchestrator yields and cannot respond to + // cancellation until it resumes, so this pre-check is the only cancellation point. if (cancellationToken.IsCancellationRequested) { throw new TaskCanceledException("The sub-orchestrator was cancelled before it could be scheduled."); @@ -241,15 +241,13 @@ public override async Task CallSubOrchestratorAsync( { if (options?.Retry?.Policy is RetryPolicy policy) { - Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstanceWithRetry( + return await this.innerContext.CreateSubOrchestrationInstanceWithRetry( orchestratorName.Name, version, instanceId, policy.ToDurableTaskCoreRetryOptions(), input, options.Tags); - - return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } else if (options?.Retry?.Handler is AsyncRetryHandler handler) { @@ -266,14 +264,12 @@ public override async Task CallSubOrchestratorAsync( } else { - Task subOrchestratorTask = this.innerContext.CreateSubOrchestrationInstance( + return await this.innerContext.CreateSubOrchestrationInstance( orchestratorName.Name, version, instanceId, input, options?.Tags); - - return await this.WaitForTaskWithCancellation(subOrchestratorTask, cancellationToken); } } catch (global::DurableTask.Core.Exceptions.SubOrchestrationFailedException e) @@ -549,40 +545,6 @@ async Task InvokeWithCustomRetryHandler( } } - async Task WaitForTaskWithCancellation(Task task, CancellationToken cancellationToken) - { - // If no cancellation token provided or it can't be cancelled, just await the task - if (!cancellationToken.CanBeCanceled) - { - return await task; - } - - // If the task is already completed, return immediately without setting up cancellation infrastructure - if (task.IsCompleted) - { - return await task; - } - - // Create a cancellation task that completes when the token is cancelled - TaskCompletionSource cancellationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously); - using CancellationTokenRegistration registration = cancellationToken.Register(() => - { - cancellationTcs.TrySetCanceled(cancellationToken); - }); - - // Wait for either the task to complete or cancellation - Task completedTask = await Task.WhenAny(task, cancellationTcs.Task); - - // If cancellation won, throw TaskCanceledException - if (completedTask == cancellationTcs.Task) - { - throw new TaskCanceledException("The task was cancelled."); - } - - // Otherwise return the result of the completed task - return await task; - } - // The default version can come from two different places depending on the context of the invocation. string GetDefaultVersion() { From 0e911a1dff4e0ec928867d27489721d5a3685700 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 21:06:00 +0000 Subject: [PATCH 8/9] Add test for cancellation token cancelled outside retry handler Added RetryHandlerReceivesCancelledTokenFromOutside test to verify behavior when: - A CancellationToken is provided to TaskOptions with retry logic - The token is cancelled OUTSIDE the retry handler (in orchestrator code) - The pre-scheduling check catches the cancelled token before scheduling This test demonstrates that when a token is cancelled before CallActivityAsync is invoked, the pre-scheduling cancellation check throws TaskCanceledException immediately, and the retry handler is never called. This is the expected behavior given the orchestrator execution model. Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../CancellationTests.cs | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index 3ead3fa2..f19d5f67 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -248,4 +248,85 @@ public async Task RetryHandlerCanStopOnCancellation() Assert.Equal(2, maxAttempts); // Should stop after 2 attempts due to cancellation Assert.Equal("\"Stopped after 2 attempts\"", metadata.SerializedOutput); } + + /// + /// Tests that when a token is cancelled outside the retry handler (between retry attempts), + /// the handler receives the cancelled token on the next attempt. + /// + [Fact] + public async Task RetryHandlerReceivesCancelledTokenFromOutside() + { + TaskName orchestratorName = nameof(RetryHandlerReceivesCancelledTokenFromOutside); + + int attemptCount = 0; + bool tokenWasCancelledInHandler = false; + CancellationTokenSource? cts = null; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + cts = new CancellationTokenSource(); + + TaskRetryOptions retryOptions = TaskOptions.FromRetryHandler(retryContext => + { + attemptCount = retryContext.LastAttemptNumber; + + // Check if token is cancelled + tokenWasCancelledInHandler = retryContext.CancellationToken.IsCancellationRequested; + + // Stop retrying if cancelled + if (retryContext.CancellationToken.IsCancellationRequested) + { + return false; + } + + return attemptCount < 5; + }).Retry!; + + TaskOptions options = new(retryOptions) + { + CancellationToken = cts.Token + }; + + // Cancel the token AFTER creating options but BEFORE first attempt + // This tests that the retry handler receives the cancelled token from outside + cts.Cancel(); + + try + { + await ctx.CallActivityAsync("FailingActivity", options); + return "Should not reach here - activity succeeded"; + } + catch (TaskCanceledException) + { + // Pre-scheduling check caught the cancelled token before even attempting + return $"Cancelled before scheduling, attempts: {attemptCount}"; + } + catch (TaskFailedException) + { + // Activity failed and retry handler stopped retrying + return $"Failed after {attemptCount} attempts, token was cancelled in handler: {tokenWasCancelledInHandler}"; + } + }) + .AddActivityFunc("FailingActivity", (TaskActivityContext activityContext) => + { + throw new InvalidOperationException("Activity always fails"); + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Since token was cancelled before CallActivityAsync, the pre-scheduling check throws + // TaskCanceledException and retry handler never gets called + Assert.Equal(0, attemptCount); + Assert.Contains("Cancelled before scheduling", metadata.SerializedOutput); + } } From a34d209f1ce8d100c55a26624c9061a713cf337e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 21:12:50 +0000 Subject: [PATCH 9/9] Add test for multiple activities in a loop with cancellation Added MultipleActivitiesInLoopWithCancellation test to demonstrate the main use case for pre-scheduling cancellation: - Orchestrator calls multiple activities in a loop - Token is cancelled after processing 3 items - Pre-scheduling check prevents remaining activities from being scheduled - Loop exits cleanly with cancellation message This test validates that cancellation in loops works as expected, allowing orchestrators to stop processing when a cancellation is requested. Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../CancellationTests.cs | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/test/Grpc.IntegrationTests/CancellationTests.cs b/test/Grpc.IntegrationTests/CancellationTests.cs index f19d5f67..c8e71937 100644 --- a/test/Grpc.IntegrationTests/CancellationTests.cs +++ b/test/Grpc.IntegrationTests/CancellationTests.cs @@ -329,4 +329,75 @@ public async Task RetryHandlerReceivesCancelledTokenFromOutside() Assert.Equal(0, attemptCount); Assert.Contains("Cancelled before scheduling", metadata.SerializedOutput); } + + /// + /// Tests that when calling multiple activities in a loop with a cancellation token, + /// the loop exits after cancellation instead of continuing to call remaining activities. + /// This is the main use case for pre-scheduling cancellation checks. + /// + [Fact] + public async Task MultipleActivitiesInLoopWithCancellation() + { + TaskName orchestratorName = nameof(MultipleActivitiesInLoopWithCancellation); + TaskName activityName = "ProcessItem"; + + int activitiesInvoked = 0; + int totalItems = 10; + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, async ctx => + { + using CancellationTokenSource cts = new(); + TaskOptions options = new() { CancellationToken = cts.Token }; + + List results = new(); + + for (int i = 0; i < totalItems; i++) + { + // Cancel after processing 3 items + if (i == 3) + { + cts.Cancel(); + } + + try + { + string result = await ctx.CallActivityAsync(activityName, i, options); + results.Add(result); + } + catch (TaskCanceledException) + { + // Pre-scheduling check caught cancellation - exit loop + results.Add($"Cancelled at item {i}"); + break; + } + } + + return $"Processed {results.Count} items: [{string.Join(", ", results)}]"; + }) + .AddActivityFunc(activityName, (TaskActivityContext ctx, int item) => + { + Interlocked.Increment(ref activitiesInvoked); + return $"Item {item}"; + })); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); + + // Should have processed 3 items (0, 1, 2) before cancellation at item 3 + Assert.Equal(3, activitiesInvoked); + Assert.Contains("Processed 4 items", metadata.SerializedOutput); // 3 successful + 1 cancellation message + Assert.Contains("Cancelled at item 3", metadata.SerializedOutput); + Assert.Contains("Item 0", metadata.SerializedOutput); + Assert.Contains("Item 1", metadata.SerializedOutput); + Assert.Contains("Item 2", metadata.SerializedOutput); + } }