From 691cd3df5b6445d0f79aa359c655966ae5005079 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 12 Jan 2026 15:02:37 -0800 Subject: [PATCH 1/4] first commit --- src/Client/Core/DurableTaskClient.cs | 13 ++++++++++--- src/Client/Grpc/GrpcDurableTaskClient.cs | 16 +++++++++++++++- .../ShimDurableTaskClient.cs | 15 +++++++++++++++ 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 5279e725..c4658bbb 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -369,7 +369,14 @@ public virtual Task PurgeInstanceAsync(string instanceId, Cancellat /// This method returns a object after the operation has completed with a /// indicating the number of orchestration instances that were purged, /// including the count of sub-orchestrations purged if any. - /// + /// + /// Thrown if is null. + /// Thrown if is empty or starts with the null character. + /// Thrown if the orchestration is not in a + /// , , + /// or state. + /// Thrown if the backend storage provider does not support purging instances. + /// Thrown if the operation is canceled via the token. public virtual Task PurgeInstanceAsync( string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { @@ -447,7 +454,7 @@ public virtual Task RestartAsync( /// The orchestration's history will be replaced with a new history that excludes the failed Activities and suborchestrations, /// and a new execution ID will be generated for the rewound orchestration instance. As the failed Activities and suborchestrations /// re-execute, the history will be appended with new TaskScheduled, TaskCompleted, and SubOrchestrationInstanceCompleted events. - /// Note that only orchestrations in a "Failed" state can be rewound. + /// Note that only orchestrations in a state can be rewound. /// /// The instance ID of the orchestration to rewind. /// The reason for the rewind. @@ -460,7 +467,7 @@ public virtual Task RestartAsync( /// Thrown if an orchestration with the specified does not exist, /// or if is the instance ID of an entity. /// Thrown if a precondition of the operation fails, for example if the specified - /// orchestration is not in a "Failed" state. + /// orchestration is not in a state. /// Thrown if the operation is canceled via the token. public virtual Task RewindInstanceAsync( string instanceId, diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index b6a6c72d..20cc348b 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -394,10 +394,16 @@ public override async Task WaitForInstanceCompletionAsync public override Task PurgeInstanceAsync( string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { + Check.NotNullOrEmpty(instanceId); bool recursive = options?.Recursive ?? false; this.logger.PurgingInstanceMetadata(instanceId); - P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive }; + P.PurgeInstancesRequest request = new() + { + InstanceId = instanceId, + Recursive = recursive, + IsOrchestration = !this.options.EnableEntitySupport || instanceId[0] != '@', + }; return this.PurgeInstancesCoreAsync(request, cancellation); } @@ -598,6 +604,14 @@ async Task PurgeInstancesCoreAsync( throw new OperationCanceledException( $"The {nameof(this.PurgeAllInstancesAsync)} operation was canceled.", e, cancellation); } + catch (RpcException e) when (e.StatusCode == StatusCode.FailedPrecondition) + { + throw new InvalidOperationException(e.Status.Detail); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Unimplemented) + { + throw new NotImplementedException(e.Status.Detail); + } } OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs) diff --git a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs index f6b6140f..f4911098 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs @@ -122,6 +122,21 @@ public override async Task PurgeInstanceAsync( string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default) { Check.NotNullOrEmpty(instanceId); + OrchestrationMetadata? orchestrationState = await this.GetInstanceAsync(instanceId, cancellation); + + // The orchestration doesn't exist, nothing to purge + if (orchestrationState == null) + { + return new PurgeResult(0); + } + + bool isEntity = this.options.EnableEntitySupport && instanceId[0] == '@'; + if (!isEntity && !orchestrationState.IsCompleted) + { + throw new InvalidOperationException($"Only orchestrations in a terminal state can be purged, " + + $"but the orchestration with instance ID {instanceId} has status {orchestrationState.RuntimeStatus}"); + } + cancellation.ThrowIfCancellationRequested(); // TODO: Support recursive purge of sub-orchestrations From 9260f078941385bb482bb95c8e2a26e7b6402b11 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 12 Jan 2026 15:05:05 -0800 Subject: [PATCH 2/4] adding the proto so it can build --- src/Grpc/orchestrator_service.proto | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index e36c4f01..8ef46a4a 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -115,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent { google.protobuf.StringValue version = 3; google.protobuf.StringValue input = 4; TraceContext parentTraceContext = 5; + map tags = 6; } message SubOrchestrationInstanceCompletedEvent { @@ -225,6 +226,11 @@ message ExecutionRewoundEvent { google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise + google.protobuf.StringValue name = 5; // used by DTS backend only + google.protobuf.StringValue version = 6; // used by DTS backend only + google.protobuf.StringValue input = 7; // used by DTS backend only + ParentInstanceInfo parentInstance = 8; // used by DTS backend only + map tags = 9; // used by DTS backend only } message HistoryEvent { @@ -483,6 +489,19 @@ message QueryInstancesResponse { google.protobuf.StringValue continuationToken = 2; } +message ListInstanceIdsRequest { + repeated OrchestrationStatus runtimeStatus = 1; + google.protobuf.Timestamp completedTimeFrom = 2; + google.protobuf.Timestamp completedTimeTo = 3; + int32 pageSize = 4; + google.protobuf.StringValue lastInstanceKey = 5; +} + +message ListInstanceIdsResponse { + repeated string instanceIds = 1; + google.protobuf.StringValue lastInstanceKey = 2; +} + message PurgeInstancesRequest { oneof request { string instanceId = 1; @@ -490,6 +509,8 @@ message PurgeInstancesRequest { InstanceBatch instanceBatch = 4; } bool recursive = 3; + // used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity) + bool isOrchestration = 5; } message PurgeInstanceFilter { @@ -750,6 +771,9 @@ service TaskHubSidecarService { // rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse); rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse); + + rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse); + rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse); rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem); @@ -854,4 +878,4 @@ message HistoryChunk { message InstanceBatch { // A maximum of 500 instance IDs can be provided in this list. repeated string instanceIds = 1; -} \ No newline at end of file +} From 69bad1d975feb7cfd628f1e179ddf82094b77613 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 12 Jan 2026 15:20:19 -0800 Subject: [PATCH 3/4] fixing the failing test --- .../ShimDurableTaskClientTests.cs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs index 9509d1e0..d25e9e81 100644 --- a/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs +++ b/test/Client/OrchestrationServiceClientShim.Tests/ShimDurableTaskClientTests.cs @@ -165,7 +165,9 @@ public async Task GetInstances_Results(bool getInputs) public async Task PurgeInstanceMetadata() { // arrange - string instanceId = Guid.NewGuid().ToString(); + List states = [CreateState("input", "output")]; + string instanceId = states.First().OrchestrationInstance.InstanceId; + this.orchestrationClient.Setup(m => m.GetOrchestrationStateAsync(instanceId, false)).ReturnsAsync(states); this.purgeClient.Setup(m => m.PurgeInstanceStateAsync(instanceId)).ReturnsAsync(new Core.PurgeResult(1)); // act From 573d31169d0ad15f59603609287e3436f984f701 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 12 Jan 2026 16:01:56 -0800 Subject: [PATCH 4/4] pulled from the main proto branch --- src/Grpc/versions.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index ea0ccc96..b13392fe 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-12-29 22:13:55 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/b7e260ad7b84740a2ed5cb4600ce73bef702a979/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2026-01-13 00:01:21 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/026329c53fe6363985655857b9ca848ec7238bd2/protos/orchestrator_service.proto