diff --git a/Directory.Packages.props b/Directory.Packages.props index 34509fe65..a0a48e71d 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -29,7 +29,7 @@ - + diff --git a/eng/targets/Release.props b/eng/targets/Release.props index e04c66c75..953a0eff6 100644 --- a/eng/targets/Release.props +++ b/eng/targets/Release.props @@ -17,7 +17,7 @@ - 1.15.1 + 1.16.0 diff --git a/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs b/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs index 8d213a9d8..775be4853 100644 --- a/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs +++ b/src/Abstractions/DurableTaskCoreExceptionsExtensions.cs @@ -49,6 +49,7 @@ static class DurableTaskCoreExceptionsExtensions failureDetails.ErrorType, failureDetails.ErrorMessage, failureDetails.StackTrace, - failureDetails.InnerFailure?.ToTaskFailureDetails()); + failureDetails.InnerFailure?.ToTaskFailureDetails(), + failureDetails.Properties); } } diff --git a/src/Abstractions/TaskFailureDetails.cs b/src/Abstractions/TaskFailureDetails.cs index 1387b19cd..271226f33 100644 --- a/src/Abstractions/TaskFailureDetails.cs +++ b/src/Abstractions/TaskFailureDetails.cs @@ -15,7 +15,8 @@ namespace Microsoft.DurableTask; /// A summary description of the failure. /// The stack trace of the failure. /// The inner cause of the task failure. -public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure) +/// Additional properties associated with the exception. +public record TaskFailureDetails(string ErrorType, string ErrorMessage, string? StackTrace, TaskFailureDetails? InnerFailure, IDictionary? Properties) { Type? loadedExceptionType; @@ -123,7 +124,8 @@ internal CoreFailureDetails ToCoreFailureDetails() this.ErrorMessage, this.StackTrace, this.InnerFailure?.ToCoreFailureDetails(), - isNonRetriable: false); + isNonRetriable: false, + this.Properties); } /// @@ -143,7 +145,8 @@ internal CoreFailureDetails ToCoreFailureDetails() coreFailureDetails.ErrorType, coreFailureDetails.ErrorMessage, coreFailureDetails.StackTrace, - FromCoreFailureDetails(coreFailureDetails.InnerFailure)); + FromCoreFailureDetails(coreFailureDetails.InnerFailure), + coreFailureDetails.Properties); } [return: NotNullIfNotNull(nameof(exception))] @@ -160,14 +163,17 @@ internal CoreFailureDetails ToCoreFailureDetails() coreEx.FailureDetails?.ErrorType ?? "(unknown)", coreEx.FailureDetails?.ErrorMessage ?? "(unknown)", coreEx.FailureDetails?.StackTrace, - FromCoreFailureDetailsRecursive(coreEx.FailureDetails?.InnerFailure) ?? FromExceptionRecursive(coreEx.InnerException)); + FromCoreFailureDetailsRecursive(coreEx.FailureDetails?.InnerFailure) ?? FromExceptionRecursive(coreEx.InnerException), + coreEx.FailureDetails?.Properties); } + // might need to udpate this later return new TaskFailureDetails( exception.GetType().ToString(), exception.Message, exception.StackTrace, - FromExceptionRecursive(exception.InnerException)); + FromExceptionRecursive(exception.InnerException), + null); } static TaskFailureDetails? FromCoreFailureDetailsRecursive(CoreFailureDetails? coreFailureDetails) @@ -181,6 +187,7 @@ internal CoreFailureDetails ToCoreFailureDetails() coreFailureDetails.ErrorType, coreFailureDetails.ErrorMessage, coreFailureDetails.StackTrace, - FromCoreFailureDetailsRecursive(coreFailureDetails.InnerFailure)); + FromCoreFailureDetailsRecursive(coreFailureDetails.InnerFailure), + coreFailureDetails.Properties); } } diff --git a/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs b/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs index 50191e378..7647326c7 100644 --- a/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs +++ b/src/Client/OrchestrationServiceClientShim/ShimExtensions.cs @@ -75,7 +75,7 @@ public static Core.OrchestrationStatus ConvertToCore(this OrchestrationRuntimeSt } TaskFailureDetails? inner = details.InnerFailure?.ConvertFromCore(); - return new TaskFailureDetails(details.ErrorType, details.ErrorMessage, details.StackTrace, inner); + return new TaskFailureDetails(details.ErrorType, details.ErrorMessage, details.StackTrace, inner, details.Properties); } /// diff --git a/src/Grpc/orchestrator_service.proto b/src/Grpc/orchestrator_service.proto index df5143bc9..b2def0878 100644 --- a/src/Grpc/orchestrator_service.proto +++ b/src/Grpc/orchestrator_service.proto @@ -41,6 +41,7 @@ message TaskFailureDetails { google.protobuf.StringValue stackTrace = 3; TaskFailureDetails innerFailure = 4; bool isNonRetriable = 5; + map properties = 6; } enum OrchestrationStatus { @@ -469,6 +470,7 @@ message PurgeInstancesRequest { oneof request { string instanceId = 1; PurgeInstanceFilter purgeInstanceFilter = 2; + InstanceBatch instanceBatch = 4; } bool recursive = 3; } @@ -681,8 +683,7 @@ message AbandonEntityTaskResponse { } message SkipGracefulOrchestrationTerminationsRequest { - // A maximum of 500 instance IDs can be provided in this list. - repeated string instanceIds = 1; + InstanceBatch instanceBatch = 1; google.protobuf.StringValue reason = 2; } @@ -818,4 +819,9 @@ message StreamInstanceHistoryRequest { message HistoryChunk { repeated HistoryEvent events = 1; +} + +message InstanceBatch { + // A maximum of 500 instance IDs can be provided in this list. + repeated string instanceIds = 1; } \ No newline at end of file diff --git a/src/Grpc/versions.txt b/src/Grpc/versions.txt index 3e4d1b210..709a80074 100644 --- a/src/Grpc/versions.txt +++ b/src/Grpc/versions.txt @@ -1,2 +1,2 @@ -# The following files were downloaded from branch main at 2025-09-17 01:45:58 UTC -https://raw.githubusercontent.com/microsoft/durabletask-protobuf/f5745e0d83f608d77871c1894d9260ceaae08967/protos/orchestrator_service.proto +# The following files were downloaded from branch main at 2025-10-13 18:06:55 UTC +https://raw.githubusercontent.com/microsoft/durabletask-protobuf/97cf9cf6ac44107b883b0f4ab1dd62ee2332cfd9/protos/orchestrator_service.proto diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index e3e331f77..13b64fe86 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -1,1172 +1,1268 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using System.Buffers; -using System.Buffers.Text; -using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; -using System.Text; -using DurableTask.Core; -using DurableTask.Core.Command; -using DurableTask.Core.Entities; -using DurableTask.Core.Entities.OperationFormat; -using DurableTask.Core.History; -using DurableTask.Core.Tracing; -using Google.Protobuf; -using Google.Protobuf.WellKnownTypes; -using DTCore = DurableTask.Core; -using P = Microsoft.DurableTask.Protobuf; -using TraceHelper = Microsoft.DurableTask.Tracing.TraceHelper; - -namespace Microsoft.DurableTask; - -/// -/// Protobuf utilities and helpers. -/// -static class ProtoUtils -{ - /// - /// Converts a history event from to . - /// - /// The proto history event to converter. - /// The converted history event. - /// When the provided history event type is not supported. - internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) - { - return ConvertHistoryEvent(proto, conversionState: null); - } - - /// - /// Converts a history event from to , and performs - /// stateful conversions of entity-related events. - /// - /// The proto history event to converter. - /// State needed for converting entity-related history entries and actions. - /// The converted history event. - /// When the provided history event type is not supported. - internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState) - { - Check.NotNull(proto); - HistoryEvent historyEvent; - switch (proto.EventTypeCase) - { - case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew: - historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input); - break; - case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted: - OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore(); - conversionState?.SetOrchestrationInstance(instance); - historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input) - { - Name = proto.ExecutionStarted.Name, - Version = proto.ExecutionStarted.Version, - OrchestrationInstance = instance, - Tags = proto.ExecutionStarted.Tags, - ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance - { - Name = proto.ExecutionStarted.ParentInstance.Name, - Version = proto.ExecutionStarted.ParentInstance.Version, - OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(), - TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId, - }, - ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(), - }; - break; - case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted: - historyEvent = new ExecutionCompletedEvent( - proto.EventId, - proto.ExecutionCompleted.Result, - proto.ExecutionCompleted.OrchestrationStatus.ToCore()); - break; - case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated: - historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input); - break; - case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended: - historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input); - break; - case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed: - historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input); - break; - case P.HistoryEvent.EventTypeOneofCase.TaskScheduled: - historyEvent = new TaskScheduledEvent( - proto.EventId, - proto.TaskScheduled.Name, - proto.TaskScheduled.Version, - proto.TaskScheduled.Input) - { - Tags = proto.TaskScheduled.Tags, - }; - break; - case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: - historyEvent = new TaskCompletedEvent( - proto.EventId, - proto.TaskCompleted.TaskScheduledId, - proto.TaskCompleted.Result); - break; - case P.HistoryEvent.EventTypeOneofCase.TaskFailed: - historyEvent = new TaskFailedEvent( - proto.EventId, - proto.TaskFailed.TaskScheduledId, - reason: null, /* not supported */ - details: null, /* not supported */ - proto.TaskFailed.FailureDetails.ToCore()); - break; - case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated: - historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId) - { - Input = proto.SubOrchestrationInstanceCreated.Input, - InstanceId = proto.SubOrchestrationInstanceCreated.InstanceId, - Name = proto.SubOrchestrationInstanceCreated.Name, - Version = proto.SubOrchestrationInstanceCreated.Version, - }; - break; - case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted: - historyEvent = new SubOrchestrationInstanceCompletedEvent( - proto.EventId, - proto.SubOrchestrationInstanceCompleted.TaskScheduledId, - proto.SubOrchestrationInstanceCompleted.Result); - break; - case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed: - historyEvent = new SubOrchestrationInstanceFailedEvent( - proto.EventId, - proto.SubOrchestrationInstanceFailed.TaskScheduledId, - reason: null /* not supported */, - details: null /* not supported */, - proto.SubOrchestrationInstanceFailed.FailureDetails.ToCore()); - break; - case P.HistoryEvent.EventTypeOneofCase.TimerCreated: - historyEvent = new TimerCreatedEvent( - proto.EventId, - proto.TimerCreated.FireAt.ToDateTime()); - break; - case P.HistoryEvent.EventTypeOneofCase.TimerFired: - historyEvent = new TimerFiredEvent( - eventId: -1, - proto.TimerFired.FireAt.ToDateTime()) - { - TimerId = proto.TimerFired.TimerId, - }; - break; - case P.HistoryEvent.EventTypeOneofCase.OrchestratorStarted: - historyEvent = new OrchestratorStartedEvent(proto.EventId); - break; - case P.HistoryEvent.EventTypeOneofCase.OrchestratorCompleted: - historyEvent = new OrchestratorCompletedEvent(proto.EventId); - break; - case P.HistoryEvent.EventTypeOneofCase.EventSent: - historyEvent = new EventSentEvent(proto.EventId) - { - InstanceId = proto.EventSent.InstanceId, - Name = proto.EventSent.Name, - Input = proto.EventSent.Input, - }; - break; - case P.HistoryEvent.EventTypeOneofCase.EventRaised: - historyEvent = new EventRaisedEvent(proto.EventId, proto.EventRaised.Input) +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Buffers; +using System.Buffers.Text; +using System.Collections; +using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Globalization; +using System.Text; +using System.Text.Json; +using DurableTask.Core; +using DurableTask.Core.Command; +using DurableTask.Core.Entities; +using DurableTask.Core.Entities.OperationFormat; +using DurableTask.Core.History; +using DurableTask.Core.Tracing; +using Google.Protobuf; +using Google.Protobuf.Collections; +using Google.Protobuf.WellKnownTypes; +using DTCore = DurableTask.Core; +using P = Microsoft.DurableTask.Protobuf; +using TraceHelper = Microsoft.DurableTask.Tracing.TraceHelper; + +namespace Microsoft.DurableTask; + +/// +/// Protobuf utilities and helpers. +/// +static class ProtoUtils +{ + /// + /// Converts a history event from to . + /// + /// The proto history event to converter. + /// The converted history event. + /// When the provided history event type is not supported. + internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto) + { + return ConvertHistoryEvent(proto, conversionState: null); + } + + /// + /// Converts a history event from to , and performs + /// stateful conversions of entity-related events. + /// + /// The proto history event to converter. + /// State needed for converting entity-related history entries and actions. + /// The converted history event. + /// When the provided history event type is not supported. + internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityConversionState? conversionState) + { + Check.NotNull(proto); + HistoryEvent historyEvent; + switch (proto.EventTypeCase) + { + case P.HistoryEvent.EventTypeOneofCase.ContinueAsNew: + historyEvent = new ContinueAsNewEvent(proto.EventId, proto.ContinueAsNew.Input); + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionStarted: + OrchestrationInstance instance = proto.ExecutionStarted.OrchestrationInstance.ToCore(); + conversionState?.SetOrchestrationInstance(instance); + historyEvent = new ExecutionStartedEvent(proto.EventId, proto.ExecutionStarted.Input) + { + Name = proto.ExecutionStarted.Name, + Version = proto.ExecutionStarted.Version, + OrchestrationInstance = instance, + Tags = proto.ExecutionStarted.Tags, + ParentInstance = proto.ExecutionStarted.ParentInstance == null ? null : new ParentInstance + { + Name = proto.ExecutionStarted.ParentInstance.Name, + Version = proto.ExecutionStarted.ParentInstance.Version, + OrchestrationInstance = proto.ExecutionStarted.ParentInstance.OrchestrationInstance.ToCore(), + TaskScheduleId = proto.ExecutionStarted.ParentInstance.TaskScheduledId, + }, + ScheduledStartTime = proto.ExecutionStarted.ScheduledStartTimestamp?.ToDateTime(), + }; + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionCompleted: + historyEvent = new ExecutionCompletedEvent( + proto.EventId, + proto.ExecutionCompleted.Result, + proto.ExecutionCompleted.OrchestrationStatus.ToCore()); + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionTerminated: + historyEvent = new ExecutionTerminatedEvent(proto.EventId, proto.ExecutionTerminated.Input); + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionSuspended: + historyEvent = new ExecutionSuspendedEvent(proto.EventId, proto.ExecutionSuspended.Input); + break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionResumed: + historyEvent = new ExecutionResumedEvent(proto.EventId, proto.ExecutionResumed.Input); + break; + case P.HistoryEvent.EventTypeOneofCase.TaskScheduled: + historyEvent = new TaskScheduledEvent( + proto.EventId, + proto.TaskScheduled.Name, + proto.TaskScheduled.Version, + proto.TaskScheduled.Input) + { + Tags = proto.TaskScheduled.Tags, + }; + break; + case P.HistoryEvent.EventTypeOneofCase.TaskCompleted: + historyEvent = new TaskCompletedEvent( + proto.EventId, + proto.TaskCompleted.TaskScheduledId, + proto.TaskCompleted.Result); + break; + case P.HistoryEvent.EventTypeOneofCase.TaskFailed: + historyEvent = new TaskFailedEvent( + proto.EventId, + proto.TaskFailed.TaskScheduledId, + reason: null, /* not supported */ + details: null, /* not supported */ + proto.TaskFailed.FailureDetails.ToCore()); + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCreated: + historyEvent = new SubOrchestrationInstanceCreatedEvent(proto.EventId) + { + Input = proto.SubOrchestrationInstanceCreated.Input, + InstanceId = proto.SubOrchestrationInstanceCreated.InstanceId, + Name = proto.SubOrchestrationInstanceCreated.Name, + Version = proto.SubOrchestrationInstanceCreated.Version, + }; + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceCompleted: + historyEvent = new SubOrchestrationInstanceCompletedEvent( + proto.EventId, + proto.SubOrchestrationInstanceCompleted.TaskScheduledId, + proto.SubOrchestrationInstanceCompleted.Result); + break; + case P.HistoryEvent.EventTypeOneofCase.SubOrchestrationInstanceFailed: + historyEvent = new SubOrchestrationInstanceFailedEvent( + proto.EventId, + proto.SubOrchestrationInstanceFailed.TaskScheduledId, + reason: null /* not supported */, + details: null /* not supported */, + proto.SubOrchestrationInstanceFailed.FailureDetails.ToCore()); + break; + case P.HistoryEvent.EventTypeOneofCase.TimerCreated: + historyEvent = new TimerCreatedEvent( + proto.EventId, + proto.TimerCreated.FireAt.ToDateTime()); + break; + case P.HistoryEvent.EventTypeOneofCase.TimerFired: + historyEvent = new TimerFiredEvent( + eventId: -1, + proto.TimerFired.FireAt.ToDateTime()) + { + TimerId = proto.TimerFired.TimerId, + }; + break; + case P.HistoryEvent.EventTypeOneofCase.OrchestratorStarted: + historyEvent = new OrchestratorStartedEvent(proto.EventId); + break; + case P.HistoryEvent.EventTypeOneofCase.OrchestratorCompleted: + historyEvent = new OrchestratorCompletedEvent(proto.EventId); + break; + case P.HistoryEvent.EventTypeOneofCase.EventSent: + historyEvent = new EventSentEvent(proto.EventId) + { + InstanceId = proto.EventSent.InstanceId, + Name = proto.EventSent.Name, + Input = proto.EventSent.Input, + }; + break; + case P.HistoryEvent.EventTypeOneofCase.EventRaised: + historyEvent = new EventRaisedEvent(proto.EventId, proto.EventRaised.Input) + { + Name = proto.EventRaised.Name, + }; + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled: + historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance); + conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled: + historyEvent = EntityConversions.EncodeOperationSignaled(proto); + conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested: + historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance); + conversionState?.AddUnlockObligations(proto.EntityLockRequested); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent: + historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance); + conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted: + historyEvent = EntityConversions.EncodeLockGranted(proto); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: + historyEvent = EntityConversions.EncodeOperationCompleted(proto); + break; + case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed: + historyEvent = EntityConversions.EncodeOperationFailed(proto); + break; + case P.HistoryEvent.EventTypeOneofCase.GenericEvent: + historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data); + break; + case P.HistoryEvent.EventTypeOneofCase.HistoryState: + historyEvent = new HistoryStateEvent( + proto.EventId, + new OrchestrationState + { + OrchestrationInstance = new OrchestrationInstance + { + InstanceId = proto.HistoryState.OrchestrationState.InstanceId, + }, + Name = proto.HistoryState.OrchestrationState.Name, + Version = proto.HistoryState.OrchestrationState.Version, + ScheduledStartTime = proto.HistoryState.OrchestrationState.ScheduledStartTimestamp.ToDateTime(), + CreatedTime = proto.HistoryState.OrchestrationState.CreatedTimestamp.ToDateTime(), + LastUpdatedTime = proto.HistoryState.OrchestrationState.LastUpdatedTimestamp.ToDateTime(), + Input = proto.HistoryState.OrchestrationState.Input, + Output = proto.HistoryState.OrchestrationState.Output, + Status = proto.HistoryState.OrchestrationState.CustomStatus, + Tags = proto.HistoryState.OrchestrationState.Tags, + }); + break; + default: + throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported."); + } + + historyEvent.Timestamp = proto.Timestamp.ToDateTime(); + return historyEvent; + } + + /// + /// Converts a to a gRPC . + /// + /// The date-time to convert. + /// The gRPC timestamp. + internal static Timestamp ToTimestamp(this DateTime dateTime) + { + // The protobuf libraries require timestamps to be in UTC + if (dateTime.Kind == DateTimeKind.Unspecified) + { + dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc); + } + else if (dateTime.Kind == DateTimeKind.Local) + { + dateTime = dateTime.ToUniversalTime(); + } + + return Timestamp.FromDateTime(dateTime); + } + + /// + /// Converts a to a gRPC . + /// + /// The date-time to convert. + /// The gRPC timestamp. + internal static Timestamp? ToTimestamp(this DateTime? dateTime) + => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null; + + /// + /// Converts a to a gRPC . + /// + /// The date-time to convert. + /// The gRPC timestamp. + internal static Timestamp ToTimestamp(this DateTimeOffset dateTime) => Timestamp.FromDateTimeOffset(dateTime); + + /// + /// Converts a to a gRPC . + /// + /// The date-time to convert. + /// The gRPC timestamp. + internal static Timestamp? ToTimestamp(this DateTimeOffset? dateTime) + => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null; + + /// + /// Constructs a . + /// + /// The orchestrator instance ID. + /// The orchestrator execution ID. + /// The orchestrator customer status or null if no custom status. + /// The orchestrator actions. + /// + /// The completion token for the work item. It must be the exact same + /// value that was provided by the corresponding that triggered the orchestrator execution. + /// + /// The entity conversion state, or null if no conversion is required. + /// The that represents orchestration execution. + /// Whether or not a history is required to complete the orchestration request and none was provided. + /// The orchestrator response. + /// When an orchestrator action is unknown. + internal static P.OrchestratorResponse ConstructOrchestratorResponse( + string instanceId, + string executionId, + string? customStatus, + IEnumerable? actions, + string completionToken, + EntityConversionState? entityConversionState, + Activity? orchestrationActivity, + bool requiresHistory = false) + { + var response = new P.OrchestratorResponse + { + InstanceId = instanceId, + CustomStatus = customStatus, + CompletionToken = completionToken, + OrchestrationTraceContext = + new() + { + SpanID = orchestrationActivity?.SpanId.ToString(), + SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(), + }, + RequiresHistory = requiresHistory, + }; + + // If a history is required and the orchestration request was not completed, then there is no list of actions. + if (requiresHistory) + { + return response; + } + + Check.NotNull(actions); + foreach (OrchestratorAction action in actions) + { + var protoAction = new P.OrchestratorAction { Id = action.Id }; + + P.TraceContext? CreateTraceContext() + { + if (orchestrationActivity is null) + { + return null; + } + + ActivitySpanId clientSpanId = ActivitySpanId.CreateRandom(); + ActivityContext clientActivityContext = new(orchestrationActivity.TraceId, clientSpanId, orchestrationActivity.ActivityTraceFlags, orchestrationActivity.TraceStateString); + + return new P.TraceContext + { + TraceParent = $"00-{clientActivityContext.TraceId}-{clientActivityContext.SpanId}-0{clientActivityContext.TraceFlags:d}", + TraceState = clientActivityContext.TraceState, + }; + } + + switch (action.OrchestratorActionType) + { + case OrchestratorActionType.ScheduleOrchestrator: + var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action; + + protoAction.ScheduleTask = new P.ScheduleTaskAction + { + Name = scheduleTaskAction.Name, + Version = scheduleTaskAction.Version, + Input = scheduleTaskAction.Input, + ParentTraceContext = CreateTraceContext(), + }; + + if (scheduleTaskAction.Tags != null) + { + foreach (KeyValuePair tag in scheduleTaskAction.Tags) + { + protoAction.ScheduleTask.Tags[tag.Key] = tag.Value; + } + } + + break; + case OrchestratorActionType.CreateSubOrchestration: + var subOrchestrationAction = (CreateSubOrchestrationAction)action; + protoAction.CreateSubOrchestration = new P.CreateSubOrchestrationAction + { + Input = subOrchestrationAction.Input, + InstanceId = subOrchestrationAction.InstanceId, + Name = subOrchestrationAction.Name, + Version = subOrchestrationAction.Version, + ParentTraceContext = CreateTraceContext(), + }; + break; + case OrchestratorActionType.CreateTimer: + var createTimerAction = (CreateTimerOrchestratorAction)action; + protoAction.CreateTimer = new P.CreateTimerAction + { + FireAt = createTimerAction.FireAt.ToTimestamp(), + }; + break; + case OrchestratorActionType.SendEvent: + var sendEventAction = (SendEventOrchestratorAction)action; + if (sendEventAction.Instance == null) + { + throw new ArgumentException( + $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!"); + } + + if (entityConversionState is not null + && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId) + && sendEventAction.EventName is not null + && sendEventAction.EventData is not null) + { + P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction(); + protoAction.SendEntityMessage = sendAction; + + EntityConversions.DecodeEntityMessageAction( + sendEventAction.EventName, + sendEventAction.EventData, + sendEventAction.Instance.InstanceId, + sendAction, + out string requestId); + + entityConversionState.EntityRequestIds.Add(requestId); + + switch (sendAction.EntityMessageTypeCase) + { + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested: + entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested); + break; + case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent: + entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId); + break; + default: + break; + } + } + else + { + protoAction.SendEvent = new P.SendEventAction + { + Instance = sendEventAction.Instance.ToProtobuf(), + Name = sendEventAction.EventName, + Data = sendEventAction.EventData, + }; + + // Distributed Tracing: start a new trace activity derived from the orchestration + // for an EventRaisedEvent (external event) + using Activity? traceActivity = TraceHelper.StartTraceActivityForEventRaisedFromWorker(sendEventAction, instanceId, executionId); + + traceActivity?.Stop(); + } + + break; + case OrchestratorActionType.OrchestrationComplete: + + if (entityConversionState is not null) + { + // as a precaution, unlock any entities that were not unlocked for some reason, before + // completing the orchestration. + foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations()) + { + response.Actions.Add(new P.OrchestratorAction + { + Id = action.Id, + SendEntityMessage = new P.SendEntityMessageAction + { + EntityUnlockSent = new P.EntityUnlockSentEvent + { + CriticalSectionId = criticalSectionId, + TargetInstanceId = target, + ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId, + }, + }, + }); + } + } + + var completeAction = (OrchestrationCompleteOrchestratorAction)action; + protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction + { + CarryoverEvents = + { + // TODO + }, + Details = completeAction.Details, + NewVersion = completeAction.NewVersion, + OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(), + Result = completeAction.Result, + }; + + if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed) + { + protoAction.CompleteOrchestration.FailureDetails = completeAction.FailureDetails.ToProtobuf(); + } + + break; + default: + throw new NotSupportedException($"Unknown orchestrator action: {action.OrchestratorActionType}"); + } + + response.Actions.Add(protoAction); + } + + return response; + } + + /// + /// Converts a to a . + /// + /// The status to convert. + /// The converted status. + internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) + { + return (OrchestrationStatus)status; + } + + /// + /// Converts a to a . + /// + /// The status to convert. + /// The converted status. + [return: NotNullIfNotNull(nameof(status))] + internal static OrchestrationInstance? ToCore(this P.OrchestrationInstance? status) + { + if (status == null) + { + return null; + } + + return new OrchestrationInstance + { + InstanceId = status.InstanceId, + ExecutionId = status.ExecutionId, + }; + } + + /// + /// Converts a to a . + /// + /// The failure details to convert. + /// The converted failure details. + [return: NotNullIfNotNull(nameof(failureDetails))] + internal static TaskFailureDetails? ToTaskFailureDetails(this P.TaskFailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new TaskFailureDetails( + failureDetails.ErrorType, + failureDetails.ErrorMessage, + failureDetails.StackTrace, + failureDetails.InnerFailure.ToTaskFailureDetails(), + ConvertProperties(failureDetails.Properties)); + } + + /// + /// Converts a to . + /// + /// The exception to convert. + /// Optional exception properties provider. + /// The task failure details. + [return: NotNullIfNotNull(nameof(e))] + internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e, DTCore.IExceptionPropertiesProvider? exceptionPropertiesProvider = null) + { + if (e == null) + { + return null; + } + + IDictionary? properties = exceptionPropertiesProvider?.GetExceptionProperties(e); + + var taskFailureDetails = new P.TaskFailureDetails + { + ErrorType = e.GetType().FullName, + ErrorMessage = e.Message, + StackTrace = e.StackTrace, + InnerFailure = e.InnerException.ToTaskFailureDetails(exceptionPropertiesProvider), + }; + + if (properties != null) + { + foreach (var kvp in properties) + { + taskFailureDetails.Properties[kvp.Key] = ConvertObjectToValue(kvp.Value); + } + } + + return taskFailureDetails; + } + + /// + /// Converts a to a . + /// + /// The entity batch request to convert. + /// The converted entity batch request. + [return: NotNullIfNotNull(nameof(entityBatchRequest))] + internal static EntityBatchRequest? ToEntityBatchRequest(this P.EntityBatchRequest? entityBatchRequest) + { + if (entityBatchRequest == null) + { + return null; + } + + return new EntityBatchRequest() + { + EntityState = entityBatchRequest.EntityState, + InstanceId = entityBatchRequest.InstanceId, + Operations = entityBatchRequest.Operations.Select(r => r.ToOperationRequest()).ToList(), + }; + } + + /// + /// Converts a to a . + /// + /// The entity request to convert. + /// The converted request. + /// Additional info about each operation, required by DTS. + internal static void ToEntityBatchRequest( + this P.EntityRequest entityRequest, + out EntityBatchRequest batchRequest, + out List operationInfos) + { + batchRequest = new EntityBatchRequest() + { + EntityState = entityRequest.EntityState, + InstanceId = entityRequest.InstanceId, + Operations = [], // operations are added to this collection below + }; + + operationInfos = new(entityRequest.OperationRequests.Count); + + foreach (P.HistoryEvent? op in entityRequest.OperationRequests) + { + if (op.EntityOperationSignaled is not null) + { + batchRequest.Operations.Add(new OperationRequest + { + Id = Guid.Parse(op.EntityOperationSignaled.RequestId), + Operation = op.EntityOperationSignaled.Operation, + Input = op.EntityOperationSignaled.Input, + }); + operationInfos.Add(new P.OperationInfo + { + RequestId = op.EntityOperationSignaled.RequestId, + ResponseDestination = null, // means we don't send back a response to the caller + }); + } + else if (op.EntityOperationCalled is not null) + { + batchRequest.Operations.Add(new OperationRequest + { + Id = Guid.Parse(op.EntityOperationCalled.RequestId), + Operation = op.EntityOperationCalled.Operation, + Input = op.EntityOperationCalled.Input, + }); + operationInfos.Add(new P.OperationInfo + { + RequestId = op.EntityOperationCalled.RequestId, + ResponseDestination = new P.OrchestrationInstance + { + InstanceId = op.EntityOperationCalled.ParentInstanceId, + ExecutionId = op.EntityOperationCalled.ParentExecutionId, + }, + }); + } + } + } + + /// + /// Converts a to a . + /// + /// The operation request to convert. + /// The converted operation request. + [return: NotNullIfNotNull(nameof(operationRequest))] + internal static OperationRequest? ToOperationRequest(this P.OperationRequest? operationRequest) + { + if (operationRequest == null) + { + return null; + } + + return new OperationRequest() + { + Operation = operationRequest.Operation, + Input = operationRequest.Input, + Id = Guid.Parse(operationRequest.RequestId), + TraceContext = operationRequest.TraceContext != null ? + new DistributedTraceContext( + operationRequest.TraceContext.TraceParent, + operationRequest.TraceContext.TraceState) : null, + }; + } + + /// + /// Converts a to a . + /// + /// The operation result to convert. + /// The converted operation result. + [return: NotNullIfNotNull(nameof(operationResult))] + internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult) + { + if (operationResult == null) + { + return null; + } + + switch (operationResult.ResultTypeCase) + { + case P.OperationResult.ResultTypeOneofCase.Success: + return new OperationResult() + { + Result = operationResult.Success.Result, + StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(), + EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(), + }; + + case P.OperationResult.ResultTypeOneofCase.Failure: + return new OperationResult() + { + FailureDetails = operationResult.Failure.FailureDetails.ToCore(), + StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(), + EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(), + }; + + default: + throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported."); + } + } + + /// + /// Converts a to . + /// + /// The operation result to convert. + /// The converted operation result. + [return: NotNullIfNotNull(nameof(operationResult))] + internal static P.OperationResult? ToOperationResult(this OperationResult? operationResult) + { + if (operationResult == null) + { + return null; + } + + if (operationResult.FailureDetails == null) + { + return new P.OperationResult() + { + Success = new P.OperationResultSuccess() + { + Result = operationResult.Result, + StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), + EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), + }, + }; + } + else + { + return new P.OperationResult() + { + Failure = new P.OperationResultFailure() + { + FailureDetails = ToProtobuf(operationResult.FailureDetails), + StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), + EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), + }, + }; + } + } + + /// + /// Converts a to a . + /// + /// The operation action to convert. + /// The converted operation action. + [return: NotNullIfNotNull(nameof(operationAction))] + internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction) + { + if (operationAction == null) + { + return null; + } + + switch (operationAction.OperationActionTypeCase) + { + case P.OperationAction.OperationActionTypeOneofCase.SendSignal: + + return new SendSignalOperationAction() + { + Name = operationAction.SendSignal.Name, + Input = operationAction.SendSignal.Input, + InstanceId = operationAction.SendSignal.InstanceId, + ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), + RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(), + ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ? + new DistributedTraceContext( + operationAction.SendSignal.ParentTraceContext.TraceParent, + operationAction.SendSignal.ParentTraceContext.TraceState) : null, + }; + + case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration: + + return new StartNewOrchestrationOperationAction() + { + Name = operationAction.StartNewOrchestration.Name, + Input = operationAction.StartNewOrchestration.Input, + InstanceId = operationAction.StartNewOrchestration.InstanceId, + Version = operationAction.StartNewOrchestration.Version, + ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(), + RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(), + ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ? + new DistributedTraceContext( + operationAction.StartNewOrchestration.ParentTraceContext.TraceParent, + operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null, + }; + default: + throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported."); + } + } + + /// + /// Converts a to . + /// + /// The operation action to convert. + /// The converted operation action. + [return: NotNullIfNotNull(nameof(operationAction))] + internal static P.OperationAction? ToOperationAction(this OperationAction? operationAction) + { + if (operationAction == null) + { + return null; + } + + var action = new P.OperationAction(); + + switch (operationAction) + { + case SendSignalOperationAction sendSignalAction: + + action.SendSignal = new P.SendSignalAction() + { + Name = sendSignalAction.Name, + Input = sendSignalAction.Input, + InstanceId = sendSignalAction.InstanceId, + ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(), + RequestTime = sendSignalAction.RequestTime?.ToTimestamp(), + ParentTraceContext = sendSignalAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = sendSignalAction.ParentTraceContext.TraceParent, + TraceState = sendSignalAction.ParentTraceContext.TraceState, + } + : null, + }; + break; + + case StartNewOrchestrationOperationAction startNewOrchestrationAction: + + action.StartNewOrchestration = new P.StartNewOrchestrationAction() + { + Name = startNewOrchestrationAction.Name, + Input = startNewOrchestrationAction.Input, + Version = startNewOrchestrationAction.Version, + InstanceId = startNewOrchestrationAction.InstanceId, + ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(), + RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(), + ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ? + new P.TraceContext + { + TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent, + TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState, + } + : null, + }; + break; + } + + return action; + } + + /// + /// Converts a to a . + /// + /// The operation result to convert. + /// The converted operation result. + [return: NotNullIfNotNull(nameof(entityBatchResult))] + internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult) + { + if (entityBatchResult == null) + { + return null; + } + + return new EntityBatchResult() + { + Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(), + EntityState = entityBatchResult.EntityState, + Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(), + FailureDetails = entityBatchResult.FailureDetails.ToCore(), + }; + } + + /// + /// Converts a to . + /// + /// The operation result to convert. + /// The completion token, or null for the older protocol. + /// Additional information about each operation, required by DTS. + /// The converted operation result. + [return: NotNullIfNotNull(nameof(entityBatchResult))] + internal static P.EntityBatchResult? ToEntityBatchResult( + this EntityBatchResult? entityBatchResult, + string? completionToken = null, + IEnumerable? operationInfos = null) + { + if (entityBatchResult == null) + { + return null; + } + + return new P.EntityBatchResult() + { + EntityState = entityBatchResult.EntityState, + FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(), + Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] }, + Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] }, + CompletionToken = completionToken ?? string.Empty, + OperationInfos = { operationInfos ?? [] }, + }; + } + + /// + /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation. + /// + /// The DT.Core representation. + /// The gRPC representation. + [return: NotNullIfNotNull(nameof(parameters))] + internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters) + { + if (parameters == null) + { + return null; + } + + return new TaskOrchestrationEntityParameters() + { + EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(), + }; + } + + /// + /// Gets the approximate byte count for a . + /// + /// The failure details. + /// The approximate byte count. + internal static int GetApproximateByteCount(this P.TaskFailureDetails failureDetails) + { + // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar + Encoding encoding = Encoding.UTF8; + + int byteCount = 0; + if (failureDetails.ErrorType != null) + { + byteCount += encoding.GetByteCount(failureDetails.ErrorType); + } + + if (failureDetails.ErrorMessage != null) + { + byteCount += encoding.GetByteCount(failureDetails.ErrorMessage); + } + + if (failureDetails.StackTrace != null) + { + byteCount += encoding.GetByteCount(failureDetails.StackTrace); + } + + if (failureDetails.InnerFailure != null) + { + byteCount += failureDetails.InnerFailure.GetApproximateByteCount(); + } + + return byteCount; + } + + /// + /// Decode a protobuf message from a base64 string. + /// + /// The type to decode to. + /// The message parser. + /// The base64 encoded message. + /// The decoded message. + /// If decoding fails. + internal static T Base64Decode(this MessageParser parser, string encodedMessage) where T : IMessage + { + // Decode the base64 in a way that doesn't allocate a byte[] on each request + int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage); + byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount); + try + { + // The Base64 APIs require first converting the string into UTF-8 bytes. We then + // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that + // we can finally decode the protobuf request. + Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0); + OperationStatus status = Base64.DecodeFromUtf8InPlace( + buffer.AsSpan(0, encodedByteCount), + out int bytesWritten); + if (status != OperationStatus.Done) + { + throw new ArgumentException( + $"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage)); + } + + return (T)parser.ParseFrom(buffer, 0, bytesWritten); + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + + /// + /// Converts a grpc to a . + /// + /// The failure details to convert. + /// The converted failure details. + internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + return new FailureDetails( + failureDetails.ErrorType, + failureDetails.ErrorMessage, + failureDetails.StackTrace, + failureDetails.InnerFailure.ToCore(), + failureDetails.IsNonRetriable, + ConvertProperties(failureDetails.Properties)); + } + + /// + /// Converts a instance to a corresponding C# object. + /// + /// The Protobuf Value to convert. + /// The corresponding C# object. + /// + /// Thrown when the Protobuf Value.KindCase is not one of the supported types. + /// + internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value) + { + switch (value.KindCase) + { + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue: + return null; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue: + return value.NumberValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue: + string stringValue = value.StringValue; + + // If the value starts with the 'dt:' prefix, it may represent a DateTime value — attempt to parse it. + if (stringValue.StartsWith("dt:", StringComparison.Ordinal)) { - Name = proto.EventRaised.Name, - }; - break; - case P.HistoryEvent.EventTypeOneofCase.EntityOperationCalled: - historyEvent = EntityConversions.EncodeOperationCalled(proto, conversionState!.CurrentInstance); - conversionState?.EntityRequestIds.Add(proto.EntityOperationCalled.RequestId); - break; - case P.HistoryEvent.EventTypeOneofCase.EntityOperationSignaled: - historyEvent = EntityConversions.EncodeOperationSignaled(proto); - conversionState?.EntityRequestIds.Add(proto.EntityOperationSignaled.RequestId); - break; - case P.HistoryEvent.EventTypeOneofCase.EntityLockRequested: - historyEvent = EntityConversions.EncodeLockRequested(proto, conversionState!.CurrentInstance); - conversionState?.AddUnlockObligations(proto.EntityLockRequested); - break; - case P.HistoryEvent.EventTypeOneofCase.EntityUnlockSent: - historyEvent = EntityConversions.EncodeUnlockSent(proto, conversionState!.CurrentInstance); - conversionState?.RemoveUnlockObligation(proto.EntityUnlockSent.TargetInstanceId); - break; - case P.HistoryEvent.EventTypeOneofCase.EntityLockGranted: - historyEvent = EntityConversions.EncodeLockGranted(proto); - break; - case P.HistoryEvent.EventTypeOneofCase.EntityOperationCompleted: - historyEvent = EntityConversions.EncodeOperationCompleted(proto); - break; - case P.HistoryEvent.EventTypeOneofCase.EntityOperationFailed: - historyEvent = EntityConversions.EncodeOperationFailed(proto); - break; - case P.HistoryEvent.EventTypeOneofCase.GenericEvent: - historyEvent = new GenericEvent(proto.EventId, proto.GenericEvent.Data); - break; - case P.HistoryEvent.EventTypeOneofCase.HistoryState: - historyEvent = new HistoryStateEvent( - proto.EventId, - new OrchestrationState + if (DateTime.TryParse(stringValue[3..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTime date)) { - OrchestrationInstance = new OrchestrationInstance - { - InstanceId = proto.HistoryState.OrchestrationState.InstanceId, - }, - Name = proto.HistoryState.OrchestrationState.Name, - Version = proto.HistoryState.OrchestrationState.Version, - ScheduledStartTime = proto.HistoryState.OrchestrationState.ScheduledStartTimestamp.ToDateTime(), - CreatedTime = proto.HistoryState.OrchestrationState.CreatedTimestamp.ToDateTime(), - LastUpdatedTime = proto.HistoryState.OrchestrationState.LastUpdatedTimestamp.ToDateTime(), - Input = proto.HistoryState.OrchestrationState.Input, - Output = proto.HistoryState.OrchestrationState.Output, - Status = proto.HistoryState.OrchestrationState.CustomStatus, - Tags = proto.HistoryState.OrchestrationState.Tags, - }); - break; - default: - throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported."); - } - - historyEvent.Timestamp = proto.Timestamp.ToDateTime(); - return historyEvent; - } - - /// - /// Converts a to a gRPC . - /// - /// The date-time to convert. - /// The gRPC timestamp. - internal static Timestamp ToTimestamp(this DateTime dateTime) - { - // The protobuf libraries require timestamps to be in UTC - if (dateTime.Kind == DateTimeKind.Unspecified) - { - dateTime = DateTime.SpecifyKind(dateTime, DateTimeKind.Utc); - } - else if (dateTime.Kind == DateTimeKind.Local) - { - dateTime = dateTime.ToUniversalTime(); - } - - return Timestamp.FromDateTime(dateTime); - } - - /// - /// Converts a to a gRPC . - /// - /// The date-time to convert. - /// The gRPC timestamp. - internal static Timestamp? ToTimestamp(this DateTime? dateTime) - => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null; - - /// - /// Converts a to a gRPC . - /// - /// The date-time to convert. - /// The gRPC timestamp. - internal static Timestamp ToTimestamp(this DateTimeOffset dateTime) => Timestamp.FromDateTimeOffset(dateTime); - - /// - /// Converts a to a gRPC . - /// - /// The date-time to convert. - /// The gRPC timestamp. - internal static Timestamp? ToTimestamp(this DateTimeOffset? dateTime) - => dateTime.HasValue ? dateTime.Value.ToTimestamp() : null; - - /// - /// Constructs a . - /// - /// The orchestrator instance ID. - /// The orchestrator execution ID. - /// The orchestrator customer status or null if no custom status. - /// The orchestrator actions. - /// - /// The completion token for the work item. It must be the exact same - /// value that was provided by the corresponding that triggered the orchestrator execution. - /// - /// The entity conversion state, or null if no conversion is required. - /// The that represents orchestration execution. - /// Whether or not a history is required to complete the orchestration request and none was provided. - /// The orchestrator response. - /// When an orchestrator action is unknown. - internal static P.OrchestratorResponse ConstructOrchestratorResponse( - string instanceId, - string executionId, - string? customStatus, - IEnumerable? actions, - string completionToken, - EntityConversionState? entityConversionState, - Activity? orchestrationActivity, - bool requiresHistory = false) - { - var response = new P.OrchestratorResponse - { - InstanceId = instanceId, - CustomStatus = customStatus, - CompletionToken = completionToken, - OrchestrationTraceContext = - new() - { - SpanID = orchestrationActivity?.SpanId.ToString(), - SpanStartTime = orchestrationActivity?.StartTimeUtc.ToTimestamp(), - }, - RequiresHistory = requiresHistory, - }; - - // If a history is required and the orchestration request was not completed, then there is no list of actions. - if (requiresHistory) - { - return response; - } - - Check.NotNull(actions); - foreach (OrchestratorAction action in actions) - { - var protoAction = new P.OrchestratorAction { Id = action.Id }; - - P.TraceContext? CreateTraceContext() - { - if (orchestrationActivity is null) - { - return null; + return date; + } } - ActivitySpanId clientSpanId = ActivitySpanId.CreateRandom(); - ActivityContext clientActivityContext = new(orchestrationActivity.TraceId, clientSpanId, orchestrationActivity.ActivityTraceFlags, orchestrationActivity.TraceStateString); - - return new P.TraceContext + // If the value starts with the 'dto:' prefix, it may represent a DateTime value — attempt to parse it. + if (stringValue.StartsWith("dto:", StringComparison.Ordinal)) { - TraceParent = $"00-{clientActivityContext.TraceId}-{clientActivityContext.SpanId}-0{clientActivityContext.TraceFlags:d}", - TraceState = clientActivityContext.TraceState, - }; - } - - switch (action.OrchestratorActionType) - { - case OrchestratorActionType.ScheduleOrchestrator: - var scheduleTaskAction = (ScheduleTaskOrchestratorAction)action; - - protoAction.ScheduleTask = new P.ScheduleTaskAction - { - Name = scheduleTaskAction.Name, - Version = scheduleTaskAction.Version, - Input = scheduleTaskAction.Input, - ParentTraceContext = CreateTraceContext(), - }; - - if (scheduleTaskAction.Tags != null) + if (DateTimeOffset.TryParse(stringValue[4..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTimeOffset date)) { - foreach (KeyValuePair tag in scheduleTaskAction.Tags) - { - protoAction.ScheduleTask.Tags[tag.Key] = tag.Value; - } + return date; } + } - break; - case OrchestratorActionType.CreateSubOrchestration: - var subOrchestrationAction = (CreateSubOrchestrationAction)action; - protoAction.CreateSubOrchestration = new P.CreateSubOrchestrationAction - { - Input = subOrchestrationAction.Input, - InstanceId = subOrchestrationAction.InstanceId, - Name = subOrchestrationAction.Name, - Version = subOrchestrationAction.Version, - ParentTraceContext = CreateTraceContext(), - }; - break; - case OrchestratorActionType.CreateTimer: - var createTimerAction = (CreateTimerOrchestratorAction)action; - protoAction.CreateTimer = new P.CreateTimerAction - { - FireAt = createTimerAction.FireAt.ToTimestamp(), - }; - break; - case OrchestratorActionType.SendEvent: - var sendEventAction = (SendEventOrchestratorAction)action; - if (sendEventAction.Instance == null) - { - throw new ArgumentException( - $"{nameof(SendEventOrchestratorAction)} cannot have a null Instance property!"); - } - - if (entityConversionState is not null - && DTCore.Common.Entities.IsEntityInstance(sendEventAction.Instance.InstanceId) - && sendEventAction.EventName is not null - && sendEventAction.EventData is not null) - { - P.SendEntityMessageAction sendAction = new P.SendEntityMessageAction(); - protoAction.SendEntityMessage = sendAction; - - EntityConversions.DecodeEntityMessageAction( - sendEventAction.EventName, - sendEventAction.EventData, - sendEventAction.Instance.InstanceId, - sendAction, - out string requestId); - - entityConversionState.EntityRequestIds.Add(requestId); - - switch (sendAction.EntityMessageTypeCase) - { - case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityLockRequested: - entityConversionState.AddUnlockObligations(sendAction.EntityLockRequested); - break; - case P.SendEntityMessageAction.EntityMessageTypeOneofCase.EntityUnlockSent: - entityConversionState.RemoveUnlockObligation(sendAction.EntityUnlockSent.TargetInstanceId); - break; - default: - break; - } - } - else - { - protoAction.SendEvent = new P.SendEventAction - { - Instance = sendEventAction.Instance.ToProtobuf(), - Name = sendEventAction.EventName, - Data = sendEventAction.EventData, - }; - - // Distributed Tracing: start a new trace activity derived from the orchestration - // for an EventRaisedEvent (external event) - using Activity? traceActivity = TraceHelper.StartTraceActivityForEventRaisedFromWorker(sendEventAction, instanceId, executionId); - - traceActivity?.Stop(); - } - - break; - case OrchestratorActionType.OrchestrationComplete: - - if (entityConversionState is not null) - { - // as a precaution, unlock any entities that were not unlocked for some reason, before - // completing the orchestration. - foreach ((string target, string criticalSectionId) in entityConversionState.ResetObligations()) - { - response.Actions.Add(new P.OrchestratorAction - { - Id = action.Id, - SendEntityMessage = new P.SendEntityMessageAction - { - EntityUnlockSent = new P.EntityUnlockSentEvent - { - CriticalSectionId = criticalSectionId, - TargetInstanceId = target, - ParentInstanceId = entityConversionState.CurrentInstance?.InstanceId, - }, - }, - }); - } - } - - var completeAction = (OrchestrationCompleteOrchestratorAction)action; - protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction - { - CarryoverEvents = - { - // TODO - }, - Details = completeAction.Details, - NewVersion = completeAction.NewVersion, - OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(), - Result = completeAction.Result, - }; - - if (completeAction.OrchestrationStatus == OrchestrationStatus.Failed) - { - protoAction.CompleteOrchestration.FailureDetails = completeAction.FailureDetails.ToProtobuf(); - } - - break; - default: - throw new NotSupportedException($"Unknown orchestrator action: {action.OrchestratorActionType}"); - } - - response.Actions.Add(protoAction); - } - - return response; - } - - /// - /// Converts a to a . - /// - /// The status to convert. - /// The converted status. - internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status) - { - return (OrchestrationStatus)status; - } - - /// - /// Converts a to a . - /// - /// The status to convert. - /// The converted status. - [return: NotNullIfNotNull(nameof(status))] - internal static OrchestrationInstance? ToCore(this P.OrchestrationInstance? status) - { - if (status == null) - { - return null; - } - - return new OrchestrationInstance - { - InstanceId = status.InstanceId, - ExecutionId = status.ExecutionId, - }; - } - - /// - /// Converts a to a . - /// - /// The failure details to convert. - /// The converted failure details. - [return: NotNullIfNotNull(nameof(failureDetails))] - internal static TaskFailureDetails? ToTaskFailureDetails(this P.TaskFailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new TaskFailureDetails( - failureDetails.ErrorType, - failureDetails.ErrorMessage, - failureDetails.StackTrace, - failureDetails.InnerFailure.ToTaskFailureDetails()); - } - - /// - /// Converts a to . - /// - /// The exception to convert. - /// The task failure details. - [return: NotNullIfNotNull(nameof(e))] - internal static P.TaskFailureDetails? ToTaskFailureDetails(this Exception? e) - { - if (e == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = e.GetType().FullName, - ErrorMessage = e.Message, - StackTrace = e.StackTrace, - InnerFailure = e.InnerException.ToTaskFailureDetails(), - }; - } - - /// - /// Converts a to a . - /// - /// The entity batch request to convert. - /// The converted entity batch request. - [return: NotNullIfNotNull(nameof(entityBatchRequest))] - internal static EntityBatchRequest? ToEntityBatchRequest(this P.EntityBatchRequest? entityBatchRequest) - { - if (entityBatchRequest == null) - { - return null; - } - - return new EntityBatchRequest() - { - EntityState = entityBatchRequest.EntityState, - InstanceId = entityBatchRequest.InstanceId, - Operations = entityBatchRequest.Operations.Select(r => r.ToOperationRequest()).ToList(), - }; - } - - /// - /// Converts a to a . - /// - /// The entity request to convert. - /// The converted request. - /// Additional info about each operation, required by DTS. - internal static void ToEntityBatchRequest( - this P.EntityRequest entityRequest, - out EntityBatchRequest batchRequest, - out List operationInfos) - { - batchRequest = new EntityBatchRequest() - { - EntityState = entityRequest.EntityState, - InstanceId = entityRequest.InstanceId, - Operations = [], // operations are added to this collection below - }; - - operationInfos = new(entityRequest.OperationRequests.Count); - - foreach (P.HistoryEvent? op in entityRequest.OperationRequests) - { - if (op.EntityOperationSignaled is not null) - { - batchRequest.Operations.Add(new OperationRequest - { - Id = Guid.Parse(op.EntityOperationSignaled.RequestId), - Operation = op.EntityOperationSignaled.Operation, - Input = op.EntityOperationSignaled.Input, - }); - operationInfos.Add(new P.OperationInfo - { - RequestId = op.EntityOperationSignaled.RequestId, - ResponseDestination = null, // means we don't send back a response to the caller - }); - } - else if (op.EntityOperationCalled is not null) - { - batchRequest.Operations.Add(new OperationRequest - { - Id = Guid.Parse(op.EntityOperationCalled.RequestId), - Operation = op.EntityOperationCalled.Operation, - Input = op.EntityOperationCalled.Input, - }); - operationInfos.Add(new P.OperationInfo - { - RequestId = op.EntityOperationCalled.RequestId, - ResponseDestination = new P.OrchestrationInstance - { - InstanceId = op.EntityOperationCalled.ParentInstanceId, - ExecutionId = op.EntityOperationCalled.ParentExecutionId, - }, - }); - } - } - } - - /// - /// Converts a to a . - /// - /// The operation request to convert. - /// The converted operation request. - [return: NotNullIfNotNull(nameof(operationRequest))] - internal static OperationRequest? ToOperationRequest(this P.OperationRequest? operationRequest) - { - if (operationRequest == null) - { - return null; - } - - return new OperationRequest() - { - Operation = operationRequest.Operation, - Input = operationRequest.Input, - Id = Guid.Parse(operationRequest.RequestId), - TraceContext = operationRequest.TraceContext != null ? - new DistributedTraceContext( - operationRequest.TraceContext.TraceParent, - operationRequest.TraceContext.TraceState) : null, - }; - } - - /// - /// Converts a to a . - /// - /// The operation result to convert. - /// The converted operation result. - [return: NotNullIfNotNull(nameof(operationResult))] - internal static OperationResult? ToOperationResult(this P.OperationResult? operationResult) - { - if (operationResult == null) - { - return null; - } - - switch (operationResult.ResultTypeCase) - { - case P.OperationResult.ResultTypeOneofCase.Success: - return new OperationResult() - { - Result = operationResult.Success.Result, - StartTimeUtc = operationResult.Success.StartTimeUtc?.ToDateTime(), - EndTimeUtc = operationResult.Success.EndTimeUtc?.ToDateTime(), - }; - - case P.OperationResult.ResultTypeOneofCase.Failure: - return new OperationResult() - { - FailureDetails = operationResult.Failure.FailureDetails.ToCore(), - StartTimeUtc = operationResult.Failure.StartTimeUtc?.ToDateTime(), - EndTimeUtc = operationResult.Failure.EndTimeUtc?.ToDateTime(), - }; - - default: - throw new NotSupportedException($"Deserialization of {operationResult.ResultTypeCase} is not supported."); - } - } - - /// - /// Converts a to . - /// - /// The operation result to convert. - /// The converted operation result. - [return: NotNullIfNotNull(nameof(operationResult))] - internal static P.OperationResult? ToOperationResult(this OperationResult? operationResult) - { - if (operationResult == null) - { - return null; - } - - if (operationResult.FailureDetails == null) - { - return new P.OperationResult() - { - Success = new P.OperationResultSuccess() - { - Result = operationResult.Result, - StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), - EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), - }, - }; - } - else - { - return new P.OperationResult() - { - Failure = new P.OperationResultFailure() - { - FailureDetails = ToProtobuf(operationResult.FailureDetails), - StartTimeUtc = operationResult.StartTimeUtc?.ToTimestamp(), - EndTimeUtc = operationResult.EndTimeUtc?.ToTimestamp(), - }, - }; - } - } - - /// - /// Converts a to a . - /// - /// The operation action to convert. - /// The converted operation action. - [return: NotNullIfNotNull(nameof(operationAction))] - internal static OperationAction? ToOperationAction(this P.OperationAction? operationAction) - { - if (operationAction == null) - { - return null; - } - - switch (operationAction.OperationActionTypeCase) - { - case P.OperationAction.OperationActionTypeOneofCase.SendSignal: - - return new SendSignalOperationAction() - { - Name = operationAction.SendSignal.Name, - Input = operationAction.SendSignal.Input, - InstanceId = operationAction.SendSignal.InstanceId, - ScheduledTime = operationAction.SendSignal.ScheduledTime?.ToDateTime(), - RequestTime = operationAction.SendSignal.RequestTime?.ToDateTimeOffset(), - ParentTraceContext = operationAction.SendSignal.ParentTraceContext != null ? - new DistributedTraceContext( - operationAction.SendSignal.ParentTraceContext.TraceParent, - operationAction.SendSignal.ParentTraceContext.TraceState) : null, - }; - - case P.OperationAction.OperationActionTypeOneofCase.StartNewOrchestration: - - return new StartNewOrchestrationOperationAction() - { - Name = operationAction.StartNewOrchestration.Name, - Input = operationAction.StartNewOrchestration.Input, - InstanceId = operationAction.StartNewOrchestration.InstanceId, - Version = operationAction.StartNewOrchestration.Version, - ScheduledStartTime = operationAction.StartNewOrchestration.ScheduledTime?.ToDateTime(), - RequestTime = operationAction.StartNewOrchestration.RequestTime?.ToDateTimeOffset(), - ParentTraceContext = operationAction.StartNewOrchestration.ParentTraceContext != null ? - new DistributedTraceContext( - operationAction.StartNewOrchestration.ParentTraceContext.TraceParent, - operationAction.StartNewOrchestration.ParentTraceContext.TraceState) : null, - }; - default: - throw new NotSupportedException($"Deserialization of {operationAction.OperationActionTypeCase} is not supported."); - } - } - - /// - /// Converts a to . - /// - /// The operation action to convert. - /// The converted operation action. - [return: NotNullIfNotNull(nameof(operationAction))] - internal static P.OperationAction? ToOperationAction(this OperationAction? operationAction) - { - if (operationAction == null) - { - return null; - } - - var action = new P.OperationAction(); - - switch (operationAction) - { - case SendSignalOperationAction sendSignalAction: - - action.SendSignal = new P.SendSignalAction() - { - Name = sendSignalAction.Name, - Input = sendSignalAction.Input, - InstanceId = sendSignalAction.InstanceId, - ScheduledTime = sendSignalAction.ScheduledTime?.ToTimestamp(), - RequestTime = sendSignalAction.RequestTime?.ToTimestamp(), - ParentTraceContext = sendSignalAction.ParentTraceContext != null ? - new P.TraceContext - { - TraceParent = sendSignalAction.ParentTraceContext.TraceParent, - TraceState = sendSignalAction.ParentTraceContext.TraceState, - } - : null, - }; - break; - - case StartNewOrchestrationOperationAction startNewOrchestrationAction: - - action.StartNewOrchestration = new P.StartNewOrchestrationAction() - { - Name = startNewOrchestrationAction.Name, - Input = startNewOrchestrationAction.Input, - Version = startNewOrchestrationAction.Version, - InstanceId = startNewOrchestrationAction.InstanceId, - ScheduledTime = startNewOrchestrationAction.ScheduledStartTime?.ToTimestamp(), - RequestTime = startNewOrchestrationAction.RequestTime?.ToTimestamp(), - ParentTraceContext = startNewOrchestrationAction.ParentTraceContext != null ? - new P.TraceContext - { - TraceParent = startNewOrchestrationAction.ParentTraceContext.TraceParent, - TraceState = startNewOrchestrationAction.ParentTraceContext.TraceState, - } - : null, - }; - break; - } - - return action; - } - - /// - /// Converts a to a . - /// - /// The operation result to convert. - /// The converted operation result. - [return: NotNullIfNotNull(nameof(entityBatchResult))] - internal static EntityBatchResult? ToEntityBatchResult(this P.EntityBatchResult? entityBatchResult) - { - if (entityBatchResult == null) - { - return null; - } - - return new EntityBatchResult() - { - Actions = entityBatchResult.Actions.Select(operationAction => operationAction!.ToOperationAction()).ToList(), - EntityState = entityBatchResult.EntityState, - Results = entityBatchResult.Results.Select(operationResult => operationResult!.ToOperationResult()).ToList(), - FailureDetails = entityBatchResult.FailureDetails.ToCore(), - }; - } - - /// - /// Converts a to . - /// - /// The operation result to convert. - /// The completion token, or null for the older protocol. - /// Additional information about each operation, required by DTS. - /// The converted operation result. - [return: NotNullIfNotNull(nameof(entityBatchResult))] - internal static P.EntityBatchResult? ToEntityBatchResult( - this EntityBatchResult? entityBatchResult, - string? completionToken = null, - IEnumerable? operationInfos = null) - { - if (entityBatchResult == null) - { - return null; - } - - return new P.EntityBatchResult() - { - EntityState = entityBatchResult.EntityState, - FailureDetails = entityBatchResult.FailureDetails.ToProtobuf(), - Actions = { entityBatchResult.Actions?.Select(a => a.ToOperationAction()) ?? [] }, - Results = { entityBatchResult.Results?.Select(a => a.ToOperationResult()) ?? [] }, - CompletionToken = completionToken ?? string.Empty, - OperationInfos = { operationInfos ?? [] }, - }; - } - - /// - /// Converts the gRPC representation of orchestrator entity parameters to the DT.Core representation. - /// - /// The DT.Core representation. - /// The gRPC representation. - [return: NotNullIfNotNull(nameof(parameters))] - internal static TaskOrchestrationEntityParameters? ToCore(this P.OrchestratorEntityParameters? parameters) - { - if (parameters == null) - { - return null; - } - - return new TaskOrchestrationEntityParameters() - { - EntityMessageReorderWindow = parameters.EntityMessageReorderWindow.ToTimeSpan(), - }; - } - - /// - /// Gets the approximate byte count for a . - /// - /// The failure details. - /// The approximate byte count. - internal static int GetApproximateByteCount(this P.TaskFailureDetails failureDetails) - { - // Protobuf strings are always UTF-8: https://developers.google.com/protocol-buffers/docs/proto3#scalar - Encoding encoding = Encoding.UTF8; - - int byteCount = 0; - if (failureDetails.ErrorType != null) - { - byteCount += encoding.GetByteCount(failureDetails.ErrorType); - } - - if (failureDetails.ErrorMessage != null) - { - byteCount += encoding.GetByteCount(failureDetails.ErrorMessage); - } - - if (failureDetails.StackTrace != null) - { - byteCount += encoding.GetByteCount(failureDetails.StackTrace); - } - - if (failureDetails.InnerFailure != null) - { - byteCount += failureDetails.InnerFailure.GetApproximateByteCount(); - } - - return byteCount; - } - - /// - /// Decode a protobuf message from a base64 string. - /// - /// The type to decode to. - /// The message parser. - /// The base64 encoded message. - /// The decoded message. - /// If decoding fails. - internal static T Base64Decode(this MessageParser parser, string encodedMessage) where T : IMessage - { - // Decode the base64 in a way that doesn't allocate a byte[] on each request - int encodedByteCount = Encoding.UTF8.GetByteCount(encodedMessage); - byte[] buffer = ArrayPool.Shared.Rent(encodedByteCount); - try - { - // The Base64 APIs require first converting the string into UTF-8 bytes. We then - // do an in-place conversion from base64 UTF-8 bytes to protobuf bytes so that - // we can finally decode the protobuf request. - Encoding.UTF8.GetBytes(encodedMessage, 0, encodedMessage.Length, buffer, 0); - OperationStatus status = Base64.DecodeFromUtf8InPlace( - buffer.AsSpan(0, encodedByteCount), - out int bytesWritten); - if (status != OperationStatus.Done) - { - throw new ArgumentException( - $"Failed to base64-decode the '{typeof(T).Name}' payload: {status}", nameof(encodedMessage)); - } - - return (T)parser.ParseFrom(buffer, 0, bytesWritten); - } - finally - { - ArrayPool.Shared.Return(buffer); - } - } - - /// - /// Converts a grpc to a . - /// - /// The failure details to convert. - /// The converted failure details. - internal static FailureDetails? ToCore(this P.TaskFailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new FailureDetails( - failureDetails.ErrorType, - failureDetails.ErrorMessage, - failureDetails.StackTrace, - failureDetails.InnerFailure.ToCore(), - failureDetails.IsNonRetriable); - } - - /// - /// Converts a instance to a corresponding C# object. - /// - /// The Protobuf Value to convert. - /// The corresponding C# object. - /// - /// Thrown when the Protobuf Value.KindCase is not one of the supported types. - /// - internal static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value) - { - switch (value.KindCase) - { - case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue: - return null; - case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue: - return value.NumberValue; - case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue: - return value.StringValue; - case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue: - return value.BoolValue; - case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue: - return value.StructValue.Fields.ToDictionary( - pair => pair.Key, - pair => ConvertValueToObject(pair.Value)); - case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue: - return value.ListValue.Values.Select(ConvertValueToObject).ToList(); + // Otherwise just return as string + return stringValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue: + return value.BoolValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue: + return value.StructValue.Fields.ToDictionary( + pair => pair.Key, + pair => ConvertValueToObject(pair.Value)); + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue: + return value.ListValue.Values.Select(ConvertValueToObject).ToList(); default: - throw new NotSupportedException($"Unsupported Value kind: {value.KindCase}"); - } + // Fallback: serialize the whole value to JSON string + return JsonSerializer.Serialize(value); + } } - /// - /// Converts a to a grpc . - /// - /// The failure details to convert. - /// The converted failure details. - static P.TaskFailureDetails? ToProtobuf(this FailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = failureDetails.ErrorType ?? "(unknown)", - ErrorMessage = failureDetails.ErrorMessage ?? "(unknown)", - StackTrace = failureDetails.StackTrace, - IsNonRetriable = failureDetails.IsNonRetriable, - InnerFailure = failureDetails.InnerFailure.ToProtobuf(), - }; - } - - static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status) - { - return (P.OrchestrationStatus)status; - } - - static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance) - { - return new P.OrchestrationInstance - { - InstanceId = instance.InstanceId, - ExecutionId = instance.ExecutionId, - }; - } - - /// - /// Tracks state required for converting orchestration histories containing entity-related events. - /// - internal class EntityConversionState - { - readonly bool insertMissingEntityUnlocks; - - OrchestrationInstance? instance; - HashSet? entityRequestIds; - Dictionary? unlockObligations; - - /// - /// Initializes a new instance of the class. - /// - /// Whether to insert missing unlock events in to the history - /// when the orchestration completes. - public EntityConversionState(bool insertMissingEntityUnlocks) - { - this.ConvertFromProto = (P.HistoryEvent e) => ProtoUtils.ConvertHistoryEvent(e, this); - this.insertMissingEntityUnlocks = insertMissingEntityUnlocks; - } - - /// - /// Gets a function that converts a history event in protobuf format to a core history event. - /// - public Func ConvertFromProto { get; } - - /// - /// Gets the orchestration instance of this history. - /// - public OrchestrationInstance? CurrentInstance => this.instance; - - /// - /// Gets the set of guids that have been used as entity request ids in this history. - /// - public HashSet EntityRequestIds => this.entityRequestIds ??= new(); - - /// - /// Records the orchestration instance, which may be needed for some conversions. - /// - /// The orchestration instance. - public void SetOrchestrationInstance(OrchestrationInstance instance) - { - this.instance = instance; - } - - /// - /// Adds unlock obligations for all entities that are being locked by this request. - /// - /// The lock request. - public void AddUnlockObligations(P.EntityLockRequestedEvent request) - { - if (!this.insertMissingEntityUnlocks) - { - return; - } - - this.unlockObligations ??= new(); - - foreach (string target in request.LockSet) - { - this.unlockObligations[target] = request.CriticalSectionId; - } - } - - /// - /// Removes an unlock obligation. - /// - /// The target entity. - public void RemoveUnlockObligation(string target) - { - if (!this.insertMissingEntityUnlocks) - { - return; - } - - this.unlockObligations?.Remove(target); - } - - /// - /// Returns the remaining unlock obligations, and clears the list. - /// - /// The unlock obligations. - public IEnumerable<(string Target, string CriticalSectionId)> ResetObligations() - { - if (!this.insertMissingEntityUnlocks) - { - yield break; - } - - if (this.unlockObligations is not null) - { - foreach (var kvp in this.unlockObligations) - { - yield return (kvp.Key, kvp.Value); - } - - this.unlockObligations = null; - } - } - } -} + /// + /// Converts a MapFieldinto a IDictionary. + /// + /// The map to convert. + /// Dictionary contains the converted obejct. + internal static IDictionary ConvertProperties(MapField properties) + { + return properties.ToDictionary( + kvp => kvp.Key, + kvp => ConvertValueToObject(kvp.Value)); + } + + /// + /// Converts a C# object to a protobuf Value. + /// + /// The object to convert. + /// The converted protobuf Value. + internal static Value ConvertObjectToValue(object? obj) + { + return obj switch + { + null => Value.ForNull(), + string str => Value.ForString(str), + bool b => Value.ForBool(b), + int i => Value.ForNumber(i), + long l => Value.ForNumber(l), + float f => Value.ForNumber(f), + double d => Value.ForNumber(d), + decimal dec => Value.ForNumber((double)dec), + + // For DateTime and DateTimeOffset, add prefix to distinguish from normal string. + DateTime dt => Value.ForString($"dt:{dt.ToString("O")}"), + DateTimeOffset dto => Value.ForString($"dto:{dto.ToString("O")}"), + IDictionary dict => Value.ForStruct(new Struct + { + Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) }, + }), + IEnumerable e => Value.ForList(e.Cast().Select(ConvertObjectToValue).ToArray()), + + // Fallback: convert unlisted type to string. + _ => Value.ForString(obj.ToString() ?? string.Empty), + }; + } + + /// + /// Converts a to a grpc . + /// + /// The failure details to convert. + /// The converted failure details. + static P.TaskFailureDetails? ToProtobuf(this FailureDetails? failureDetails) + { + if (failureDetails == null) + { + return null; + } + + var taskFailureDetails = new P.TaskFailureDetails + { + ErrorType = failureDetails.ErrorType ?? "(unknown)", + ErrorMessage = failureDetails.ErrorMessage ?? "(unknown)", + StackTrace = failureDetails.StackTrace, + IsNonRetriable = failureDetails.IsNonRetriable, + InnerFailure = failureDetails.InnerFailure.ToProtobuf(), + }; + + // Properly populate the MapField + if (failureDetails.Properties != null) + { + foreach (var kvp in failureDetails.Properties) + { + taskFailureDetails.Properties[kvp.Key] = ConvertObjectToValue(kvp.Value); + } + } + + return taskFailureDetails; + } + + static P.OrchestrationStatus ToProtobuf(this OrchestrationStatus status) + { + return (P.OrchestrationStatus)status; + } + + static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance) + { + return new P.OrchestrationInstance + { + InstanceId = instance.InstanceId, + ExecutionId = instance.ExecutionId, + }; + } + + /// + /// Tracks state required for converting orchestration histories containing entity-related events. + /// + internal class EntityConversionState + { + readonly bool insertMissingEntityUnlocks; + + OrchestrationInstance? instance; + HashSet? entityRequestIds; + Dictionary? unlockObligations; + + /// + /// Initializes a new instance of the class. + /// + /// Whether to insert missing unlock events in to the history + /// when the orchestration completes. + public EntityConversionState(bool insertMissingEntityUnlocks) + { + this.ConvertFromProto = (P.HistoryEvent e) => ProtoUtils.ConvertHistoryEvent(e, this); + this.insertMissingEntityUnlocks = insertMissingEntityUnlocks; + } + + /// + /// Gets a function that converts a history event in protobuf format to a core history event. + /// + public Func ConvertFromProto { get; } + + /// + /// Gets the orchestration instance of this history. + /// + public OrchestrationInstance? CurrentInstance => this.instance; + + /// + /// Gets the set of guids that have been used as entity request ids in this history. + /// + public HashSet EntityRequestIds => this.entityRequestIds ??= new(); + + /// + /// Records the orchestration instance, which may be needed for some conversions. + /// + /// The orchestration instance. + public void SetOrchestrationInstance(OrchestrationInstance instance) + { + this.instance = instance; + } + + /// + /// Adds unlock obligations for all entities that are being locked by this request. + /// + /// The lock request. + public void AddUnlockObligations(P.EntityLockRequestedEvent request) + { + if (!this.insertMissingEntityUnlocks) + { + return; + } + + this.unlockObligations ??= new(); + + foreach (string target in request.LockSet) + { + this.unlockObligations[target] = request.CriticalSectionId; + } + } + + /// + /// Removes an unlock obligation. + /// + /// The target entity. + public void RemoveUnlockObligation(string target) + { + if (!this.insertMissingEntityUnlocks) + { + return; + } + + this.unlockObligations?.Remove(target); + } + + /// + /// Returns the remaining unlock obligations, and clears the list. + /// + /// The unlock obligations. + public IEnumerable<(string Target, string CriticalSectionId)> ResetObligations() + { + if (!this.insertMissingEntityUnlocks) + { + yield break; + } + + if (this.unlockObligations is not null) + { + foreach (var kvp in this.unlockObligations) + { + yield return (kvp.Key, kvp.Value); + } + + this.unlockObligations = null; + } + } + } +} diff --git a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs index 7e70990b7..7eae49d95 100644 --- a/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs +++ b/src/Worker/Core/DependencyInjection/DefaultDurableTaskWorkerBuilder.cs @@ -54,7 +54,19 @@ public IHostedService Build(IServiceProvider serviceProvider) Verify.NotNull(this.buildTarget, error); DurableTaskRegistry registry = serviceProvider.GetOptions(this.Name); - return (IHostedService)ActivatorUtilities.CreateInstance( - serviceProvider, this.buildTarget, this.Name, registry.BuildFactory()); + + // Get the IExceptionPropertiesProvider from DI if registered + IExceptionPropertiesProvider? exceptionPropertiesProvider = serviceProvider.GetService(); + + if (exceptionPropertiesProvider != null) + { + return (IHostedService)ActivatorUtilities.CreateInstance( + serviceProvider, this.buildTarget, this.Name, registry.BuildFactory(), exceptionPropertiesProvider); + } + else + { + return (IHostedService)ActivatorUtilities.CreateInstance( + serviceProvider, this.buildTarget, this.Name, registry.BuildFactory()); + } } } diff --git a/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs b/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs new file mode 100644 index 000000000..bffe8ccb0 --- /dev/null +++ b/src/Worker/Core/ExceptionPropertiesProviderAdapter.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; + +namespace Microsoft.DurableTask.Worker; + +/// +/// Adapts a Microsoft.DurableTask.Worker IExceptionPropertiesProvider to DurableTask.Core IExceptionPropertiesProvider. +/// +public sealed class ExceptionPropertiesProviderAdapter : global::DurableTask.Core.IExceptionPropertiesProvider +{ + readonly IExceptionPropertiesProvider inner; + + /// + /// Initializes a new instance of the class. + /// + /// The inner provider to adapt. + public ExceptionPropertiesProviderAdapter(IExceptionPropertiesProvider inner) + { + this.inner = inner ?? throw new ArgumentNullException(nameof(inner)); + } + + /// + /// Gets exception properties from the inner provider. + /// + /// The exception to get properties for. + /// The exception properties dictionary. + public IDictionary? GetExceptionProperties(Exception exception) + => this.inner.GetExceptionProperties(exception); +} diff --git a/src/Worker/Core/Hosting/DurableTaskWorker.cs b/src/Worker/Core/Hosting/DurableTaskWorker.cs index b788a5270..01f12d5c4 100644 --- a/src/Worker/Core/Hosting/DurableTaskWorker.cs +++ b/src/Worker/Core/Hosting/DurableTaskWorker.cs @@ -31,4 +31,9 @@ protected DurableTaskWorker(string? name, IDurableTaskFactory factory) /// the configured tasks during host construction. /// protected virtual IDurableTaskFactory Factory { get; } + + /// + /// Gets or sets the exception properties provider used to enrich failure details with custom exception properties. + /// + protected IExceptionPropertiesProvider? ExceptionPropertiesProvider { get; set; } } diff --git a/src/Worker/Core/IExceptionPropertiesProvider.cs b/src/Worker/Core/IExceptionPropertiesProvider.cs new file mode 100644 index 000000000..e0205670e --- /dev/null +++ b/src/Worker/Core/IExceptionPropertiesProvider.cs @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +namespace Microsoft.DurableTask.Worker; + +/// +/// Provides custom exception property inclusion rules for enriching FailureDetails. +/// +public interface IExceptionPropertiesProvider +{ + /// + /// Extracts custom properties from an exception. + /// + /// The exception to extract properties from. + /// A dictionary of custom properties to include in the FailureDetails, or null if no properties should be added. + IDictionary? GetExceptionProperties(Exception exception); +} diff --git a/src/Worker/Core/Shims/DurableTaskShimFactory.cs b/src/Worker/Core/Shims/DurableTaskShimFactory.cs index 584b7eeb8..3ae1cbc7c 100644 --- a/src/Worker/Core/Shims/DurableTaskShimFactory.cs +++ b/src/Worker/Core/Shims/DurableTaskShimFactory.cs @@ -36,7 +36,7 @@ public DurableTaskShimFactory( /// /// Gets the default with default values. /// - public static DurableTaskShimFactory Default { get; } = new(); + public static DurableTaskShimFactory Default { get; } = new(null, null); /// /// Creates a from a . diff --git a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs index 330fd1888..a5483073c 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationEntityContext.cs @@ -168,7 +168,8 @@ static TaskFailureDetails ConvertFailureDetails(FailureDetails failureDetails) failureDetails.ErrorType, failureDetails.ErrorMessage, failureDetails.StackTrace, - failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null); + failureDetails.InnerFailure != null ? ConvertFailureDetails(failureDetails.InnerFailure) : null, + failureDetails.Properties); async Task CallEntityInternalAsync(EntityInstanceId id, string operationName, object? input) { diff --git a/src/Worker/Core/Shims/TaskOrchestrationShim.cs b/src/Worker/Core/Shims/TaskOrchestrationShim.cs index 127038a38..eb7a179be 100644 --- a/src/Worker/Core/Shims/TaskOrchestrationShim.cs +++ b/src/Worker/Core/Shims/TaskOrchestrationShim.cs @@ -95,7 +95,9 @@ public TaskOrchestrationShim( // failure details are correctly propagated. throw new CoreTaskFailedException(e.Message, e.InnerException) { - FailureDetails = new FailureDetails(e, e.FailureDetails.ToCoreFailureDetails()), + FailureDetails = new FailureDetails(e, + e.FailureDetails.ToCoreFailureDetails(), + properties: e.FailureDetails.Properties), }; } finally diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs index 3e921252e..4dfa70f7f 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs @@ -33,16 +33,20 @@ class Processor readonly TaskHubSidecarServiceClient client; readonly DurableTaskShimFactory shimFactory; readonly GrpcDurableTaskWorkerOptions.InternalOptions internalOptions; + readonly DTCore.IExceptionPropertiesProvider? exceptionPropertiesProvider; [Obsolete("Experimental")] readonly IOrchestrationFilter? orchestrationFilter; - public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null) + public Processor(GrpcDurableTaskWorker worker, TaskHubSidecarServiceClient client, IOrchestrationFilter? orchestrationFilter = null, IExceptionPropertiesProvider? exceptionPropertiesProvider = null) { this.worker = worker; this.client = client; this.shimFactory = new DurableTaskShimFactory(this.worker.grpcOptions, this.worker.loggerFactory); this.internalOptions = this.worker.grpcOptions.Internal; this.orchestrationFilter = orchestrationFilter; + this.exceptionPropertiesProvider = exceptionPropertiesProvider is not null + ? new ExceptionPropertiesProviderAdapter(exceptionPropertiesProvider) + : null; } ILogger Logger => this.worker.logger; @@ -636,7 +640,8 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), - ErrorPropagationMode.UseFailureDetails); + ErrorPropagationMode.UseFailureDetails, + this.exceptionPropertiesProvider); result = executor.Execute(); } else @@ -654,7 +659,7 @@ await this.client.AbandonTaskOrchestratorWorkItemAsync( { // This is not expected: Normally TaskOrchestrationExecutor handles exceptions in user code. this.Logger.OrchestratorFailed(name, request.InstanceId, unexpected.ToString()); - failureDetails = unexpected.ToTaskFailureDetails(); + failureDetails = unexpected.ToTaskFailureDetails(this.exceptionPropertiesProvider); } P.OrchestratorResponse response; @@ -764,6 +769,8 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, P.TaskFailureDetails? failureDetails = null; TaskContext innerContext = new(instance); + innerContext.ExceptionPropertiesProvider = this.exceptionPropertiesProvider; + TaskName name = new(request.Name); string? output = null; @@ -792,7 +799,7 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken, } catch (Exception applicationException) { - failureDetails = applicationException.ToTaskFailureDetails(); + failureDetails = applicationException.ToTaskFailureDetails(this.exceptionPropertiesProvider); } } else diff --git a/src/Worker/Grpc/GrpcDurableTaskWorker.cs b/src/Worker/Grpc/GrpcDurableTaskWorker.cs index 463af441f..da61d8849 100644 --- a/src/Worker/Grpc/GrpcDurableTaskWorker.cs +++ b/src/Worker/Grpc/GrpcDurableTaskWorker.cs @@ -29,6 +29,7 @@ sealed partial class GrpcDurableTaskWorker : DurableTaskWorker /// The service provider. /// The logger. /// The optional used to filter orchestration execution. + /// The custom exception properties provider that help build failure details. public GrpcDurableTaskWorker( string name, IDurableTaskFactory factory, @@ -36,7 +37,8 @@ public GrpcDurableTaskWorker( IOptionsMonitor workerOptions, IServiceProvider services, ILoggerFactory loggerFactory, - IOrchestrationFilter? orchestrationFilter = null) + IOrchestrationFilter? orchestrationFilter = null, + IExceptionPropertiesProvider? exceptionPropertiesProvider = null) : base(name, factory) { this.grpcOptions = Check.NotNull(grpcOptions).Get(name); @@ -45,6 +47,7 @@ public GrpcDurableTaskWorker( this.loggerFactory = Check.NotNull(loggerFactory); this.logger = loggerFactory.CreateLogger("Microsoft.DurableTask"); // TODO: use better category name. this.orchestrationFilter = orchestrationFilter; + this.ExceptionPropertiesProvider = exceptionPropertiesProvider; } /// @@ -52,7 +55,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await using AsyncDisposable disposable = this.GetCallInvoker(out CallInvoker callInvoker, out string address); this.logger.StartingTaskHubWorker(address); - await new Processor(this, new(callInvoker), this.orchestrationFilter).ExecuteAsync(stoppingToken); + await new Processor(this, new(callInvoker), this.orchestrationFilter, this.ExceptionPropertiesProvider).ExecuteAsync(stoppingToken); } #if NET6_0_OR_GREATER diff --git a/src/Worker/Grpc/GrpcOrchestrationRunner.cs b/src/Worker/Grpc/GrpcOrchestrationRunner.cs index 74a92006a..d8d04deaa 100644 --- a/src/Worker/Grpc/GrpcOrchestrationRunner.cs +++ b/src/Worker/Grpc/GrpcOrchestrationRunner.cs @@ -1,140 +1,140 @@ -// Copyright (c) Microsoft Corporation. -// Licensed under the MIT License. - -using DurableTask.Core; -using DurableTask.Core.History; -using Google.Protobuf; -using Microsoft.DurableTask.Worker.Shims; -using Microsoft.Extensions.Caching.Memory; -using Microsoft.Extensions.DependencyInjection; -using P = Microsoft.DurableTask.Protobuf; - -namespace Microsoft.DurableTask.Worker.Grpc; - -/// -/// Helper class for invoking orchestrations directly, without building a worker instance. -/// -/// -/// -/// This static class can be used to execute orchestration logic directly. In order to use it for this purpose, the -/// caller must provide orchestration state as serialized protobuf bytes. -/// -/// The Azure Functions .NET worker extension is the primary intended user of this class, where orchestration state -/// is provided by trigger bindings. -/// -/// -public static class GrpcOrchestrationRunner -{ - /// - /// Loads orchestration history from and uses it to execute the - /// orchestrator function code pointed to by . - /// - /// - /// The type of the orchestrator function input. This type must be deserializable from JSON. - /// - /// - /// The type of the orchestrator function output. This type must be serializable to JSON. - /// - /// - /// The base64-encoded protobuf payload representing an orchestration execution request. - /// - /// A function that implements the orchestrator logic. - /// - /// Optional from which injected dependencies can be retrieved. - /// - /// - /// Returns a base64-encoded set of orchestrator actions to be interpreted by the external orchestration engine. - /// - /// - /// Thrown if or is null. - /// - public static string LoadAndRun( - string encodedOrchestratorRequest, - Func> orchestratorFunc, - IServiceProvider? services = null) - { - Check.NotNull(orchestratorFunc); - return LoadAndRun(encodedOrchestratorRequest, FuncTaskOrchestrator.Create(orchestratorFunc), services); - } - - /// - /// Deserializes orchestration history from and uses it to resume the - /// orchestrator implemented by . - /// - /// - /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. - /// - /// - /// An implementation that defines the orchestrator logic. - /// - /// - /// Optional from which injected dependencies can be retrieved. - /// - /// - /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. - /// - /// - /// Thrown if or is null. - /// - /// - /// Thrown if contains invalid data. - /// - public static string LoadAndRun( - string encodedOrchestratorRequest, - ITaskOrchestrator implementation, - IServiceProvider? services = null) - { - return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services); +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using DurableTask.Core; +using DurableTask.Core.History; +using Google.Protobuf; +using Microsoft.DurableTask.Worker.Shims; +using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.DependencyInjection; +using P = Microsoft.DurableTask.Protobuf; + +namespace Microsoft.DurableTask.Worker.Grpc; + +/// +/// Helper class for invoking orchestrations directly, without building a worker instance. +/// +/// +/// +/// This static class can be used to execute orchestration logic directly. In order to use it for this purpose, the +/// caller must provide orchestration state as serialized protobuf bytes. +/// +/// The Azure Functions .NET worker extension is the primary intended user of this class, where orchestration state +/// is provided by trigger bindings. +/// +/// +public static class GrpcOrchestrationRunner +{ + /// + /// Loads orchestration history from and uses it to execute the + /// orchestrator function code pointed to by . + /// + /// + /// The type of the orchestrator function input. This type must be deserializable from JSON. + /// + /// + /// The type of the orchestrator function output. This type must be serializable to JSON. + /// + /// + /// The base64-encoded protobuf payload representing an orchestration execution request. + /// + /// A function that implements the orchestrator logic. + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a base64-encoded set of orchestrator actions to be interpreted by the external orchestration engine. + /// + /// + /// Thrown if or is null. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, + Func> orchestratorFunc, + IServiceProvider? services = null) + { + Check.NotNull(orchestratorFunc); + return LoadAndRun(encodedOrchestratorRequest, FuncTaskOrchestrator.Create(orchestratorFunc), services); } - /// - /// Deserializes orchestration history from and uses it to resume the - /// orchestrator implemented by . - /// - /// - /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. - /// - /// + /// + /// Deserializes orchestration history from and uses it to resume the + /// orchestrator implemented by . + /// + /// + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// + /// /// An implementation that defines the orchestrator logic. /// - /// - /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session. - /// - /// - /// Optional from which injected dependencies can be retrieved. - /// - /// - /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. - /// - /// - /// Thrown if or is null. - /// - /// - /// Thrown if contains invalid data. - /// - public static string LoadAndRun( - string encodedOrchestratorRequest, + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// + /// + /// Thrown if or is null. + /// + /// + /// Thrown if contains invalid data. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, ITaskOrchestrator implementation, - ExtendedSessionsCache? extendedSessionsCache, - IServiceProvider? services = null) - { - Check.NotNullOrEmpty(encodedOrchestratorRequest); - Check.NotNull(implementation); - - P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode( - encodedOrchestratorRequest); - - List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); - IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); - Dictionary properties = request.Properties.ToDictionary( - pair => pair.Key, - pair => ProtoUtils.ConvertValueToObject(pair.Value)); - + IServiceProvider? services = null) + { + return LoadAndRun(encodedOrchestratorRequest, implementation, extendedSessionsCache: null, services: services); + } + + /// + /// Deserializes orchestration history from and uses it to resume the + /// orchestrator implemented by . + /// + /// + /// The encoded protobuf payload representing an orchestration execution request. This is a base64-encoded string. + /// + /// + /// An implementation that defines the orchestrator logic. + /// + /// + /// The cache of extended sessions which can be used to retrieve the if this orchestration request is from within an extended session. + /// + /// + /// Optional from which injected dependencies can be retrieved. + /// + /// + /// Returns a serialized set of orchestrator actions that should be used as the return value of the orchestrator function trigger. + /// + /// + /// Thrown if or is null. + /// + /// + /// Thrown if contains invalid data. + /// + public static string LoadAndRun( + string encodedOrchestratorRequest, + ITaskOrchestrator implementation, + ExtendedSessionsCache? extendedSessionsCache, + IServiceProvider? services = null) + { + Check.NotNullOrEmpty(encodedOrchestratorRequest); + Check.NotNull(implementation); + + P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode( + encodedOrchestratorRequest); + + List pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList(); + IEnumerable newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent); + Dictionary properties = request.Properties.ToDictionary( + pair => pair.Key, + pair => ProtoUtils.ConvertValueToObject(pair.Value)); + OrchestratorExecutionResult? result = null; MemoryCache? extendedSessions = null; - // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached - bool addToExtendedSessions = false; + // If any of the request parameters are malformed, we assume the default - extended sessions are not enabled and the orchestration history is attached + bool addToExtendedSessions = false; bool requiresHistory = false; bool pastEventsIncluded = true; bool isExtendedSession = false; @@ -157,88 +157,94 @@ public static string LoadAndRun( { pastEventsIncluded = includePastEvents; } - + if (isExtendedSession && extendedSessions != null) { // If a history was provided, even if we already have an extended session stored, we always want to evict whatever state is in the cache and replace it with a new extended // session based on the provided history - if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null) - { - OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState; - runtimeState.NewEvents.Clear(); - foreach (HistoryEvent newEvent in newEvents) - { - runtimeState.AddEvent(newEvent); - } - - result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents(); - if (extendedSessionState.OrchestrationExecutor.IsCompleted) - { - extendedSessions.Remove(request.InstanceId); - } - } - else + if (!pastEventsIncluded && extendedSessions.TryGetValue(request.InstanceId, out ExtendedSessionState? extendedSessionState) && extendedSessionState is not null) { - extendedSessions.Remove(request.InstanceId); - addToExtendedSessions = true; + OrchestrationRuntimeState runtimeState = extendedSessionState!.RuntimeState; + runtimeState.NewEvents.Clear(); + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + result = extendedSessionState.OrchestrationExecutor.ExecuteNewEvents(); + if (extendedSessionState.OrchestrationExecutor.IsCompleted) + { + extendedSessions.Remove(request.InstanceId); + } + } + else + { + extendedSessions.Remove(request.InstanceId); + addToExtendedSessions = true; + } + } + + if (result == null) + { + // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the + // session and lost the orchestration history so we cannot replay the orchestration. + if (!pastEventsIncluded) + { + requiresHistory = true; + } + else + { + // Re-construct the orchestration state from the history. + // New events must be added using the AddEvent method. + OrchestrationRuntimeState runtimeState = new(pastEvents); + + foreach (HistoryEvent newEvent in newEvents) + { + runtimeState.AddEvent(newEvent); + } + + TaskName orchestratorName = new(runtimeState.Name); + ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p + ? new(new(p.Name), p.OrchestrationInstance.InstanceId) + : null; + + DurableTaskShimFactory factory = services is null + ? DurableTaskShimFactory.Default + : ActivatorUtilities.GetServiceOrCreateInstance(services); + TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); + + TaskOrchestrationExecutor executor = new( + runtimeState, + shim, + BehaviorOnContinueAsNew.Carryover, + request.EntityParameters.ToCore(), + ErrorPropagationMode.UseFailureDetails); + result = executor.Execute(); + + if (addToExtendedSessions && !executor.IsCompleted) + { + extendedSessions.Set( + request.InstanceId, + new(runtimeState, shim, executor), + new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) }); + } + else + { + extendedSessions?.Remove(request.InstanceId); + } } - } - - if (result == null) - { - // DurableTask.Core did not attach the orchestration history since the extended session is still active on its end, but we have since evicted the - // session and lost the orchestration history so we cannot replay the orchestration. - if (!pastEventsIncluded) - { - requiresHistory = true; - } - else - { - // Re-construct the orchestration state from the history. - // New events must be added using the AddEvent method. - OrchestrationRuntimeState runtimeState = new(pastEvents); - - foreach (HistoryEvent newEvent in newEvents) - { - runtimeState.AddEvent(newEvent); - } - - TaskName orchestratorName = new(runtimeState.Name); - ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p - ? new(new(p.Name), p.OrchestrationInstance.InstanceId) - : null; - - DurableTaskShimFactory factory = services is null - ? DurableTaskShimFactory.Default - : ActivatorUtilities.GetServiceOrCreateInstance(services); - TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, properties, parent); - TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails); - result = executor.Execute(); - - if (addToExtendedSessions && !executor.IsCompleted) - { - extendedSessions.Set( - request.InstanceId, - new(runtimeState, shim, executor), - new MemoryCacheEntryOptions { SlidingExpiration = TimeSpan.FromSeconds(extendedSessionIdleTimeoutInSeconds) }); - } - else - { - extendedSessions?.Remove(request.InstanceId); - } - } - } - - P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( + } + + P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse( request.InstanceId, - request.ExecutionId, - result?.CustomStatus, - result?.Actions, - completionToken: string.Empty, /* doesn't apply */ + request.ExecutionId, + result?.CustomStatus, + result?.Actions, + completionToken: string.Empty, /* doesn't apply */ entityConversionState: null, - orchestrationActivity: null, - requiresHistory: requiresHistory); - byte[] responseBytes = response.ToByteArray(); - return Convert.ToBase64String(responseBytes); - } -} + orchestrationActivity: null, + requiresHistory: requiresHistory); + byte[] responseBytes = response.ToByteArray(); + return Convert.ToBase64String(responseBytes); + } +} diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs index ae8c16a19..86b98030d 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/ProtobufUtils.cs @@ -2,12 +2,16 @@ // Licensed under the MIT License. using System.Buffers; +using System.Collections; +using System.Globalization; +using System.Text.Json; using DurableTask.Core; using DurableTask.Core.Command; using DurableTask.Core.History; using DurableTask.Core.Query; using DurableTask.Core.Tracing; using Google.Protobuf; +using Google.Protobuf.Collections; using Google.Protobuf.WellKnownTypes; using Microsoft.DurableTask.Sidecar.Dispatcher; using Proto = Microsoft.DurableTask.Protobuf; @@ -357,7 +361,8 @@ public static string Base64Encode(IMessage message) failureDetails.ErrorMessage, failureDetails.StackTrace, GetFailureDetails(failureDetails.InnerFailure), - failureDetails.IsNonRetriable); + failureDetails.IsNonRetriable, + ConvertMapToDictionary(failureDetails.Properties)); } internal static Proto.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) @@ -367,7 +372,7 @@ public static string Base64Encode(IMessage message) return null; } - return new Proto.TaskFailureDetails + var taskFailureDetails = new Proto.TaskFailureDetails { ErrorType = failureDetails.ErrorType, ErrorMessage = failureDetails.ErrorMessage, @@ -375,6 +380,17 @@ public static string Base64Encode(IMessage message) InnerFailure = GetFailureDetails(failureDetails.InnerFailure), IsNonRetriable = failureDetails.IsNonRetriable, }; + + // Add properties if they exist + if (failureDetails.Properties != null) + { + foreach (var kvp in failureDetails.Properties) + { + taskFailureDetails.Properties.Add(kvp.Key, ConvertObjectToValue(kvp.Value)); + } + } + + return taskFailureDetails; } internal static OrchestrationQuery ToOrchestrationQuery(Proto.QueryInstancesRequest request) @@ -438,4 +454,94 @@ internal static Proto.PurgeInstancesResponse CreatePurgeInstancesResponse(PurgeR }; return response; } + + /// + /// Converts a MapField into a IDictionary. + /// + /// + /// + public static IDictionary ConvertMapToDictionary(MapField properties) + { + return properties.ToDictionary( + kvp => kvp.Key, + kvp => ConvertValueToObject(kvp.Value) + ); + } + + /// + /// Converts a C# object to a protobuf Value. + /// + /// The object to convert. + /// The converted protobuf Value. + internal static Value ConvertObjectToValue(object? obj) + { + return obj switch + { + null => Value.ForNull(), + string str => Value.ForString(str), + bool b => Value.ForBool(b), + int i => Value.ForNumber(i), + long l => Value.ForNumber(l), + float f => Value.ForNumber(f), + double d => Value.ForNumber(d), + decimal dec => Value.ForNumber((double)dec), + + // For DateTime and DateTimeOffset, add prefix to distinguish from normal string. + DateTime dt => Value.ForString($"dt:{dt.ToString("O")}"), + DateTimeOffset dto => Value.ForString($"dto:{dto.ToString("O")}"), + IDictionary dict => Value.ForStruct(new Struct + { + Fields = { dict.ToDictionary(kvp => kvp.Key, kvp => ConvertObjectToValue(kvp.Value)) }, + }), + IEnumerable e => Value.ForList(e.Cast().Select(ConvertObjectToValue).ToArray()), + + // Fallback: convert unlisted type to string. + _ => Value.ForString(obj.ToString() ?? string.Empty), + }; + } + + static object? ConvertValueToObject(Google.Protobuf.WellKnownTypes.Value value) + { + switch (value.KindCase) + { + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NullValue: + return null; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.NumberValue: + return value.NumberValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StringValue: + string stringValue = value.StringValue; + + // If the value starts with the 'dt:' prefix, it may represent a DateTime value — attempt to parse it. + if (stringValue.StartsWith("dt:", StringComparison.Ordinal)) + { + if (DateTime.TryParse(stringValue[3..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTime date)) + { + return date; + } + } + + // If the value starts with the 'dto:' prefix, it may represent a DateTime value — attempt to parse it. + if (stringValue.StartsWith("dto:", StringComparison.Ordinal)) + { + if (DateTimeOffset.TryParse(stringValue[4..], CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind, out DateTimeOffset date)) + { + return date; + } + } + + // Otherwise just return as string + return stringValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.BoolValue: + return value.BoolValue; + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.StructValue: + return value.StructValue.Fields.ToDictionary( + pair => pair.Key, + pair => ConvertValueToObject(pair.Value)); + case Google.Protobuf.WellKnownTypes.Value.KindOneofCase.ListValue: + return value.ListValue.Values.Select(ConvertValueToObject).ToList(); + default: + // Fallback: serialize the whole value to JSON string + return JsonSerializer.Serialize(value); + } + } } diff --git a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs index e3a320f79..ecf7975e4 100644 --- a/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs +++ b/test/Grpc.IntegrationTests/GrpcSidecar/Grpc/TaskHubGrpcServer.cs @@ -306,7 +306,7 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, Input = request.GetInputsAndOutputs ? state.Input : null, Output = request.GetInputsAndOutputs ? state.Output : null, CustomStatus = request.GetInputsAndOutputs ? state.Status : null, - FailureDetails = request.GetInputsAndOutputs ? GetFailureDetails(state.FailureDetails) : null, + FailureDetails = request.GetInputsAndOutputs ? ProtobufUtils.GetFailureDetails(state.FailureDetails) : null, Tags = { state.Tags } } }; @@ -398,22 +398,6 @@ static P.GetInstanceResponse CreateGetInstanceResponse(OrchestrationState state, } } - static P.TaskFailureDetails? GetFailureDetails(FailureDetails? failureDetails) - { - if (failureDetails == null) - { - return null; - } - - return new P.TaskFailureDetails - { - ErrorType = failureDetails.ErrorType, - ErrorMessage = failureDetails.ErrorMessage, - StackTrace = failureDetails.StackTrace, - InnerFailure = GetFailureDetails(failureDetails.InnerFailure), - }; - } - /// /// Invoked by the remote SDK over gRPC when an orchestrator task (episode) is completed. /// diff --git a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs index e9e3bdcde..2235efd65 100644 --- a/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs +++ b/test/Grpc.IntegrationTests/OrchestrationErrorHandling.cs @@ -2,6 +2,7 @@ // Licensed under the MIT License. using System.Runtime.Serialization; +using Microsoft.Extensions.DependencyInjection; using Microsoft.DurableTask.Client; using Microsoft.DurableTask.Tests.Logging; using Microsoft.DurableTask.Worker; @@ -631,6 +632,107 @@ static Exception MakeException(Type exceptionType, string message) return (Exception)Activator.CreateInstance(exceptionType, message)!; } + /// + /// Tests that exception properties are included in FailureDetails when using a custom IExceptionPropertiesProvider. + /// + [Fact] + public async Task CustomExceptionPropertiesInFailureDetails() + { + TaskName orchestratorName = "OrchestrationWithCustomException"; + TaskName activityName = "BusinessActivity"; + + // Register activity functions that will throw a custom exception with diverse property types. + async Task MyOrchestrationImpl(TaskOrchestrationContext ctx) => + await ctx.CallActivityAsync(activityName); + + void MyActivityImpl(TaskActivityContext ctx) => + throw new BusinessValidationException( + message: "Business logic validation failed", + stringProperty: "validation-error-123", + intProperty: 100, + longProperty: 999999999L, + dateTimeProperty: new DateTime(2025, 10, 15, 14, 30, 0, DateTimeKind.Utc), + dictionaryProperty: new Dictionary + { + ["error_code"] = "VALIDATION_FAILED", + ["retry_count"] = 3, + ["is_critical"] = true + }, + listProperty: new List { "error1", "error2", 500, null }, + nullProperty: null); + + await using HostTestLifetime server = await this.StartWorkerAsync(b => + { + // Register the custom exception properties provider + b.Services.AddSingleton(); + + b.AddTasks(tasks => tasks + .AddOrchestratorFunc(orchestratorName, MyOrchestrationImpl) + .AddActivityFunc(activityName, MyActivityImpl)); + }); + + string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName); + OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync( + instanceId, getInputsAndOutputs: true, this.TimeoutToken); + + Assert.NotNull(metadata); + Assert.Equal(instanceId, metadata.InstanceId); + Assert.Equal(OrchestrationRuntimeStatus.Failed, metadata.RuntimeStatus); + + Assert.NotNull(metadata.FailureDetails); + TaskFailureDetails failureDetails = metadata.FailureDetails!; + Assert.Equal(typeof(TaskFailedException).FullName, failureDetails.ErrorType); + + // Check that the activity failure is in the inner failure + Assert.NotNull(failureDetails.InnerFailure); + TaskFailureDetails innerFailure = failureDetails.InnerFailure!; + Assert.Equal(typeof(BusinessValidationException).FullName, innerFailure.ErrorType); + + // Check that custom properties are included and verify all property types + Assert.NotNull(innerFailure.Properties); + + // We should contain 7 properties. + Assert.Equal(7, innerFailure.Properties.Count); + + // Verify string property + Assert.True(innerFailure.Properties.ContainsKey("StringProperty")); + Assert.Equal("validation-error-123", innerFailure.Properties["StringProperty"]); + + // Verify numeric properties (note: all numeric types are serialized as double in protobuf) + Assert.True(innerFailure.Properties.ContainsKey("IntProperty")); + Assert.Equal(100.0, innerFailure.Properties["IntProperty"]); + + Assert.True(innerFailure.Properties.ContainsKey("LongProperty")); + Assert.Equal(999999999.0, innerFailure.Properties["LongProperty"]); + + // Verify DateTime property + Assert.True(innerFailure.Properties.ContainsKey("DateTimeProperty")); + Assert.Equal(new DateTime(2025, 10, 15, 14, 30, 0, DateTimeKind.Utc), innerFailure.Properties["DateTimeProperty"]); + + // Verify dictionary property with nested values + Assert.True(innerFailure.Properties.ContainsKey("DictionaryProperty")); + var dictProperty = innerFailure.Properties["DictionaryProperty"] as IDictionary; + Assert.NotNull(dictProperty); + Assert.Equal(3, dictProperty.Count); + Assert.Equal("VALIDATION_FAILED", dictProperty["error_code"]); + Assert.Equal(3.0, dictProperty["retry_count"]); + Assert.Equal(true, dictProperty["is_critical"]); + + // Verify list property with mixed types + Assert.True(innerFailure.Properties.ContainsKey("ListProperty")); + var listProperty = innerFailure.Properties["ListProperty"] as IList; + Assert.NotNull(listProperty); + Assert.Equal(4, listProperty.Count); + Assert.Equal("error1", listProperty[0]); + Assert.Equal("error2", listProperty[1]); + Assert.Equal(500.0, listProperty[2]); + Assert.Null(listProperty[3]); + + // Verify null property + Assert.True(innerFailure.Properties.ContainsKey("NullProperty")); + Assert.Null(innerFailure.Properties["NullProperty"]); + } + [Serializable] class CustomException : Exception { @@ -649,4 +751,69 @@ protected CustomException(SerializationInfo info, StreamingContext context) { } } + + /// + /// A custom exception with diverse property types for comprehensive testing of exception properties. + /// + [Serializable] + class BusinessValidationException : Exception + { + public BusinessValidationException(string message, + string stringProperty, + int intProperty, + long longProperty, + DateTime dateTimeProperty, + IDictionary dictionaryProperty, + IList listProperty, + object? nullProperty) : base(message) + { + this.StringProperty = stringProperty; + this.IntProperty = intProperty; + this.LongProperty = longProperty; + this.DateTimeProperty = dateTimeProperty; + this.DictionaryProperty = dictionaryProperty; + this.ListProperty = listProperty; + this.NullProperty = nullProperty; + } + + public string? StringProperty { get; } + public int? IntProperty { get; } + public long? LongProperty { get; } + public DateTime? DateTimeProperty { get; } + public IDictionary? DictionaryProperty { get; } + public IList? ListProperty { get; } + public object? NullProperty { get; } + + protected BusinessValidationException(SerializationInfo info, StreamingContext context) + : base(info, context) + { + } + } + + // Set a custom exception provider. + class TestExceptionPropertiesProvider : IExceptionPropertiesProvider + { + public IDictionary? GetExceptionProperties(Exception exception) + { + return exception switch + { + ArgumentOutOfRangeException e => new Dictionary + { + ["Name"] = e.ParamName ?? string.Empty, + ["Value"] = e.ActualValue ?? string.Empty, + }, + Microsoft.DurableTask.Grpc.Tests.OrchestrationErrorHandling.BusinessValidationException e => new Dictionary + { + ["StringProperty"] = e.StringProperty, + ["IntProperty"] = e.IntProperty, + ["LongProperty"] = e.LongProperty, + ["DateTimeProperty"] = e.DateTimeProperty, + ["DictionaryProperty"] = e.DictionaryProperty, + ["ListProperty"] = e.ListProperty, + ["NullProperty"] = e.NullProperty, + }, + _ => null // No custom properties for other exceptions + }; + } + } } diff --git a/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs b/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs index 70cc60fcf..d5c89fa0e 100644 --- a/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs +++ b/test/ScheduledTasks.Tests/Client/DefaultScheduleClientTests.cs @@ -166,7 +166,7 @@ public async Task DeleteAsync_WhenOrchestrationFails_ThrowsException() .ReturnsAsync(new OrchestrationMetadata(nameof(ExecuteScheduleOperationOrchestrator), instanceId) { RuntimeStatus = OrchestrationRuntimeStatus.Failed, - FailureDetails = new TaskFailureDetails("TestError", errorMessage, null, null) + FailureDetails = new TaskFailureDetails("TestError", errorMessage, null, null, null) }); // Act & Assert diff --git a/test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs b/test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs new file mode 100644 index 000000000..309a5410d --- /dev/null +++ b/test/Worker/Grpc.Tests/ExceptionPropertiesProviderRegistrationTests.cs @@ -0,0 +1,57 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Reflection; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.Grpc; +using Microsoft.DurableTask.Worker.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; + +namespace Worker.Grpc.Tests; + +public class ExceptionPropertiesProviderRegistrationTests +{ + sealed class TestExceptionPropertiesProvider : IExceptionPropertiesProvider + { + public IDictionary? GetExceptionProperties(Exception exception) + { + return new Dictionary { ["Foo"] = "Bar" }; + } + } + + [Fact] + public void DiRegistration_RegistersAndFlowsToWorker() + { + ServiceCollection services = new(); + services.AddSingleton(NullLoggerFactory.Instance); + + // Register via DI directly + services.AddSingleton(); + + services.AddDurableTaskWorker(builder => + { + builder.UseGrpc(); + }); + + using ServiceProvider sp = services.BuildServiceProvider(); + + IHostedService hosted = Assert.Single(sp.GetServices()); + Assert.IsType(hosted); + + object? provider = typeof(DurableTaskWorker) + .GetProperty("ExceptionPropertiesProvider", BindingFlags.Instance | BindingFlags.NonPublic)! + .GetValue(hosted); + + Assert.NotNull(provider); + Assert.IsType(provider); + + // And DI resolves the same instance + var resolved = sp.GetRequiredService(); + Assert.Same(resolved, provider); + } +} + + diff --git a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs index 86453ddef..56ace8e77 100644 --- a/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs +++ b/test/Worker/Grpc.Tests/RunBackgroundTaskLoggingTests.cs @@ -388,7 +388,9 @@ public static async Task CreateAsync() grpcOptions: grpcOptions, workerOptions: workerOptions, services: services, - loggerFactory: loggerFactory); + loggerFactory: loggerFactory, + orchestrationFilter: null, + exceptionPropertiesProvider: null); // Client mock var callInvoker = Mock.Of(); @@ -400,7 +402,7 @@ public static async Task CreateAsync() processorType, BindingFlags.Public | BindingFlags.Instance, binder: null, - args: new object?[] { worker, clientMock.Object, null }, + args: new object?[] { worker, clientMock.Object, null, null }, culture: null)!; MethodInfo runBackgroundTask = processorType.GetMethod("RunBackgroundTask", BindingFlags.Instance | BindingFlags.NonPublic)!;