From df07f70f3eea3a91aa848271839363a504dfafed Mon Sep 17 00:00:00 2001 From: stidsborg Date: Tue, 27 Jan 2026 10:00:11 +0100 Subject: [PATCH 1/2] Removed Middleware --- .../Training.subscription | 1 + .../Flows/FlowsWithMiddlewareTests.cs | 68 ------- Cleipnir.Flows/CrossCutting/CallChain.cs | 50 ----- Cleipnir.Flows/CrossCutting/IMiddleware.cs | 19 -- .../CrossCutting/MiddlewareInstanceOrType.cs | 8 - Cleipnir.Flows/FlowOptions.cs | 35 +--- Cleipnir.Flows/Flows.cs | 189 +++++++++++------- Cleipnir.Flows/FlowsContainer.cs | 39 ++-- Cleipnir.Flows/Options.cs | 41 +--- 9 files changed, 144 insertions(+), 306 deletions(-) create mode 100644 .learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription delete mode 100644 Cleipnir.Flows.Tests/Flows/FlowsWithMiddlewareTests.cs delete mode 100644 Cleipnir.Flows/CrossCutting/CallChain.cs delete mode 100644 Cleipnir.Flows/CrossCutting/IMiddleware.cs delete mode 100644 Cleipnir.Flows/CrossCutting/MiddlewareInstanceOrType.cs diff --git a/.learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription b/.learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription new file mode 100644 index 0000000..7257c25 --- /dev/null +++ b/.learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription @@ -0,0 +1 @@ +Training \ No newline at end of file diff --git a/Cleipnir.Flows.Tests/Flows/FlowsWithMiddlewareTests.cs b/Cleipnir.Flows.Tests/Flows/FlowsWithMiddlewareTests.cs deleted file mode 100644 index dd3df52..0000000 --- a/Cleipnir.Flows.Tests/Flows/FlowsWithMiddlewareTests.cs +++ /dev/null @@ -1,68 +0,0 @@ -using Cleipnir.Flows.CrossCutting; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Helpers; -using Cleipnir.ResilientFunctions.Storage; -using Microsoft.Extensions.DependencyInjection; -using Shouldly; - -namespace Cleipnir.Flows.Tests.Flows; - -[TestClass] -public class FlowsWithMiddlewareTests -{ - [TestMethod] - public async Task SimpleFlowCompletesSuccessfully() - { - var serviceCollection = new ServiceCollection(); - serviceCollection.AddTransient(); - serviceCollection.AddSingleton(); - - var flowStore = new InMemoryFunctionStore(); - var options = new Options(); - options.UseMiddleware(); - using var flowsContainer = new FlowsContainer( - flowStore, - serviceCollection.BuildServiceProvider(), - options - ); - - var flows = new SimpleMiddlewareFlows(flowsContainer); - - await Should.ThrowAsync>( - flows.Run("someInstanceId", "someParameter") - ); - - SimpleMiddleware.Param.ShouldBe("someParameter"); - var result = (Result?)SimpleMiddleware.Result; - result.ShouldNotBeNull(); - result.SucceedWithValue.ShouldBe(1); - } - - public class SimpleMiddlewareFlows(FlowsContainer flowsContainer) - : Flows(nameof(SimpleMiddlewareFlow), flowsContainer); - - public class SimpleMiddlewareFlow : Flow - { - public override Task Run(string param) => 1.ToTask(); - } - - private class SimpleMiddleware : IMiddleware - { - public static object? Param = null; - public static object? Result = null; - - public async Task> Run( - TParam param, - Workflow workflow, - Next next - ) where TParam : notnull - { - Param = param; - var result = await next(param, workflow); - Result = result; - - return new Result(failWith: FatalWorkflowException.Create(workflow.FlowId, new TimeoutException("Timeout occured!"))); - } - } -} \ No newline at end of file diff --git a/Cleipnir.Flows/CrossCutting/CallChain.cs b/Cleipnir.Flows/CrossCutting/CallChain.cs deleted file mode 100644 index 0fd25aa..0000000 --- a/Cleipnir.Flows/CrossCutting/CallChain.cs +++ /dev/null @@ -1,50 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; -using Cleipnir.ResilientFunctions.Domain; -using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; - -namespace Cleipnir.Flows.CrossCutting; - -public static class CallChain -{ - public static Next Create( - List middlewares, - Func> runFlow) - where TParam : notnull - { - Next currNext = - async (p, w) => - { - try - { - var result = await runFlow(p, w); - return new Result(result); - } - catch (SuspendInvocationException) - { - return new Result(Suspend.Invocation); - } - catch (FatalWorkflowException exception) - { - exception.FlowId = w.FlowId; - return new Result(exception); - } - catch (Exception exception) - { - return new Result(FatalWorkflowException.CreateNonGeneric(w.FlowId, exception)); - } - }; - - for (var i = middlewares.Count - 1; i >= 0; i--) - { - var middleware = middlewares[i]; - var next = currNext; - - currNext = (p, w) => middleware.Run(p, w, next); - } - - return currNext; - } -} \ No newline at end of file diff --git a/Cleipnir.Flows/CrossCutting/IMiddleware.cs b/Cleipnir.Flows/CrossCutting/IMiddleware.cs deleted file mode 100644 index 6785dd1..0000000 --- a/Cleipnir.Flows/CrossCutting/IMiddleware.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System.Threading.Tasks; -using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; -using Cleipnir.ResilientFunctions.Domain; - -namespace Cleipnir.Flows.CrossCutting; - -public delegate Task> Next( - TParam param, - Workflow workflow -); - -public interface IMiddleware -{ - public Task> Run( - TParam param, - Workflow workflow, - Next next - ) where TParam : notnull; -} \ No newline at end of file diff --git a/Cleipnir.Flows/CrossCutting/MiddlewareInstanceOrType.cs b/Cleipnir.Flows/CrossCutting/MiddlewareInstanceOrType.cs deleted file mode 100644 index 47d9d2d..0000000 --- a/Cleipnir.Flows/CrossCutting/MiddlewareInstanceOrType.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace Cleipnir.Flows.CrossCutting; - -public record MiddlewareInstanceOrType; - -public record MiddlewareInstance(IMiddleware Middleware) : MiddlewareInstanceOrType; -public record MiddlewareType(Type Type) : MiddlewareInstanceOrType; \ No newline at end of file diff --git a/Cleipnir.Flows/FlowOptions.cs b/Cleipnir.Flows/FlowOptions.cs index 214c915..3dc2794 100644 --- a/Cleipnir.Flows/FlowOptions.cs +++ b/Cleipnir.Flows/FlowOptions.cs @@ -1,7 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using Cleipnir.Flows.CrossCutting; +using System; using Cleipnir.ResilientFunctions.Domain; namespace Cleipnir.Flows; @@ -9,17 +6,16 @@ namespace Cleipnir.Flows; public class FlowOptions { public static FlowOptions Default { get; } = new(); - + internal TimeSpan? RetentionPeriod { get; } internal bool? EnableWatchdogs { get; } internal int? MaxParallelRetryInvocations { get; } internal TimeSpan? MessagesDefaultMaxWaitForCompletion { get; } - internal List Middlewares { get; } = new(); public FlowOptions( TimeSpan? retentionPeriod = null, bool? enableWatchdogs = null, - TimeSpan? messagesDefaultMaxWaitForCompletion = null, + TimeSpan? messagesDefaultMaxWaitForCompletion = null, int? maxParallelRetryInvocations = null ) { @@ -29,37 +25,16 @@ public FlowOptions( MaxParallelRetryInvocations = maxParallelRetryInvocations; } - public FlowOptions UseMiddleware() where TMiddleware : IMiddleware - { - Middlewares.Add(new MiddlewareType(typeof(TMiddleware))); - return this; - } - - public FlowOptions UseMiddleware(IMiddleware middleware) - { - Middlewares.Add(new MiddlewareInstance(middleware)); - return this; - } - public FlowOptions Merge(Options options) { - var merged = new FlowOptions( + return new FlowOptions( RetentionPeriod ?? options.RetentionPeriod, EnableWatchdogs ?? options.EnableWatchdogs, MessagesDefaultMaxWaitForCompletion ?? options.MessagesDefaultMaxWaitForCompletion, MaxParallelRetryInvocations ?? options.MaxParallelRetryInvocations ); - - if (Middlewares.Any()) - foreach (var middleware in Middlewares) - merged.Middlewares.Add(middleware); - else - foreach (var middleware in options.Middlewares) - merged.Middlewares.Add(middleware); - - return merged; } internal LocalSettings MapToLocalSettings() => new(RetentionPeriod, EnableWatchdogs, MessagesDefaultMaxWaitForCompletion, MaxParallelRetryInvocations); -} \ No newline at end of file +} diff --git a/Cleipnir.Flows/Flows.cs b/Cleipnir.Flows/Flows.cs index b2668e3..e6b4522 100644 --- a/Cleipnir.Flows/Flows.cs +++ b/Cleipnir.Flows/Flows.cs @@ -1,15 +1,13 @@ -using System; +using System; using System.Collections.Generic; -using System.Linq; using System.Linq.Expressions; using System.Threading.Tasks; -using Cleipnir.Flows.CrossCutting; using Cleipnir.ResilientFunctions; using Cleipnir.ResilientFunctions.CoreRuntime.Invocation; using Cleipnir.ResilientFunctions.Domain; +using Cleipnir.ResilientFunctions.Domain.Exceptions.Commands; using Cleipnir.ResilientFunctions.Helpers; using Cleipnir.ResilientFunctions.Messaging; -using Cleipnir.ResilientFunctions.Storage; using Microsoft.Extensions.DependencyInjection; namespace Cleipnir.Flows; @@ -25,18 +23,9 @@ public abstract class BaseFlows : IBaseFlows where TFlow : notnull { public static Type FlowType { get; } = typeof(TFlow); - private FlowsContainer FlowsContainer { get; } - private readonly Func? _flowFactory; - - protected BaseFlows(FlowsContainer flowsContainer, Func? flowFactory) - { - FlowsContainer = flowsContainer; - _flowFactory = flowFactory; - } - public abstract Task Interrupt(IEnumerable instances); - private static Action CreateWorkflowSetter() + protected static Action CreateWorkflowSetter() { ParameterExpression flowParam = Expression.Parameter(typeof(TFlow), "flow"); ParameterExpression contextParam = Expression.Parameter(typeof(Workflow), "workflow"); @@ -55,28 +44,6 @@ private static Action CreateWorkflowSetter() var setter = lambdaExpr.Compile(); return setter; } - - protected Next CreateMiddlewareCallChain(Func> runFlow) where TParam : notnull - { - var serviceProvider = FlowsContainer.ServiceProvider; - var workflowSetter = CreateWorkflowSetter(); - return CallChain.Create( - FlowsContainer.Middlewares, - runFlow: async (param, workflow) => - { - await using var scope = serviceProvider.CreateAsyncScope(); - - var flow = _flowFactory == null - ? scope.ServiceProvider.GetRequiredService() - : _flowFactory(); - - workflowSetter(flow, workflow); - - var result = await runFlow(flow, param); - return result; - } - ); - } public abstract Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) where T : class; } @@ -85,18 +52,43 @@ public class Flows : BaseFlows where TFlow : Flow { private readonly ParamlessRegistration _registration; - public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? options = null, Func? flowFactory = null) : base(flowsContainer, flowFactory) + public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? options = null, Func? flowFactory = null) { - var callChain = CreateMiddlewareCallChain(runFlow: async (flow, _) => - { - await flow.Run(); - return Unit.Instance; - }); - + var serviceProvider = flowsContainer.ServiceProvider; + var workflowSetter = CreateWorkflowSetter(); + flowsContainer.EnsureNoExistingRegistration(flowName, typeof(TFlow)); _registration = flowsContainer.FunctionRegistry.RegisterParamless( flowName, - inner: workflow => callChain(Unit.Instance, workflow), + inner: async w => + { + try + { + await using var scope = serviceProvider.CreateAsyncScope(); + + var flow = flowFactory == null + ? scope.ServiceProvider.GetRequiredService() + : flowFactory(); + + workflowSetter(flow, w); + + await flow.Run(); + return new Result(Unit.Instance); + } + catch (SuspendInvocationException) + { + return new Result(Suspend.Invocation); + } + catch (FatalWorkflowException exception) + { + exception.FlowId = w.FlowId; + return new Result(exception); + } + catch (Exception exception) + { + return new Result(FatalWorkflowException.CreateNonGeneric(w.FlowId, exception)); + } + }, (options ?? FlowOptions.Default).MapToLocalSettings() ); } @@ -112,7 +104,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? option return controlPanel; } - public MessageWriter MessageWriter(FlowInstance instanceId) + public MessageWriter MessageWriter(FlowInstance instanceId) => _registration.MessageWriters.For(instanceId); /// @@ -121,7 +113,7 @@ public MessageWriter MessageWriter(FlowInstance instanceId) /// Instance id for the flow /// Optional initial state with pre-filled messages and effects /// A task which will complete when the flow has completed its execution - public Task Run(FlowInstance instanceId, InitialState? initialState = null) + public Task Run(FlowInstance instanceId, InitialState? initialState = null) => _registration.Invoke(instanceId, initialState); /// @@ -133,10 +125,10 @@ public Task Run(FlowInstance instanceId, InitialState? initialState = null) /// A task which will complete when the flow has been persisted public Task Schedule(FlowInstance instanceId, InitialState? initialState = null) => _registration.Schedule(instanceId, initialState: initialState); - + /// /// Schedule the flow for future execution. - /// Flow can be executed at any replica + /// Flow can be executed at any replica /// /// Instance id for the flow /// Time when the flow should be executed @@ -144,7 +136,7 @@ public Task Schedule(FlowInstance instanceId, InitialState? initialSt public Task ScheduleAt(FlowInstance instanceId, DateTime delayUntil) => _registration.ScheduleAt(instanceId, delayUntil); /// /// Schedule the flow for future execution. - /// Flow can be executed at any replica + /// Flow can be executed at any replica /// /// Instance id for the flow /// Delay before the flow is executed @@ -159,7 +151,7 @@ public Task Schedule(FlowInstance instanceId, InitialState? initialSt /// Optional idempotency key to de-duplicate messages /// Message type /// A task which will complete when the message has been persisted - public override Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) + public override Task RouteMessage(T message, string correlationId, string? idempotencyKey = null) => _registration.RouteMessage(message, correlationId, idempotencyKey); /// @@ -204,20 +196,44 @@ public class Flows : BaseFlows where TParam : notnull { private readonly ActionRegistration _registration; - - public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? options = null, Func? flowFactory = null) : base(flowsContainer, flowFactory) + + public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? options = null, Func? flowFactory = null) { - var callChain = CreateMiddlewareCallChain( - runFlow: async (flow, param) => - { - await flow.Run(param); - return Unit.Instance; - }); - + var serviceProvider = flowsContainer.ServiceProvider; + var workflowSetter = CreateWorkflowSetter(); + flowsContainer.EnsureNoExistingRegistration(flowName, typeof(TFlow)); _registration = flowsContainer.FunctionRegistry.RegisterAction( flowName, - inner: (param, workflow) => callChain(param, workflow), + inner: async (p, w) => + { + try + { + await using var scope = serviceProvider.CreateAsyncScope(); + + var flow = flowFactory == null + ? scope.ServiceProvider.GetRequiredService() + : flowFactory(); + + workflowSetter(flow, w); + + await flow.Run(p); + return new Result(Unit.Instance); + } + catch (SuspendInvocationException) + { + return new Result(Suspend.Invocation); + } + catch (FatalWorkflowException exception) + { + exception.FlowId = w.FlowId; + return new Result(exception); + } + catch (Exception exception) + { + return new Result(FatalWorkflowException.CreateNonGeneric(w.FlowId, exception)); + } + }, settings: (options ?? FlowOptions.Default).MapToLocalSettings() ); } @@ -243,7 +259,7 @@ public MessageWriter MessageWriter(FlowInstance instanceId) /// Flow's parameter /// Optional initial state with pre-filled messages and effects /// A task which will complete when the flow has completed its execution - public Task Run(FlowInstance instanceId, TParam param, InitialState? initialState = null) + public Task Run(FlowInstance instanceId, TParam param, InitialState? initialState = null) => _registration.Invoke(instanceId, param, initialState); /// @@ -256,10 +272,10 @@ public Task Run(FlowInstance instanceId, TParam param, InitialState? initialStat /// A task which will complete when the flow has been persisted public Task Schedule(FlowInstance instanceId, TParam param, InitialState? initialState = null) => _registration.Schedule(instanceId, param, initialState: initialState); - + /// /// Schedule the flow for future execution. - /// Flow can be executed at any replica + /// Flow can be executed at any replica /// /// Instance id for the flow /// Flow's parameter @@ -273,7 +289,7 @@ DateTime delayUntil /// /// Schedule the flow for future execution. - /// Flow can be executed at any replica + /// Flow can be executed at any replica /// /// Instance id for the flow /// Flow's parameter @@ -337,17 +353,44 @@ public class Flows : BaseFlows where TParam : notnull { private readonly FuncRegistration _registration; - - public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? options = null, Func? flowFactory = null) : base(flowsContainer, flowFactory) + + public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? options = null, Func? flowFactory = null) { - var callChain = CreateMiddlewareCallChain( - runFlow: (flow, param) => flow.Run(param) - ); - + var serviceProvider = flowsContainer.ServiceProvider; + var workflowSetter = CreateWorkflowSetter(); + flowsContainer.EnsureNoExistingRegistration(flowName, typeof(TFlow)); _registration = flowsContainer.FunctionRegistry.RegisterFunc( flowName, - inner: (param, workflow) => callChain(param, workflow), + inner: async (p, w) => + { + try + { + await using var scope = serviceProvider.CreateAsyncScope(); + + var flow = flowFactory == null + ? scope.ServiceProvider.GetRequiredService() + : flowFactory(); + + workflowSetter(flow, w); + + var result = await flow.Run(p); + return new Result(result); + } + catch (SuspendInvocationException) + { + return new Result(Suspend.Invocation); + } + catch (FatalWorkflowException exception) + { + exception.FlowId = w.FlowId; + return new Result(exception); + } + catch (Exception exception) + { + return new Result(FatalWorkflowException.CreateNonGeneric(w.FlowId, exception)); + } + }, (options ?? FlowOptions.Default).MapToLocalSettings() ); } @@ -357,7 +400,7 @@ public Flows(string flowName, FlowsContainer flowsContainer, FlowOptions? option /// /// Instance id for the flow /// The flow's control panel or null if the flow does not exist - public Task?> ControlPanel(string instanceId) + public Task?> ControlPanel(string instanceId) => _registration.ControlPanel(instanceId); public MessageWriter MessageWriter(FlowInstance instanceId) @@ -457,4 +500,4 @@ public Task SendMessages(IReadOnlyList messages) /// Instance ids and parameters for the flows /// A task which will complete when the flows have been persisted public Task> BulkSchedule(IEnumerable> bulkWork) => _registration.BulkSchedule(bulkWork); -} \ No newline at end of file +} diff --git a/Cleipnir.Flows/FlowsContainer.cs b/Cleipnir.Flows/FlowsContainer.cs index 118ba26..307a03a 100644 --- a/Cleipnir.Flows/FlowsContainer.cs +++ b/Cleipnir.Flows/FlowsContainer.cs @@ -1,9 +1,7 @@ -using System; +using System; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; -using Cleipnir.Flows.CrossCutting; using Cleipnir.ResilientFunctions; using Cleipnir.ResilientFunctions.Storage; using Microsoft.Extensions.DependencyInjection; @@ -15,43 +13,34 @@ public class FlowsContainer : IDisposable { internal readonly IServiceProvider ServiceProvider; internal readonly FunctionsRegistry FunctionRegistry; - internal readonly List Middlewares; private readonly Dictionary _registeredFlows = new(); private readonly Lock _lock = new(); public FunctionsRegistry Functions => FunctionRegistry; - + public FlowsContainer(IFunctionStore flowStore, IServiceProvider serviceProvider, Options options) { ServiceProvider = serviceProvider; - + if (options.UnhandledExceptionHandler == null && serviceProvider.GetService() != null) { var logger = serviceProvider.GetRequiredService(); options = new Options( unhandledExceptionHandler: ex => logger.LogError(ex, "Unhandled exception in Cleipnir Flows"), - options.RetentionPeriod, + options.RetentionPeriod, options.RetentionCleanUpFrequency, - options.LeaseLength, + options.LeaseLength, options.EnableWatchdogs, options.WatchdogCheckFrequency, options.MessagesPullFrequency, options.MessagesDefaultMaxWaitForCompletion, - options.DelayStartup, - options.MaxParallelRetryInvocations, + options.DelayStartup, + options.MaxParallelRetryInvocations, options.Serializer ); } - + FunctionRegistry = new FunctionsRegistry(flowStore, options.MapToSettings()); - Middlewares = options.Middlewares - .Select(m => m switch - { - MiddlewareInstance middlewareInstance => middlewareInstance.Middleware, - MiddlewareType middlewareType => (IMiddleware) serviceProvider.GetRequiredService(middlewareType.Type), - _ => throw new ArgumentOutOfRangeException(nameof(m)) - }) - .ToList(); } internal void EnsureNoExistingRegistration(string flowName, Type flowType) @@ -62,7 +51,7 @@ internal void EnsureNoExistingRegistration(string flowName, Type flowType) else _registeredFlows[flowName] = flowType; } - + public void Dispose() => FunctionRegistry.Dispose(); public Task ShutdownGracefully(TimeSpan? maxWait = null) => FunctionRegistry.ShutdownGracefully(maxWait); @@ -71,23 +60,23 @@ public Flows RegisterAnonymousFlow(Func? flowFactory = null flowName ??= typeof(TFlow).Name; return new Flows(flowName, flowsContainer: this, options ?? new FlowOptions(), flowFactory); } - + public Flows RegisterAnonymousFlow(Func? flowFactory = null, string? flowName = null, FlowOptions? options = null) where TFlow : Flow where TParam : notnull { flowName ??= typeof(TFlow).Name; return new Flows(flowName, flowsContainer: this, options ?? new FlowOptions(), flowFactory); } - + public Flows RegisterAnonymousFlow(Func? flowFactory = null, string? flowName = null, FlowOptions? options = null) where TFlow : Flow where TResult : notnull where TParam : notnull { flowName ??= typeof(TFlow).Name; return new Flows(flowName, flowsContainer: this, options ?? new FlowOptions(), flowFactory); } - + public static FlowsContainer Create( IServiceProvider? serviceProvider = null, - IFunctionStore? functionStore = null, - Options? options = null) + IFunctionStore? functionStore = null, + Options? options = null) => new( functionStore ?? new InMemoryFunctionStore(), serviceProvider ?? new ServiceCollection().BuildServiceProvider(), diff --git a/Cleipnir.Flows/Options.cs b/Cleipnir.Flows/Options.cs index 49781e1..48ccff8 100644 --- a/Cleipnir.Flows/Options.cs +++ b/Cleipnir.Flows/Options.cs @@ -1,7 +1,4 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using Cleipnir.Flows.CrossCutting; +using System; using Cleipnir.ResilientFunctions.CoreRuntime; using Cleipnir.ResilientFunctions.CoreRuntime.Serialization; using Cleipnir.ResilientFunctions.Domain; @@ -12,7 +9,7 @@ namespace Cleipnir.Flows; public class Options { public static Options Default { get; } = new(); - + internal Action? UnhandledExceptionHandler { get; } internal TimeSpan? RetentionPeriod { get; } internal TimeSpan? RetentionCleanUpFrequency { get; } @@ -24,7 +21,6 @@ public class Options internal TimeSpan? MessagesPullFrequency { get; } internal TimeSpan? MessagesDefaultMaxWaitForCompletion { get; } internal ISerializer? Serializer { get; } - internal List Middlewares { get; } = new(); internal UtcNow? UtcNow { get; } /// @@ -43,16 +39,16 @@ public class Options /// Specify custom serializer. Default built-in json-serializer. /// Provide custom delegate for providing current utc datetime. Default: () => DateTime.UtcNow public Options( - Action? unhandledExceptionHandler = null, + Action? unhandledExceptionHandler = null, TimeSpan? retentionPeriod = null, TimeSpan? retentionCleanUpFrequency = null, - TimeSpan? leaseLength = null, + TimeSpan? leaseLength = null, bool? enableWatchdogs = null, TimeSpan? watchdogCheckFrequency = null, TimeSpan? messagesPullFrequency = null, TimeSpan? messagesDefaultMaxWaitForCompletion = null, - TimeSpan? delayStartup = null, - int? maxParallelRetryInvocations = null, + TimeSpan? delayStartup = null, + int? maxParallelRetryInvocations = null, ISerializer? serializer = null, UtcNow? utcNow = null ) @@ -71,21 +67,9 @@ public Options( UtcNow = utcNow; } - public Options UseMiddleware() where TMiddleware : IMiddleware - { - Middlewares.Add(new MiddlewareType(typeof(TMiddleware))); - return this; - } - - public Options UseMiddleware(IMiddleware middleware) - { - Middlewares.Add(new MiddlewareInstance(middleware)); - return this; - } - public Options Merge(Options options) { - var merged = new Options( + return new Options( UnhandledExceptionHandler ?? options.UnhandledExceptionHandler, RetentionPeriod ?? options.RetentionPeriod, RetentionCleanUpFrequency ?? options.RetentionCleanUpFrequency, @@ -98,15 +82,6 @@ public Options Merge(Options options) MaxParallelRetryInvocations ?? options.MaxParallelRetryInvocations, Serializer ?? options.Serializer ); - - if (Middlewares.Any()) - foreach (var middleware in Middlewares) - merged.Middlewares.Add(middleware); - else - foreach (var middleware in options.Middlewares) - merged.Middlewares.Add(middleware); - - return merged; } internal Settings MapToSettings() @@ -124,4 +99,4 @@ internal Settings MapToSettings() Serializer, UtcNow ); -} \ No newline at end of file +} From 73575637f1d3b3b970d8d42be77a6929b9a9230b Mon Sep 17 00:00:00 2001 From: stidsborg Date: Tue, 27 Jan 2026 10:01:00 +0100 Subject: [PATCH 2/2] ... --- .../Training.subscription | 1 - 1 file changed, 1 deletion(-) delete mode 100644 .learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription diff --git a/.learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription b/.learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription deleted file mode 100644 index 7257c25..0000000 --- a/.learningtransport/.events/Cleipnir.Flows.NServiceBus.Tests.MyMessage/Training.subscription +++ /dev/null @@ -1 +0,0 @@ -Training \ No newline at end of file