From b6d87ed49f951752ffc84a68e1fdd3481d242284 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 3 Dec 2025 22:41:49 -0800 Subject: [PATCH 1/9] first commit --- .../Tracking/AzureTableTrackingStore.cs | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 2f2dca83b..50efa5158 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1075,16 +1075,44 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete } string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); + ExecutionStartedEvent executionStartedEvent = runtimeState.ExecutionStartedEvent; + + // We need to set all of the fields of the instance entity in the case that it was never created for the orchestration. + // This can be the case for a suborchestration that completed in one execution, for example. var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty) { // TODO: Translating null to "null" is a temporary workaround. We should prioritize // https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary. + ["Name"] = runtimeState.Name, + ["Version"] = runtimeState.Version, + ["CreatedTime"] = executionStartedEvent.Timestamp, ["CustomStatus"] = runtimeState.Status ?? "null", ["ExecutionId"] = executionId, ["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp, ["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(), - ["CompletedTime"] = runtimeState.CompletedTime + ["CompletedTime"] = runtimeState.CompletedTime, + ["Generation"] = executionStartedEvent.Generation, + ["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags), + ["TaskHubName"] = this.settings.TaskHubName, }; + if (runtimeState.ExecutionStartedEvent.ScheduledStartTime.HasValue) + { + instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; + } + + // Set the input and output + this.SetInstancesTablePropertyFromHistoryProperty( + TableEntityConverter.Serialize(executionStartedEvent), + instanceEntity, + historyPropertyName: nameof(executionStartedEvent.Input), + instancePropertyName: InputProperty, + data: executionStartedEvent.Input); + this.SetInstancesTablePropertyFromHistoryProperty( + TableEntityConverter.Serialize(runtimeState.ExecutionCompletedEvent), + instanceEntity, + historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result), + instancePropertyName: OutputProperty, + data: runtimeState.Output); Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); From a11585b249dd6bd2e2a891950c9d89b4a25e5e63 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 4 Dec 2025 00:32:54 -0800 Subject: [PATCH 2/9] added missing properties to other instance entities --- .../AzureStorageOrchestrationService.cs | 3 +-- .../Tracking/AzureTableTrackingStore.cs | 20 +++++++++++++------ .../Tracking/ITrackingStore.cs | 7 +++---- .../InstanceStoreBackedTrackingStore.cs | 7 +++---- .../Tracking/TrackingStoreBase.cs | 2 +- 5 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index c6ca54d4c..38302222a 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1078,8 +1078,7 @@ async Task IsExecutableInstanceAsync( var executionTerminatedEvent = (ExecutionTerminatedEvent)executionTerminatedEventMessage.Event; await this.trackingStore.UpdateStatusForTerminationAsync( instanceId, - executionTerminatedEvent.Input, - executionTerminatedEvent.Timestamp); + executionTerminatedEvent; return $"Instance is {OrchestrationStatus.Terminated}"; } diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 50efa5158..aaf4c2a2d 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -798,21 +798,25 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell /// public override async Task UpdateStatusForTerminationAsync( string instanceId, - string output, - DateTime lastUpdatedTime, + ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default) { string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); - TableEntity entity = new TableEntity(sanitizedInstanceId, "") + TableEntity instanceEntity = new TableEntity(sanitizedInstanceId, "") { ["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"), - ["LastUpdatedTime"] = lastUpdatedTime, + ["LastUpdatedTime"] = executionTerminatedEvent.Timestamp, ["CompletedTime"] = DateTime.UtcNow, - [OutputProperty] = output }; + this.SetInstancesTablePropertyFromHistoryProperty( + TableEntityConverter.Serialize(executionTerminatedEvent), + instanceEntity, + historyPropertyName: nameof(executionTerminatedEvent.Input), + instancePropertyName: OutputProperty, + data: executionTerminatedEvent.Input); Stopwatch stopwatch = Stopwatch.StartNew(); - await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken); + await this.InstancesTable.MergeEntityAsync(instanceEntity, ETag.All, cancellationToken); this.settings.Logger.InstanceStatusUpdate( this.storageAccountName, @@ -864,6 +868,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default) ["CustomStatus"] = newRuntimeState.Status ?? "null", ["ExecutionId"] = executionId, ["LastUpdatedTime"] = newEvents.Last().Timestamp, + ["TaskHubName"] = this.settings.TaskHubName, }; // check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario @@ -910,6 +915,9 @@ public override Task StartAsync(CancellationToken cancellationToken = default) instanceEntity["Version"] = executionStartedEvent.Version; instanceEntity["CreatedTime"] = executionStartedEvent.Timestamp; instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); + instanceEntity["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags); + instanceEntity["Generation"] = executionStartedEvent.Generation; + if (executionStartedEvent.ScheduledStartTime.HasValue) { instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index c0ce27831..5a9646096 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -166,13 +166,12 @@ interface ITrackingStore Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default); /// - /// Used to update the instance status to "Terminated" whend a pending orchestration is terminated. + /// Used to update the instance status to "Terminated" when a pending orchestration is terminated. /// /// The instance being terminated - /// The output of the orchestration - /// The last updated time of the orchestration (the time the termination request was created) + /// The termination history event. /// The token to monitor for cancellation requests. The default value is . - Task UpdateStatusForTerminationAsync(string instanceId, string output, DateTime lastUpdatedTime, CancellationToken cancellationToken = default); + Task UpdateStatusForTerminationAsync(string instanceId, ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default); /// /// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 1e70a98ee..ec3841d99 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -179,16 +179,15 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] public override async Task UpdateStatusForTerminationAsync( string instanceId, - string output, - DateTime lastUpdatedTime, + ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default) { // Get the most recent execution and update its status to terminated IEnumerable instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false); instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated; - instanceEntity.Single().State.LastUpdatedTime = lastUpdatedTime; + instanceEntity.Single().State.LastUpdatedTime = executionTerminatedEvent.Timestamp; instanceEntity.Single().State.CompletedTime = DateTime.UtcNow; - instanceEntity.Single().State.Output = output; + instanceEntity.Single().State.Output = executionTerminatedEvent.Input; await this.instanceStore.WriteEntitiesAsync(instanceEntity); } diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index e6f55fc59..7c56b2061 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -101,7 +101,7 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo } /// - public abstract Task UpdateStatusForTerminationAsync(string instanceId, string output, DateTime lastUpdatedTime, CancellationToken cancellationToken = default); + public abstract Task UpdateStatusForTerminationAsync(string instanceId, ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default); /// public abstract Task StartAsync(CancellationToken cancellationToken = default); From 1e9ffc7d7d9c1c13532fd9b9f13d46012a2ac59b Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 4 Dec 2025 00:35:35 -0800 Subject: [PATCH 3/9] missed a parentheses --- .../AzureStorageOrchestrationService.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 38302222a..26a8ac622 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1078,7 +1078,7 @@ async Task IsExecutableInstanceAsync( var executionTerminatedEvent = (ExecutionTerminatedEvent)executionTerminatedEventMessage.Event; await this.trackingStore.UpdateStatusForTerminationAsync( instanceId, - executionTerminatedEvent; + executionTerminatedEvent); return $"Instance is {OrchestrationStatus.Terminated}"; } From 5d4d216938e6388a38e1694f87312d3e3750b667 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 4 Dec 2025 12:51:53 -0800 Subject: [PATCH 4/9] fixed other bugs, added way more tests --- .../AzureStorageOrchestrationService.cs | 5 +- .../Tracking/AzureTableTrackingStore.cs | 39 +- .../Tracking/ITrackingStore.cs | 4 +- .../InstanceStoreBackedTrackingStore.cs | 4 +- .../Tracking/TrackingStoreBase.cs | 2 +- .../AzureStorageScenarioTests.cs | 402 +++++++++++++++++- 6 files changed, 411 insertions(+), 45 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index 26a8ac622..a97efb71b 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -797,7 +797,6 @@ async Task LockNextTaskOrchestrationWorkItemAsync(boo session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, - session.TrackingStoreContext, cancellationToken); if (!string.IsNullOrEmpty(warningMessage)) { @@ -1059,7 +1058,6 @@ async Task IsExecutableInstanceAsync( OrchestrationRuntimeState runtimeState, IList newMessages, bool allowReplayingTerminalInstances, - object trackingStoreContext, CancellationToken cancellationToken) { if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent)) @@ -1102,11 +1100,10 @@ await this.trackingStore.UpdateStatusForTerminationAsync( if (instanceStatus == null || (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId && instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus)) { - await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + await this.trackingStore.UpdateInstanceStatusForCompletedOrchestrationAsync( runtimeState.OrchestrationInstance.InstanceId, runtimeState.OrchestrationInstance.ExecutionId, runtimeState, - trackingStoreContext, cancellationToken); } if (!allowReplayingTerminalInstances) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index aaf4c2a2d..f331e73e6 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -807,13 +807,10 @@ public override async Task UpdateStatusForTerminationAsync( ["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"), ["LastUpdatedTime"] = executionTerminatedEvent.Timestamp, ["CompletedTime"] = DateTime.UtcNow, + [OutputProperty] = executionTerminatedEvent.Input, }; - this.SetInstancesTablePropertyFromHistoryProperty( - TableEntityConverter.Serialize(executionTerminatedEvent), - instanceEntity, - historyPropertyName: nameof(executionTerminatedEvent.Input), - instancePropertyName: OutputProperty, - data: executionTerminatedEvent.Input); + + await this.CompressLargeMessageAsync(instanceEntity, listOfBlobs: null, cancellationToken: cancellationToken); Stopwatch stopwatch = Stopwatch.StartNew(); await this.InstancesTable.MergeEntityAsync(instanceEntity, ETag.All, cancellationToken); @@ -1056,11 +1053,10 @@ public override Task StartAsync(CancellationToken cancellationToken = default) return eTagValue; } - public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, - object trackingStoreContext, CancellationToken cancellationToken = default) { if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && @@ -1071,17 +1067,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete return; } - TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext; - if (context.Blobs.Count > 0) - { - var tasks = new List(context.Blobs.Count); - foreach (string blobName in context.Blobs) - { - tasks.Add(this.messageManager.DeleteBlobAsync(blobName)); - } - await Task.WhenAll(tasks); - } - string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId); ExecutionStartedEvent executionStartedEvent = runtimeState.ExecutionStartedEvent; @@ -1109,14 +1094,20 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete } // Set the input and output + // In the case that the input or output are too large and are stored in blob storage, we must extract the blob names from the history entities. + TableQueryResults results = await this + .GetHistoryEntitiesResponseInfoAsync(instanceId, executionId, null, cancellationToken) + .GetResultsAsync(cancellationToken: cancellationToken); + TableEntity executionCompletedEntity = results.Entities.LastOrDefault(e => (string)e["EventType"] == EventType.ExecutionCompleted.ToString()); + TableEntity executionStartedEntity = results.Entities.LastOrDefault(e => (string)e["EventType"] == EventType.ExecutionStarted.ToString()); this.SetInstancesTablePropertyFromHistoryProperty( - TableEntityConverter.Serialize(executionStartedEvent), + executionStartedEntity, instanceEntity, historyPropertyName: nameof(executionStartedEvent.Input), instancePropertyName: InputProperty, data: executionStartedEvent.Input); this.SetInstancesTablePropertyFromHistoryProperty( - TableEntityConverter.Serialize(runtimeState.ExecutionCompletedEvent), + executionCompletedEntity, instanceEntity, historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result), instancePropertyName: OutputProperty, @@ -1262,6 +1253,12 @@ static string GetBlobName(TableEntity entity, string property) // EventType. Use a hardcoded value to record the orchestration input. eventType = "Input"; } + else if (property == "Output") + { + // This message is used to terminate an orchestration with no history, so it does not have a + // corresponding EventType. Use a hardcoded value to record the orchestration output. + eventType = "Output"; + } else if (property == "Tags") { eventType = "Tags"; diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 5a9646096..146fc28b9 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -105,16 +105,14 @@ interface ITrackingStore /// /// Updates the instance status of the specified orchestration instance to match that of for a completed orchestration. - /// Also deletes any orphaned blobs of . /// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to /// for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing. /// /// The ID of the orchestration. /// The execution ID of the orchestration. /// The runtime state of the orchestration. - /// Additional context for the execution that is maintained by the tracking store. /// The token to monitor for cancellation requests. The default value is . - Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, CancellationToken cancellationToken = default); /// /// Get The Orchestration State for querying all orchestration instances diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index ec3841d99..2a8a68f45 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -191,11 +191,10 @@ public override async Task UpdateStatusForTerminationAsync( await this.instanceStore.WriteEntitiesAsync(instanceEntity); } - public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync( + public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, - object trackingStoreContext, CancellationToken cancellationToken = default) { if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && @@ -206,7 +205,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete return; } - // No blobs to delete for this tracking store implementation await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[] { new OrchestrationStateInstanceEntity() diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 7c56b2061..91bf0f643 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -110,6 +110,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); /// - public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default); + public abstract Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, CancellationToken cancellationToken = default); } } diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index aa4aa7026..399728070 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -884,7 +884,7 @@ public async Task TerminateOrchestration(bool enableExtendedSessions) var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0); // Need to wait for the instance to start before we can terminate it. - // TODO: This requirement may not be ideal and should be revisited. + // TerminatePendingOrchestration tests terminating a pending orchestration. await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); await client.TerminateAsync("sayōnara"); @@ -966,12 +966,15 @@ public async Task TerminateSuspendedOrchestration(bool enableExtendedSessions) } /// - /// Test that a pending orchestration can be terminated. + /// Test that a pending orchestration can be terminated (including tests with a large termination reason that will need to be + /// stored in blob storage). /// [DataTestMethod] - [DataRow(true)] - [DataRow(false)] - public async Task TerminatePendingOrchestration(bool enableExtendedSessions) + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TerminatePendingOrchestration(bool enableExtendedSessions, bool largeTerminationReason) { using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) { @@ -980,20 +983,27 @@ public async Task TerminatePendingOrchestration(bool enableExtendedSessions) var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0, startAt: DateTime.UtcNow.AddMinutes(1)); await client.WaitForStatusChange(TimeSpan.FromSeconds(5), OrchestrationStatus.Pending); - await client.TerminateAsync("terminate"); + string message = largeTerminationReason ? this.GenerateMediumRandomStringPayload().ToString() : "terminate"; + await client.TerminateAsync(message); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + if (largeTerminationReason) + { + int blobCount = await this.GetBlobCount("test-largemessages", client.InstanceId); + Assert.IsTrue(blobCount > 0); + } + // Confirm the pending orchestration was terminated. Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); - Assert.AreEqual("terminate", status?.Output); + Assert.AreEqual(message, status?.Output); // Now sleep for a minute and confirm that the orchestration does not start after its scheduled time. Thread.Sleep(TimeSpan.FromMinutes(1)); status = await client.GetStatusAsync(); Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); - Assert.AreEqual("terminate", status?.Output); + Assert.AreEqual(message, status?.Output); await host.StopAsync(); } @@ -2495,14 +2505,15 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession [DataRow(false, true)] [DataRow(true, false)] [DataRow(false, false)] - public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtendedSessions, bool terminate) + public async Task TestWorkerFailingDuringCompleteWorkItemCallCompletedOrchestration(bool enableExtendedSessions, bool terminate) { using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) { await host.StartAsync(); // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator - var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!"); + string input = "hello!"; + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), input); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); @@ -2516,7 +2527,371 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtende Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); TableEntity entity = new TableEntity(instanceId, "") { - ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G") + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a + // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) + ["Input"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(input, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(input, JToken.Parse(status.Input).ToString()); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a failed orchestration. + /// + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallFailedOrchestration(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string failureReason = "Failure!"; + var client = await host.StartOrchestrationAsync( + typeof(Orchestrations.ThrowException), + input: failureReason); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a + // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) + ["Input"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(failureReason, status.Output); + Assert.AreEqual(failureReason, JToken.Parse(status.Input).ToString()); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a terminated orchestration. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallTerminatedOrchestration(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + // Using the counter orchestration because it will wait indefinitely for input. + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0); + await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); + // Terminate the orchestration + string reason = "terminate"; + await client.TerminateAsync(reason); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + var instanceId = client.InstanceId; + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a + // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) + ["Input"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(reason, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + + await host.StopAsync(); + } + } + + /// + /// Same as but for an orchestration with large input + /// and output, which will need to be stored in blob storage. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeInputOutput(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string message = this.GenerateMediumRandomStringPayload().ToString(); + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), message); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + + var instanceId = client.InstanceId; + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a + // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) + ["Input"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(message, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a large termination reason that + /// will need to be stored in blob storage. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeTerminationReason(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string message = this.GenerateMediumRandomStringPayload().ToString(); + // Using the counter orchestration because it will wait indefinitely for input. + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0); + await client.WaitForStartupAsync(TimeSpan.FromSeconds(10)); + // Terminate the orchestration + await client.TerminateAsync(message); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + + var instanceId = client.InstanceId; + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a + // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) + ["Input"] = "null", + }; + await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); + + // Assert that the status in the Instance table reads "Running" + IList state = await client.GetStateAsync(instanceId); + OrchestrationStatus forcedStatus = state.First().OrchestrationStatus; + Assert.AreEqual(OrchestrationStatus.Running, forcedStatus); + + // The type of event sent should not matter - the event itself should be discarded, and the instance table updated + // to reflect the status in the history table. + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + + await host.StopAsync(); + } + } + + /// + /// Same as but for a large exception message that will need + /// to be stored in blob storage. + /// + [DataTestMethod] + [DataRow(true, true)] + [DataRow(false, true)] + [DataRow(true, false)] + [DataRow(false, false)] + public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeFailureReason(bool enableExtendedSessions, bool terminate) + { + using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions)) + { + await host.StartAsync(); + + string message = this.GenerateMediumRandomStringPayload().ToString(); + var client = await host.StartOrchestrationAsync( + typeof(Orchestrations.ThrowException), + input: message); + var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); + Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + + var instanceId = client.InstanceId; + int blobCount = await this.GetBlobCount("test-largemessages", instanceId); + Assert.IsTrue(blobCount > 0); + + // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". + // This simulates the scenario where the History table was updated, but not the Instance table. + AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings( + enableExtendedSessions); + AzureStorageClient azureStorageClient = new AzureStorageClient(settings); + + Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") + { + ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), + ["Output"] = "null", + // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a + // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) + ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2542,8 +2917,9 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtende Assert.AreEqual(1, state.Count); status = state.First(); - OrchestrationStatus expectedStatus = OrchestrationStatus.Completed; - Assert.AreEqual(expectedStatus, status.OrchestrationStatus); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); await host.StopAsync(); } From d5b4ef29f940d06d70d341561b560c3abcce6c22 Mon Sep 17 00:00:00 2001 From: sophiatev <38052607+sophiatev@users.noreply.github.com> Date: Thu, 4 Dec 2025 12:53:11 -0800 Subject: [PATCH 5/9] Update src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index f331e73e6..8c20d4c2b 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -914,7 +914,6 @@ public override Task StartAsync(CancellationToken cancellationToken = default) instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString(); instanceEntity["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags); instanceEntity["Generation"] = executionStartedEvent.Generation; - if (executionStartedEvent.ScheduledStartTime.HasValue) { instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; From 2e4e885a718140179ccaf2cb517cf13055da3ad2 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 4 Dec 2025 14:19:23 -0800 Subject: [PATCH 6/9] addressing PR comments --- .../AzureStorageOrchestrationService.cs | 1 + .../Tracking/AzureTableTrackingStore.cs | 69 +++++++++++++------ .../Tracking/ITrackingStore.cs | 3 +- .../InstanceStoreBackedTrackingStore.cs | 1 + .../Tracking/TrackingStoreBase.cs | 2 +- 5 files changed, 53 insertions(+), 23 deletions(-) diff --git a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs index a97efb71b..22e0e80da 100644 --- a/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs +++ b/src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs @@ -1104,6 +1104,7 @@ await this.trackingStore.UpdateInstanceStatusForCompletedOrchestrationAsync( runtimeState.OrchestrationInstance.InstanceId, runtimeState.OrchestrationInstance.ExecutionId, runtimeState, + instanceStatus is not null, cancellationToken); } if (!allowReplayingTerminalInstances) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 8c20d4c2b..981290a07 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -807,6 +807,7 @@ public override async Task UpdateStatusForTerminationAsync( ["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"), ["LastUpdatedTime"] = executionTerminatedEvent.Timestamp, ["CompletedTime"] = DateTime.UtcNow, + // In the case of terminating an orchestration, the termination reason becomes the orchestration's output. [OutputProperty] = executionTerminatedEvent.Input, }; @@ -1056,6 +1057,7 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, + bool instanceEntityExists, CancellationToken cancellationToken = default) { if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && @@ -1073,17 +1075,16 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( // This can be the case for a suborchestration that completed in one execution, for example. var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty) { - // TODO: Translating null to "null" is a temporary workaround. We should prioritize - // https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary. ["Name"] = runtimeState.Name, ["Version"] = runtimeState.Version, ["CreatedTime"] = executionStartedEvent.Timestamp, + // TODO: Translating null to "null" is a temporary workaround. We should prioritize + // https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary. ["CustomStatus"] = runtimeState.Status ?? "null", ["ExecutionId"] = executionId, ["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp, ["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(), ["CompletedTime"] = runtimeState.CompletedTime, - ["Generation"] = executionStartedEvent.Generation, ["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags), ["TaskHubName"] = this.settings.TaskHubName, }; @@ -1093,24 +1094,50 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( } // Set the input and output - // In the case that the input or output are too large and are stored in blob storage, we must extract the blob names from the history entities. - TableQueryResults results = await this - .GetHistoryEntitiesResponseInfoAsync(instanceId, executionId, null, cancellationToken) - .GetResultsAsync(cancellationToken: cancellationToken); - TableEntity executionCompletedEntity = results.Entities.LastOrDefault(e => (string)e["EventType"] == EventType.ExecutionCompleted.ToString()); - TableEntity executionStartedEntity = results.Entities.LastOrDefault(e => (string)e["EventType"] == EventType.ExecutionStarted.ToString()); - this.SetInstancesTablePropertyFromHistoryProperty( - executionStartedEntity, - instanceEntity, - historyPropertyName: nameof(executionStartedEvent.Input), - instancePropertyName: InputProperty, - data: executionStartedEvent.Input); - this.SetInstancesTablePropertyFromHistoryProperty( - executionCompletedEntity, - instanceEntity, - historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result), - instancePropertyName: OutputProperty, - data: runtimeState.Output); + // In the case that the output is too large and is stored in blob storage, or the input has not been set by a previous execution and is stored in + // blob storage, we must extract the blob names from the history entities. + bool outputTooLarge = this.ExceedsMaxTablePropertySize(runtimeState.Output); + bool inputTooLarge = this.ExceedsMaxTablePropertySize(runtimeState.Input); + TableQueryResults results = null; + if (outputTooLarge || (!instanceEntityExists && inputTooLarge)) + { + results = await this + .GetHistoryEntitiesResponseInfoAsync(instanceId, executionId, null, cancellationToken) + .GetResultsAsync(cancellationToken: cancellationToken); + } + + if (outputTooLarge) + { + TableEntity executionCompletedEntity = results.Entities.LastOrDefault(e => (string)e["EventType"] == EventType.ExecutionCompleted.ToString()); + this.SetInstancesTablePropertyFromHistoryProperty( + executionCompletedEntity, + instanceEntity, + historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result), + instancePropertyName: OutputProperty, + data: runtimeState.Output); + } + else + { + instanceEntity[OutputProperty] = runtimeState.Output; + } + + if (!instanceEntityExists) + { + if (inputTooLarge) + { + TableEntity executionStartedEntity = results.Entities.FirstOrDefault(e => (string)e["EventType"] == EventType.ExecutionStarted.ToString()); + this.SetInstancesTablePropertyFromHistoryProperty( + executionStartedEntity, + instanceEntity, + historyPropertyName: nameof(executionStartedEvent.Input), + instancePropertyName: InputProperty, + data: executionStartedEvent.Input); + } + else + { + instanceEntity[InputProperty] = runtimeState.Input; + } + } Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew(); await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity); diff --git a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs index 146fc28b9..4a1b91fae 100644 --- a/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs @@ -111,8 +111,9 @@ interface ITrackingStore /// The ID of the orchestration. /// The execution ID of the orchestration. /// The runtime state of the orchestration. + /// Whether the instance entity already exists in the instance store. /// The token to monitor for cancellation requests. The default value is . - Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, CancellationToken cancellationToken = default); + Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default); /// /// Get The Orchestration State for querying all orchestration instances diff --git a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs index 2a8a68f45..ca3da07bd 100644 --- a/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/InstanceStoreBackedTrackingStore.cs @@ -195,6 +195,7 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string instanceId, string executionId, OrchestrationRuntimeState runtimeState, + bool instanceEntityExists, CancellationToken cancellationToken = default) { if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed && diff --git a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs index 91bf0f643..7879537e4 100644 --- a/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs +++ b/src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs @@ -110,6 +110,6 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo public abstract Task UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default); /// - public abstract Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, CancellationToken cancellationToken = default); + public abstract Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default); } } From df7b37298ba109acdf7d055b3197ce4f3ff50644 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Thu, 4 Dec 2025 16:40:49 -0800 Subject: [PATCH 7/9] fixing the tests, adding tests for a nonexistent instance entity --- .../Tracking/AzureTableTrackingStore.cs | 19 +- .../AzureStorageScenarioTests.cs | 172 ++++++++++++++++-- 2 files changed, 167 insertions(+), 24 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 981290a07..a933a29d2 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -811,7 +811,9 @@ public override async Task UpdateStatusForTerminationAsync( [OutputProperty] = executionTerminatedEvent.Input, }; - await this.CompressLargeMessageAsync(instanceEntity, listOfBlobs: null, cancellationToken: cancellationToken); + // Setting addBlobPropertyName to false ensures that the blob URL is saved as the "Output" of the instance entity, which is the expected behavior + // for large orchestration outputs. + await this.CompressLargeMessageAsync(instanceEntity, listOfBlobs: null, cancellationToken: cancellationToken, addBlobPropertyName: false); Stopwatch stopwatch = Stopwatch.StartNew(); await this.InstancesTable.MergeEntityAsync(instanceEntity, ETag.All, cancellationToken); @@ -1214,7 +1216,7 @@ void SetInstancesTablePropertyFromHistoryProperty( } } - async Task CompressLargeMessageAsync(TableEntity entity, List listOfBlobs, CancellationToken cancellationToken) + async Task CompressLargeMessageAsync(TableEntity entity, List listOfBlobs, CancellationToken cancellationToken, bool addBlobPropertyName = true) { foreach (string propertyName in VariableSizeEntityProperties) { @@ -1229,9 +1231,16 @@ property is string stringProperty && // Clear out the original property value and create a new "*BlobName"-suffixed property. // The runtime will look for the new "*BlobName"-suffixed column to know if a property is stored in a blob. - string blobPropertyName = GetBlobPropertyName(propertyName); - entity.Add(blobPropertyName, blobName); - entity[propertyName] = string.Empty; + if (addBlobPropertyName) + { + string blobPropertyName = GetBlobPropertyName(propertyName); + entity.Add(blobPropertyName, blobName); + entity[propertyName] = string.Empty; + } + else + { + entity[propertyName] = this.messageManager.GetBlobUrl(blobName); + } // if necessary, keep track of all the blobs associated with this execution listOfBlobs?.Add(blobName); diff --git a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs index 45504124a..502e5fab6 100644 --- a/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs +++ b/test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs @@ -2513,9 +2513,10 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallCompletedOrchestrat // Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator string input = "hello!"; - var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), input); + var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), input, tags: new Dictionary { { "key", "value" } }); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". // This simulates the scenario where the History table was updated, but not the Instance table. @@ -2529,9 +2530,6 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallCompletedOrchestrat { ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), ["Output"] = "null", - // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a - // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) - ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2561,6 +2559,31 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallCompletedOrchestrat Assert.AreEqual(input, JToken.Parse(status.Output).ToString()); Assert.AreEqual(input, JToken.Parse(status.Input).ToString()); + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(input, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(input, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Echo))); + Assert.IsTrue(status.Tags.Contains(new KeyValuePair("key", "value"))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + await host.StopAsync(); } } @@ -2586,6 +2609,7 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallFailedOrchestration input: failureReason); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". // This simulates the scenario where the History table was updated, but not the Instance table. @@ -2595,13 +2619,11 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallFailedOrchestration AzureStorageClient azureStorageClient = new AzureStorageClient(settings); Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName); + TableEntity entity = new TableEntity(instanceId, "") { ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), ["Output"] = "null", - // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a - // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) - ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2631,6 +2653,30 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallFailedOrchestration Assert.AreEqual(failureReason, status.Output); Assert.AreEqual(failureReason, JToken.Parse(status.Input).ToString()); + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(failureReason, status.Output); + Assert.AreEqual(failureReason, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.ThrowException))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + await host.StopAsync(); } } @@ -2657,6 +2703,7 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallTerminatedOrchestra await client.TerminateAsync(reason); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; // Simulate having an "out of date" Instance table, by setting it's runtime status to "Running". // This simulates the scenario where the History table was updated, but not the Instance table. @@ -2670,9 +2717,6 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallTerminatedOrchestra { ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), ["Output"] = "null", - // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a - // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) - ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2702,6 +2746,30 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallTerminatedOrchestra Assert.AreEqual(reason, status.Output); Assert.AreEqual(0, int.Parse(status.Input)); + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(reason, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Counter))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + await host.StopAsync(); } } @@ -2725,6 +2793,7 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeInputOutput(bo var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), message); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; var instanceId = client.InstanceId; int blobCount = await this.GetBlobCount("test-largemessages", instanceId); @@ -2741,9 +2810,6 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeInputOutput(bo { ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), ["Output"] = "null", - // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a - // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) - ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2773,6 +2839,30 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeInputOutput(bo Assert.AreEqual(message, JToken.Parse(status.Output).ToString()); Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Completed, status.OrchestrationStatus); + Assert.AreEqual(message, JToken.Parse(status.Output).ToString()); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Echo))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + await host.StopAsync(); } } @@ -2800,6 +2890,7 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeTerminationRea await client.TerminateAsync(message); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; var instanceId = client.InstanceId; int blobCount = await this.GetBlobCount("test-largemessages", instanceId); @@ -2816,9 +2907,6 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeTerminationRea { ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), ["Output"] = "null", - // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a - // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) - ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2848,6 +2936,30 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeTerminationRea Assert.AreEqual(message, status.Output); Assert.AreEqual(0, int.Parse(status.Input)); + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Terminated, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(0, int.Parse(status.Input)); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.Counter))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + await host.StopAsync(); } } @@ -2873,6 +2985,7 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeFailureReason( input: message); var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10)); Assert.AreEqual(OrchestrationStatus.Failed, status?.OrchestrationStatus); + string executionId = status.OrchestrationInstance.ExecutionId; var instanceId = client.InstanceId; int blobCount = await this.GetBlobCount("test-largemessages", instanceId); @@ -2889,9 +3002,6 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeFailureReason( { ["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G"), ["Output"] = "null", - // We set the input to "null" here to simulate the situation where this was not previously set (for example, for a - // suborchestration that completed in one execution and so had no previous call to UpdateStateAsync or SetNewExecutionAsync) - ["Input"] = "null", }; await instanceTable.MergeEntityAsync(entity, Azure.ETag.All); @@ -2921,6 +3031,30 @@ public async Task TestWorkerFailingDuringCompleteWorkItemCallLargeFailureReason( Assert.AreEqual(message, status.Output); Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + // Now simulate there being no instance entity (which can be the case for suborchestrations that complete in one execution), and try again + await instanceTable.DeleteEntityAsync(entity, Azure.ETag.All); + + if (terminate) + { + await client.TerminateAsync("testing"); + } + else + { + await client.RaiseEventAsync("Foo", "Bar"); + } + await Task.Delay(TimeSpan.FromSeconds(30)); + + // A replay should have occurred, forcing the instance table to be updated with a terminal status + state = await client.GetStateAsync(instanceId); + Assert.AreEqual(1, state.Count); + + status = state.First(); + Assert.AreEqual(OrchestrationStatus.Failed, status.OrchestrationStatus); + Assert.AreEqual(message, status.Output); + Assert.AreEqual(message, JToken.Parse(status.Input).ToString()); + Assert.IsTrue(status.Name.Contains(nameof(Orchestrations.ThrowException))); + Assert.AreEqual(executionId, status.OrchestrationInstance.ExecutionId); + await host.StopAsync(); } } From 7c67b45c660a8ff49bdc811173f576e81a6bf5f4 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 8 Dec 2025 11:59:20 -0800 Subject: [PATCH 8/9] updated to only extract the executioncompleted and executionstarted events for large input/outputs to extract the blob names --- .../Tracking/AzureTableTrackingStore.cs | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index a933a29d2..30e80a4e5 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1095,22 +1095,14 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; } - // Set the input and output - // In the case that the output is too large and is stored in blob storage, or the input has not been set by a previous execution and is stored in - // blob storage, we must extract the blob names from the history entities. - bool outputTooLarge = this.ExceedsMaxTablePropertySize(runtimeState.Output); - bool inputTooLarge = this.ExceedsMaxTablePropertySize(runtimeState.Input); - TableQueryResults results = null; - if (outputTooLarge || (!instanceEntityExists && inputTooLarge)) + // Set the output. + // In the case that the output is too large and is stored in blob storage, extract the blob name from the ExecutionCompleted history entity. + if (this.ExceedsMaxTablePropertySize(runtimeState.Output)) { - results = await this - .GetHistoryEntitiesResponseInfoAsync(instanceId, executionId, null, cancellationToken) - .GetResultsAsync(cancellationToken: cancellationToken); - } - - if (outputTooLarge) - { - TableEntity executionCompletedEntity = results.Entities.LastOrDefault(e => (string)e["EventType"] == EventType.ExecutionCompleted.ToString()); + string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" + + $" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" + + $" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionCompleted)}'"; + TableEntity executionCompletedEntity = (await this.QueryHistoryAsync(filter, instanceId, cancellationToken)).Single(); this.SetInstancesTablePropertyFromHistoryProperty( executionCompletedEntity, instanceEntity, @@ -1122,12 +1114,17 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( { instanceEntity[OutputProperty] = runtimeState.Output; } - + + // If the input has not been set by a previous execution, set the input. if (!instanceEntityExists) { - if (inputTooLarge) + // In the case that the input is too large and is stored in blob storage, extract the blob name from the ExecutionStarted history entity. + if (this.ExceedsMaxTablePropertySize(runtimeState.Input)) { - TableEntity executionStartedEntity = results.Entities.FirstOrDefault(e => (string)e["EventType"] == EventType.ExecutionStarted.ToString()); + string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" + + $" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" + + $" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionStarted)}'"; + TableEntity executionStartedEntity = (await this.QueryHistoryAsync(filter, instanceId, cancellationToken)).Single(); this.SetInstancesTablePropertyFromHistoryProperty( executionStartedEntity, instanceEntity, From 298d3372b1948de9dc9f3c89a5c1605d0c114386 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 8 Dec 2025 13:59:08 -0800 Subject: [PATCH 9/9] updated to add better exception handling --- .../Tracking/AzureTableTrackingStore.cs | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs index 30e80a4e5..c6f63838a 100644 --- a/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs +++ b/src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs @@ -1095,6 +1095,23 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime; } + static TableEntity GetSingleEntityFromHistoryTableResults(IReadOnlyList entities, string dataType) + { + try + { + TableEntity singleEntity = entities.SingleOrDefault(); + + return singleEntity ?? throw new DurableTaskStorageException($"The history table query to determine the blob storage URL " + + $"for the large orchestration {dataType} returned no rows. Unable to extract the URL from these results."); + } + catch (InvalidOperationException) + { + throw new DurableTaskStorageException($"The history table query to determine the blob storage URL for the large orchestration " + + $"{dataType} returned more than one row, when exactly one row is expected. " + + $"Unable to extract the URL from these results."); + } + } + // Set the output. // In the case that the output is too large and is stored in blob storage, extract the blob name from the ExecutionCompleted history entity. if (this.ExceedsMaxTablePropertySize(runtimeState.Output)) @@ -1102,7 +1119,7 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" + $" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" + $" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionCompleted)}'"; - TableEntity executionCompletedEntity = (await this.QueryHistoryAsync(filter, instanceId, cancellationToken)).Single(); + TableEntity executionCompletedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "output"); this.SetInstancesTablePropertyFromHistoryProperty( executionCompletedEntity, instanceEntity, @@ -1124,7 +1141,7 @@ public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync( string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" + $" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" + $" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionStarted)}'"; - TableEntity executionStartedEntity = (await this.QueryHistoryAsync(filter, instanceId, cancellationToken)).Single(); + TableEntity executionStartedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "input"); this.SetInstancesTablePropertyFromHistoryProperty( executionStartedEntity, instanceEntity,