diff --git a/Directory.Packages.props b/Directory.Packages.props
index 34509fe65..a0a48e71d 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -29,7 +29,7 @@
-
+
diff --git a/eng/targets/Release.props b/eng/targets/Release.props
index e04c66c75..953a0eff6 100644
--- a/eng/targets/Release.props
+++ b/eng/targets/Release.props
@@ -17,7 +17,7 @@
- 1.15.1
+ 1.16.0
diff --git a/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs b/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs
index 8d213a9d8..775be4853 100644
--- a/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs
+++ b/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs
@@ -49,6 +49,7 @@ static class DurableTaskCoreExceptionsExtensions
failureDetails.ErrorType,
failureDetails.ErrorMessage,
failureDetails.StackTrace,
- failureDetails.InnerFailure?.ToTaskFailureDetails());
+ failureDetails.InnerFailure?.ToTaskFailureDetails(),
+ failureDetails.Properties);
}
}
diff --git a/src/Abstractions/TaskFailureDetails.cs b/src/Abstractions/TaskFailureDetails.cs
index 1387b19cd..271226f33 100644
--- a/src/Abstractions/TaskFailureDetails.cs
+++ b/src/Abstractions/TaskFailureDetails.cs
@@ -15,7 +15,8 @@ namespace Microsoft.DurableTask;
/// A summary description of the failure.
/// The stack trace of the failure.
/// The inner cause of the task failure.
-public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure)
+/// Additional properties associated with the exception.
+public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure, IDictionary? Properties)
{
Type? loadedExceptionType;
@@ -123,7 +124,8 @@ internal CoreFailureDetails ToCoreFailureDetails()
this.ErrorMessage,
this.StackTrace,
this.InnerFailure?.ToCoreFailureDetails(),
- isNonRetriable: false);
+ isNonRetriable: false,
+ this.Properties);
}
///
@@ -143,7 +145,8 @@ internal CoreFailureDetails ToCoreFailureDetails()
coreFailureDetails.ErrorType,
coreFailureDetails.ErrorMessage,
coreFailureDetails.StackTrace,
- FromCoreFailureDetails(coreFailureDetails.InnerFailure));
+ FromCoreFailureDetails(coreFailureDetails.InnerFailure),
+ coreFailureDetails.Properties);
}
[return: NotNullIfNotNull(nameof(exception))]
@@ -160,14 +163,17 @@ internal CoreFailureDetails ToCoreFailureDetails()
coreEx.FailureDetails?.ErrorType ?? "(unknown)",
coreEx.FailureDetails?.ErrorMessage ?? "(unknown)",
coreEx.FailureDetails?.StackTrace,
- FromCoreFailureDetailsRecursive(coreEx.FailureDetails?.InnerFailure) ?? FromExceptionRecursive(coreEx.InnerException));
+ FromCoreFailureDetailsRecursive(coreEx.FailureDetails?.InnerFailure) ?? FromExceptionRecursive(coreEx.InnerException),
+ coreEx.FailureDetails?.Properties);
}
+ // might need to udpate this later
return new TaskFailureDetails(
exception.GetType().ToString(),
exception.Message,
exception.StackTrace,
- FromExceptionRecursive(exception.InnerException));
+ FromExceptionRecursive(exception.InnerException),
+ null);
}
static TaskFailureDetails? FromCoreFailureDetailsRecursive(CoreFailureDetails? coreFailureDetails)
@@ -181,6 +187,7 @@ internal CoreFailureDetails ToCoreFailureDetails()
coreFailureDetails.ErrorType,
coreFailureDetails.ErrorMessage,
coreFailureDetails.StackTrace,
- FromCoreFailureDetailsRecursive(coreFailureDetails.InnerFailure));
+ FromCoreFailureDetailsRecursive(coreFailureDetails.InnerFailure),
+ coreFailureDetails.Properties);
}
}
diff --git a/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs b/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs
index 50191e378..7647326c7 100644
--- a/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs
+++ b/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs
@@ -75,7 +75,7 @@ public static Core.OrchestrationStatus ConvertToCore(this OrchestrationRuntimeSt
}
TaskFailureDetails? inner = details.InnerFailure?.ConvertFromCore();
- return new TaskFailureDetails(details.ErrorType, details.ErrorMessage, details.StackTrace, inner);
+ return new TaskFailureDetails(details.ErrorType, details.ErrorMessage, details.StackTrace, inner, details.Properties);
}
///
diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto
index df5143bc9..b2def0878 100644
--- a/src/Grpc/orchestrator_service.proto
+++ b/src/Grpc/orchestrator_service.proto
@@ -41,6 +41,7 @@ message TaskFailureDetails {
google.protobuf.StringValue stackTrace = 3;
TaskFailureDetails innerFailure = 4;
bool isNonRetriable = 5;
+ map properties = 6;
}
enum OrchestrationStatus {
@@ -469,6 +470,7 @@ message PurgeInstancesRequest {
oneof request {
string instanceId = 1;
PurgeInstanceFilter purgeInstanceFilter = 2;
+ InstanceBatch instanceBatch = 4;
}
bool recursive = 3;
}
@@ -681,8 +683,7 @@ message AbandonEntityTaskResponse {
}
message SkipGracefulOrchestrationTerminationsRequest {
- // A maximum of 500 instance IDs can be provided in this list.
- repeated string instanceIds = 1;
+ InstanceBatch instanceBatch = 1;
google.protobuf.StringValue reason = 2;
}
@@ -818,4 +819,9 @@ message StreamInstanceHistoryRequest {
message HistoryChunk {
repeated HistoryEvent events = 1;
+}
+
+message InstanceBatch {
+ // A maximum of 500 instance IDs can be provided in this list.
+ repeated string instanceIds = 1;
}
\ No newline at end of file
diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt
index 3e4d1b210..709a80074 100644
--- a/src/Grpc/versions.txt
+++ b/src/Grpc/versions.txt
@@ -1,2 +1,2 @@
-# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC
-https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto
+# The following files were downloaded from branch main at 2025-10-13 18:06:55 UTC
+https://raw.githubusercontent.com/microsoft/durabletask-protobuf/97cf9cf6ac44107b883b0f4ab1dd62ee2332cfd9/protos/orchestrator_service.proto
diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs
index e3e331f77..13b64fe86 100644
--- a/src/Shared/Grpc/ProtoUtils.cs
+++ b/src/Shared/Grpc/ProtoUtils.cs
@@ -1,1172 +1,1268 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using System.Buffers;
-using System.Buffers.Text;
-using System.Diagnostics;
-using System.Diagnostics.CodeAnalysis;
-using System.Text;
-using DurableTask.Core;
-using DurableTask.Core.Command;
-using DurableTask.Core.Entities;
-using DurableTask.Core.Entities.OperationFormat;
-using DurableTask.Core.History;
-using DurableTask.Core.Tracing;
-using Google.Protobuf;
-using Google.Protobuf.WellKnownTypes;
-using DTCore = DurableTask.Core;
-using P = Microsoft.DurableTask.Protobuf;
-using TraceHelper = Microsoft.DurableTask.Tracing.TraceHelper;
-
-namespace Microsoft.DurableTask;
-
-///
-/// Protobuf utilities and helpers.
-///
-static class ProtoUtils
-{
- ///
- /// Converts a history event from to .
- ///
- /// The proto history event to converter.
- /// The converted history event.
- /// When the provided history event type is not supported.
- internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
- {
- return ConvertHistoryEvent(proto, conversionState: null);
- }
-
- ///
- /// Converts a history event from to , and performs
- /// stateful conversions of entity-related events.
- ///
- /// The proto history event to converter.
- /// State needed for converting entity-related history entries and actions.
- /// The converted history event.
- /// When the provided history event type is not supported.
- internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState)
- {
- Check.NotNull(proto);
- HistoryEvent historyEvent;
- switch (proto.EventTypeCase)
- {
- case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew:
- historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted:
- OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore();
- conversionState?.SetOrchestrationInstance(instance);
- historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input)
- {
- Name = proto.ExecutionStarted.Name,
- Version = proto.ExecutionStarted.Version,
- OrchestrationInstance = instance,
- Tags = proto.ExecutionStarted.Tags,
- ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance
- {
- Name = proto.ExecutionStarted.ParentInstance.Name,
- Version = proto.ExecutionStarted.ParentInstance.Version,
- OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(),
- TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId,
- },
- ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(),
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted:
- historyEvent = new ExecutionCompletedEvent(
- proto.EventId,
- proto.ExecutionCompleted.Result,
- proto.ExecutionCompleted.OrchestrationStatus.ToCore());
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
- historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended:
- historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed:
- historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input);
- break;
- case P.HistoryEvent.EventTypeOneofCase.TaskScheduled:
- historyEvent = new TaskScheduledEvent(
- proto.EventId,
- proto.TaskScheduled.Name,
- proto.TaskScheduled.Version,
- proto.TaskScheduled.Input)
- {
- Tags = proto.TaskScheduled.Tags,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.TaskCompleted:
- historyEvent = new TaskCompletedEvent(
- proto.EventId,
- proto.TaskCompleted.TaskScheduledId,
- proto.TaskCompleted.Result);
- break;
- case P.HistoryEvent.EventTypeOneofCase.TaskFailed:
- historyEvent = new TaskFailedEvent(
- proto.EventId,
- proto.TaskFailed.TaskScheduledId,
- reason: null, /* not supported */
- details: null, /* not supported */
- proto.TaskFailed.FailureDetails.ToCore());
- break;
- case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated:
- historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId)
- {
- Input = proto.SubOrchestrationInstanceCreated.Input,
- InstanceId = proto.SubOrchestrationInstanceCreated.InstanceId,
- Name = proto.SubOrchestrationInstanceCreated.Name,
- Version = proto.SubOrchestrationInstanceCreated.Version,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted:
- historyEvent = new SubOrchestrationInstanceCompletedEvent(
- proto.EventId,
- proto.SubOrchestrationInstanceCompleted.TaskScheduledId,
- proto.SubOrchestrationInstanceCompleted.Result);
- break;
- case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed:
- historyEvent = new SubOrchestrationInstanceFailedEvent(
- proto.EventId,
- proto.SubOrchestrationInstanceFailed.TaskScheduledId,
- reason: null /* not supported */,
- details: null /* not supported */,
- proto.SubOrchestrationInstanceFailed.FailureDetails.ToCore());
- break;
- case P.HistoryEvent.EventTypeOneofCase.TimerCreated:
- historyEvent = new TimerCreatedEvent(
- proto.EventId,
- proto.TimerCreated.FireAt.ToDateTime());
- break;
- case P.HistoryEvent.EventTypeOneofCase.TimerFired:
- historyEvent = new TimerFiredEvent(
- eventId: -1,
- proto.TimerFired.FireAt.ToDateTime())
- {
- TimerId = proto.TimerFired.TimerId,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.OrchestratorStarted:
- historyEvent = new OrchestratorStartedEvent(proto.EventId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.OrchestratorCompleted:
- historyEvent = new OrchestratorCompletedEvent(proto.EventId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EventSent:
- historyEvent = new EventSentEvent(proto.EventId)
- {
- InstanceId = proto.EventSent.InstanceId,
- Name = proto.EventSent.Name,
- Input = proto.EventSent.Input,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.EventRaised:
- historyEvent = new EventRaisedEvent(proto.EventId, proto.EventRaised.Input)
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using System.Buffers;
+using System.Buffers.Text;
+using System.Collections;
+using System.Diagnostics;
+using System.Diagnostics.CodeAnalysis;
+using System.Globalization;
+using System.Text;
+using System.Text.Json;
+using DurableTask.Core;
+using DurableTask.Core.Command;
+using DurableTask.Core.Entities;
+using DurableTask.Core.Entities.OperationFormat;
+using DurableTask.Core.History;
+using DurableTask.Core.Tracing;
+using Google.Protobuf;
+using Google.Protobuf.Collections;
+using Google.Protobuf.WellKnownTypes;
+using DTCore = DurableTask.Core;
+using P = Microsoft.DurableTask.Protobuf;
+using TraceHelper = Microsoft.DurableTask.Tracing.TraceHelper;
+
+namespace Microsoft.DurableTask;
+
+///
+/// Protobuf utilities and helpers.
+///
+static class ProtoUtils
+{
+ ///
+ /// Converts a history event from to .
+ ///
+ /// The proto history event to converter.
+ /// The converted history event.
+ /// When the provided history event type is not supported.
+ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto)
+ {
+ return ConvertHistoryEvent(proto, conversionState: null);
+ }
+
+ ///
+ /// Converts a history event from to , and performs
+ /// stateful conversions of entity-related events.
+ ///
+ /// The proto history event to converter.
+ /// State needed for converting entity-related history entries and actions.
+ /// The converted history event.
+ /// When the provided history event type is not supported.
+ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState)
+ {
+ Check.NotNull(proto);
+ HistoryEvent historyEvent;
+ switch (proto.EventTypeCase)
+ {
+ case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew:
+ historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted:
+ OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore();
+ conversionState?.SetOrchestrationInstance(instance);
+ historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input)
+ {
+ Name = proto.ExecutionStarted.Name,
+ Version = proto.ExecutionStarted.Version,
+ OrchestrationInstance = instance,
+ Tags = proto.ExecutionStarted.Tags,
+ ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance
+ {
+ Name = proto.ExecutionStarted.ParentInstance.Name,
+ Version = proto.ExecutionStarted.ParentInstance.Version,
+ OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(),
+ TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId,
+ },
+ ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(),
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted:
+ historyEvent = new ExecutionCompletedEvent(
+ proto.EventId,
+ proto.ExecutionCompleted.Result,
+ proto.ExecutionCompleted.OrchestrationStatus.ToCore());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated:
+ historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended:
+ historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed:
+ historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TaskScheduled:
+ historyEvent = new TaskScheduledEvent(
+ proto.EventId,
+ proto.TaskScheduled.Name,
+ proto.TaskScheduled.Version,
+ proto.TaskScheduled.Input)
+ {
+ Tags = proto.TaskScheduled.Tags,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TaskCompleted:
+ historyEvent = new TaskCompletedEvent(
+ proto.EventId,
+ proto.TaskCompleted.TaskScheduledId,
+ proto.TaskCompleted.Result);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TaskFailed:
+ historyEvent = new TaskFailedEvent(
+ proto.EventId,
+ proto.TaskFailed.TaskScheduledId,
+ reason: null, /* not supported */
+ details: null, /* not supported */
+ proto.TaskFailed.FailureDetails.ToCore());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated:
+ historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId)
+ {
+ Input = proto.SubOrchestrationInstanceCreated.Input,
+ InstanceId = proto.SubOrchestrationInstanceCreated.InstanceId,
+ Name = proto.SubOrchestrationInstanceCreated.Name,
+ Version = proto.SubOrchestrationInstanceCreated.Version,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted:
+ historyEvent = new SubOrchestrationInstanceCompletedEvent(
+ proto.EventId,
+ proto.SubOrchestrationInstanceCompleted.TaskScheduledId,
+ proto.SubOrchestrationInstanceCompleted.Result);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed:
+ historyEvent = new SubOrchestrationInstanceFailedEvent(
+ proto.EventId,
+ proto.SubOrchestrationInstanceFailed.TaskScheduledId,
+ reason: null /* not supported */,
+ details: null /* not supported */,
+ proto.SubOrchestrationInstanceFailed.FailureDetails.ToCore());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TimerCreated:
+ historyEvent = new TimerCreatedEvent(
+ proto.EventId,
+ proto.TimerCreated.FireAt.ToDateTime());
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.TimerFired:
+ historyEvent = new TimerFiredEvent(
+ eventId: -1,
+ proto.TimerFired.FireAt.ToDateTime())
+ {
+ TimerId = proto.TimerFired.TimerId,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.OrchestratorStarted:
+ historyEvent = new OrchestratorStartedEvent(proto.EventId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.OrchestratorCompleted:
+ historyEvent = new OrchestratorCompletedEvent(proto.EventId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EventSent:
+ historyEvent = new EventSentEvent(proto.EventId)
+ {
+ InstanceId = proto.EventSent.InstanceId,
+ Name = proto.EventSent.Name,
+ Input = proto.EventSent.Input,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EventRaised:
+ historyEvent = new EventRaisedEvent(proto.EventId, proto.EventRaised.Input)
+ {
+ Name = proto.EventRaised.Name,
+ };
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled:
+ historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance);
+ conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled:
+ historyEvent = EntityConversions.EncodeOperationSignaled(proto);
+ conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested:
+ historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance);
+ conversionState?.AddUnlockObligations(proto.EntityLockRequested);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent:
+ historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance);
+ conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted:
+ historyEvent = EntityConversions.EncodeLockGranted(proto);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted:
+ historyEvent = EntityConversions.EncodeOperationCompleted(proto);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed:
+ historyEvent = EntityConversions.EncodeOperationFailed(proto);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.GenericEvent:
+ historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data);
+ break;
+ case P.HistoryEvent.EventTypeOneofCase.HistoryState:
+ historyEvent = new HistoryStateEvent(
+ proto.EventId,
+ new OrchestrationState
+ {
+ OrchestrationInstance = new OrchestrationInstance
+ {
+ InstanceId = proto.HistoryState.OrchestrationState.InstanceId,
+ },
+ Name = proto.HistoryState.OrchestrationState.Name,
+ Version = proto.HistoryState.OrchestrationState.Version,
+ ScheduledStartTime = proto.HistoryState.OrchestrationState.ScheduledStartTimestamp.ToDateTime(),
+ CreatedTime = proto.HistoryState.OrchestrationState.CreatedTimestamp.ToDateTime(),
+ LastUpdatedTime = proto.HistoryState.OrchestrationState.LastUpdatedTimestamp.ToDateTime(),
+ Input = proto.HistoryState.OrchestrationState.Input,
+ Output = proto.HistoryState.OrchestrationState.Output,
+ Status = proto.HistoryState.OrchestrationState.CustomStatus,
+ Tags = proto.HistoryState.OrchestrationState.Tags,
+ });
+ break;
+ default:
+ throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported.");
+ }
+
+ historyEvent.Timestamp = proto.Timestamp.ToDateTime();
+ return historyEvent;
+ }
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp ToTimestamp(this DateTime dateTime)
+ {
+ // The protobuf libraries require timestamps to be in UTC
+ if (dateTime.Kind == DateTimeKind.Unspecified)
+ {
+ dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc);
+ }
+ else if (dateTime.Kind == DateTimeKind.Local)
+ {
+ dateTime = dateTime.ToUniversalTime();
+ }
+
+ return Timestamp.FromDateTime(dateTime);
+ }
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp? ToTimestamp(this DateTime? dateTime)
+ => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp ToTimestamp(this DateTimeOffset dateTime) => Timestamp.FromDateTimeOffset(dateTime);
+
+ ///
+ /// Converts a to a gRPC .
+ ///
+ /// The date-time to convert.
+ /// The gRPC timestamp.
+ internal static Timestamp? ToTimestamp(this DateTimeOffset? dateTime)
+ => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
+
+ ///
+ /// Constructs a .
+ ///
+ /// The orchestrator instance ID.
+ /// The orchestrator execution ID.
+ /// The orchestrator customer status or null if no custom status.
+ /// The orchestrator actions.
+ ///
+ /// The completion token for the work item. It must be the exact same
+ /// value that was provided by the corresponding that triggered the orchestrator execution.
+ ///
+ /// The entity conversion state, or null if no conversion is required.
+ /// The that represents orchestration execution.
+ /// Whether or not a history is required to complete the orchestration request and none was provided.
+ /// The orchestrator response.
+ /// When an orchestrator action is unknown.
+ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
+ string instanceId,
+ string executionId,
+ string? customStatus,
+ IEnumerable? actions,
+ string completionToken,
+ EntityConversionState? entityConversionState,
+ Activity? orchestrationActivity,
+ bool requiresHistory = false)
+ {
+ var response = new P.OrchestratorResponse
+ {
+ InstanceId = instanceId,
+ CustomStatus = customStatus,
+ CompletionToken = completionToken,
+ OrchestrationTraceContext =
+ new()
+ {
+ SpanID = orchestrationActivity?.SpanId.ToString(),
+ SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(),
+ },
+ RequiresHistory = requiresHistory,
+ };
+
+ // If a history is required and the orchestration request was not completed, then there is no list of actions.
+ if (requiresHistory)
+ {
+ return response;
+ }
+
+ Check.NotNull(actions);
+ foreach (OrchestratorAction action in actions)
+ {
+ var protoAction = new P.OrchestratorAction { Id = action.Id };
+
+ P.TraceContext? CreateTraceContext()
+ {
+ if (orchestrationActivity is null)
+ {
+ return null;
+ }
+
+ ActivitySpanId clientSpanId = ActivitySpanId.CreateRandom();
+ ActivityContext clientActivityContext = new(orchestrationActivity.TraceId, clientSpanId, orchestrationActivity.ActivityTraceFlags, orchestrationActivity.TraceStateString);
+
+ return new P.TraceContext
+ {
+ TraceParent = $"00-{clientActivityContext.TraceId}-{clientActivityContext.SpanId}-0{clientActivityContext.TraceFlags:d}",
+ TraceState = clientActivityContext.TraceState,
+ };
+ }
+
+ switch (action.OrchestratorActionType)
+ {
+ case OrchestratorActionType.ScheduleOrchestrator:
+ var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action;
+
+ protoAction.ScheduleTask = new P.ScheduleTaskAction
+ {
+ Name = scheduleTaskAction.Name,
+ Version = scheduleTaskAction.Version,
+ Input = scheduleTaskAction.Input,
+ ParentTraceContext = CreateTraceContext(),
+ };
+
+ if (scheduleTaskAction.Tags != null)
+ {
+ foreach (KeyValuePair tag in scheduleTaskAction.Tags)
+ {
+ protoAction.ScheduleTask.Tags[tag.Key] = tag.Value;
+ }
+ }
+
+ break;
+ case OrchestratorActionType.CreateSubOrchestration:
+ var subOrchestrationAction = (CreateSubOrchestrationAction)action;
+ protoAction.CreateSubOrchestration = new P.CreateSubOrchestrationAction
+ {
+ Input = subOrchestrationAction.Input,
+ InstanceId = subOrchestrationAction.InstanceId,
+ Name = subOrchestrationAction.Name,
+ Version = subOrchestrationAction.Version,
+ ParentTraceContext = CreateTraceContext(),
+ };
+ break;
+ case OrchestratorActionType.CreateTimer:
+ var createTimerAction = (CreateTimerOrchestratorAction)action;
+ protoAction.CreateTimer = new P.CreateTimerAction
+ {
+ FireAt = createTimerAction.FireAt.ToTimestamp(),
+ };
+ break;
+ case OrchestratorActionType.SendEvent:
+ var sendEventAction = (SendEventOrchestratorAction)action;
+ if (sendEventAction.Instance == null)
+ {
+ throw new ArgumentException(
+ $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!");
+ }
+
+ if (entityConversionState is not null
+ && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId)
+ && sendEventAction.EventName is not null
+ && sendEventAction.EventData is not null)
+ {
+ P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction();
+ protoAction.SendEntityMessage = sendAction;
+
+ EntityConversions.DecodeEntityMessageAction(
+ sendEventAction.EventName,
+ sendEventAction.EventData,
+ sendEventAction.Instance.InstanceId,
+ sendAction,
+ out string requestId);
+
+ entityConversionState.EntityRequestIds.Add(requestId);
+
+ switch (sendAction.EntityMessageTypeCase)
+ {
+ case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested:
+ entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested);
+ break;
+ case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent:
+ entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId);
+ break;
+ default:
+ break;
+ }
+ }
+ else
+ {
+ protoAction.SendEvent = new P.SendEventAction
+ {
+ Instance = sendEventAction.Instance.ToProtobuf(),
+ Name = sendEventAction.EventName,
+ Data = sendEventAction.EventData,
+ };
+
+ // Distributed Tracing: start a new trace activity derived from the orchestration
+ // for an EventRaisedEvent (external event)
+ using Activity? traceActivity = TraceHelper.StartTraceActivityForEventRaisedFromWorker(sendEventAction, instanceId, executionId);
+
+ traceActivity?.Stop();
+ }
+
+ break;
+ case OrchestratorActionType.OrchestrationComplete:
+
+ if (entityConversionState is not null)
+ {
+ // as a precaution, unlock any entities that were not unlocked for some reason, before
+ // completing the orchestration.
+ foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations())
+ {
+ response.Actions.Add(new P.OrchestratorAction
+ {
+ Id = action.Id,
+ SendEntityMessage = new P.SendEntityMessageAction
+ {
+ EntityUnlockSent = new P.EntityUnlockSentEvent
+ {
+ CriticalSectionId = criticalSectionId,
+ TargetInstanceId = target,
+ ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId,
+ },
+ },
+ });
+ }
+ }
+
+ var completeAction = (OrchestrationCompleteOrchestratorAction)action;
+ protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction
+ {
+ CarryoverEvents =
+ {
+ // TODO
+ },
+ Details = completeAction.Details,
+ NewVersion = completeAction.NewVersion,
+ OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(),
+ Result = completeAction.Result,
+ };
+
+ if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed)
+ {
+ protoAction.CompleteOrchestration.FailureDetails = completeAction.FailureDetails.ToProtobuf();
+ }
+
+ break;
+ default:
+ throw new NotSupportedException($"Unknown orchestrator action: {action.OrchestratorActionType}");
+ }
+
+ response.Actions.Add(protoAction);
+ }
+
+ return response;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The status to convert.
+ /// The converted status.
+ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
+ {
+ return (OrchestrationStatus)status;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The status to convert.
+ /// The converted status.
+ [return: NotNullIfNotNull(nameof(status))]
+ internal static OrchestrationInstance? ToCore(this P.OrchestrationInstance? status)
+ {
+ if (status == null)
+ {
+ return null;
+ }
+
+ return new OrchestrationInstance
+ {
+ InstanceId = status.InstanceId,
+ ExecutionId = status.ExecutionId,
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The failure details to convert.
+ /// The converted failure details.
+ [return: NotNullIfNotNull(nameof(failureDetails))]
+ internal static TaskFailureDetails? ToTaskFailureDetails(this P.TaskFailureDetails? failureDetails)
+ {
+ if (failureDetails == null)
+ {
+ return null;
+ }
+
+ return new TaskFailureDetails(
+ failureDetails.ErrorType,
+ failureDetails.ErrorMessage,
+ failureDetails.StackTrace,
+ failureDetails.InnerFailure.ToTaskFailureDetails(),
+ ConvertProperties(failureDetails.Properties));
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The exception to convert.
+ /// Optional exception properties provider.
+ /// The task failure details.
+ [return: NotNullIfNotNull(nameof(e))]
+ internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e, DTCore.IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
+ {
+ if (e == null)
+ {
+ return null;
+ }
+
+ IDictionary? properties = exceptionPropertiesProvider?.GetExceptionProperties(e);
+
+ var taskFailureDetails = new P.TaskFailureDetails
+ {
+ ErrorType = e.GetType().FullName,
+ ErrorMessage = e.Message,
+ StackTrace = e.StackTrace,
+ InnerFailure = e.InnerException.ToTaskFailureDetails(exceptionPropertiesProvider),
+ };
+
+ if (properties != null)
+ {
+ foreach (var kvp in properties)
+ {
+ taskFailureDetails.Properties[kvp.Key] = ConvertObjectToValue(kvp.Value);
+ }
+ }
+
+ return taskFailureDetails;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The entity batch request to convert.
+ /// The converted entity batch request.
+ [return: NotNullIfNotNull(nameof(entityBatchRequest))]
+ internal static EntityBatchRequest? ToEntityBatchRequest(this P.EntityBatchRequest? entityBatchRequest)
+ {
+ if (entityBatchRequest == null)
+ {
+ return null;
+ }
+
+ return new EntityBatchRequest()
+ {
+ EntityState = entityBatchRequest.EntityState,
+ InstanceId = entityBatchRequest.InstanceId,
+ Operations = entityBatchRequest.Operations.Select(r => r.ToOperationRequest()).ToList(),
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The entity request to convert.
+ /// The converted request.
+ /// Additional info about each operation, required by DTS.
+ internal static void ToEntityBatchRequest(
+ this P.EntityRequest entityRequest,
+ out EntityBatchRequest batchRequest,
+ out List operationInfos)
+ {
+ batchRequest = new EntityBatchRequest()
+ {
+ EntityState = entityRequest.EntityState,
+ InstanceId = entityRequest.InstanceId,
+ Operations = [], // operations are added to this collection below
+ };
+
+ operationInfos = new(entityRequest.OperationRequests.Count);
+
+ foreach (P.HistoryEvent? op in entityRequest.OperationRequests)
+ {
+ if (op.EntityOperationSignaled is not null)
+ {
+ batchRequest.Operations.Add(new OperationRequest
+ {
+ Id = Guid.Parse(op.EntityOperationSignaled.RequestId),
+ Operation = op.EntityOperationSignaled.Operation,
+ Input = op.EntityOperationSignaled.Input,
+ });
+ operationInfos.Add(new P.OperationInfo
+ {
+ RequestId = op.EntityOperationSignaled.RequestId,
+ ResponseDestination = null, // means we don't send back a response to the caller
+ });
+ }
+ else if (op.EntityOperationCalled is not null)
+ {
+ batchRequest.Operations.Add(new OperationRequest
+ {
+ Id = Guid.Parse(op.EntityOperationCalled.RequestId),
+ Operation = op.EntityOperationCalled.Operation,
+ Input = op.EntityOperationCalled.Input,
+ });
+ operationInfos.Add(new P.OperationInfo
+ {
+ RequestId = op.EntityOperationCalled.RequestId,
+ ResponseDestination = new P.OrchestrationInstance
+ {
+ InstanceId = op.EntityOperationCalled.ParentInstanceId,
+ ExecutionId = op.EntityOperationCalled.ParentExecutionId,
+ },
+ });
+ }
+ }
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation request to convert.
+ /// The converted operation request.
+ [return: NotNullIfNotNull(nameof(operationRequest))]
+ internal static OperationRequest? ToOperationRequest(this P.OperationRequest? operationRequest)
+ {
+ if (operationRequest == null)
+ {
+ return null;
+ }
+
+ return new OperationRequest()
+ {
+ Operation = operationRequest.Operation,
+ Input = operationRequest.Input,
+ Id = Guid.Parse(operationRequest.RequestId),
+ TraceContext = operationRequest.TraceContext != null ?
+ new DistributedTraceContext(
+ operationRequest.TraceContext.TraceParent,
+ operationRequest.TraceContext.TraceState) : null,
+ };
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation result to convert.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(operationResult))]
+ internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult)
+ {
+ if (operationResult == null)
+ {
+ return null;
+ }
+
+ switch (operationResult.ResultTypeCase)
+ {
+ case P.OperationResult.ResultTypeOneofCase.Success:
+ return new OperationResult()
+ {
+ Result = operationResult.Success.Result,
+ StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(),
+ EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(),
+ };
+
+ case P.OperationResult.ResultTypeOneofCase.Failure:
+ return new OperationResult()
+ {
+ FailureDetails = operationResult.Failure.FailureDetails.ToCore(),
+ StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(),
+ EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(),
+ };
+
+ default:
+ throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported.");
+ }
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The operation result to convert.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(operationResult))]
+ internal static P.OperationResult? ToOperationResult(this OperationResult? operationResult)
+ {
+ if (operationResult == null)
+ {
+ return null;
+ }
+
+ if (operationResult.FailureDetails == null)
+ {
+ return new P.OperationResult()
+ {
+ Success = new P.OperationResultSuccess()
+ {
+ Result = operationResult.Result,
+ StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(),
+ EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(),
+ },
+ };
+ }
+ else
+ {
+ return new P.OperationResult()
+ {
+ Failure = new P.OperationResultFailure()
+ {
+ FailureDetails = ToProtobuf(operationResult.FailureDetails),
+ StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(),
+ EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(),
+ },
+ };
+ }
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation action to convert.
+ /// The converted operation action.
+ [return: NotNullIfNotNull(nameof(operationAction))]
+ internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction)
+ {
+ if (operationAction == null)
+ {
+ return null;
+ }
+
+ switch (operationAction.OperationActionTypeCase)
+ {
+ case P.OperationAction.OperationActionTypeOneofCase.SendSignal:
+
+ return new SendSignalOperationAction()
+ {
+ Name = operationAction.SendSignal.Name,
+ Input = operationAction.SendSignal.Input,
+ InstanceId = operationAction.SendSignal.InstanceId,
+ ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(),
+ RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(),
+ ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ?
+ new DistributedTraceContext(
+ operationAction.SendSignal.ParentTraceContext.TraceParent,
+ operationAction.SendSignal.ParentTraceContext.TraceState) : null,
+ };
+
+ case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration:
+
+ return new StartNewOrchestrationOperationAction()
+ {
+ Name = operationAction.StartNewOrchestration.Name,
+ Input = operationAction.StartNewOrchestration.Input,
+ InstanceId = operationAction.StartNewOrchestration.InstanceId,
+ Version = operationAction.StartNewOrchestration.Version,
+ ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
+ RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(),
+ ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ?
+ new DistributedTraceContext(
+ operationAction.StartNewOrchestration.ParentTraceContext.TraceParent,
+ operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null,
+ };
+ default:
+ throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported.");
+ }
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The operation action to convert.
+ /// The converted operation action.
+ [return: NotNullIfNotNull(nameof(operationAction))]
+ internal static P.OperationAction? ToOperationAction(this OperationAction? operationAction)
+ {
+ if (operationAction == null)
+ {
+ return null;
+ }
+
+ var action = new P.OperationAction();
+
+ switch (operationAction)
+ {
+ case SendSignalOperationAction sendSignalAction:
+
+ action.SendSignal = new P.SendSignalAction()
+ {
+ Name = sendSignalAction.Name,
+ Input = sendSignalAction.Input,
+ InstanceId = sendSignalAction.InstanceId,
+ ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(),
+ RequestTime = sendSignalAction.RequestTime?.ToTimestamp(),
+ ParentTraceContext = sendSignalAction.ParentTraceContext != null ?
+ new P.TraceContext
+ {
+ TraceParent = sendSignalAction.ParentTraceContext.TraceParent,
+ TraceState = sendSignalAction.ParentTraceContext.TraceState,
+ }
+ : null,
+ };
+ break;
+
+ case StartNewOrchestrationOperationAction startNewOrchestrationAction:
+
+ action.StartNewOrchestration = new P.StartNewOrchestrationAction()
+ {
+ Name = startNewOrchestrationAction.Name,
+ Input = startNewOrchestrationAction.Input,
+ Version = startNewOrchestrationAction.Version,
+ InstanceId = startNewOrchestrationAction.InstanceId,
+ ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(),
+ RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(),
+ ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ?
+ new P.TraceContext
+ {
+ TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent,
+ TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState,
+ }
+ : null,
+ };
+ break;
+ }
+
+ return action;
+ }
+
+ ///
+ /// Converts a to a .
+ ///
+ /// The operation result to convert.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(entityBatchResult))]
+ internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult)
+ {
+ if (entityBatchResult == null)
+ {
+ return null;
+ }
+
+ return new EntityBatchResult()
+ {
+ Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(),
+ EntityState = entityBatchResult.EntityState,
+ Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(),
+ FailureDetails = entityBatchResult.FailureDetails.ToCore(),
+ };
+ }
+
+ ///
+ /// Converts a to .
+ ///
+ /// The operation result to convert.
+ /// The completion token, or null for the older protocol.
+ /// Additional information about each operation, required by DTS.
+ /// The converted operation result.
+ [return: NotNullIfNotNull(nameof(entityBatchResult))]
+ internal static P.EntityBatchResult? ToEntityBatchResult(
+ this EntityBatchResult? entityBatchResult,
+ string? completionToken = null,
+ IEnumerable? operationInfos = null)
+ {
+ if (entityBatchResult == null)
+ {
+ return null;
+ }
+
+ return new P.EntityBatchResult()
+ {
+ EntityState = entityBatchResult.EntityState,
+ FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(),
+ Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] },
+ Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] },
+ CompletionToken = completionToken ?? string.Empty,
+ OperationInfos = { operationInfos ?? [] },
+ };
+ }
+
+ ///
+ /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation.
+ ///
+ /// The DT.Core representation.
+ /// The gRPC representation.
+ [return: NotNullIfNotNull(nameof(parameters))]
+ internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters)
+ {
+ if (parameters == null)
+ {
+ return null;
+ }
+
+ return new TaskOrchestrationEntityParameters()
+ {
+ EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(),
+ };
+ }
+
+ ///
+ /// Gets the approximate byte count for a .
+ ///
+ /// The failure details.
+ /// The approximate byte count.
+ internal static int GetApproximateByteCount(this P.TaskFailureDetails failureDetails)
+ {
+ // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar
+ Encoding encoding = Encoding.UTF8;
+
+ int byteCount = 0;
+ if (failureDetails.ErrorType != null)
+ {
+ byteCount += encoding.GetByteCount(failureDetails.ErrorType);
+ }
+
+ if (failureDetails.ErrorMessage != null)
+ {
+ byteCount += encoding.GetByteCount(failureDetails.ErrorMessage);
+ }
+
+ if (failureDetails.StackTrace != null)
+ {
+ byteCount += encoding.GetByteCount(failureDetails.StackTrace);
+ }
+
+ if (failureDetails.InnerFailure != null)
+ {
+ byteCount += failureDetails.InnerFailure.GetApproximateByteCount();
+ }
+
+ return byteCount;
+ }
+
+ ///
+ /// Decode a protobuf message from a base64 string.
+ ///
+ /// The type to decode to.
+ /// The message parser.
+ /// The base64 encoded message.
+ /// The decoded message.
+ /// If decoding fails.
+ internal static T Base64Decode(this MessageParser parser, string encodedMessage) where T : IMessage
+ {
+ // Decode the base64 in a way that doesn't allocate a byte[] on each request
+ int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage);
+ byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount);
+ try
+ {
+ // The Base64 APIs require first converting the string into UTF-8 bytes. We then
+ // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that
+ // we can finally decode the protobuf request.
+ Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0);
+ OperationStatus status = Base64.DecodeFromUtf8InPlace(
+ buffer.AsSpan(0, encodedByteCount),
+ out int bytesWritten);
+ if (status != OperationStatus.Done)
+ {
+ throw new ArgumentException(
+ $"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage));
+ }
+
+ return (T)parser.ParseFrom(buffer, 0, bytesWritten);
+ }
+ finally
+ {
+ ArrayPool.Shared.Return(buffer);
+ }
+ }
+
+ ///
+ /// Converts a grpc to a .
+ ///
+ /// The failure details to convert.
+ /// The converted failure details.
+ internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails)
+ {
+ if (failureDetails == null)
+ {
+ return null;
+ }
+
+ return new FailureDetails(
+ failureDetails.ErrorType,
+ failureDetails.ErrorMessage,
+ failureDetails.StackTrace,
+ failureDetails.InnerFailure.ToCore(),
+ failureDetails.IsNonRetriable,
+ ConvertProperties(failureDetails.Properties));
+ }
+
+ ///
+ /// Converts a instance to a corresponding C# object.
+ ///
+ /// The Protobuf Value to convert.
+ /// The corresponding C# object.
+ ///
+ /// Thrown when the Protobuf Value.KindCase is not one of the supported types.
+ ///
+ internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value)
+ {
+ switch (value.KindCase)
+ {
+ case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue:
+ return null;
+ case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue:
+ return value.NumberValue;
+ case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue:
+ string stringValue = value.StringValue;
+
+ // If the value starts with the 'dt:' prefix, it may represent a DateTime value — attempt to parse it.
+ if (stringValue.StartsWith("dt:", StringComparison.Ordinal))
{
- Name = proto.EventRaised.Name,
- };
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled:
- historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance);
- conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled:
- historyEvent = EntityConversions.EncodeOperationSignaled(proto);
- conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested:
- historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance);
- conversionState?.AddUnlockObligations(proto.EntityLockRequested);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent:
- historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance);
- conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted:
- historyEvent = EntityConversions.EncodeLockGranted(proto);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted:
- historyEvent = EntityConversions.EncodeOperationCompleted(proto);
- break;
- case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed:
- historyEvent = EntityConversions.EncodeOperationFailed(proto);
- break;
- case P.HistoryEvent.EventTypeOneofCase.GenericEvent:
- historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data);
- break;
- case P.HistoryEvent.EventTypeOneofCase.HistoryState:
- historyEvent = new HistoryStateEvent(
- proto.EventId,
- new OrchestrationState
+ if (DateTime.TryParse(stringValue[3..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTime date))
{
- OrchestrationInstance = new OrchestrationInstance
- {
- InstanceId = proto.HistoryState.OrchestrationState.InstanceId,
- },
- Name = proto.HistoryState.OrchestrationState.Name,
- Version = proto.HistoryState.OrchestrationState.Version,
- ScheduledStartTime = proto.HistoryState.OrchestrationState.ScheduledStartTimestamp.ToDateTime(),
- CreatedTime = proto.HistoryState.OrchestrationState.CreatedTimestamp.ToDateTime(),
- LastUpdatedTime = proto.HistoryState.OrchestrationState.LastUpdatedTimestamp.ToDateTime(),
- Input = proto.HistoryState.OrchestrationState.Input,
- Output = proto.HistoryState.OrchestrationState.Output,
- Status = proto.HistoryState.OrchestrationState.CustomStatus,
- Tags = proto.HistoryState.OrchestrationState.Tags,
- });
- break;
- default:
- throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported.");
- }
-
- historyEvent.Timestamp = proto.Timestamp.ToDateTime();
- return historyEvent;
- }
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp ToTimestamp(this DateTime dateTime)
- {
- // The protobuf libraries require timestamps to be in UTC
- if (dateTime.Kind == DateTimeKind.Unspecified)
- {
- dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc);
- }
- else if (dateTime.Kind == DateTimeKind.Local)
- {
- dateTime = dateTime.ToUniversalTime();
- }
-
- return Timestamp.FromDateTime(dateTime);
- }
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp? ToTimestamp(this DateTime? dateTime)
- => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp ToTimestamp(this DateTimeOffset dateTime) => Timestamp.FromDateTimeOffset(dateTime);
-
- ///
- /// Converts a to a gRPC .
- ///
- /// The date-time to convert.
- /// The gRPC timestamp.
- internal static Timestamp? ToTimestamp(this DateTimeOffset? dateTime)
- => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null;
-
- ///
- /// Constructs a .
- ///
- /// The orchestrator instance ID.
- /// The orchestrator execution ID.
- /// The orchestrator customer status or null if no custom status.
- /// The orchestrator actions.
- ///
- /// The completion token for the work item. It must be the exact same
- /// value that was provided by the corresponding that triggered the orchestrator execution.
- ///
- /// The entity conversion state, or null if no conversion is required.
- /// The that represents orchestration execution.
- /// Whether or not a history is required to complete the orchestration request and none was provided.
- /// The orchestrator response.
- /// When an orchestrator action is unknown.
- internal static P.OrchestratorResponse ConstructOrchestratorResponse(
- string instanceId,
- string executionId,
- string? customStatus,
- IEnumerable? actions,
- string completionToken,
- EntityConversionState? entityConversionState,
- Activity? orchestrationActivity,
- bool requiresHistory = false)
- {
- var response = new P.OrchestratorResponse
- {
- InstanceId = instanceId,
- CustomStatus = customStatus,
- CompletionToken = completionToken,
- OrchestrationTraceContext =
- new()
- {
- SpanID = orchestrationActivity?.SpanId.ToString(),
- SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(),
- },
- RequiresHistory = requiresHistory,
- };
-
- // If a history is required and the orchestration request was not completed, then there is no list of actions.
- if (requiresHistory)
- {
- return response;
- }
-
- Check.NotNull(actions);
- foreach (OrchestratorAction action in actions)
- {
- var protoAction = new P.OrchestratorAction { Id = action.Id };
-
- P.TraceContext? CreateTraceContext()
- {
- if (orchestrationActivity is null)
- {
- return null;
+ return date;
+ }
}
- ActivitySpanId clientSpanId = ActivitySpanId.CreateRandom();
- ActivityContext clientActivityContext = new(orchestrationActivity.TraceId, clientSpanId, orchestrationActivity.ActivityTraceFlags, orchestrationActivity.TraceStateString);
-
- return new P.TraceContext
+ // If the value starts with the 'dto:' prefix, it may represent a DateTime value — attempt to parse it.
+ if (stringValue.StartsWith("dto:", StringComparison.Ordinal))
{
- TraceParent = $"00-{clientActivityContext.TraceId}-{clientActivityContext.SpanId}-0{clientActivityContext.TraceFlags:d}",
- TraceState = clientActivityContext.TraceState,
- };
- }
-
- switch (action.OrchestratorActionType)
- {
- case OrchestratorActionType.ScheduleOrchestrator:
- var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action;
-
- protoAction.ScheduleTask = new P.ScheduleTaskAction
- {
- Name = scheduleTaskAction.Name,
- Version = scheduleTaskAction.Version,
- Input = scheduleTaskAction.Input,
- ParentTraceContext = CreateTraceContext(),
- };
-
- if (scheduleTaskAction.Tags != null)
+ if (DateTimeOffset.TryParse(stringValue[4..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTimeOffset date))
{
- foreach (KeyValuePair tag in scheduleTaskAction.Tags)
- {
- protoAction.ScheduleTask.Tags[tag.Key] = tag.Value;
- }
+ return date;
}
+ }
- break;
- case OrchestratorActionType.CreateSubOrchestration:
- var subOrchestrationAction = (CreateSubOrchestrationAction)action;
- protoAction.CreateSubOrchestration = new P.CreateSubOrchestrationAction
- {
- Input = subOrchestrationAction.Input,
- InstanceId = subOrchestrationAction.InstanceId,
- Name = subOrchestrationAction.Name,
- Version = subOrchestrationAction.Version,
- ParentTraceContext = CreateTraceContext(),
- };
- break;
- case OrchestratorActionType.CreateTimer:
- var createTimerAction = (CreateTimerOrchestratorAction)action;
- protoAction.CreateTimer = new P.CreateTimerAction
- {
- FireAt = createTimerAction.FireAt.ToTimestamp(),
- };
- break;
- case OrchestratorActionType.SendEvent:
- var sendEventAction = (SendEventOrchestratorAction)action;
- if (sendEventAction.Instance == null)
- {
- throw new ArgumentException(
- $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!");
- }
-
- if (entityConversionState is not null
- && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId)
- && sendEventAction.EventName is not null
- && sendEventAction.EventData is not null)
- {
- P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction();
- protoAction.SendEntityMessage = sendAction;
-
- EntityConversions.DecodeEntityMessageAction(
- sendEventAction.EventName,
- sendEventAction.EventData,
- sendEventAction.Instance.InstanceId,
- sendAction,
- out string requestId);
-
- entityConversionState.EntityRequestIds.Add(requestId);
-
- switch (sendAction.EntityMessageTypeCase)
- {
- case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested:
- entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested);
- break;
- case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent:
- entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId);
- break;
- default:
- break;
- }
- }
- else
- {
- protoAction.SendEvent = new P.SendEventAction
- {
- Instance = sendEventAction.Instance.ToProtobuf(),
- Name = sendEventAction.EventName,
- Data = sendEventAction.EventData,
- };
-
- // Distributed Tracing: start a new trace activity derived from the orchestration
- // for an EventRaisedEvent (external event)
- using Activity? traceActivity = TraceHelper.StartTraceActivityForEventRaisedFromWorker(sendEventAction, instanceId, executionId);
-
- traceActivity?.Stop();
- }
-
- break;
- case OrchestratorActionType.OrchestrationComplete:
-
- if (entityConversionState is not null)
- {
- // as a precaution, unlock any entities that were not unlocked for some reason, before
- // completing the orchestration.
- foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations())
- {
- response.Actions.Add(new P.OrchestratorAction
- {
- Id = action.Id,
- SendEntityMessage = new P.SendEntityMessageAction
- {
- EntityUnlockSent = new P.EntityUnlockSentEvent
- {
- CriticalSectionId = criticalSectionId,
- TargetInstanceId = target,
- ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId,
- },
- },
- });
- }
- }
-
- var completeAction = (OrchestrationCompleteOrchestratorAction)action;
- protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction
- {
- CarryoverEvents =
- {
- // TODO
- },
- Details = completeAction.Details,
- NewVersion = completeAction.NewVersion,
- OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(),
- Result = completeAction.Result,
- };
-
- if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed)
- {
- protoAction.CompleteOrchestration.FailureDetails = completeAction.FailureDetails.ToProtobuf();
- }
-
- break;
- default:
- throw new NotSupportedException($"Unknown orchestrator action: {action.OrchestratorActionType}");
- }
-
- response.Actions.Add(protoAction);
- }
-
- return response;
- }
-
- ///
- /// Converts a to a .
- ///
- /// The status to convert.
- /// The converted status.
- internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
- {
- return (OrchestrationStatus)status;
- }
-
- ///
- /// Converts a to a .
- ///
- /// The status to convert.
- /// The converted status.
- [return: NotNullIfNotNull(nameof(status))]
- internal static OrchestrationInstance? ToCore(this P.OrchestrationInstance? status)
- {
- if (status == null)
- {
- return null;
- }
-
- return new OrchestrationInstance
- {
- InstanceId = status.InstanceId,
- ExecutionId = status.ExecutionId,
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The failure details to convert.
- /// The converted failure details.
- [return: NotNullIfNotNull(nameof(failureDetails))]
- internal static TaskFailureDetails? ToTaskFailureDetails(this P.TaskFailureDetails? failureDetails)
- {
- if (failureDetails == null)
- {
- return null;
- }
-
- return new TaskFailureDetails(
- failureDetails.ErrorType,
- failureDetails.ErrorMessage,
- failureDetails.StackTrace,
- failureDetails.InnerFailure.ToTaskFailureDetails());
- }
-
- ///
- /// Converts a to .
- ///
- /// The exception to convert.
- /// The task failure details.
- [return: NotNullIfNotNull(nameof(e))]
- internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e)
- {
- if (e == null)
- {
- return null;
- }
-
- return new P.TaskFailureDetails
- {
- ErrorType = e.GetType().FullName,
- ErrorMessage = e.Message,
- StackTrace = e.StackTrace,
- InnerFailure = e.InnerException.ToTaskFailureDetails(),
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The entity batch request to convert.
- /// The converted entity batch request.
- [return: NotNullIfNotNull(nameof(entityBatchRequest))]
- internal static EntityBatchRequest? ToEntityBatchRequest(this P.EntityBatchRequest? entityBatchRequest)
- {
- if (entityBatchRequest == null)
- {
- return null;
- }
-
- return new EntityBatchRequest()
- {
- EntityState = entityBatchRequest.EntityState,
- InstanceId = entityBatchRequest.InstanceId,
- Operations = entityBatchRequest.Operations.Select(r => r.ToOperationRequest()).ToList(),
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The entity request to convert.
- /// The converted request.
- /// Additional info about each operation, required by DTS.
- internal static void ToEntityBatchRequest(
- this P.EntityRequest entityRequest,
- out EntityBatchRequest batchRequest,
- out List operationInfos)
- {
- batchRequest = new EntityBatchRequest()
- {
- EntityState = entityRequest.EntityState,
- InstanceId = entityRequest.InstanceId,
- Operations = [], // operations are added to this collection below
- };
-
- operationInfos = new(entityRequest.OperationRequests.Count);
-
- foreach (P.HistoryEvent? op in entityRequest.OperationRequests)
- {
- if (op.EntityOperationSignaled is not null)
- {
- batchRequest.Operations.Add(new OperationRequest
- {
- Id = Guid.Parse(op.EntityOperationSignaled.RequestId),
- Operation = op.EntityOperationSignaled.Operation,
- Input = op.EntityOperationSignaled.Input,
- });
- operationInfos.Add(new P.OperationInfo
- {
- RequestId = op.EntityOperationSignaled.RequestId,
- ResponseDestination = null, // means we don't send back a response to the caller
- });
- }
- else if (op.EntityOperationCalled is not null)
- {
- batchRequest.Operations.Add(new OperationRequest
- {
- Id = Guid.Parse(op.EntityOperationCalled.RequestId),
- Operation = op.EntityOperationCalled.Operation,
- Input = op.EntityOperationCalled.Input,
- });
- operationInfos.Add(new P.OperationInfo
- {
- RequestId = op.EntityOperationCalled.RequestId,
- ResponseDestination = new P.OrchestrationInstance
- {
- InstanceId = op.EntityOperationCalled.ParentInstanceId,
- ExecutionId = op.EntityOperationCalled.ParentExecutionId,
- },
- });
- }
- }
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation request to convert.
- /// The converted operation request.
- [return: NotNullIfNotNull(nameof(operationRequest))]
- internal static OperationRequest? ToOperationRequest(this P.OperationRequest? operationRequest)
- {
- if (operationRequest == null)
- {
- return null;
- }
-
- return new OperationRequest()
- {
- Operation = operationRequest.Operation,
- Input = operationRequest.Input,
- Id = Guid.Parse(operationRequest.RequestId),
- TraceContext = operationRequest.TraceContext != null ?
- new DistributedTraceContext(
- operationRequest.TraceContext.TraceParent,
- operationRequest.TraceContext.TraceState) : null,
- };
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation result to convert.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(operationResult))]
- internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult)
- {
- if (operationResult == null)
- {
- return null;
- }
-
- switch (operationResult.ResultTypeCase)
- {
- case P.OperationResult.ResultTypeOneofCase.Success:
- return new OperationResult()
- {
- Result = operationResult.Success.Result,
- StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(),
- EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(),
- };
-
- case P.OperationResult.ResultTypeOneofCase.Failure:
- return new OperationResult()
- {
- FailureDetails = operationResult.Failure.FailureDetails.ToCore(),
- StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(),
- EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(),
- };
-
- default:
- throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported.");
- }
- }
-
- ///
- /// Converts a to .
- ///
- /// The operation result to convert.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(operationResult))]
- internal static P.OperationResult? ToOperationResult(this OperationResult? operationResult)
- {
- if (operationResult == null)
- {
- return null;
- }
-
- if (operationResult.FailureDetails == null)
- {
- return new P.OperationResult()
- {
- Success = new P.OperationResultSuccess()
- {
- Result = operationResult.Result,
- StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(),
- EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(),
- },
- };
- }
- else
- {
- return new P.OperationResult()
- {
- Failure = new P.OperationResultFailure()
- {
- FailureDetails = ToProtobuf(operationResult.FailureDetails),
- StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(),
- EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(),
- },
- };
- }
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation action to convert.
- /// The converted operation action.
- [return: NotNullIfNotNull(nameof(operationAction))]
- internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction)
- {
- if (operationAction == null)
- {
- return null;
- }
-
- switch (operationAction.OperationActionTypeCase)
- {
- case P.OperationAction.OperationActionTypeOneofCase.SendSignal:
-
- return new SendSignalOperationAction()
- {
- Name = operationAction.SendSignal.Name,
- Input = operationAction.SendSignal.Input,
- InstanceId = operationAction.SendSignal.InstanceId,
- ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(),
- RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(),
- ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ?
- new DistributedTraceContext(
- operationAction.SendSignal.ParentTraceContext.TraceParent,
- operationAction.SendSignal.ParentTraceContext.TraceState) : null,
- };
-
- case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration:
-
- return new StartNewOrchestrationOperationAction()
- {
- Name = operationAction.StartNewOrchestration.Name,
- Input = operationAction.StartNewOrchestration.Input,
- InstanceId = operationAction.StartNewOrchestration.InstanceId,
- Version = operationAction.StartNewOrchestration.Version,
- ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(),
- RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(),
- ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ?
- new DistributedTraceContext(
- operationAction.StartNewOrchestration.ParentTraceContext.TraceParent,
- operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null,
- };
- default:
- throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported.");
- }
- }
-
- ///
- /// Converts a to .
- ///
- /// The operation action to convert.
- /// The converted operation action.
- [return: NotNullIfNotNull(nameof(operationAction))]
- internal static P.OperationAction? ToOperationAction(this OperationAction? operationAction)
- {
- if (operationAction == null)
- {
- return null;
- }
-
- var action = new P.OperationAction();
-
- switch (operationAction)
- {
- case SendSignalOperationAction sendSignalAction:
-
- action.SendSignal = new P.SendSignalAction()
- {
- Name = sendSignalAction.Name,
- Input = sendSignalAction.Input,
- InstanceId = sendSignalAction.InstanceId,
- ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(),
- RequestTime = sendSignalAction.RequestTime?.ToTimestamp(),
- ParentTraceContext = sendSignalAction.ParentTraceContext != null ?
- new P.TraceContext
- {
- TraceParent = sendSignalAction.ParentTraceContext.TraceParent,
- TraceState = sendSignalAction.ParentTraceContext.TraceState,
- }
- : null,
- };
- break;
-
- case StartNewOrchestrationOperationAction startNewOrchestrationAction:
-
- action.StartNewOrchestration = new P.StartNewOrchestrationAction()
- {
- Name = startNewOrchestrationAction.Name,
- Input = startNewOrchestrationAction.Input,
- Version = startNewOrchestrationAction.Version,
- InstanceId = startNewOrchestrationAction.InstanceId,
- ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(),
- RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(),
- ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ?
- new P.TraceContext
- {
- TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent,
- TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState,
- }
- : null,
- };
- break;
- }
-
- return action;
- }
-
- ///
- /// Converts a to a .
- ///
- /// The operation result to convert.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(entityBatchResult))]
- internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult)
- {
- if (entityBatchResult == null)
- {
- return null;
- }
-
- return new EntityBatchResult()
- {
- Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(),
- EntityState = entityBatchResult.EntityState,
- Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(),
- FailureDetails = entityBatchResult.FailureDetails.ToCore(),
- };
- }
-
- ///
- /// Converts a to .
- ///
- /// The operation result to convert.
- /// The completion token, or null for the older protocol.
- /// Additional information about each operation, required by DTS.
- /// The converted operation result.
- [return: NotNullIfNotNull(nameof(entityBatchResult))]
- internal static P.EntityBatchResult? ToEntityBatchResult(
- this EntityBatchResult? entityBatchResult,
- string? completionToken = null,
- IEnumerable? operationInfos = null)
- {
- if (entityBatchResult == null)
- {
- return null;
- }
-
- return new P.EntityBatchResult()
- {
- EntityState = entityBatchResult.EntityState,
- FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(),
- Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] },
- Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] },
- CompletionToken = completionToken ?? string.Empty,
- OperationInfos = { operationInfos ?? [] },
- };
- }
-
- ///
- /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation.
- ///
- /// The DT.Core representation.
- /// The gRPC representation.
- [return: NotNullIfNotNull(nameof(parameters))]
- internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters)
- {
- if (parameters == null)
- {
- return null;
- }
-
- return new TaskOrchestrationEntityParameters()
- {
- EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(),
- };
- }
-
- ///
- /// Gets the approximate byte count for a .
- ///
- /// The failure details.
- /// The approximate byte count.
- internal static int GetApproximateByteCount(this P.TaskFailureDetails failureDetails)
- {
- // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar
- Encoding encoding = Encoding.UTF8;
-
- int byteCount = 0;
- if (failureDetails.ErrorType != null)
- {
- byteCount += encoding.GetByteCount(failureDetails.ErrorType);
- }
-
- if (failureDetails.ErrorMessage != null)
- {
- byteCount += encoding.GetByteCount(failureDetails.ErrorMessage);
- }
-
- if (failureDetails.StackTrace != null)
- {
- byteCount += encoding.GetByteCount(failureDetails.StackTrace);
- }
-
- if (failureDetails.InnerFailure != null)
- {
- byteCount += failureDetails.InnerFailure.GetApproximateByteCount();
- }
-
- return byteCount;
- }
-
- ///
- /// Decode a protobuf message from a base64 string.
- ///
- /// The type to decode to.
- /// The message parser.
- /// The base64 encoded message.
- /// The decoded message.
- /// If decoding fails.
- internal static T Base64Decode(this MessageParser parser, string encodedMessage) where T : IMessage
- {
- // Decode the base64 in a way that doesn't allocate a byte[] on each request
- int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage);
- byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount);
- try
- {
- // The Base64 APIs require first converting the string into UTF-8 bytes. We then
- // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that
- // we can finally decode the protobuf request.
- Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0);
- OperationStatus status = Base64.DecodeFromUtf8InPlace(
- buffer.AsSpan(0, encodedByteCount),
- out int bytesWritten);
- if (status != OperationStatus.Done)
- {
- throw new ArgumentException(
- $"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage));
- }
-
- return (T)parser.ParseFrom(buffer, 0, bytesWritten);
- }
- finally
- {
- ArrayPool.Shared.Return(buffer);
- }
- }
-
- ///
- /// Converts a grpc to a .
- ///
- /// The failure details to convert.
- /// The converted failure details.
- internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails)
- {
- if (failureDetails == null)
- {
- return null;
- }
-
- return new FailureDetails(
- failureDetails.ErrorType,
- failureDetails.ErrorMessage,
- failureDetails.StackTrace,
- failureDetails.InnerFailure.ToCore(),
- failureDetails.IsNonRetriable);
- }
-
- ///
- /// Converts a instance to a corresponding C# object.
- ///
- /// The Protobuf Value to convert.
- /// The corresponding C# object.
- ///
- /// Thrown when the Protobuf Value.KindCase is not one of the supported types.
- ///
- internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value)
- {
- switch (value.KindCase)
- {
- case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue:
- return null;
- case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue:
- return value.NumberValue;
- case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue:
- return value.StringValue;
- case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue:
- return value.BoolValue;
- case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue:
- return value.StructValue.Fields.ToDictionary(
- pair => pair.Key,
- pair => ConvertValueToObject(pair.Value));
- case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue:
- return value.ListValue.Values.Select(ConvertValueToObject).ToList();
+ // Otherwise just return as string
+ return stringValue;
+ case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue:
+ return value.BoolValue;
+ case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue:
+ return value.StructValue.Fields.ToDictionary(
+ pair => pair.Key,
+ pair => ConvertValueToObject(pair.Value));
+ case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue:
+ return value.ListValue.Values.Select(ConvertValueToObject).ToList();
default:
- throw new NotSupportedException($"Unsupported Value kind: {value.KindCase}");
- }
+ // Fallback: serialize the whole value to JSON string
+ return JsonSerializer.Serialize(value);
+ }
}
- ///
- /// Converts a to a grpc .
- ///
- /// The failure details to convert.
- /// The converted failure details.
- static P.TaskFailureDetails? ToProtobuf(this FailureDetails? failureDetails)
- {
- if (failureDetails == null)
- {
- return null;
- }
-
- return new P.TaskFailureDetails
- {
- ErrorType = failureDetails.ErrorType ?? "(unknown)",
- ErrorMessage = failureDetails.ErrorMessage ?? "(unknown)",
- StackTrace = failureDetails.StackTrace,
- IsNonRetriable = failureDetails.IsNonRetriable,
- InnerFailure = failureDetails.InnerFailure.ToProtobuf(),
- };
- }
-
- static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status)
- {
- return (P.OrchestrationStatus)status;
- }
-
- static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance)
- {
- return new P.OrchestrationInstance
- {
- InstanceId = instance.InstanceId,
- ExecutionId = instance.ExecutionId,
- };
- }
-
- ///
- /// Tracks state required for converting orchestration histories containing entity-related events.
- ///
- internal class EntityConversionState
- {
- readonly bool insertMissingEntityUnlocks;
-
- OrchestrationInstance? instance;
- HashSet? entityRequestIds;
- Dictionary? unlockObligations;
-
- ///
- /// Initializes a new instance of the class.
- ///
- /// Whether to insert missing unlock events in to the history
- /// when the orchestration completes.
- public EntityConversionState(bool insertMissingEntityUnlocks)
- {
- this.ConvertFromProto = (P.HistoryEvent e) => ProtoUtils.ConvertHistoryEvent(e, this);
- this.insertMissingEntityUnlocks = insertMissingEntityUnlocks;
- }
-
- ///
- /// Gets a function that converts a history event in protobuf format to a core history event.
- ///
- public Func ConvertFromProto { get; }
-
- ///
- /// Gets the orchestration instance of this history.
- ///
- public OrchestrationInstance? CurrentInstance => this.instance;
-
- ///
- /// Gets the set of guids that have been used as entity request ids in this history.
- ///
- public HashSet EntityRequestIds => this.entityRequestIds ??= new();
-
- ///
- /// Records the orchestration instance, which may be needed for some conversions.
- ///
- /// The orchestration instance.
- public void SetOrchestrationInstance(OrchestrationInstance instance)
- {
- this.instance = instance;
- }
-
- ///
- /// Adds unlock obligations for all entities that are being locked by this request.
- ///
- /// The lock request.
- public void AddUnlockObligations(P.EntityLockRequestedEvent request)
- {
- if (!this.insertMissingEntityUnlocks)
- {
- return;
- }
-
- this.unlockObligations ??= new();
-
- foreach (string target in request.LockSet)
- {
- this.unlockObligations[target] = request.CriticalSectionId;
- }
- }
-
- ///
- /// Removes an unlock obligation.
- ///
- /// The target entity.
- public void RemoveUnlockObligation(string target)
- {
- if (!this.insertMissingEntityUnlocks)
- {
- return;
- }
-
- this.unlockObligations?.Remove(target);
- }
-
- ///
- /// Returns the remaining unlock obligations, and clears the list.
- ///
- /// The unlock obligations.
- public IEnumerable<(string Target, string CriticalSectionId)> ResetObligations()
- {
- if (!this.insertMissingEntityUnlocks)
- {
- yield break;
- }
-
- if (this.unlockObligations is not null)
- {
- foreach (var kvp in this.unlockObligations)
- {
- yield return (kvp.Key, kvp.Value);
- }
-
- this.unlockObligations = null;
- }
- }
- }
-}
+ ///
+ /// Converts a MapFieldinto a IDictionary.
+ ///
+ /// The map to convert.
+ /// Dictionary contains the converted obejct.
+ internal static IDictionary ConvertProperties(MapField properties)
+ {
+ return properties.ToDictionary(
+ kvp => kvp.Key,
+ kvp => ConvertValueToObject(kvp.Value));
+ }
+
+ ///
+ /// Converts a C# object to a protobuf Value.
+ ///
+ /// The object to convert.
+ /// The converted protobuf Value.
+ internal static Value ConvertObjectToValue(object? obj)
+ {
+ return obj switch
+ {
+ null => Value.ForNull(),
+ string str => Value.ForString(str),
+ bool b => Value.ForBool(b),
+ int i => Value.ForNumber(i),
+ long l => Value.ForNumber(l),
+ float f => Value.ForNumber(f),
+ double d => Value.ForNumber(d),
+ decimal dec => Value.ForNumber((double)dec),
+
+ // For DateTime and DateTimeOffset, add prefix to distinguish from normal string.
+ DateTime dt => Value.ForString($"dt:{dt.ToString("O")}"),
+ DateTimeOffset dto => Value.ForString($"dto:{dto.ToString("O")}"),
+ IDictionary dict => Value.ForStruct(new Struct
+ {
+ Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) },
+ }),
+ IEnumerable e => Value.ForList(e.Cast
protected virtual IDurableTaskFactory Factory { get; }
+
+ ///
+ /// Gets or sets the exception properties provider used to enrich failure details with custom exception properties.
+ ///
+ protected IExceptionPropertiesProvider? ExceptionPropertiesProvider { get; set; }
}
diff --git a/src/Worker/Core/IExceptionPropertiesProvider.cs b/src/Worker/Core/IExceptionPropertiesProvider.cs
new file mode 100644
index 000000000..e0205670e
--- /dev/null
+++ b/src/Worker/Core/IExceptionPropertiesProvider.cs
@@ -0,0 +1,17 @@
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+namespace Microsoft.DurableTask.Worker;
+
+///
+/// Provides custom exception property inclusion rules for enriching FailureDetails.
+///
+public interface IExceptionPropertiesProvider
+{
+ ///
+ /// Extracts custom properties from an exception.
+ ///
+ /// The exception to extract properties from.
+ /// A dictionary of custom properties to include in the FailureDetails, or null if no properties should be added.
+ IDictionary? GetExceptionProperties(Exception exception);
+}
diff --git a/src/Worker/Core/Shims/DurableTaskShimFactory.cs b/src/Worker/Core/Shims/DurableTaskShimFactory.cs
index 584b7eeb8..3ae1cbc7c 100644
--- a/src/Worker/Core/Shims/DurableTaskShimFactory.cs
+++ b/src/Worker/Core/Shims/DurableTaskShimFactory.cs
@@ -36,7 +36,7 @@ public DurableTaskShimFactory(
///
/// Gets the default with default values.
///
- public static DurableTaskShimFactory Default { get; } = new();
+ public static DurableTaskShimFactory Default { get; } = new(null, null);
///
/// Creates a from a .
diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs
index 330fd1888..a5483073c 100644
--- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs
+++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs
@@ -168,7 +168,8 @@ static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails)
failureDetails.ErrorType,
failureDetails.ErrorMessage,
failureDetails.StackTrace,
- failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null);
+ failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null,
+ failureDetails.Properties);
async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input)
{
diff --git a/src/Worker/Core/Shims/TaskOrchestrationShim.cs b/src/Worker/Core/Shims/TaskOrchestrationShim.cs
index 127038a38..eb7a179be 100644
--- a/src/Worker/Core/Shims/TaskOrchestrationShim.cs
+++ b/src/Worker/Core/Shims/TaskOrchestrationShim.cs
@@ -95,7 +95,9 @@ public TaskOrchestrationShim(
// failure details are correctly propagated.
throw new CoreTaskFailedException(e.Message, e.InnerException)
{
- FailureDetails = new FailureDetails(e, e.FailureDetails.ToCoreFailureDetails()),
+ FailureDetails = new FailureDetails(e,
+ e.FailureDetails.ToCoreFailureDetails(),
+ properties: e.FailureDetails.Properties),
};
}
finally
diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
index 3e921252e..4dfa70f7f 100644
--- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
+++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
@@ -33,16 +33,20 @@ class Processor
readonly TaskHubSidecarServiceClient client;
readonly DurableTaskShimFactory shimFactory;
readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions;
+ readonly DTCore.IExceptionPropertiesProvider? exceptionPropertiesProvider;
[Obsolete("Experimental")]
readonly IOrchestrationFilter? orchestrationFilter;
- public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null)
+ public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null, IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
{
this.worker = worker;
this.client = client;
this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory);
this.internalOptions = this.worker.grpcOptions.Internal;
this.orchestrationFilter = orchestrationFilter;
+ this.exceptionPropertiesProvider = exceptionPropertiesProvider is not null
+ ? new ExceptionPropertiesProviderAdapter(exceptionPropertiesProvider)
+ : null;
}
ILogger Logger => this.worker.logger;
@@ -636,7 +640,8 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
shim,
BehaviorOnContinueAsNew.Carryover,
request.EntityParameters.ToCore(),
- ErrorPropagationMode.UseFailureDetails);
+ ErrorPropagationMode.UseFailureDetails,
+ this.exceptionPropertiesProvider);
result = executor.Execute();
}
else
@@ -654,7 +659,7 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync(
{
// This is not expected: Normally TaskOrchestrationExecutor handles exceptions in user code.
this.Logger.OrchestratorFailed(name, request.InstanceId, unexpected.ToString());
- failureDetails = unexpected.ToTaskFailureDetails();
+ failureDetails = unexpected.ToTaskFailureDetails(this.exceptionPropertiesProvider);
}
P.OrchestratorResponse response;
@@ -764,6 +769,8 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken,
P.TaskFailureDetails? failureDetails = null;
TaskContext innerContext = new(instance);
+ innerContext.ExceptionPropertiesProvider = this.exceptionPropertiesProvider;
+
TaskName name = new(request.Name);
string? output = null;
@@ -792,7 +799,7 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken,
}
catch (Exception applicationException)
{
- failureDetails = applicationException.ToTaskFailureDetails();
+ failureDetails = applicationException.ToTaskFailureDetails(this.exceptionPropertiesProvider);
}
}
else
diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.cs
index 463af441f..da61d8849 100644
--- a/src/Worker/Grpc/GrpcDurableTaskWorker.cs
+++ b/src/Worker/Grpc/GrpcDurableTaskWorker.cs
@@ -29,6 +29,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker
/// The service provider.
/// The logger.
/// The optional used to filter orchestration execution.
+ /// The custom exception properties provider that help build failure details.
public GrpcDurableTaskWorker(
string name,
IDurableTaskFactory factory,
@@ -36,7 +37,8 @@ public GrpcDurableTaskWorker(
IOptionsMonitor workerOptions,
IServiceProvider services,
ILoggerFactory loggerFactory,
- IOrchestrationFilter? orchestrationFilter = null)
+ IOrchestrationFilter? orchestrationFilter = null,
+ IExceptionPropertiesProvider? exceptionPropertiesProvider = null)
: base(name, factory)
{
this.grpcOptions = Check.NotNull(grpcOptions).Get(name);
@@ -45,6 +47,7 @@ public GrpcDurableTaskWorker(
this.loggerFactory = Check.NotNull(loggerFactory);
this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name.
this.orchestrationFilter = orchestrationFilter;
+ this.ExceptionPropertiesProvider = exceptionPropertiesProvider;
}
///
@@ -52,7 +55,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address);
this.logger.StartingTaskHubWorker(address);
- await new Processor(this, new(callInvoker), this.orchestrationFilter).ExecuteAsync(stoppingToken);
+ await new Processor(this, new(callInvoker), this.orchestrationFilter, this.ExceptionPropertiesProvider).ExecuteAsync(stoppingToken);
}
#if NET6_0_OR_GREATER
diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs
index 74a92006a..d8d04deaa 100644
--- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs
+++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs
@@ -1,140 +1,140 @@
-// Copyright (c) Microsoft Corporation.
-// Licensed under the MIT License.
-
-using DurableTask.Core;
-using DurableTask.Core.History;
-using Google.Protobuf;
-using Microsoft.DurableTask.Worker.Shims;
-using Microsoft.Extensions.Caching.Memory;
-using Microsoft.Extensions.DependencyInjection;
-using P = Microsoft.DurableTask.Protobuf;
-
-namespace Microsoft.DurableTask.Worker.Grpc;
-
-///
-/// Helper class for invoking orchestrations directly, without building a worker instance.
-///
-///
-///
-/// This static class can be used to execute orchestration logic directly. In order to use it for this purpose, the
-/// caller must provide orchestration state as serialized protobuf bytes.
-///
-/// The Azure Functions .NET worker extension is the primary intended user of this class, where orchestration state
-/// is provided by trigger bindings.
-///
-///
-public static class GrpcOrchestrationRunner
-{
- ///
- /// Loads orchestration history from and uses it to execute the
- /// orchestrator function code pointed to by .
- ///
- ///
- /// The type of the orchestrator function input. This type must be deserializable from JSON.
- ///
- ///
- /// The type of the orchestrator function output. This type must be serializable to JSON.
- ///
- ///
- /// The base64-encoded protobuf payload representing an orchestration execution request.
- ///
- /// A function that implements the orchestrator logic.
- ///
- /// Optional from which injected dependencies can be retrieved.
- ///
- ///
- /// Returns a base64-encoded set of orchestrator actions to be interpreted by the external orchestration engine.
- ///
- ///
- /// Thrown if or is null.
- ///
- public static string LoadAndRun(
- string encodedOrchestratorRequest,
- Func> orchestratorFunc,
- IServiceProvider? services = null)
- {
- Check.NotNull(orchestratorFunc);
- return LoadAndRun(encodedOrchestratorRequest, FuncTaskOrchestrator.Create(orchestratorFunc), services);
- }
-
- ///
- /// Deserializes orchestration history from and uses it to resume the
- /// orchestrator implemented by .
- ///
- ///
- /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string.
- ///
- ///
- /// An implementation that defines the orchestrator logic.
- ///
- ///
- /// Optional from which injected dependencies can be retrieved.
- ///
- ///
- /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger.
- ///
- ///
- /// Thrown if or is null.
- ///
- ///
- /// Thrown if contains invalid data.
- ///
- public static string LoadAndRun(
- string encodedOrchestratorRequest,
- ITaskOrchestrator implementation,
- IServiceProvider? services = null)
- {
- return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services);
+// Copyright (c) Microsoft Corporation.
+// Licensed under the MIT License.
+
+using DurableTask.Core;
+using DurableTask.Core.History;
+using Google.Protobuf;
+using Microsoft.DurableTask.Worker.Shims;
+using Microsoft.Extensions.Caching.Memory;
+using Microsoft.Extensions.DependencyInjection;
+using P = Microsoft.DurableTask.Protobuf;
+
+namespace Microsoft.DurableTask.Worker.Grpc;
+
+///
+/// Helper class for invoking orchestrations directly, without building a worker instance.
+///
+///
+///
+/// This static class can be used to execute orchestration logic directly. In order to use it for this purpose, the
+/// caller must provide orchestration state as serialized protobuf bytes.
+///
+/// The Azure Functions .NET worker extension is the primary intended user of this class, where orchestration state
+/// is provided by trigger bindings.
+///
+///
+public static class GrpcOrchestrationRunner
+{
+ ///
+ /// Loads orchestration history from and uses it to execute the
+ /// orchestrator function code pointed to by .
+ ///
+ ///
+ /// The type of the orchestrator function input. This type must be deserializable from JSON.
+ ///
+ ///
+ /// The type of the orchestrator function output. This type must be serializable to JSON.
+ ///
+ ///
+ /// The base64-encoded protobuf payload representing an orchestration execution request.
+ ///
+ /// A function that implements the orchestrator logic.
+ ///
+ /// Optional from which injected dependencies can be retrieved.
+ ///
+ ///
+ /// Returns a base64-encoded set of orchestrator actions to be interpreted by the external orchestration engine.
+ ///
+ ///
+ /// Thrown if or is null.
+ ///
+ public static string LoadAndRun(
+ string encodedOrchestratorRequest,
+ Func> orchestratorFunc,
+ IServiceProvider? services = null)
+ {
+ Check.NotNull(orchestratorFunc);
+ return LoadAndRun(encodedOrchestratorRequest, FuncTaskOrchestrator.Create(orchestratorFunc), services);
}
- ///
- /// Deserializes orchestration history from and uses it to resume the
- /// orchestrator implemented by .
- ///
- ///
- /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string.
- ///
- ///
+ ///
+ /// Deserializes orchestration history from and uses it to resume the
+ /// orchestrator implemented by .
+ ///
+ ///
+ /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string.
+ ///
+ ///
/// An implementation that defines the orchestrator logic.
///
- ///
- /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session.
- ///
- ///
- /// Optional from which injected dependencies can be retrieved.
- ///
- ///
- /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger.
- ///
- ///
- /// Thrown if or is null.
- ///
- ///
- /// Thrown if contains invalid data.
- ///
- public static string LoadAndRun(
- string encodedOrchestratorRequest,
+ ///
+ /// Optional from which injected dependencies can be retrieved.
+ ///
+ ///
+ /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger.
+ ///
+ ///
+ /// Thrown if or is null.
+ ///
+ ///
+ /// Thrown if contains invalid data.
+ ///
+ public static string LoadAndRun(
+ string encodedOrchestratorRequest,
ITaskOrchestrator implementation,
- ExtendedSessionsCache? extendedSessionsCache,
- IServiceProvider? services = null)
- {
- Check.NotNullOrEmpty(encodedOrchestratorRequest);
- Check.NotNull(implementation);
-
- P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode(
- encodedOrchestratorRequest);
-
- List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
- IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);
- Dictionary properties = request.Properties.ToDictionary(
- pair => pair.Key,
- pair => ProtoUtils.ConvertValueToObject(pair.Value));
-
+ IServiceProvider? services = null)
+ {
+ return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services);
+ }
+
+ ///
+ /// Deserializes orchestration history from and uses it to resume the
+ /// orchestrator implemented by .
+ ///
+ ///
+ /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string.
+ ///
+ ///
+ /// An implementation that defines the orchestrator logic.
+ ///
+ ///
+ /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session.
+ ///
+ ///
+ /// Optional from which injected dependencies can be retrieved.
+ ///
+ ///
+ /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger.
+ ///
+ ///
+ /// Thrown if or is null.
+ ///
+ ///
+ /// Thrown if contains invalid data.
+ ///
+ public static string LoadAndRun(
+ string encodedOrchestratorRequest,
+ ITaskOrchestrator implementation,
+ ExtendedSessionsCache? extendedSessionsCache,
+ IServiceProvider? services = null)
+ {
+ Check.NotNullOrEmpty(encodedOrchestratorRequest);
+ Check.NotNull(implementation);
+
+ P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode(
+ encodedOrchestratorRequest);
+
+ List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
+ IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);
+ Dictionary properties = request.Properties.ToDictionary(
+ pair => pair.Key,
+ pair => ProtoUtils.ConvertValueToObject(pair.Value));
+
OrchestratorExecutionResult? result = null;
MemoryCache? extendedSessions = null;
- // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
- bool addToExtendedSessions = false;
+ // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached
+ bool addToExtendedSessions = false;
bool requiresHistory = false;
bool pastEventsIncluded = true;
bool isExtendedSession = false;
@@ -157,88 +157,94 @@ public static string LoadAndRun(
{
pastEventsIncluded = includePastEvents;
}
-
+
if (isExtendedSession && extendedSessions != null)
{
// If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended
// session based on the provided history
- if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null)
- {
- OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState;
- runtimeState.NewEvents.Clear();
- foreach (HistoryEvent newEvent in newEvents)
- {
- runtimeState.AddEvent(newEvent);
- }
-
- result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents();
- if (extendedSessionState.OrchestrationExecutor.IsCompleted)
- {
- extendedSessions.Remove(request.InstanceId);
- }
- }
- else
+ if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null)
{
- extendedSessions.Remove(request.InstanceId);
- addToExtendedSessions = true;
+ OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState;
+ runtimeState.NewEvents.Clear();
+ foreach (HistoryEvent newEvent in newEvents)
+ {
+ runtimeState.AddEvent(newEvent);
+ }
+
+ result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents();
+ if (extendedSessionState.OrchestrationExecutor.IsCompleted)
+ {
+ extendedSessions.Remove(request.InstanceId);
+ }
+ }
+ else
+ {
+ extendedSessions.Remove(request.InstanceId);
+ addToExtendedSessions = true;
+ }
+ }
+
+ if (result == null)
+ {
+ // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the
+ // session and lost the orchestration history so we cannot replay the orchestration.
+ if (!pastEventsIncluded)
+ {
+ requiresHistory = true;
+ }
+ else
+ {
+ // Re-construct the orchestration state from the history.
+ // New events must be added using the AddEvent method.
+ OrchestrationRuntimeState runtimeState = new(pastEvents);
+
+ foreach (HistoryEvent newEvent in newEvents)
+ {
+ runtimeState.AddEvent(newEvent);
+ }
+
+ TaskName orchestratorName = new(runtimeState.Name);
+ ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
+ ? new(new(p.Name), p.OrchestrationInstance.InstanceId)
+ : null;
+
+ DurableTaskShimFactory factory = services is null
+ ? DurableTaskShimFactory.Default
+ : ActivatorUtilities.GetServiceOrCreateInstance(services);
+ TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
+
+ TaskOrchestrationExecutor executor = new(
+ runtimeState,
+ shim,
+ BehaviorOnContinueAsNew.Carryover,
+ request.EntityParameters.ToCore(),
+ ErrorPropagationMode.UseFailureDetails);
+ result = executor.Execute();
+
+ if (addToExtendedSessions && !executor.IsCompleted)
+ {
+ extendedSessions.Set(
+ request.InstanceId,
+ new(runtimeState, shim, executor),
+ new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
+ }
+ else
+ {
+ extendedSessions?.Remove(request.InstanceId);
+ }
}
- }
-
- if (result == null)
- {
- // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the
- // session and lost the orchestration history so we cannot replay the orchestration.
- if (!pastEventsIncluded)
- {
- requiresHistory = true;
- }
- else
- {
- // Re-construct the orchestration state from the history.
- // New events must be added using the AddEvent method.
- OrchestrationRuntimeState runtimeState = new(pastEvents);
-
- foreach (HistoryEvent newEvent in newEvents)
- {
- runtimeState.AddEvent(newEvent);
- }
-
- TaskName orchestratorName = new(runtimeState.Name);
- ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
- ? new(new(p.Name), p.OrchestrationInstance.InstanceId)
- : null;
-
- DurableTaskShimFactory factory = services is null
- ? DurableTaskShimFactory.Default
- : ActivatorUtilities.GetServiceOrCreateInstance(services);
- TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent);
- TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
- result = executor.Execute();
-
- if (addToExtendedSessions && !executor.IsCompleted)
- {
- extendedSessions.Set(
- request.InstanceId,
- new(runtimeState, shim, executor),
- new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) });
- }
- else
- {
- extendedSessions?.Remove(request.InstanceId);
- }
- }
- }
-
- P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
+ }
+
+ P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
request.InstanceId,
- request.ExecutionId,
- result?.CustomStatus,
- result?.Actions,
- completionToken: string.Empty, /* doesn't apply */
+ request.ExecutionId,
+ result?.CustomStatus,
+ result?.Actions,
+ completionToken: string.Empty, /* doesn't apply */
entityConversionState: null,
- orchestrationActivity: null,
- requiresHistory: requiresHistory);
- byte[] responseBytes = response.ToByteArray();
- return Convert.ToBase64String(responseBytes);
- }
-}
+ orchestrationActivity: null,
+ requiresHistory: requiresHistory);
+ byte[] responseBytes = response.ToByteArray();
+ return Convert.ToBase64String(responseBytes);
+ }
+}
diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs
index ae8c16a19..86b98030d 100644
--- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs
+++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs
@@ -2,12 +2,16 @@
// Licensed under the MIT License.
using System.Buffers;
+using System.Collections;
+using System.Globalization;
+using System.Text.Json;
using DurableTask.Core;
using DurableTask.Core.Command;
using DurableTask.Core.History;
using DurableTask.Core.Query;
using DurableTask.Core.Tracing;
using Google.Protobuf;
+using Google.Protobuf.Collections;
using Google.Protobuf.WellKnownTypes;
using Microsoft.DurableTask.Sidecar.Dispatcher;
using Proto = Microsoft.DurableTask.Protobuf;
@@ -357,7 +361,8 @@ public static string Base64Encode(IMessage message)
failureDetails.ErrorMessage,
failureDetails.StackTrace,
GetFailureDetails(failureDetails.InnerFailure),
- failureDetails.IsNonRetriable);
+ failureDetails.IsNonRetriable,
+ ConvertMapToDictionary(failureDetails.Properties));
}
internal static Proto.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails)
@@ -367,7 +372,7 @@ public static string Base64Encode(IMessage message)
return null;
}
- return new Proto.TaskFailureDetails
+ var taskFailureDetails = new Proto.TaskFailureDetails
{
ErrorType = failureDetails.ErrorType,
ErrorMessage = failureDetails.ErrorMessage,
@@ -375,6 +380,17 @@ public static string Base64Encode(IMessage message)
InnerFailure = GetFailureDetails(failureDetails.InnerFailure),
IsNonRetriable = failureDetails.IsNonRetriable,
};
+
+ // Add properties if they exist
+ if (failureDetails.Properties != null)
+ {
+ foreach (var kvp in failureDetails.Properties)
+ {
+ taskFailureDetails.Properties.Add(kvp.Key, ConvertObjectToValue(kvp.Value));
+ }
+ }
+
+ return taskFailureDetails;
}
internal static OrchestrationQuery ToOrchestrationQuery(Proto.QueryInstancesRequest request)
@@ -438,4 +454,94 @@ internal static Proto.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeR
};
return response;
}
+
+ ///
+ /// Converts a MapField into a IDictionary.
+ ///
+ ///
+ ///
+ public static IDictionary ConvertMapToDictionary(MapField properties)
+ {
+ return properties.ToDictionary(
+ kvp => kvp.Key,
+ kvp => ConvertValueToObject(kvp.Value)
+ );
+ }
+
+ ///
+ /// Converts a C# object to a protobuf Value.
+ ///
+ /// The object to convert.
+ /// The converted protobuf Value.
+ internal static Value ConvertObjectToValue(object? obj)
+ {
+ return obj switch
+ {
+ null => Value.ForNull(),
+ string str => Value.ForString(str),
+ bool b => Value.ForBool(b),
+ int i => Value.ForNumber(i),
+ long l => Value.ForNumber(l),
+ float f => Value.ForNumber(f),
+ double d => Value.ForNumber(d),
+ decimal dec => Value.ForNumber((double)dec),
+
+ // For DateTime and DateTimeOffset, add prefix to distinguish from normal string.
+ DateTime dt => Value.ForString($"dt:{dt.ToString("O")}"),
+ DateTimeOffset dto => Value.ForString($"dto:{dto.ToString("O")}"),
+ IDictionary dict => Value.ForStruct(new Struct
+ {
+ Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) },
+ }),
+ IEnumerable e => Value.ForList(e.Cast