From b1f485f97802cce735f5d876c269c2ea68bdb200 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Tue, 9 Sep 2025 14:34:12 -0700 Subject: [PATCH 01/14] committing what i have for now --- .../Command/OrchestratorActionType.cs | 5 ++ .../Command/RewindOrchestrationAction.cs | 38 +++++++++ src/DurableTask.Core/History/EventType.cs | 5 ++ .../History/ExecutionRewoundEvent.cs | 50 ++++++++++++ .../TaskOrchestrationContext.cs | 15 ++++ .../TaskOrchestrationDispatcher.cs | 80 +++++++++++++++++++ .../TaskOrchestrationExecutor.cs | 5 +- 7 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 src/DurableTask.Core/Command/RewindOrchestrationAction.cs create mode 100644 src/DurableTask.Core/History/ExecutionRewoundEvent.cs diff --git a/src/DurableTask.Core/Command/OrchestratorActionType.cs b/src/DurableTask.Core/Command/OrchestratorActionType.cs index 7e9cbdc69..34d256aaa 100644 --- a/src/DurableTask.Core/Command/OrchestratorActionType.cs +++ b/src/DurableTask.Core/Command/OrchestratorActionType.cs @@ -42,5 +42,10 @@ public enum OrchestratorActionType /// The orchestrator completed. /// OrchestrationComplete, + + /// + /// The orchestration was rewound. + /// + RewindOrchestration, } } \ No newline at end of file diff --git a/src/DurableTask.Core/Command/RewindOrchestrationAction.cs b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs new file mode 100644 index 000000000..29ecf249a --- /dev/null +++ b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs @@ -0,0 +1,38 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- +#nullable enable +namespace DurableTask.Core.Command +{ + + /// + /// Orchestrator action for rewinding orchestrations. + /// + public class RewindOrchestrationAction : OrchestratorAction + { + // NOTE: Actions must be serializable by a variety of different serializer types to support out-of-process execution. + // To ensure maximum compatibility, all properties should be public and settable by default. + + /// + public override OrchestratorActionType OrchestratorActionType => OrchestratorActionType.RewindOrchestration; + + /// + /// The reason for the rewind action. + /// + public string? Reason { get; set; } + + /// + /// The new execution ID of the rewound orchestration. If none is provided, a new ID will be generated by the orchestration itself. + /// + public string? NewExecutionId { get; set; } + } +} \ No newline at end of file diff --git a/src/DurableTask.Core/History/EventType.cs b/src/DurableTask.Core/History/EventType.cs index f9412d0f2..52c44ad11 100644 --- a/src/DurableTask.Core/History/EventType.cs +++ b/src/DurableTask.Core/History/EventType.cs @@ -125,5 +125,10 @@ public enum EventType /// Orchestration was resumed event /// ExecutionResumed, + + /// + /// Orchestration was rewound event. + /// + ExecutionRewound, } } \ No newline at end of file diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs new file mode 100644 index 000000000..a4ce978da --- /dev/null +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -0,0 +1,50 @@ +// ---------------------------------------------------------------------------------- +// Copyright Microsoft Corporation +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ---------------------------------------------------------------------------------- + +namespace DurableTask.Core.History +{ + using System.Runtime.Serialization; + + /// + /// Generic History event + /// + [DataContract] + public class ExecutionRewoundEvent : HistoryEvent + { + /// + /// The reason for the rewind event. + /// + [DataMember] public string Reason; + + /// + /// The new execution ID of the rewound orchestration. If none is provided, a new ID will be generated by the orchestration itself. + /// + [DataMember] public string NewExecutionId; + + /// + /// Creates a new ExecutionRewoundEvent with the supplied event id and reason. + /// + /// The integer event id + /// The reason for the rewind event + public ExecutionRewoundEvent(int eventId, string reason) + : base(eventId) + { + this.Reason = reason; + } + + /// + /// Gets the event type + /// + public override EventType EventType => EventType.ExecutionRewound; + } +} \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index b12cb1b08..5a63861e7 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -725,6 +725,21 @@ public void CompleteOrchestration(string result, string details, OrchestrationSt this.orchestratorActionsMap.Add(id, completedOrchestratorAction); } + public void HandleExecutionRewoundEvent(ExecutionRewoundEvent rewoundEvent) + { + if (!this.executionCompletedOrTerminated) + { + return; + } + int id = this.idCounter++; + var rewindOrchestrationAction = new RewindOrchestrationAction() { Id = id }; + if (rewoundEvent.NewExecutionId != null) + { + rewindOrchestrationAction.NewExecutionId = rewoundEvent.NewExecutionId; + } + this.orchestratorActionsMap.Add(id, rewindOrchestrationAction); + } + class OpenTaskInfo { public string Name { get; set; } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 42564cddf..b4cb7b266 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -513,6 +513,15 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work isCompleted = !continuedAsNew; break; + case OrchestratorActionType.RewindOrchestration: + var rewindDecision = (RewindOrchestrationAction)decision; + this.ProcessRewindOrchestrationDecision(rewindDecision, runtimeState, out List subOrchestrationRewindMessages, out OrchestrationRuntimeState newRuntimeState); + foreach (var rewindMessage in subOrchestrationRewindMessages) + { + orchestratorMessages.Add(rewindMessage); + } + workItem.OrchestrationRuntimeState = newRuntimeState; + break; default: throw TraceHelper.TraceExceptionInstance( TraceEventType.Error, @@ -600,6 +609,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work // Copy the distributed trace context, if any continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent); + // runtimeState = new OrchestrationRuntimeState(); runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); runtimeState.AddEvent(continueAsNewExecutionStarted!); @@ -1239,6 +1249,76 @@ TaskMessage ProcessSendEventDecision( }; } + void ProcessRewindOrchestrationDecision(RewindOrchestrationAction rewindAction, OrchestrationRuntimeState runtimeState, out List subOrchestrationRewindMessages, out OrchestrationRuntimeState newRuntimeState) + { + HashSet failedTaskIds = new(); + subOrchestrationRewindMessages = new(); + newRuntimeState = new() + { + Status = runtimeState.Status + }; + + // Determine the task IDs of the failed tasks and suborchestrations + foreach (var evt in runtimeState.Events) + { + if (evt is TaskFailedEvent taskFailedEvent) + { + failedTaskIds.Add(taskFailedEvent.TaskScheduledId); + } + else if (evt is SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailedEvent) + { + failedTaskIds.Add(subOrchestrationInstanceFailedEvent.TaskScheduledId); + } + } + + newRuntimeState.AddEvent(new OrchestratorStartedEvent(-1)); + newRuntimeState.AddEvent(runtimeState.Events.OfType().LastOrDefault()); + newRuntimeState.AddEvent(new ExecutionStartedEvent(-1, runtimeState.Input) + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = runtimeState.OrchestrationInstance!.InstanceId, + ExecutionId = rewindAction.NewExecutionId ?? Guid.NewGuid().ToString("N") + }, + Tags = runtimeState.Tags, + ParentInstance = runtimeState.ParentInstance, + Name = runtimeState.Name, + Version = runtimeState.Version + }); + + foreach (var evt in runtimeState.Events) + { + // Do not add the TaskScheduledEvents for the failed tasks so that they get rescheduled, and do not add any of the failed task/suborchestration events to the new history + if (!(evt is TaskScheduledEvent taskScheduledEvent && failedTaskIds.Contains(taskScheduledEvent.EventId)) + && evt is not TaskFailedEvent && evt is not SubOrchestrationInstanceFailedEvent) + { + newRuntimeState.AddEvent(evt); + + // For each of the failed suborchestrations, generate a rewind event + if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) + { + var executionId = Guid.NewGuid().ToString("N"); + rewindEventsForFailedSubOrchestrations.Add + ( + new TaskMessage + { + Event = new ExecutionRewoundEvent(-1, rewindAction.Reason) + { + NewExecutionId = executionId + }, + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId, + ExecutionId = executionId + }, + } + ); + } + } + } + newRuntimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + } + internal class NonBlockingCountdownLock { int available; diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index c5e100a2f..b14554c4e 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -151,7 +151,7 @@ void ProcessEvents(IEnumerable events) // TODO: Create a setting that allows orchestrations to complete when the orchestrator // function completes, even if there are open tasks. if (!this.context.HasOpenTasks) - { + { fdf if (this.result!.IsCompleted) { if (this.result.IsFaulted) @@ -251,6 +251,9 @@ void ProcessEvent(HistoryEvent historyEvent) case EventType.ExecutionResumed: this.context.HandleExecutionResumedEvent((ExecutionResumedEvent)historyEvent, ProcessEvent); break; + case EventType.ExecutionRewound: + this.context.HandleExecutionRewoundEvent((ExecutionRewoundEvent)historyEvent); + break; } } } From 86954ac7828e4d442767a5f15f2b8ed6c3a2835a Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Sat, 4 Oct 2025 11:21:30 -0700 Subject: [PATCH 02/14] finishing up the initial implementation --- .../Command/RewindOrchestrationAction.cs | 5 - .../History/ExecutionRewoundEvent.cs | 54 +++++++-- .../SubOrchestrationInstanceCreatedEvent.cs | 21 ++++ .../TaskOrchestrationContext.cs | 6 +- .../TaskOrchestrationDispatcher.cs | 111 ++++++++++++------ .../TaskOrchestrationExecutor.cs | 6 +- 6 files changed, 143 insertions(+), 60 deletions(-) diff --git a/src/DurableTask.Core/Command/RewindOrchestrationAction.cs b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs index 29ecf249a..f58325873 100644 --- a/src/DurableTask.Core/Command/RewindOrchestrationAction.cs +++ b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs @@ -29,10 +29,5 @@ public class RewindOrchestrationAction : OrchestratorAction /// The reason for the rewind action. /// public string? Reason { get; set; } - - /// - /// The new execution ID of the rewound orchestration. If none is provided, a new ID will be generated by the orchestration itself. - /// - public string? NewExecutionId { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs index a4ce978da..abba18d25 100644 --- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -13,6 +13,7 @@ namespace DurableTask.Core.History { + using System.Collections.Generic; using System.Runtime.Serialization; /// @@ -21,16 +22,6 @@ namespace DurableTask.Core.History [DataContract] public class ExecutionRewoundEvent : HistoryEvent { - /// - /// The reason for the rewind event. - /// - [DataMember] public string Reason; - - /// - /// The new execution ID of the rewound orchestration. If none is provided, a new ID will be generated by the orchestration itself. - /// - [DataMember] public string NewExecutionId; - /// /// Creates a new ExecutionRewoundEvent with the supplied event id and reason. /// @@ -46,5 +37,48 @@ public ExecutionRewoundEvent(int eventId, string reason) /// Gets the event type /// public override EventType EventType => EventType.ExecutionRewound; + + /// + /// Gets or sets the reason for the rewind event. + /// + [DataMember] + public string Reason { get; set; } + + + /// + /// Gets or sets the input of the orchestration being rewound + /// + [DataMember] + public string Input { get; set; } + + /// + /// Gets or sets the dictionary of tags of the orchestration being rewound + /// + [DataMember] + public IDictionary Tags { get; set; } + + /// + /// Gets or sets the parent instance of the suborchestration being rewound + /// + [DataMember] + public ParentInstance ParentInstance { get; set; } + + /// + /// Gets or sets the name of the orchestration being rewound + /// + [DataMember] + public string Name { get; set; } + + /// + /// Gets or sets the version of the orchestration being rewound + /// + [DataMember] + public string Version { get; set; } + + /// + /// Gets or sets the instance ID of the orchestration being rewound + /// + [DataMember] + public string InstanceId { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs index e611eb99e..f2209fa2c 100644 --- a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs +++ b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs @@ -13,6 +13,9 @@ namespace DurableTask.Core.History { + using System.Collections; + using System.Collections.Generic; + using System.Collections.Specialized; using System.Runtime.Serialization; /// @@ -64,5 +67,23 @@ public SubOrchestrationInstanceCreatedEvent(int eventId) /// [DataMember] public string ClientSpanId { get; set; } + + /// + /// Gets or sets the execution Id + /// + [DataMember] + public string ExecutionId { get; set; } + + /// + /// Gets or sets the dictionary of tags of the suborchestration + /// + [DataMember] + public IDictionary Tags { get; set; } + + /// + /// Gets or sets the parent instance of the suborchestration + /// + [DataMember] + public ParentInstance ParentInstance { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index 5a63861e7..e6eb3682c 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -41,6 +41,7 @@ internal class TaskOrchestrationContext : OrchestrationContext public bool IsSuspended { get; private set; } public bool HasContinueAsNew => continueAsNew != null; + public bool IsRewinding { get; private set; } public void AddEventToNextIteration(HistoryEvent he) { @@ -731,12 +732,9 @@ public void HandleExecutionRewoundEvent(ExecutionRewoundEvent rewoundEvent) { return; } + this.IsRewinding = true; int id = this.idCounter++; var rewindOrchestrationAction = new RewindOrchestrationAction() { Id = id }; - if (rewoundEvent.NewExecutionId != null) - { - rewindOrchestrationAction.NewExecutionId = rewoundEvent.NewExecutionId; - } this.orchestratorActionsMap.Add(id, rewindOrchestrationAction); } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index b4cb7b266..18fbf9116 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -314,6 +314,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work var isCompleted = false; var continuedAsNew = false; var isInterrupted = false; + var isRewinding = false; // correlation CorrelationTraceClient.Propagate(() => CorrelationTraceContext.Current = workItem.TraceContext); @@ -515,12 +516,17 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work break; case OrchestratorActionType.RewindOrchestration: var rewindDecision = (RewindOrchestrationAction)decision; - this.ProcessRewindOrchestrationDecision(rewindDecision, runtimeState, out List subOrchestrationRewindMessages, out OrchestrationRuntimeState newRuntimeState); - foreach (var rewindMessage in subOrchestrationRewindMessages) - { - orchestratorMessages.Add(rewindMessage); - } + this.ProcessRewindOrchestrationDecision( + rewindDecision, + runtimeState, + out List subOrchestrationRewindMessages, + out OrchestrationRuntimeState newRuntimeState); + orchestratorMessages.AddRange(subOrchestrationRewindMessages); workItem.OrchestrationRuntimeState = newRuntimeState; + // Setting this to true here will end an extended session if it is in progress. + // We don't want to save the state across executions, since we essentially manually modify + // the orchestration history here and so that stored by the extended session is incorrect. + isRewinding = true; break; default: throw TraceHelper.TraceExceptionInstance( @@ -609,7 +615,6 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work // Copy the distributed trace context, if any continueAsNewExecutionStarted!.SetParentTraceContext(runtimeState.ExecutionStartedEvent); - // runtimeState = new OrchestrationRuntimeState(); runtimeState.AddEvent(new OrchestratorStartedEvent(-1)); runtimeState.AddEvent(continueAsNewExecutionStarted!); @@ -683,7 +688,7 @@ await this.orchestrationService.CompleteTaskOrchestrationWorkItemAsync( workItem.OrchestrationRuntimeState = runtimeState; } - return isCompleted || continuedAsNew || isInterrupted; + return isCompleted || continuedAsNew || isInterrupted || isRewinding; } static OrchestrationExecutionContext GetOrchestrationExecutionContext(OrchestrationRuntimeState runtimeState) @@ -927,7 +932,12 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt TraceHelper.EmitTraceActivityForTaskFailed(workItem.OrchestrationRuntimeState.OrchestrationInstance, taskScheduledEvent, taskFailedEvent, errorPropagationMode); } - workItem.OrchestrationRuntimeState.AddEvent(message.Event); + // In this case, the ExecutionRewoundEvent has already been added to the history and is just sent as a way to trigger the failed deepest suborchestrations to rerun. + // We do not redundantly add it to the history in this situation. + if (!(message.Event is ExecutionRewoundEvent executionRewoundEvent && workItem.OrchestrationRuntimeState.OrchestrationStatus == OrchestrationStatus.Running)) + { + workItem.OrchestrationRuntimeState.AddEvent(message.Event); + } } return true; @@ -1153,7 +1163,16 @@ TaskMessage ProcessCreateSubOrchestrationInstanceDecision( { Name = createSubOrchestrationAction.Name, Version = createSubOrchestrationAction.Version, - InstanceId = createSubOrchestrationAction.InstanceId + InstanceId = createSubOrchestrationAction.InstanceId, + ExecutionId = Guid.NewGuid().ToString("N"), + ParentInstance = new ParentInstance + { + OrchestrationInstance = runtimeState.OrchestrationInstance, + Name = runtimeState.Name, + Version = runtimeState.Version, + TaskScheduleId = createSubOrchestrationAction.Id + }, + Tags = OrchestrationTags.MergeTags(createSubOrchestrationAction.Tags, runtimeState.Tags), }; if (includeParameters) { @@ -1166,19 +1185,13 @@ TaskMessage ProcessCreateSubOrchestrationInstanceDecision( var startedEvent = new ExecutionStartedEvent(-1, createSubOrchestrationAction.Input) { - Tags = OrchestrationTags.MergeTags(createSubOrchestrationAction.Tags, runtimeState.Tags), + Tags = historyEvent.Tags, OrchestrationInstance = new OrchestrationInstance { InstanceId = createSubOrchestrationAction.InstanceId, - ExecutionId = Guid.NewGuid().ToString("N") - }, - ParentInstance = new ParentInstance - { - OrchestrationInstance = runtimeState.OrchestrationInstance, - Name = runtimeState.Name, - Version = runtimeState.Version, - TaskScheduleId = createSubOrchestrationAction.Id + ExecutionId = historyEvent.ExecutionId }, + ParentInstance = historyEvent.ParentInstance, Name = createSubOrchestrationAction.Name, Version = createSubOrchestrationAction.Version }; @@ -1249,10 +1262,15 @@ TaskMessage ProcessSendEventDecision( }; } - void ProcessRewindOrchestrationDecision(RewindOrchestrationAction rewindAction, OrchestrationRuntimeState runtimeState, out List subOrchestrationRewindMessages, out OrchestrationRuntimeState newRuntimeState) + void ProcessRewindOrchestrationDecision( + RewindOrchestrationAction rewindAction, + OrchestrationRuntimeState runtimeState, + out List subOrchestrationRewindMessages, + out OrchestrationRuntimeState newRuntimeState) { HashSet failedTaskIds = new(); subOrchestrationRewindMessages = new(); + newRuntimeState = new() { Status = runtimeState.Status @@ -1273,50 +1291,67 @@ void ProcessRewindOrchestrationDecision(RewindOrchestrationAction rewindAction, newRuntimeState.AddEvent(new OrchestratorStartedEvent(-1)); newRuntimeState.AddEvent(runtimeState.Events.OfType().LastOrDefault()); - newRuntimeState.AddEvent(new ExecutionStartedEvent(-1, runtimeState.Input) - { - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = runtimeState.OrchestrationInstance!.InstanceId, - ExecutionId = rewindAction.NewExecutionId ?? Guid.NewGuid().ToString("N") - }, - Tags = runtimeState.Tags, - ParentInstance = runtimeState.ParentInstance, - Name = runtimeState.Name, - Version = runtimeState.Version - }); - foreach (var evt in runtimeState.Events) + // We start at 1 since we want to skip the first OrchestratorStartedEvent since we already this event manually above + for (int i = 1; i < runtimeState.Events.Count; i++) { - // Do not add the TaskScheduledEvents for the failed tasks so that they get rescheduled, and do not add any of the failed task/suborchestration events to the new history + var evt = runtimeState.Events[i]; + + // Do not add the TaskScheduledEvents for the failed tasks so that they get rescheduled, and do not add any of + // the failed task/suborchestration events to the new history if (!(evt is TaskScheduledEvent taskScheduledEvent && failedTaskIds.Contains(taskScheduledEvent.EventId)) && evt is not TaskFailedEvent && evt is not SubOrchestrationInstanceFailedEvent) { newRuntimeState.AddEvent(evt); + if (evt is ExecutionStartedEvent executionStartedEvent) + { + // Copy all information from the old ExecutionStartedEvent except for the ExecutionId, since we create a new one + executionStartedEvent.OrchestrationInstance.ExecutionId = Guid.NewGuid().ToString("N"); + } // For each of the failed suborchestrations, generate a rewind event if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) { - var executionId = Guid.NewGuid().ToString("N"); - rewindEventsForFailedSubOrchestrations.Add + subOrchestrationRewindMessages.Add ( new TaskMessage { + // We include all this information in the rewind event to handle the edge case scenario of a suborchestration's history being purged. + // The ExecutionRewoundEvent will then contain enough information to rerun the entire suborchestration from scratch. Event = new ExecutionRewoundEvent(-1, rewindAction.Reason) { - NewExecutionId = executionId + Tags = subOrchestrationInstanceCreatedEvent.Tags, + InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId, + ParentInstance = subOrchestrationInstanceCreatedEvent.ParentInstance, + Name = subOrchestrationInstanceCreatedEvent.Name, + Version = subOrchestrationInstanceCreatedEvent.Version, + Input = subOrchestrationInstanceCreatedEvent.Input }, OrchestrationInstance = new OrchestrationInstance { InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId, - ExecutionId = executionId + ExecutionId = subOrchestrationInstanceCreatedEvent.ExecutionId }, } ); } } } - newRuntimeState.AddEvent(new OrchestratorCompletedEvent(-1)); + + // If this is a "terminal leaf" with no suborchestrations, we need to add an outbound message to it to force it to rerun. + // This will trigger the orchestration to rerun with the altered history, so it will only rerun the failed tasks. + // Once it finishes, it will send a completion message to its parent orchestration, which will trigger the parents to rerun as well. + if (subOrchestrationRewindMessages.Count == 0) + { + subOrchestrationRewindMessages.Add( + new TaskMessage + { + // This is a "dummy event" that will not be added to the history and is used just to trigger the rerun. + Event = new ExecutionRewoundEvent(-1, string.Empty), + OrchestrationInstance = runtimeState.OrchestrationInstance, + } + ); + } } internal class NonBlockingCountdownLock diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index b14554c4e..87ede3b9a 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -151,8 +151,8 @@ void ProcessEvents(IEnumerable events) // TODO: Create a setting that allows orchestrations to complete when the orchestrator // function completes, even if there are open tasks. if (!this.context.HasOpenTasks) - { fdf - if (this.result!.IsCompleted) + { + if (this.result!.IsCompleted && !this.context.IsRewinding) { if (this.result.IsFaulted) { @@ -173,7 +173,7 @@ void ProcessEvents(IEnumerable events) } } - // TODO: It is an error if result is not completed when all OpenTasks are done. + // TODO: It is an error if result is not completed when all OpenTasks are done and the orchestration is not rewinding. // Throw an exception in that case. } } From f0b18e7b56146f8eb80a5d9b07416692c4a9ca70 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 15 Oct 2025 11:22:16 -0700 Subject: [PATCH 03/14] everything is working --- .../Command/RewindOrchestrationAction.cs | 8 -- .../History/ExecutionRewoundEvent.cs | 37 ++----- .../SubOrchestrationInstanceCreatedEvent.cs | 18 --- src/DurableTask.Core/OrchestrationStatus.cs | 5 + .../TaskOrchestrationContext.cs | 13 --- .../TaskOrchestrationDispatcher.cs | 103 +++++++++++------- .../TaskOrchestrationExecutor.cs | 7 +- 7 files changed, 77 insertions(+), 114 deletions(-) diff --git a/src/DurableTask.Core/Command/RewindOrchestrationAction.cs b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs index f58325873..6913726c7 100644 --- a/src/DurableTask.Core/Command/RewindOrchestrationAction.cs +++ b/src/DurableTask.Core/Command/RewindOrchestrationAction.cs @@ -19,15 +19,7 @@ namespace DurableTask.Core.Command /// public class RewindOrchestrationAction : OrchestratorAction { - // NOTE: Actions must be serializable by a variety of different serializer types to support out-of-process execution. - // To ensure maximum compatibility, all properties should be public and settable by default. - /// public override OrchestratorActionType OrchestratorActionType => OrchestratorActionType.RewindOrchestration; - - /// - /// The reason for the rewind action. - /// - public string? Reason { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs index abba18d25..18b524639 100644 --- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -22,6 +22,12 @@ namespace DurableTask.Core.History [DataContract] public class ExecutionRewoundEvent : HistoryEvent { + /// + /// Creates a new ExecutionRewoundEvent with the supplied event id and empty reason. + /// + /// The integer event id + public ExecutionRewoundEvent(int eventId) : base(eventId) { } + /// /// Creates a new ExecutionRewoundEvent with the supplied event id and reason. /// @@ -44,39 +50,14 @@ public ExecutionRewoundEvent(int eventId, string reason) [DataMember] public string Reason { get; set; } - - /// - /// Gets or sets the input of the orchestration being rewound - /// - [DataMember] - public string Input { get; set; } - - /// - /// Gets or sets the dictionary of tags of the orchestration being rewound - /// - [DataMember] - public IDictionary Tags { get; set; } - - /// - /// Gets or sets the parent instance of the suborchestration being rewound - /// - [DataMember] - public ParentInstance ParentInstance { get; set; } - - /// - /// Gets or sets the name of the orchestration being rewound - /// - [DataMember] - public string Name { get; set; } - /// - /// Gets or sets the version of the orchestration being rewound + /// Gets or sets the parent execution id of the rewound suborchestration. /// [DataMember] - public string Version { get; set; } + public string ParentExecutionId { get; set; } /// - /// Gets or sets the instance ID of the orchestration being rewound + /// Gets or sets the instance ID of the rewound orchestration. /// [DataMember] public string InstanceId { get; set; } diff --git a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs index f2209fa2c..7c4ac2019 100644 --- a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs +++ b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs @@ -67,23 +67,5 @@ public SubOrchestrationInstanceCreatedEvent(int eventId) /// [DataMember] public string ClientSpanId { get; set; } - - /// - /// Gets or sets the execution Id - /// - [DataMember] - public string ExecutionId { get; set; } - - /// - /// Gets or sets the dictionary of tags of the suborchestration - /// - [DataMember] - public IDictionary Tags { get; set; } - - /// - /// Gets or sets the parent instance of the suborchestration - /// - [DataMember] - public ParentInstance ParentInstance { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/OrchestrationStatus.cs b/src/DurableTask.Core/OrchestrationStatus.cs index 098aecec1..1b581a08a 100644 --- a/src/DurableTask.Core/OrchestrationStatus.cs +++ b/src/DurableTask.Core/OrchestrationStatus.cs @@ -57,5 +57,10 @@ public enum OrchestrationStatus /// Orchestration state of suspended /// Suspended, + + /// + /// Orchestration status of rewinding + /// + Rewinding } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationContext.cs b/src/DurableTask.Core/TaskOrchestrationContext.cs index 26c9c302f..87fc435b3 100644 --- a/src/DurableTask.Core/TaskOrchestrationContext.cs +++ b/src/DurableTask.Core/TaskOrchestrationContext.cs @@ -41,7 +41,6 @@ internal class TaskOrchestrationContext : OrchestrationContext public bool IsSuspended { get; private set; } public bool HasContinueAsNew => continueAsNew != null; - public bool IsRewinding { get; private set; } public void AddEventToNextIteration(HistoryEvent he) { @@ -728,18 +727,6 @@ public void CompleteOrchestration(string result, string details, OrchestrationSt this.orchestratorActionsMap.Add(id, completedOrchestratorAction); } - public void HandleExecutionRewoundEvent(ExecutionRewoundEvent rewoundEvent) - { - if (!this.executionCompletedOrTerminated) - { - return; - } - this.IsRewinding = true; - int id = this.idCounter++; - var rewindOrchestrationAction = new RewindOrchestrationAction() { Id = id }; - this.orchestratorActionsMap.Add(id, rewindOrchestrationAction); - } - class OpenTaskInfo { public string Name { get; set; } diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 7d7a113f9..4e279b763 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -428,15 +428,22 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work if (!versioningFailed) { - if (workItem.Cursor == null) + if (runtimeState.NewEvents.OfType().LastOrDefault() is not null) { - workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem); + decisions = new List { new RewindOrchestrationAction() }; } else { - await this.ResumeOrchestrationAsync(workItem); + if (workItem.Cursor == null) + { + workItem.Cursor = await this.ExecuteOrchestrationAsync(runtimeState, workItem); + } + else + { + await this.ResumeOrchestrationAsync(workItem); + } + decisions = workItem.Cursor.LatestDecisions.ToList(); } - decisions = workItem.Cursor.LatestDecisions.ToList(); } this.logHelper.OrchestrationExecuted( @@ -521,14 +528,13 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work isCompleted = !continuedAsNew; break; case OrchestratorActionType.RewindOrchestration: - var rewindDecision = (RewindOrchestrationAction)decision; this.ProcessRewindOrchestrationDecision( - rewindDecision, runtimeState, out List subOrchestrationRewindMessages, out OrchestrationRuntimeState newRuntimeState); orchestratorMessages.AddRange(subOrchestrationRewindMessages); workItem.OrchestrationRuntimeState = newRuntimeState; + runtimeState = newRuntimeState; // Setting this to true here will end an extended session if it is in progress. // We don't want to save the state across executions, since we essentially manually modify // the orchestration history here and so that stored by the extended session is incorrect. @@ -870,6 +876,19 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt return false; } + if (message.Event.EventType == EventType.ExecutionRewound + && workItem.OrchestrationRuntimeState.OrchestrationStatus != OrchestrationStatus.Running + && workItem.NewMessages.Count > 1) + { + foreach (TaskMessage droppedMessage in workItem.NewMessages) + { + logHelper.DroppingOrchestrationMessage(workItem, droppedMessage, "Multiple messages sent to an instance " + + "that is attempting to rewind from a terminal state. The only message that can be sent in " + + "this case is the rewind request."); + } + return false; + } + logHelper.ProcessingOrchestrationMessage(workItem, message); TraceHelper.TraceInstance( TraceEventType.Information, @@ -1182,15 +1201,6 @@ TaskMessage ProcessCreateSubOrchestrationInstanceDecision( Name = createSubOrchestrationAction.Name, Version = createSubOrchestrationAction.Version, InstanceId = createSubOrchestrationAction.InstanceId, - ExecutionId = Guid.NewGuid().ToString("N"), - ParentInstance = new ParentInstance - { - OrchestrationInstance = runtimeState.OrchestrationInstance, - Name = runtimeState.Name, - Version = runtimeState.Version, - TaskScheduleId = createSubOrchestrationAction.Id - }, - Tags = OrchestrationTags.MergeTags(createSubOrchestrationAction.Tags, runtimeState.Tags), }; if (includeParameters) { @@ -1203,13 +1213,19 @@ TaskMessage ProcessCreateSubOrchestrationInstanceDecision( var startedEvent = new ExecutionStartedEvent(-1, createSubOrchestrationAction.Input) { - Tags = historyEvent.Tags, + Tags = OrchestrationTags.MergeTags(createSubOrchestrationAction.Tags, runtimeState.Tags), OrchestrationInstance = new OrchestrationInstance { InstanceId = createSubOrchestrationAction.InstanceId, - ExecutionId = historyEvent.ExecutionId + ExecutionId = Guid.NewGuid().ToString("N") + }, + ParentInstance = new ParentInstance + { + OrchestrationInstance = runtimeState.OrchestrationInstance, + Name = runtimeState.Name, + Version = runtimeState.Version, + TaskScheduleId = createSubOrchestrationAction.Id }, - ParentInstance = historyEvent.ParentInstance, Name = createSubOrchestrationAction.Name, Version = createSubOrchestrationAction.Version }; @@ -1281,7 +1297,6 @@ TaskMessage ProcessSendEventDecision( } void ProcessRewindOrchestrationDecision( - RewindOrchestrationAction rewindAction, OrchestrationRuntimeState runtimeState, out List subOrchestrationRewindMessages, out OrchestrationRuntimeState newRuntimeState) @@ -1307,48 +1322,52 @@ void ProcessRewindOrchestrationDecision( } } - newRuntimeState.AddEvent(new OrchestratorStartedEvent(-1)); - newRuntimeState.AddEvent(runtimeState.Events.OfType().LastOrDefault()); + ExecutionRewoundEvent executionRewoundEvent = (runtimeState.NewEvents.Last(e => e is ExecutionRewoundEvent) as ExecutionRewoundEvent)!; + string newExecutionId = Guid.NewGuid().ToString("N"); - // We start at 1 since we want to skip the first OrchestratorStartedEvent since we already this event manually above - for (int i = 1; i < runtimeState.Events.Count; i++) + // In so copying the history, we do minimal alterations which provides less room for error and less work for the backends. + // That being said, the new history will have the ExecutionRewoundEvent at near end, rather than the beginning (which might be more intuitive). + // And we can also end up with sections with an OrchestratorStartedEvent immediately followed by an OrchestratorCompletedEvent if all the events in between got deleted. + // Are we okay with this? + foreach (var evt in runtimeState.Events) { - var evt = runtimeState.Events[i]; - // Do not add the TaskScheduledEvents for the failed tasks so that they get rescheduled, and do not add any of - // the failed task/suborchestration events to the new history + // the failed task/suborchestration/execution events to the new history. if (!(evt is TaskScheduledEvent taskScheduledEvent && failedTaskIds.Contains(taskScheduledEvent.EventId)) - && evt is not TaskFailedEvent && evt is not SubOrchestrationInstanceFailedEvent) + && evt is not TaskFailedEvent + && evt is not SubOrchestrationInstanceFailedEvent + && evt is not ExecutionCompletedEvent) { newRuntimeState.AddEvent(evt); if (evt is ExecutionStartedEvent executionStartedEvent) { // Copy all information from the old ExecutionStartedEvent except for the ExecutionId, since we create a new one - executionStartedEvent.OrchestrationInstance.ExecutionId = Guid.NewGuid().ToString("N"); + executionStartedEvent.OrchestrationInstance.ExecutionId = newExecutionId; + // If this is a suborchestration, we also need to update the ParentInstance's ExecutionId to match the new ExecutionId of the rewinding parent orchestration + if (!string.IsNullOrEmpty(executionRewoundEvent.ParentExecutionId)) + { + executionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId = executionRewoundEvent.ParentExecutionId; + } } // For each of the failed suborchestrations, generate a rewind event - if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) + if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent + && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) { + var childExecutionRewoundEvent = new ExecutionRewoundEvent(-1, executionRewoundEvent!.Reason) + { + ParentExecutionId = newExecutionId, + InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId + }; + subOrchestrationRewindMessages.Add ( new TaskMessage { - // We include all this information in the rewind event to handle the edge case scenario of a suborchestration's history being purged. - // The ExecutionRewoundEvent will then contain enough information to rerun the entire suborchestration from scratch. - Event = new ExecutionRewoundEvent(-1, rewindAction.Reason) - { - Tags = subOrchestrationInstanceCreatedEvent.Tags, - InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId, - ParentInstance = subOrchestrationInstanceCreatedEvent.ParentInstance, - Name = subOrchestrationInstanceCreatedEvent.Name, - Version = subOrchestrationInstanceCreatedEvent.Version, - Input = subOrchestrationInstanceCreatedEvent.Input - }, + Event = childExecutionRewoundEvent, OrchestrationInstance = new OrchestrationInstance { - InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId, - ExecutionId = subOrchestrationInstanceCreatedEvent.ExecutionId + InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId }, } ); diff --git a/src/DurableTask.Core/TaskOrchestrationExecutor.cs b/src/DurableTask.Core/TaskOrchestrationExecutor.cs index 572a81754..540851e50 100644 --- a/src/DurableTask.Core/TaskOrchestrationExecutor.cs +++ b/src/DurableTask.Core/TaskOrchestrationExecutor.cs @@ -178,7 +178,7 @@ void ProcessEvents(IEnumerable events) // function completes, even if there are open tasks. if (!this.context.HasOpenTasks) { - if (this.result!.IsCompleted && !this.context.IsRewinding) + if (this.result!.IsCompleted) { if (this.result.IsFaulted) { @@ -199,7 +199,7 @@ void ProcessEvents(IEnumerable events) } } - // TODO: It is an error if result is not completed when all OpenTasks are done and the orchestration is not rewinding. + // TODO: It is an error if result is not completed when all OpenTasks are done. // Throw an exception in that case. } } @@ -277,9 +277,6 @@ void ProcessEvent(HistoryEvent historyEvent) case EventType.ExecutionResumed: this.context.HandleExecutionResumedEvent((ExecutionResumedEvent)historyEvent, ProcessEvent); break; - case EventType.ExecutionRewound: - this.context.HandleExecutionRewoundEvent((ExecutionRewoundEvent)historyEvent); - break; } } } From 0de4fe65c5582d6fe66429c3060c8b0de1b4fd02 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Wed, 15 Oct 2025 12:11:23 -0700 Subject: [PATCH 04/14] removed unused usings --- .../History/SubOrchestrationInstanceCreatedEvent.cs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs index 7c4ac2019..e611eb99e 100644 --- a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs +++ b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs @@ -13,9 +13,6 @@ namespace DurableTask.Core.History { - using System.Collections; - using System.Collections.Generic; - using System.Collections.Specialized; using System.Runtime.Serialization; /// From 3ee0a73b21d2b49dbfd828d1f14a0f3cdc4a76b8 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 17 Oct 2025 11:34:50 -0700 Subject: [PATCH 05/14] fixed distributed tracing --- .../History/ExecutionRewoundEvent.cs | 10 +++++-- .../TaskOrchestrationDispatcher.cs | 30 +++++++++++++++++-- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs index 18b524639..d88c36e04 100644 --- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -13,14 +13,14 @@ namespace DurableTask.Core.History { - using System.Collections.Generic; + using DurableTask.Core.Tracing; using System.Runtime.Serialization; /// /// Generic History event /// [DataContract] - public class ExecutionRewoundEvent : HistoryEvent + public class ExecutionRewoundEvent : HistoryEvent, ISupportsDurableTraceContext { /// /// Creates a new ExecutionRewoundEvent with the supplied event id and empty reason. @@ -61,5 +61,11 @@ public ExecutionRewoundEvent(int eventId, string reason) /// [DataMember] public string InstanceId { get; set; } + + /// + /// Gets or sets the parent trace context of the rewound suborchestration. + /// + [DataMember] + public DistributedTraceContext ParentTraceContext { get; set; } } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 4e279b763..ad5207d9e 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -343,6 +343,20 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work ExecutionStartedEvent startEvent = runtimeState.ExecutionStartedEvent ?? workItem.NewMessages.Select(msg => msg.Event).OfType().FirstOrDefault(); + ExecutionRewoundEvent rewindEvent = + workItem.NewMessages.Select(msg => msg.Event).OfType().LastOrDefault(); + + if (rewindEvent is not null && runtimeState.OrchestrationStatus != OrchestrationStatus.Running) + { + isRewinding = true; + if (rewindEvent.ParentTraceContext != null) + { + startEvent.ParentTraceContext = rewindEvent.ParentTraceContext; + } + startEvent.ParentTraceContext.SpanId = null; + startEvent.ParentTraceContext.Id = null; + startEvent.ParentTraceContext.ActivityStartTime = null; + } Activity? traceActivity = TraceHelper.StartTraceActivityForOrchestrationExecution(startEvent); OrchestrationState? instanceState = null; @@ -428,7 +442,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work if (!versioningFailed) { - if (runtimeState.NewEvents.OfType().LastOrDefault() is not null) + if (isRewinding) { decisions = new List { new RewindOrchestrationAction() }; } @@ -1353,13 +1367,25 @@ void ProcessRewindOrchestrationDecision( // For each of the failed suborchestrations, generate a rewind event if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) - { + { var childExecutionRewoundEvent = new ExecutionRewoundEvent(-1, executionRewoundEvent!.Reason) { ParentExecutionId = newExecutionId, InstanceId = subOrchestrationInstanceCreatedEvent.InstanceId }; + if (runtimeState.ExecutionStartedEvent.TryGetParentTraceContext(out ActivityContext parentTraceContext)) + { + var newClientSpanId = ActivitySpanId.CreateRandom(); + subOrchestrationInstanceCreatedEvent.ClientSpanId = newClientSpanId.ToString(); + ActivityContext childActivityContext = new( + parentTraceContext.TraceId, + newClientSpanId, + parentTraceContext.TraceFlags, + parentTraceContext.TraceState); + childExecutionRewoundEvent.SetParentTraceContext(childActivityContext); + } + subOrchestrationRewindMessages.Add ( new TaskMessage From 43e26c72142c14d629bce954619ade5f61e1e896 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 17 Oct 2025 11:37:34 -0700 Subject: [PATCH 06/14] added some comments --- src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index ad5207d9e..d23af1424 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -353,6 +353,7 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work { startEvent.ParentTraceContext = rewindEvent.ParentTraceContext; } + // We set these to null here so that a new Activity is created to represent the execution of the rewound orchestration. startEvent.ParentTraceContext.SpanId = null; startEvent.ParentTraceContext.Id = null; startEvent.ParentTraceContext.ActivityStartTime = null; @@ -1377,6 +1378,8 @@ void ProcessRewindOrchestrationDecision( if (runtimeState.ExecutionStartedEvent.TryGetParentTraceContext(out ActivityContext parentTraceContext)) { var newClientSpanId = ActivitySpanId.CreateRandom(); + // We set a new client span ID here so that the execution of the rewound suborchestration is not tied to the + // old parent. subOrchestrationInstanceCreatedEvent.ClientSpanId = newClientSpanId.ToString(); ActivityContext childActivityContext = new( parentTraceContext.TraceId, From 5ab8370bf36898d600372af8a033b0b0c30da28c Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 20 Oct 2025 12:41:56 -0700 Subject: [PATCH 07/14] added changes to avoid altering the original runtime state --- .../History/ExecutionStartedEvent.cs | 25 ++++++++++++++++++ .../SubOrchestrationInstanceCreatedEvent.cs | 19 ++++++++++++++ .../TaskOrchestrationDispatcher.cs | 26 ++++++++++++++----- .../Tracing/DistributedTraceContext.cs | 10 +++++++ 4 files changed, 73 insertions(+), 7 deletions(-) diff --git a/src/DurableTask.Core/History/ExecutionStartedEvent.cs b/src/DurableTask.Core/History/ExecutionStartedEvent.cs index 6ae593359..59c6b8202 100644 --- a/src/DurableTask.Core/History/ExecutionStartedEvent.cs +++ b/src/DurableTask.Core/History/ExecutionStartedEvent.cs @@ -48,6 +48,31 @@ internal ExecutionStartedEvent() { } + /// + /// Creates a new ExecutionStartedEvent with the same fields as . + /// A deep copy is performed on all non-base class fields. + /// + internal ExecutionStartedEvent(ExecutionStartedEvent other) + { + // Copy base class fields + EventId = other.EventId; + Timestamp = other.Timestamp; + ExtensionData = other.ExtensionData; + IsPlayed = other.IsPlayed; + + // Deep copy all other fields + OrchestrationInstance = other.OrchestrationInstance?.Clone(); + ParentInstance = other.ParentInstance?.Clone(); + ParentTraceContext = other.ParentTraceContext?.Clone(); + Input = other.Input; + Name = other.Name; + Version = other.Version; + Tags = other.Tags != null ? new Dictionary(other.Tags) : null; + Correlation = other.Correlation; + ScheduledStartTime = other.ScheduledStartTime; + Generation = other.Generation; + } + /// /// Gets the event type /// diff --git a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs index e611eb99e..646070933 100644 --- a/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs +++ b/src/DurableTask.Core/History/SubOrchestrationInstanceCreatedEvent.cs @@ -30,6 +30,25 @@ public SubOrchestrationInstanceCreatedEvent(int eventId) { } + /// + /// Creates a new ExecutionStartedEvent with the same fields as . + /// + internal SubOrchestrationInstanceCreatedEvent(SubOrchestrationInstanceCreatedEvent other) + { + // Copy base class fields + EventId = other.EventId; + Timestamp = other.Timestamp; + ExtensionData = other.ExtensionData; + IsPlayed = other.IsPlayed; + + // Copy all other fields + Name = other.Name; + Version = other.Version; + InstanceId = other.InstanceId; + Input = other.Input; + ClientSpanId = other.ClientSpanId; + } + /// /// Gets the event type /// diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index d23af1424..20123d9e4 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -1353,20 +1353,24 @@ void ProcessRewindOrchestrationDecision( && evt is not SubOrchestrationInstanceFailedEvent && evt is not ExecutionCompletedEvent) { - newRuntimeState.AddEvent(evt); + HistoryEvent eventToAdd = evt; + if (evt is ExecutionStartedEvent executionStartedEvent) { // Copy all information from the old ExecutionStartedEvent except for the ExecutionId, since we create a new one - executionStartedEvent.OrchestrationInstance.ExecutionId = newExecutionId; + var newExecutionStartedEvent = new ExecutionStartedEvent(executionStartedEvent); + newExecutionStartedEvent.OrchestrationInstance.ExecutionId = newExecutionId; + // If this is a suborchestration, we also need to update the ParentInstance's ExecutionId to match the new ExecutionId of the rewinding parent orchestration if (!string.IsNullOrEmpty(executionRewoundEvent.ParentExecutionId)) { - executionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId = executionRewoundEvent.ParentExecutionId; + newExecutionStartedEvent.ParentInstance.OrchestrationInstance.ExecutionId = executionRewoundEvent.ParentExecutionId; } + eventToAdd = newExecutionStartedEvent; } // For each of the failed suborchestrations, generate a rewind event - if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent + else if (evt is SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreatedEvent && failedTaskIds.Contains(subOrchestrationInstanceCreatedEvent.EventId)) { var childExecutionRewoundEvent = new ExecutionRewoundEvent(-1, executionRewoundEvent!.Reason) @@ -1377,10 +1381,15 @@ void ProcessRewindOrchestrationDecision( if (runtimeState.ExecutionStartedEvent.TryGetParentTraceContext(out ActivityContext parentTraceContext)) { - var newClientSpanId = ActivitySpanId.CreateRandom(); // We set a new client span ID here so that the execution of the rewound suborchestration is not tied to the // old parent. - subOrchestrationInstanceCreatedEvent.ClientSpanId = newClientSpanId.ToString(); + var newClientSpanId = ActivitySpanId.CreateRandom(); + var newSubOrchestrationInstanceCreatedEvent = new SubOrchestrationInstanceCreatedEvent(subOrchestrationInstanceCreatedEvent) + { + ClientSpanId = newClientSpanId.ToString() + }; + eventToAdd = newSubOrchestrationInstanceCreatedEvent; + ActivityContext childActivityContext = new( parentTraceContext.TraceId, newClientSpanId, @@ -1401,6 +1410,9 @@ void ProcessRewindOrchestrationDecision( } ); } + + // Finally, add the event to the new history + newRuntimeState.AddEvent(eventToAdd); } } @@ -1414,7 +1426,7 @@ void ProcessRewindOrchestrationDecision( { // This is a "dummy event" that will not be added to the history and is used just to trigger the rerun. Event = new ExecutionRewoundEvent(-1, string.Empty), - OrchestrationInstance = runtimeState.OrchestrationInstance, + OrchestrationInstance = newRuntimeState.OrchestrationInstance, } ); } diff --git a/src/DurableTask.Core/Tracing/DistributedTraceContext.cs b/src/DurableTask.Core/Tracing/DistributedTraceContext.cs index b69b1f5f6..612caa386 100644 --- a/src/DurableTask.Core/Tracing/DistributedTraceContext.cs +++ b/src/DurableTask.Core/Tracing/DistributedTraceContext.cs @@ -80,5 +80,15 @@ public string? TraceState /// [DataMember] public DateTimeOffset? ActivityStartTime { get; set; } + + internal DistributedTraceContext Clone() + { + return new DistributedTraceContext(this.TraceParent, this.TraceState) + { + Id = Id, + SpanId = SpanId, + ActivityStartTime = ActivityStartTime + }; + } } } From 14fe651128453ddbe55d60742e7876b9c0917d6f Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 20 Oct 2025 12:59:48 -0700 Subject: [PATCH 08/14] changed a comment --- src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 20123d9e4..83476b6e4 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -1341,8 +1341,7 @@ void ProcessRewindOrchestrationDecision( string newExecutionId = Guid.NewGuid().ToString("N"); // In so copying the history, we do minimal alterations which provides less room for error and less work for the backends. - // That being said, the new history will have the ExecutionRewoundEvent at near end, rather than the beginning (which might be more intuitive). - // And we can also end up with sections with an OrchestratorStartedEvent immediately followed by an OrchestratorCompletedEvent if all the events in between got deleted. + // That being said, we end up with sections with an OrchestratorStartedEvent immediately followed by an OrchestratorCompletedEvent if all the events in between got deleted. // Are we okay with this? foreach (var evt in runtimeState.Events) { From 8a6f0755d4f9b7c39592ea34b1e1e79bdd115e2f Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 24 Oct 2025 13:59:25 -0700 Subject: [PATCH 09/14] addressing PR comments --- .../History/ExecutionRewoundEvent.cs | 1 + src/DurableTask.Core/OrchestrationStatus.cs | 7 +------ src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 12 +++++++++--- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs index d88c36e04..07b053c87 100644 --- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -11,6 +11,7 @@ // limitations under the License. // ---------------------------------------------------------------------------------- +#nullable enable namespace DurableTask.Core.History { using DurableTask.Core.Tracing; diff --git a/src/DurableTask.Core/OrchestrationStatus.cs b/src/DurableTask.Core/OrchestrationStatus.cs index 1b581a08a..1e26747d8 100644 --- a/src/DurableTask.Core/OrchestrationStatus.cs +++ b/src/DurableTask.Core/OrchestrationStatus.cs @@ -56,11 +56,6 @@ public enum OrchestrationStatus /// /// Orchestration state of suspended /// - Suspended, - - /// - /// Orchestration status of rewinding - /// - Rewinding + Suspended } } \ No newline at end of file diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 83476b6e4..4cf5e5d5d 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -443,6 +443,9 @@ protected async Task OnProcessWorkItemAsync(TaskOrchestrationWorkItem work if (!versioningFailed) { + // In this case we skip the orchestration's execution since all tasks have been completed and it is in a terminal state. + // Instead we "rewind" its execution by removing all failed tasks (see ProcessRewindOrchestrationDecision). + // Upon receiving the next work item for the rewound orchestration, the failed tasks will be re-executed. if (isRewinding) { decisions = new List { new RewindOrchestrationAction() }; @@ -897,9 +900,12 @@ internal static bool ReconcileMessagesWithState(TaskOrchestrationWorkItem workIt { foreach (TaskMessage droppedMessage in workItem.NewMessages) { - logHelper.DroppingOrchestrationMessage(workItem, droppedMessage, "Multiple messages sent to an instance " + - "that is attempting to rewind from a terminal state. The only message that can be sent in " + - "this case is the rewind request."); + if (droppedMessage.Event.EventType != EventType.ExecutionRewound) + { + logHelper.DroppingOrchestrationMessage(workItem, droppedMessage, "Multiple messages sent to an instance " + + "that is attempting to rewind from a terminal state. The only message that can be sent in " + + "this case is the rewind request."); + } } return false; } From feb1ac7cc0de977b8f4ade93666400b1e90bd30b Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 24 Oct 2025 14:05:14 -0700 Subject: [PATCH 10/14] fixing the build errors --- src/DurableTask.Core/History/ExecutionRewoundEvent.cs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs index 07b053c87..18f456ff0 100644 --- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -10,7 +10,6 @@ // See the License for the specific language governing permissions and // limitations under the License. // ---------------------------------------------------------------------------------- - #nullable enable namespace DurableTask.Core.History { @@ -49,24 +48,24 @@ public ExecutionRewoundEvent(int eventId, string reason) /// Gets or sets the reason for the rewind event. /// [DataMember] - public string Reason { get; set; } + public string? Reason { get; set; } /// /// Gets or sets the parent execution id of the rewound suborchestration. /// [DataMember] - public string ParentExecutionId { get; set; } + public string? ParentExecutionId { get; set; } /// /// Gets or sets the instance ID of the rewound orchestration. /// [DataMember] - public string InstanceId { get; set; } + public string? InstanceId { get; set; } /// /// Gets or sets the parent trace context of the rewound suborchestration. /// [DataMember] - public DistributedTraceContext ParentTraceContext { get; set; } + public DistributedTraceContext? ParentTraceContext { get; set; } } } \ No newline at end of file From b462b9d9a484ca8dec23383f226fada2b0e1a309 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 24 Oct 2025 14:12:11 -0700 Subject: [PATCH 11/14] fixing another build error --- src/DurableTask.Core/History/ExecutionRewoundEvent.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs index 18f456ff0..c838e9eb2 100644 --- a/src/DurableTask.Core/History/ExecutionRewoundEvent.cs +++ b/src/DurableTask.Core/History/ExecutionRewoundEvent.cs @@ -33,7 +33,7 @@ public ExecutionRewoundEvent(int eventId) : base(eventId) { } /// /// The integer event id /// The reason for the rewind event - public ExecutionRewoundEvent(int eventId, string reason) + public ExecutionRewoundEvent(int eventId, string? reason) : base(eventId) { this.Reason = reason; From 4b3660fc5dfa1f8db82f80a8621892aa5f28755e Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Fri, 31 Oct 2025 13:53:08 -0700 Subject: [PATCH 12/14] updating a comment --- src/DurableTask.Core/TaskOrchestrationDispatcher.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs index 4cf5e5d5d..33fa406cd 100644 --- a/src/DurableTask.Core/TaskOrchestrationDispatcher.cs +++ b/src/DurableTask.Core/TaskOrchestrationDispatcher.cs @@ -1346,9 +1346,7 @@ void ProcessRewindOrchestrationDecision( ExecutionRewoundEvent executionRewoundEvent = (runtimeState.NewEvents.Last(e => e is ExecutionRewoundEvent) as ExecutionRewoundEvent)!; string newExecutionId = Guid.NewGuid().ToString("N"); - // In so copying the history, we do minimal alterations which provides less room for error and less work for the backends. - // That being said, we end up with sections with an OrchestratorStartedEvent immediately followed by an OrchestratorCompletedEvent if all the events in between got deleted. - // Are we okay with this? + // Copy the existing history, removing the failed task/suborchestration events and generating rewind events for each of the failed suborchestrations. foreach (var evt in runtimeState.Events) { // Do not add the TaskScheduledEvents for the failed tasks so that they get rescheduled, and do not add any of From 6a14145f02bfdf4b32876509c3389922abcc2129 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 3 Nov 2025 10:20:46 -0800 Subject: [PATCH 13/14] updating package versions --- .../DurableTask.ApplicationInsights.csproj | 2 +- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 2 +- src/DurableTask.Core/DurableTask.Core.csproj | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj b/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj index 1c168a38c..20a02176e 100644 --- a/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj +++ b/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj @@ -12,7 +12,7 @@ 0 7 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 09d6cc84f..901aefa26 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -22,7 +22,7 @@ 2 6 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj index b7acf7eec..ac937da56 100644 --- a/src/DurableTask.Core/DurableTask.Core.csproj +++ b/src/DurableTask.Core/DurableTask.Core.csproj @@ -18,7 +18,7 @@ 3 5 - 1 + 2 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 From 9eaa09a033298f2836e9007315c52ae206f77b93 Mon Sep 17 00:00:00 2001 From: Sophia Tevosyan Date: Mon, 3 Nov 2025 10:21:42 -0800 Subject: [PATCH 14/14] updated patch version not minor version --- .../DurableTask.ApplicationInsights.csproj | 4 ++-- src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj | 4 ++-- src/DurableTask.Core/DurableTask.Core.csproj | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj b/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj index 20a02176e..02f79492c 100644 --- a/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj +++ b/src/DurableTask.ApplicationInsights/DurableTask.ApplicationInsights.csproj @@ -11,8 +11,8 @@ 0 - 7 - 2 + 8 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj index 901aefa26..8ff135fbd 100644 --- a/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj +++ b/src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj @@ -21,8 +21,8 @@ 2 - 6 - 2 + 7 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0 diff --git a/src/DurableTask.Core/DurableTask.Core.csproj b/src/DurableTask.Core/DurableTask.Core.csproj index ac937da56..1327f7492 100644 --- a/src/DurableTask.Core/DurableTask.Core.csproj +++ b/src/DurableTask.Core/DurableTask.Core.csproj @@ -17,8 +17,8 @@ 3 - 5 - 2 + 6 + 0 $(MajorVersion).$(MinorVersion).$(PatchVersion) $(VersionPrefix).0