diff --git a/src/IntegrationTest.Billing/BillingFunctions.cs b/src/IntegrationTest.Billing/BillingFunctions.cs index bbfe364..8d8fb0c 100644 --- a/src/IntegrationTest.Billing/BillingFunctions.cs +++ b/src/IntegrationTest.Billing/BillingFunctions.cs @@ -12,7 +12,7 @@ public partial class BillingFunctions [Function("BillingApi")] [NServiceBusFunction] public partial Task BillingApi( - [ServiceBusTrigger("billing-api", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] + [ServiceBusTrigger("billing-api", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = false)] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext functionContext, @@ -27,7 +27,7 @@ public static void ConfigureBillingApi(EndpointConfiguration configuration) [Function("BillingBackend")] [NServiceBusFunction] public partial Task BillingBackend( - [ServiceBusTrigger("billing-backend", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] + [ServiceBusTrigger("billing-backend", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = false)] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext functionContext, diff --git a/src/IntegrationTest.Sales/SalesEndpoint.cs b/src/IntegrationTest.Sales/SalesEndpoint.cs index dc9dbf2..2bb69db 100644 --- a/src/IntegrationTest.Sales/SalesEndpoint.cs +++ b/src/IntegrationTest.Sales/SalesEndpoint.cs @@ -4,6 +4,7 @@ namespace IntegrationTest.Sales; using IntegrationTest.Shared; using Microsoft.Azure.Functions.Worker; using Microsoft.Extensions.DependencyInjection; +using NServiceBus.AzureFunctions.AzureServiceBus; // Cleanest pattern for single-function endpoints [NServiceBusFunction] @@ -11,7 +12,7 @@ public partial class SalesEndpoint { [Function("Sales")] public partial Task Sales( - [ServiceBusTrigger("sales", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)] + [ServiceBusTrigger("sales", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = false)] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext functionContext, @@ -23,5 +24,17 @@ public static void ConfigureSales(EndpointConfiguration configuration) configuration.RegisterComponents(services => services.AddSingleton(new MyComponent("Sales"))); configuration.AddHandler(); + configuration.AuditProcessedMessagesTo("audit"); + + // Use the dead letter queue for failures + configuration.Recoverability().CustomPolicy((_, context) => + { + if (context.ImmediateProcessingFailures < 3) + { + return RecoverabilityAction.ImmediateRetry(); + } + + return RecoverabilityAction.DeadLetter(context.Exception); + }); } } \ No newline at end of file diff --git a/src/IntegrationTest.Shared/Infrastructure/TestStorage.cs b/src/IntegrationTest.Shared/Infrastructure/TestStorage.cs index 8844c3a..e2329c8 100644 --- a/src/IntegrationTest.Shared/Infrastructure/TestStorage.cs +++ b/src/IntegrationTest.Shared/Infrastructure/TestStorage.cs @@ -25,9 +25,9 @@ public Payload CreatePayload(string testName) var sortedMessages = list .OrderBy(m => m.Order) - .ThenBy(m => m.MessageType) - .ThenBy(m => m.SendingEndpoint) - .ThenBy(m => m.ReceivingEndpoint) + .ThenBy(m => m.MessageType) + .ThenBy(m => m.SendingEndpoint) + .ThenBy(m => m.ReceivingEndpoint) .ToArray(); return new Payload(sortedMessages); @@ -39,8 +39,9 @@ public class TestStorage(string endpointName, GlobalTestStorage globalStorage) public void LogMessage(string testName, T message, IMessageHandlerContext context) where T : class { - var sendingEndpoint = context.MessageHeaders[Headers.OriginatingEndpoint] ?? ""; + var sendingEndpoint = context.MessageHeaders.GetValueOrDefault(Headers.OriginatingEndpoint, ""); var storageOrder = context.Extensions.Get("TestStorageOrder"); + var rec = new MessageReceived(message.GetType().FullName!, storageOrder, sendingEndpoint, endpointName); globalStorage.Add(testName, rec); } diff --git a/src/IntegrationTest.Shipping/ShippingEndpoint.cs b/src/IntegrationTest.Shipping/ShippingEndpoint.cs index 1fecd10..90661c5 100644 --- a/src/IntegrationTest.Shipping/ShippingEndpoint.cs +++ b/src/IntegrationTest.Shipping/ShippingEndpoint.cs @@ -9,7 +9,7 @@ public partial class ShippingEndpoint { [Function(nameof(Shipping))] public partial Task Shipping( - [ServiceBusTrigger("shipping", AutoCompleteMessages = true)] + [ServiceBusTrigger("shipping", AutoCompleteMessages = false)] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext functionContext, diff --git a/src/IntegrationTest/Program.cs b/src/IntegrationTest/Program.cs index 6f1cab2..623e8de 100644 --- a/src/IntegrationTest/Program.cs +++ b/src/IntegrationTest/Program.cs @@ -1,4 +1,3 @@ -using System.Text.Json; using IntegrationTest; using IntegrationTest.Shared; using IntegrationTest.Shared.Infrastructure; @@ -10,18 +9,14 @@ var builder = FunctionsApplication.CreateBuilder(args); builder.UseWhen(_ => true); -builder.Logging.ClearProviders(); -builder.Logging.AddJsonConsole(o => -{ - o.IncludeScopes = true; - o.JsonWriterOptions = new JsonWriterOptions { Indented = true }; -}); -builder.Logging.SetMinimumLevel(LogLevel.Information); +builder.Logging.AddSimpleConsole(options => options.IncludeScopes = true); +builder.Logging.SetMinimumLevel(LogLevel.Warning); builder.Services.AddSingleton(); builder.Services.AddSingleton(new MyComponent("global")); builder.AddNServiceBusFunctions(); + builder.AddSendOnlyNServiceBusEndpoint("client", configuration => { configuration.RegisterComponents(services => services.AddSingleton(new MyComponent("client"))); diff --git a/src/NServiceBus.AzureFunctions.Analyzer/DiagnosticIds.cs b/src/NServiceBus.AzureFunctions.Analyzer/DiagnosticIds.cs index 8f53d6b..814e42e 100644 --- a/src/NServiceBus.AzureFunctions.Analyzer/DiagnosticIds.cs +++ b/src/NServiceBus.AzureFunctions.Analyzer/DiagnosticIds.cs @@ -2,15 +2,16 @@ namespace NServiceBus.AzureFunctions.Analyzer; using Microsoft.CodeAnalysis; -static class DiagnosticIds +public static class DiagnosticIds { public const string ClassMustBePartial = "NSBFUNC001"; public const string ShouldNotImplementIHandleMessages = "NSBFUNC002"; public const string MethodMustBePartial = "NSBFUNC003"; - public const string MultipleConfigureMethods = "NSBFUNC005"; public const string MissingAddNServiceBusFunctionsCall = "NSBFUNC004"; + public const string MultipleConfigureMethods = "NSBFUNC005"; + public const string AutoCompleteEnabled = "NSBFUNC006"; - public static readonly DiagnosticDescriptor ClassMustBePartialDescriptor = new( + internal static readonly DiagnosticDescriptor ClassMustBePartialDescriptor = new( id: ClassMustBePartial, title: "Class containing [NServiceBusFunction] must be partial", messageFormat: "Class '{0}' must be declared as partial to use [NServiceBusFunction]", @@ -18,7 +19,7 @@ static class DiagnosticIds defaultSeverity: DiagnosticSeverity.Error, isEnabledByDefault: true); - public static readonly DiagnosticDescriptor ShouldNotImplementIHandleMessagesDescriptor = new( + internal static readonly DiagnosticDescriptor ShouldNotImplementIHandleMessagesDescriptor = new( id: ShouldNotImplementIHandleMessages, title: "Function class should not implement IHandleMessages", messageFormat: "Class '{0}' should not implement IHandleMessages; message handlers should be registered separately via IEndpointConfiguration", @@ -26,7 +27,7 @@ static class DiagnosticIds defaultSeverity: DiagnosticSeverity.Warning, isEnabledByDefault: true); - public static readonly DiagnosticDescriptor MethodMustBePartialDescriptor = new( + internal static readonly DiagnosticDescriptor MethodMustBePartialDescriptor = new( id: MethodMustBePartial, title: "Method with [NServiceBusFunction] must be partial", messageFormat: "Method '{0}' must be declared as partial to use [NServiceBusFunction]", @@ -34,7 +35,7 @@ static class DiagnosticIds defaultSeverity: DiagnosticSeverity.Error, isEnabledByDefault: true); - public static readonly DiagnosticDescriptor MultipleConfigureMethodsDescriptor = new( + internal static readonly DiagnosticDescriptor MultipleConfigureMethodsDescriptor = new( id: MultipleConfigureMethods, title: "Multiple configuration methods found", messageFormat: "Multiple '{0}' configuration methods found on class '{1}'", @@ -42,7 +43,15 @@ static class DiagnosticIds defaultSeverity: DiagnosticSeverity.Error, isEnabledByDefault: true); - public static readonly DiagnosticDescriptor MissingAddNServiceBusFunctionsCallDescriptor = new( + internal static readonly DiagnosticDescriptor AutoCompleteMustBeExplicitlyDisabled = new( + id: AutoCompleteEnabled, + title: "Message auto completion must be explicitly disabled", + messageFormat: "The auto complete property on the service bus trigger for method '{0}' must be explicitly set to false", + category: "NServiceBus.AzureFunctions", + defaultSeverity: DiagnosticSeverity.Error, + isEnabledByDefault: true); + + internal static readonly DiagnosticDescriptor MissingAddNServiceBusFunctionsCallDescriptor = new( id: MissingAddNServiceBusFunctionsCall, title: "AddNServiceBusFunctions() is not called", messageFormat: "This project has NServiceBus endpoint registrations but does not call builder.AddNServiceBusFunctions(). Endpoints will not be started.", diff --git a/src/NServiceBus.AzureFunctions.Analyzer/FunctionEndpointGenerator.Parser.cs b/src/NServiceBus.AzureFunctions.Analyzer/FunctionEndpointGenerator.Parser.cs index a99dbb3..a1caf6f 100644 --- a/src/NServiceBus.AzureFunctions.Analyzer/FunctionEndpointGenerator.Parser.cs +++ b/src/NServiceBus.AzureFunctions.Analyzer/FunctionEndpointGenerator.Parser.cs @@ -125,12 +125,24 @@ static FunctionSpecs ExtractFromMethod(IMethodSymbol methodSymbol, FunctionEndpo queueName = pAttr.ConstructorArguments[0].Value as string; } + var autoCompleteEnabled = true; foreach (var namedArg in pAttr.NamedArguments) { if (namedArg.Key == "Connection") { connectionName = namedArg.Value.Value as string; } + + if (namedArg.Key == "AutoCompleteMessages") + { + var autoComplete = namedArg.Value.Value as bool?; + autoCompleteEnabled = autoComplete!.Value; + } + } + + if (autoCompleteEnabled) + { + diagnostics.Add(CreateDiagnostic(DiagnosticIds.AutoCompleteMustBeExplicitlyDisabled, method, method.Name)); } messageParamName = param.Name; diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusMessageProcessor.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusMessageProcessor.cs index 468077a..b83d154 100644 --- a/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusMessageProcessor.cs +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusMessageProcessor.cs @@ -7,7 +7,6 @@ namespace NServiceBus.AzureFunctions.AzureServiceBus; public class AzureServiceBusMessageProcessor(AzureServiceBusServerlessTransport transport, string endpointName) { - //NOTE: Message actions and function context is here to be ready for future features like native dlq support without having to change the end user api. public async Task Process(ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext functionContext, CancellationToken cancellationToken = default) { if (transport.MessageProcessor is null) @@ -16,6 +15,6 @@ public async Task Process(ServiceBusReceivedMessage message, ServiceBusMessageAc throw new InvalidOperationException($"Endpoint {endpointName} cannot process messages because it is configured in send-only mode."); } - await transport.MessageProcessor.Process(message, cancellationToken).ConfigureAwait(false); + await transport.MessageProcessor.Process(message, messageActions, cancellationToken).ConfigureAwait(false); } } \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusServerlessTransport.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusServerlessTransport.cs index 87a295e..683b80b 100644 --- a/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusServerlessTransport.cs +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/AzureServiceBusServerlessTransport.cs @@ -6,10 +6,12 @@ namespace NServiceBus; using System.Threading; using System.Threading.Tasks; using Azure.Core; +using AzureFunctions.AzureServiceBus; +using BitFaster.Caching.Lru; using Microsoft.Extensions.Azure; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; -using NServiceBus.AzureFunctions.AzureServiceBus.Serverless.TransportWrapper; +using Microsoft.Extensions.Logging; using NServiceBus.Transport; public class AzureServiceBusServerlessTransport(TopicTopology topology) : TransportDefinition(TransportTransactionMode.ReceiveOnly, @@ -53,7 +55,7 @@ public override async Task Initialize( .ConfigureAwait(false); var serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure, - static receiver => new PipelineInvokingMessageProcessor(receiver)); + receiver => new PipelineInvokingMessageProcessor(receiver, new FastConcurrentLru(1_000), hostSettings.ServiceProvider.GetRequiredService>())); var isSendOnly = hostSettings.CoreSettings.GetOrDefault(SendOnlyConfigKey); diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/DeadLetterMessage.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/DeadLetterMessage.cs new file mode 100644 index 0000000..6594ed6 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/DeadLetterMessage.cs @@ -0,0 +1,23 @@ +namespace NServiceBus.AzureFunctions.AzureServiceBus; + +using Pipeline; +using Transport; + +public sealed class DeadLetterMessage : RecoverabilityAction +{ + internal DeadLetterMessage(string deadLetterReason, string deadLetterErrorDescription, Dictionary? propertiesToModify = null) => + deadLetterRequest = new DeadLetterRequest(deadLetterReason, deadLetterErrorDescription, propertiesToModify); + + internal DeadLetterMessage(Exception exception) => + deadLetterRequest = new DeadLetterRequest(exception); + + public override IReadOnlyCollection GetRoutingContexts(IRecoverabilityActionContext context) + { + context.Extensions.Get().Set(deadLetterRequest); + return []; + } + + public override ErrorHandleResult ErrorHandleResult => ErrorHandleResult.Handled; + + readonly DeadLetterRequest deadLetterRequest; +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/DeadLetterRequest.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/DeadLetterRequest.cs new file mode 100644 index 0000000..b6644eb --- /dev/null +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/DeadLetterRequest.cs @@ -0,0 +1,17 @@ +namespace NServiceBus.AzureFunctions.AzureServiceBus; + +class DeadLetterRequest(string deadLetterReason, string deadLetterErrorDescription, Dictionary? propertiesToModify = null) +{ + public string DeadLetterReason { get; } = Truncate(deadLetterReason, 1024); + public string DeadLetterErrorDescription { get; } = Truncate(deadLetterErrorDescription, 1024); + public Dictionary PropertiesToModify { get; } = propertiesToModify ?? []; + + public DeadLetterRequest(Exception exception, Dictionary? propertiesToModify = null) : this( + $"{exception.GetType().FullName} - {exception.Message}", + exception.StackTrace ?? "No stack trace available", + propertiesToModify) + { + } + + static string Truncate(string value, int maxLength) => string.IsNullOrEmpty(value) ? value : value.Length <= maxLength ? value : value[..maxLength]; +} \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/GuidHelper.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/GuidHelper.cs new file mode 100644 index 0000000..e428e1f --- /dev/null +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/GuidHelper.cs @@ -0,0 +1,29 @@ +namespace NServiceBus.AzureFunctions.AzureServiceBus; + +using System; +using System.Buffers.Binary; + +/// +/// Provides helper methods for working with . +/// +/// +/// Inspired by NGuid by Bradley Grainger, +/// used under the MIT License. +/// +static class GuidHelper +{ + /// + /// Creates a Version 8 UUID with a v7-style layout: as Unix + /// milliseconds in bytes 0–7 (time-sortable, stable across redeliveries) and + /// in bytes 8–15, both big-endian. + /// + public static Guid CreateVersion8(DateTimeOffset timestamp, long sequenceNumber) + { + Span guidBytes = stackalloc byte[16]; + BinaryPrimitives.WriteInt64BigEndian(guidBytes, timestamp.ToUnixTimeMilliseconds()); + BinaryPrimitives.WriteInt64BigEndian(guidBytes[8..], sequenceNumber); + guidBytes[6] = (byte)(0x80 | (guidBytes[6] & 0xF)); + guidBytes[8] = (byte)(0x80 | (guidBytes[8] & 0x3F)); + return new Guid(guidBytes, bigEndian: true); + } +} diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/PipelineInvokingMessageProcessor.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/PipelineInvokingMessageProcessor.cs index bfe50ec..cb1ce08 100644 --- a/src/NServiceBus.AzureFunctions.AzureServiceBus/PipelineInvokingMessageProcessor.cs +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/PipelineInvokingMessageProcessor.cs @@ -1,91 +1,141 @@ -namespace NServiceBus.AzureFunctions.AzureServiceBus.Serverless.TransportWrapper; +namespace NServiceBus.AzureFunctions.AzureServiceBus; using System; -using System.Runtime.Serialization; using System.Threading; using System.Threading.Tasks; -using System.Xml; using Azure.Messaging.ServiceBus; -using NServiceBus.Extensibility; -using NServiceBus.Transport; +using BitFaster.Caching; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging; +using Extensibility; +using Transport; using NServiceBus.Transport.AzureServiceBus; +using static PipelineInvokingMessageProcessorLog; -class PipelineInvokingMessageProcessor(IMessageReceiver baseTransportReceiver) : IMessageReceiver +class PipelineInvokingMessageProcessor( + IMessageReceiver baseTransportReceiver, + ICache messagesToBeCompleted, + ILogger logger, + Func>? headerExtractor = null) + : IMessageReceiver { public Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, CancellationToken cancellationToken = default) { this.onMessage = onMessage; this.onError = onError; + return baseTransportReceiver.Initialize(limitations, (_, __) => Task.CompletedTask, (_, __) => Task.FromResult(ErrorHandleResult.Handled), cancellationToken) ?? Task.CompletedTask; } - public async Task Process(ServiceBusReceivedMessage message, CancellationToken cancellationToken = default) + public async Task Process(ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, CancellationToken cancellationToken = default) { - var messageId = message.MessageId ?? Guid.NewGuid().ToString("N"); - var body = GetBody(message); + string nativeMessageId = message.MessageId; + Dictionary headers; + BinaryData body; var contextBag = new ContextBag(); - // Azure Service Bus transport also makes the incoming message available. We can do the same narrow the gap - contextBag.Set(message); + + if (string.IsNullOrWhiteSpace(nativeMessageId)) + { + nativeMessageId = message.ApplicationProperties.TryGetValue(Headers.MessageId, out var nsbMessageId) ? nsbMessageId.ToString()! : GuidHelper.CreateVersion8(message.EnqueuedTime, message.SequenceNumber).ToString(); + } + + if (messagesToBeCompleted.TryRemove(nativeMessageId)) + { + MessageAlreadyProcessed(logger, nativeMessageId); + + await SafeCompleteMessage(messageActions, nativeMessageId, message, CancellationToken.None).ConfigureAwait(false); + return; + } + + try + { + headers = extractHeaders(message); + body = message.Body ?? BinaryData.Empty; + + contextBag.Set(message); + } + catch (Exception ex) + { + MessageDeadLetteredDueToMetadataExtractionFailure(logger, ex); + + var deadLetterRequest = new DeadLetterRequest(ex); + + await SafeDeadLetterMessage(messageActions, message, deadLetterRequest, CancellationToken.None).ConfigureAwait(false); + return; + } try { using var azureServiceBusTransportTransaction = new AzureServiceBusTransportTransaction(); - var messageContext = CreateMessageContext(message, messageId, body, azureServiceBusTransportTransaction.TransportTransaction, contextBag); - await onMessage!(messageContext, cancellationToken).ConfigureAwait(false); + // we need to clone the headers since the core pipeline might mutate them + var messageContext = new MessageContext(nativeMessageId, new Dictionary(headers), body, azureServiceBusTransportTransaction.TransportTransaction, ReceiveAddress, contextBag); + + await onMessage(messageContext, cancellationToken).ConfigureAwait(false); azureServiceBusTransportTransaction.Commit(); + + await SafeCompleteMessage(messageActions, nativeMessageId, message, CancellationToken.None).ConfigureAwait(false); } - catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested) { - throw; + MessageProcessingCanceled(logger, ex); + await SafeAbandonMessage(messageActions, message, CancellationToken.None).ConfigureAwait(false); } catch (Exception exception) { using var azureServiceBusTransportTransaction = new AzureServiceBusTransportTransaction(); - var errorContext = CreateErrorContext(message, exception, messageId, body, azureServiceBusTransportTransaction.TransportTransaction, contextBag); + ErrorHandleResult errorHandleResult; + try + { + // No need to clone the message header here since we do not make use of them after on error has executed + var errorContext = new ErrorContext(exception, headers, nativeMessageId, body, azureServiceBusTransportTransaction.TransportTransaction, message.DeliveryCount, ReceiveAddress, contextBag); + errorHandleResult = await onError.Invoke(errorContext, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException ex) when (cancellationToken.IsCancellationRequested) + { + OnErrorCanceled(logger, ex); + await SafeAbandonMessage(messageActions, message, CancellationToken.None).ConfigureAwait(false); + return; + } + catch (ServiceBusException ex) when (ex.IsTransient || ex.Reason == ServiceBusFailureReason.MessageLockLost) + { + OnErrorFailedDueToTransientException(logger, ex); + await SafeAbandonMessage(messageActions, message, CancellationToken.None).ConfigureAwait(false); + return; + } + catch (Exception ex) + { + MessageDeadLetteredDueToExceptionInOnError(logger, exception); + + await SafeDeadLetterMessage(messageActions, message, new DeadLetterRequest(ex), CancellationToken.None).ConfigureAwait(false); + return; + } + + if (azureServiceBusTransportTransaction.TransportTransaction.TryGet(out var applicationDeadLetterRequest)) + { + UserRequestedDeadLetter(logger, nativeMessageId, applicationDeadLetterRequest.DeadLetterReason, applicationDeadLetterRequest.DeadLetterErrorDescription); + + await SafeDeadLetterMessage(messageActions, message, applicationDeadLetterRequest, CancellationToken.None).ConfigureAwait(false); - var errorHandleResult = await onError!.Invoke(errorContext, cancellationToken).ConfigureAwait(false); + return; + } if (errorHandleResult == ErrorHandleResult.Handled) { azureServiceBusTransportTransaction.Commit(); + await SafeCompleteMessage(messageActions, nativeMessageId, message, CancellationToken.None).ConfigureAwait(false); return; } - throw; + await SafeAbandonMessage(messageActions, message, CancellationToken.None).ConfigureAwait(false); } } - static BinaryData GetBody(ServiceBusReceivedMessage message) - { - var body = message.Body ?? BinaryData.FromBytes(ReadOnlyMemory.Empty); - var memory = body.ToMemory(); - - if (memory.IsEmpty || - !message.ApplicationProperties.TryGetValue(TransportEncodingHeader, out var value) || - !value.Equals("wcf/byte-array")) - { - return body; - } - - using var reader = XmlDictionaryReader.CreateBinaryReader(body.ToStream(), XmlDictionaryReaderQuotas.Max); - var bodyBytes = (byte[])Deserializer.ReadObject(reader)!; - return new BinaryData(bodyBytes); - } - - ErrorContext CreateErrorContext(ServiceBusReceivedMessage message, Exception exception, string messageId, - BinaryData body, TransportTransaction transportTransaction, ContextBag contextBag) => - new(exception, GetNServiceBusHeaders(message), messageId, body, transportTransaction, message.DeliveryCount, ReceiveAddress, contextBag); - - MessageContext CreateMessageContext(ServiceBusReceivedMessage message, string messageId, BinaryData body, - TransportTransaction transportTransaction, ContextBag contextBag) => - new(messageId, GetNServiceBusHeaders(message), body, transportTransaction, ReceiveAddress, contextBag); - static Dictionary GetNServiceBusHeaders(ServiceBusReceivedMessage message) { var headers = new Dictionary(message.ApplicationProperties.Count); @@ -95,8 +145,6 @@ MessageContext CreateMessageContext(ServiceBusReceivedMessage message, string me headers[kvp.Key] = kvp.Value?.ToString(); } - headers.Remove(TransportEncodingHeader); - if (!string.IsNullOrWhiteSpace(message.ReplyTo)) { headers[Headers.ReplyToAddress] = message.ReplyTo; @@ -107,9 +155,61 @@ MessageContext CreateMessageContext(ServiceBusReceivedMessage message, string me headers[Headers.CorrelationId] = message.CorrelationId; } + if (!string.IsNullOrWhiteSpace(message.ContentType)) + { + headers[Headers.ContentType] = message.ContentType; + } + return headers; } + async Task SafeDeadLetterMessage(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, DeadLetterRequest request, CancellationToken cancellationToken) + { + try + { + await messageActions.DeadLetterMessageAsync(message, + request.PropertiesToModify, + request.DeadLetterReason, + request.DeadLetterErrorDescription, + cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } + catch (Exception ex) + { + DeadLetterMessageFailed(logger, ex); + } + } + + async Task SafeAbandonMessage(ServiceBusMessageActions messageActions, ServiceBusReceivedMessage message, CancellationToken cancellationToken) + { + try + { + await messageActions.AbandonMessageAsync(message, cancellationToken: cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { } + catch (Exception ex) + { + AbandonMessageFailed(logger, ex); + } + } + + async Task SafeCompleteMessage(ServiceBusMessageActions messageActions, string nativeMessageId, ServiceBusReceivedMessage message, CancellationToken cancellationToken) + { + try + { + await messageActions.CompleteMessageAsync(message, cancellationToken).ConfigureAwait(false); + } + catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) + { + messagesToBeCompleted.AddOrUpdate(nativeMessageId, true); + } + catch (Exception ex) + { + CompleteMessageFailed(logger, ex); + messagesToBeCompleted.AddOrUpdate(nativeMessageId, true); + } + } + public Task StartReceive(CancellationToken cancellationToken = default) => Task.CompletedTask; // No-op because the rate at which Azure Functions pushes messages to the pipeline can't be controlled. @@ -121,10 +221,72 @@ MessageContext CreateMessageContext(ServiceBusReceivedMessage message, string me public string Id => baseTransportReceiver.Id; public string ReceiveAddress => baseTransportReceiver.ReceiveAddress; - OnMessage? onMessage; - OnError? onError; + OnMessage onMessage = static (_, _) => Task.CompletedTask; + OnError onError = static (_, _) => Task.FromResult(ErrorHandleResult.Handled); + + // we do this to enable tests to simulate exceptions when extracting headers + readonly Func> extractHeaders = headerExtractor ?? GetNServiceBusHeaders; +} + +static partial class PipelineInvokingMessageProcessorLog +{ + [LoggerMessage( + EventId = 0, + Level = LogLevel.Information, + Message = "Message {MessageId} was already processed and will be completed")] + internal static partial void MessageAlreadyProcessed(ILogger logger, string messageId); + + [LoggerMessage( + EventId = 1, + Level = LogLevel.Error, + Message = "Message dead lettered due to issues with extracting message metadata.")] + internal static partial void MessageDeadLetteredDueToMetadataExtractionFailure(ILogger logger, Exception exception); + + [LoggerMessage( + EventId = 2, + Level = LogLevel.Debug, + Message = "Message processing canceled.")] + internal static partial void MessageProcessingCanceled(ILogger logger, Exception exception); + + [LoggerMessage( + EventId = 3, + Level = LogLevel.Debug, + Message = "OnError canceled.")] + internal static partial void OnErrorCanceled(ILogger logger, Exception exception); + + [LoggerMessage( + EventId = 4, + Level = LogLevel.Warning, + Message = "OnError failed due to transient exception.")] + internal static partial void OnErrorFailedDueToTransientException(ILogger logger, Exception exception); + + [LoggerMessage( + EventId = 5, + Level = LogLevel.Error, + Message = "Message dead lettered due to exception in OnError.")] + internal static partial void MessageDeadLetteredDueToExceptionInOnError(ILogger logger, Exception exception); + + [LoggerMessage( + EventId = 6, + Level = LogLevel.Error, + Message = "User requested {MessageId} to be dead lettered due to {DeadLetterReason}: {DeadLetterErrorDescription}")] + internal static partial void UserRequestedDeadLetter(ILogger logger, string messageId, string deadLetterReason, string deadLetterErrorDescription); + + [LoggerMessage( + EventId = 7, + Level = LogLevel.Warning, + Message = "Dead letter message failed.")] + internal static partial void DeadLetterMessageFailed(ILogger logger, Exception exception); - const string TransportEncodingHeader = "NServiceBus.Transport.Encoding"; + [LoggerMessage( + EventId = 8, + Level = LogLevel.Warning, + Message = "Abandon message failed.")] + internal static partial void AbandonMessageFailed(ILogger logger, Exception exception); - static readonly DataContractSerializer Deserializer = new(typeof(byte[])); + [LoggerMessage( + EventId = 9, + Level = LogLevel.Warning, + Message = "Complete message failed.")] + internal static partial void CompleteMessageFailed(ILogger logger, Exception exception); } \ No newline at end of file diff --git a/src/NServiceBus.AzureFunctions.AzureServiceBus/RecoverabilityActionExtensions.cs b/src/NServiceBus.AzureFunctions.AzureServiceBus/RecoverabilityActionExtensions.cs new file mode 100644 index 0000000..07393b3 --- /dev/null +++ b/src/NServiceBus.AzureFunctions.AzureServiceBus/RecoverabilityActionExtensions.cs @@ -0,0 +1,12 @@ +namespace NServiceBus.AzureFunctions.AzureServiceBus; + +public static class RecoverabilityActionExtensions +{ + extension(RecoverabilityAction _) + { + public static DeadLetterMessage DeadLetter(string deadLetterReason, string deadLetterErrorDescription, Dictionary? propertiesToModify = null) + => new(deadLetterReason, deadLetterErrorDescription, propertiesToModify); + + public static DeadLetterMessage DeadLetter(Exception exception) => new(exception); + } +} \ No newline at end of file diff --git a/src/Tests.Analyzers/ApprovalFiles/FunctionCompositionGeneratorTests.GeneratesProjectComposition.approved.txt b/src/Tests.Analyzers/ApprovalFiles/FunctionCompositionGeneratorTests.GeneratesProjectComposition.approved.txt index 9af5f28..1644343 100644 --- a/src/Tests.Analyzers/ApprovalFiles/FunctionCompositionGeneratorTests.GeneratesProjectComposition.approved.txt +++ b/src/Tests.Analyzers/ApprovalFiles/FunctionCompositionGeneratorTests.GeneratesProjectComposition.approved.txt @@ -15,7 +15,7 @@ public partial class Functions [NServiceBusFunction] [Function("ProcessOrder")] public partial Task Run( - [ServiceBusTrigger("sales-queue", Connection = "AzureServiceBus")] ServiceBusReceivedMessage message, + [ServiceBusTrigger("sales-queue", Connection = "AzureServiceBus", AutoCompleteMessages = false)] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext context, CancellationToken cancellationToken); diff --git a/src/Tests.Analyzers/FunctionEndpointGeneratorTests.cs b/src/Tests.Analyzers/FunctionEndpointGeneratorTests.cs index 6177657..aa9a1b1 100644 --- a/src/Tests.Analyzers/FunctionEndpointGeneratorTests.cs +++ b/src/Tests.Analyzers/FunctionEndpointGeneratorTests.cs @@ -15,10 +15,12 @@ public void GeneratesFunctionEndpoint() => .SuppressCompilationErrors() .Approve(); - [TestCase(FunctionClassMustBePartial, "NSBFUNC001")] - [TestCase(FunctionClassShouldNotImplementIHandleMessages, "NSBFUNC002")] - [TestCase(FunctionMethodMustBePartial, "NSBFUNC003")] - [TestCase(MultipleConfigureMethods, "NSBFUNC005")] + [TestCase(FunctionClassMustBePartial, DiagnosticIds.ClassMustBePartial)] + [TestCase(FunctionClassShouldNotImplementIHandleMessages, DiagnosticIds.ShouldNotImplementIHandleMessages)] + [TestCase(FunctionMethodMustBePartial, DiagnosticIds.MethodMustBePartial)] + [TestCase(MultipleConfigureMethods, DiagnosticIds.MultipleConfigureMethods)] + [TestCase(MissingAutoComplete, DiagnosticIds.AutoCompleteEnabled)] + [TestCase(AutoCompleteEnabled, DiagnosticIds.AutoCompleteEnabled)] public void ReportsGeneratorDiagnostics(string source, string diagnosticId) { var result = SourceGeneratorTest.ForIncrementalGenerator() @@ -129,4 +131,42 @@ public static void ConfigureProcessOrder( } } """; + + const string MissingAutoComplete = """ + namespace Demo; + + public partial class Functions + { + [NServiceBusFunction] + [Function("ProcessOrder")] + public partial Task Run( + [ServiceBusTrigger("sales-queue", Connection = "AzureServiceBus")] ServiceBusReceivedMessage message, + ServiceBusMessageActions messageActions, + FunctionContext context, + CancellationToken cancellationToken); + + public static void ConfigureProcessOrder(EndpointConfiguration endpointConfiguration) + { + } + } + """; + + const string AutoCompleteEnabled = """ + namespace Demo; + + public partial class Functions + { + [NServiceBusFunction] + [Function("ProcessOrder")] + public partial Task Run( + [ServiceBusTrigger("sales-queue", AutoCompleteMessages = true)] ServiceBusReceivedMessage message, + ServiceBusMessageActions messageActions, + FunctionContext context, + CancellationToken cancellationToken); + + public static void ConfigureProcessOrder(EndpointConfiguration endpointConfiguration) + { + } + } + """; } \ No newline at end of file diff --git a/src/Tests.Analyzers/MissingCompositionCallAnalyzerTests.cs b/src/Tests.Analyzers/MissingCompositionCallAnalyzerTests.cs index d67aab7..52150e4 100644 --- a/src/Tests.Analyzers/MissingCompositionCallAnalyzerTests.cs +++ b/src/Tests.Analyzers/MissingCompositionCallAnalyzerTests.cs @@ -16,7 +16,7 @@ public void ReportsDiagnosticWhenCompositionCallIsMissing() .Run(); var diagnostics = result.GetAnalyzerDiagnostics(); - Assert.That(diagnostics, Has.Some.Matches(d => d.Id == "NSBFUNC004")); + Assert.That(diagnostics, Has.Some.Matches(d => d.Id == DiagnosticIds.MissingAddNServiceBusFunctionsCall)); } [Test] @@ -48,7 +48,7 @@ public static void AddNServiceBusFunctions(this Builder builder) .Run(); var diagnostics = result.GetAnalyzerDiagnostics(); - Assert.That(diagnostics, Has.Some.Matches(d => d.Id == "NSBFUNC004")); + Assert.That(diagnostics, Has.Some.Matches(d => d.Id == DiagnosticIds.MissingAddNServiceBusFunctionsCall)); } [Test] @@ -75,7 +75,7 @@ public static void Configure(FunctionsApplicationBuilder builder) .Run(); var diagnostics = result.GetAnalyzerDiagnostics(); - Assert.That(diagnostics, Has.None.Matches(d => d.Id == "NSBFUNC004")); + Assert.That(diagnostics, Has.None.Matches(d => d.Id == DiagnosticIds.MissingAddNServiceBusFunctionsCall)); } static SourceGeneratorTest CreateSourceGeneratorAnalyzerTest() => diff --git a/src/Tests.Analyzers/TestSources.cs b/src/Tests.Analyzers/TestSources.cs index 21fe3e4..4f28f47 100644 --- a/src/Tests.Analyzers/TestSources.cs +++ b/src/Tests.Analyzers/TestSources.cs @@ -10,7 +10,7 @@ public partial class Functions [NServiceBusFunction] [Function("ProcessOrder")] public partial Task Run( - [ServiceBusTrigger("sales-queue", Connection = "AzureServiceBus")] ServiceBusReceivedMessage message, + [ServiceBusTrigger("sales-queue", Connection = "AzureServiceBus", AutoCompleteMessages = false)] ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext context, CancellationToken cancellationToken); diff --git a/src/Tests/.editorconfig b/src/Tests/.editorconfig index cc3614b..c788229 100644 --- a/src/Tests/.editorconfig +++ b/src/Tests/.editorconfig @@ -4,4 +4,7 @@ dotnet_diagnostic.CA2007.severity = none # Add a CancellationToken - not a library -dotnet_diagnostic.PS0018.severity = none \ No newline at end of file +dotnet_diagnostic.PS0018.severity = none + +# Make the CancellationToken required - not a library +dotnet_diagnostic.PS0004.severity = none \ No newline at end of file diff --git a/src/Tests/ApprovalFiles/ApiApprovals.ApprovaAzureServiceBusComponentApi.approved.txt b/src/Tests/ApprovalFiles/ApiApprovals.ApprovaAzureServiceBusComponentApi.approved.txt index b4a6c05..bdddbc5 100644 --- a/src/Tests/ApprovalFiles/ApiApprovals.ApprovaAzureServiceBusComponentApi.approved.txt +++ b/src/Tests/ApprovalFiles/ApiApprovals.ApprovaAzureServiceBusComponentApi.approved.txt @@ -6,6 +6,19 @@ namespace NServiceBus.AzureFunctions.AzureServiceBus public AzureServiceBusMessageProcessor(NServiceBus.AzureServiceBusServerlessTransport transport, string endpointName) { } public System.Threading.Tasks.Task Process(Azure.Messaging.ServiceBus.ServiceBusReceivedMessage message, Microsoft.Azure.Functions.Worker.ServiceBusMessageActions messageActions, Microsoft.Azure.Functions.Worker.FunctionContext functionContext, System.Threading.CancellationToken cancellationToken = default) { } } + public sealed class DeadLetterMessage : NServiceBus.RecoverabilityAction + { + public override NServiceBus.Transport.ErrorHandleResult ErrorHandleResult { get; } + public override System.Collections.Generic.IReadOnlyCollection GetRoutingContexts(NServiceBus.Pipeline.IRecoverabilityActionContext context) { } + } + public static class RecoverabilityActionExtensions + { + extension(NServiceBus.RecoverabilityAction _) + { + public static NServiceBus.AzureFunctions.AzureServiceBus.DeadLetterMessage DeadLetter(string deadLetterReason, string deadLetterErrorDescription, System.Collections.Generic.Dictionary? propertiesToModify = null) { } + public static NServiceBus.AzureFunctions.AzureServiceBus.DeadLetterMessage DeadLetter(System.Exception exception) { } + } + } } namespace NServiceBus { diff --git a/src/Tests/DeadLetterMessageTests.cs b/src/Tests/DeadLetterMessageTests.cs new file mode 100644 index 0000000..4c8d727 --- /dev/null +++ b/src/Tests/DeadLetterMessageTests.cs @@ -0,0 +1,57 @@ +namespace NServiceBus.AzureFunctions.Tests; + +using AzureServiceBus; +using NUnit.Framework; + +public class DeadLetterMessageTests +{ + [Test] + public void Should_full_control_over_dead_letter_parameters() + { + var reason = "reason"; + var description = "description"; + var properties = new Dictionary { { "SomeProperty", "SomeValue" } }; + var request = new DeadLetterRequest(reason, description, properties); + + Assert.AreEqual(reason, request.DeadLetterReason, "DeadLetterReason should be set correctly"); + Assert.AreEqual(description, request.DeadLetterErrorDescription, "DeadLetterErrorDescription should be set correctly"); + Assert.IsNotNull(request.PropertiesToModify, "PropertiesToModify should not be null"); + Assert.IsTrue(request.PropertiesToModify!.ContainsKey("SomeProperty"), "PropertiesToModify should contain 'SomeProperty'"); + Assert.AreEqual("SomeValue", request.PropertiesToModify["SomeProperty"], "PropertiesToModify['SomeProperty'] should be set correctly"); + } + + [Test] + public void Should_convert_exception_to_dead_letter_request() + { + var exception = SimulateException(); + var request = new DeadLetterRequest(exception); + + // Make sure we follow microsoft guidance - https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dead-letter-queues#application-level-dead-lettering + Assert.AreEqual("System.InvalidOperationException - Test exception", request.DeadLetterReason, "DeadLetterReason should reflect exception type and message"); + Assert.AreEqual(request.DeadLetterErrorDescription, exception.StackTrace, "DeadLetterErrorDescription should contain stack trace"); + return; + + Exception SimulateException() + { + try + { + throw new InvalidOperationException("Test exception"); + } + catch (Exception e) + { + return e; + } + } + } + + [Test] + public void Should_truncate_dead_letter_reason_and_description_to_1024_characters() + { + var longReason = new string('A', 2000); + var longDescription = new string('B', 3000); + var request = new DeadLetterRequest(longReason, longDescription); + + Assert.AreEqual(new string('A', 1024), request.DeadLetterReason, "DeadLetterReason should match the first 1024 characters of the input"); + Assert.AreEqual(new string('B', 1024), request.DeadLetterErrorDescription, "DeadLetterErrorDescription should match the first 1024 characters of the input"); + } +} \ No newline at end of file diff --git a/src/Tests/MessageProcessorTests.cs b/src/Tests/MessageProcessorTests.cs new file mode 100644 index 0000000..78a75e6 --- /dev/null +++ b/src/Tests/MessageProcessorTests.cs @@ -0,0 +1,501 @@ +namespace NServiceBus.AzureFunctions.Tests; + +using Azure.Messaging.ServiceBus; +using AzureServiceBus; +using BitFaster.Caching; +using BitFaster.Caching.Lru; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Logging.Testing; +using NUnit.Framework; +using Transport; +using NServiceBus; + +[TestFixture] +public class MessageProcessorTests +{ + [Test] + public async Task Should_expose_native_message_id_headers_and_body_on_message_context() + { + var expectedMessageId = "test-message-id-123"; + var expectedBody = new byte[] { 1, 2, 3, 4 }; + var expectedHeaderKey = "custom-header"; + var expectedHeaderValue = "header-value"; + var expectedReplyTo = "reply-queue"; + var expectedCorrelationId = "correlation-abc"; + var expectedContentType = "some/content-type"; + + var message = ServiceBusModelFactory.ServiceBusReceivedMessage( + messageId: expectedMessageId, + properties: new Dictionary { { expectedHeaderKey, expectedHeaderValue } }, + body: new BinaryData(expectedBody), + replyTo: expectedReplyTo, + correlationId: expectedCorrelationId, + contentType: expectedContentType + ); + + var result = await ProcessMessage( + message: message + ); + + var messageContext = result.MessageContext; + + using (Assert.EnterMultipleScope()) + { + Assert.That(messageContext, Is.Not.Null, "MessageContext should not be null"); + Assert.That(messageContext!.NativeMessageId, Is.EqualTo(expectedMessageId), "MessageContext should expose the native message id"); + Assert.That(messageContext.Headers.ContainsKey(expectedHeaderKey), Is.True, "MessageContext should expose the custom header"); + Assert.That(messageContext.Headers[expectedHeaderKey], Is.EqualTo(expectedHeaderValue), "MessageContext should expose the correct header value"); + Assert.That(messageContext.Body.ToArray(), Is.EqualTo(expectedBody).AsCollection, "MessageContext should expose the correct message body"); + Assert.That(messageContext.Headers.ContainsKey(Headers.CorrelationId), Is.True, "Native CorrelationId should be upconverted to the CorrelationId header"); + Assert.That(messageContext.Headers[Headers.CorrelationId], Is.EqualTo(expectedCorrelationId), "Headers should expose the correct CorrelationId value"); + Assert.That(messageContext.Headers.ContainsKey(Headers.ReplyToAddress), Is.True, "Native ReplyTo should be upconverted to the ReplyToAddress header"); + Assert.That(messageContext.Headers[Headers.ReplyToAddress], Is.EqualTo(expectedReplyTo), "Headers should expose the correct ReplyTo header value"); + Assert.That(messageContext.Headers.ContainsKey(Headers.ContentType), Is.True, "Native ContentType should be upconverted to the ContentType header"); + Assert.That(messageContext.Headers[Headers.ContentType], Is.EqualTo(expectedContentType), "Headers should expose the correct ContentType header value"); + } + } + + [Test] + public async Task Should_derive_message_id_from_enqueued_time_and_sequence_number_if_no_native_id_is_present() + { + var sequenceNumber = 42L; + var enqueuedTime = DateTimeOffset.UtcNow; + var expectedMessageId = GuidHelper.CreateVersion8(enqueuedTime, sequenceNumber).ToString(); + + string? firstMessageId = null; + string? secondMessageId = null; + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(sequenceNumber: sequenceNumber, enqueuedTime: enqueuedTime); + + await ProcessMessage(message: message, onMessage: (context, _) => { firstMessageId = context.NativeMessageId; return Task.CompletedTask; }); + await ProcessMessage(message: message, onMessage: (context, _) => { secondMessageId = context.NativeMessageId; return Task.CompletedTask; }); + + using (Assert.EnterMultipleScope()) + { + Assert.That(firstMessageId, Is.EqualTo(expectedMessageId), "MessageId should be derived from enqueued time and sequence number"); + Assert.That(secondMessageId, Is.EqualTo(firstMessageId), "MessageId should be stable across processing attempts"); + } + } + + [Test] + public async Task Should_use_auto_generated_message_id_if_present() + { + string? messageId = null; + var autoGeneratedMessageId = Guid.NewGuid().ToString(); + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(properties: new Dictionary { { Headers.MessageId, autoGeneratedMessageId } }); + var result = await ProcessMessage( + message: message, + onMessage: (context, _) => + { + messageId = context.NativeMessageId; + return Task.CompletedTask; + }); + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasCompleted, Is.True, "Message should be completed"); + Assert.That(messageId, Is.EqualTo(autoGeneratedMessageId), "Auto generated message id should used as message id"); + } + } + + [Test] + public async Task Should_complete_messages_with_sequence_number_derived_id_immediately_if_previous_processing_attempt_was_successful() + { + var completedMessagesCache = new FastConcurrentLru(1_000); + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(sequenceNumber: 99L); + + var firstProcessingResult = await ProcessMessage( + message: message, + completedMessagesCache: completedMessagesCache, + messageActions: new TestableMessageActions { CompleteMessage = (_, _) => throw new Exception("simulated complete exception") }); + + using (Assert.EnterMultipleScope()) + { + Assert.That(firstProcessingResult.OnMessageWasCalled, Is.True, "OnMessage should be called"); + Assert.That(firstProcessingResult.MessageActions.WasCompleted, Is.True, "Message should be completed"); + } + + var secondProcessingResult = await ProcessMessage( + message: message, + completedMessagesCache: completedMessagesCache); + + using (Assert.EnterMultipleScope()) + { + Assert.That(secondProcessingResult.OnMessageWasCalled, Is.False, "OnMessage should not be called on second delivery"); + Assert.That(secondProcessingResult.MessageActions.WasCompleted, Is.True, "Message should be completed on second delivery"); + } + } + + [Test] + public async Task Should_complete_messages_immediately_if_previous_processing_attempt_was_successful() + { + var completedMessagesCache = new FastConcurrentLru(1_000); + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: Guid.NewGuid().ToString()); + + var firstProcessingResult = await ProcessMessage( + message: message, + completedMessagesCache: completedMessagesCache, + messageActions: new TestableMessageActions { CompleteMessage = (_, _) => throw new Exception("simulated complete exception") }); + + using (Assert.EnterMultipleScope()) + { + Assert.That(firstProcessingResult.OnMessageWasCalled, Is.True, "OnMessage should be called"); + Assert.That(firstProcessingResult.MessageActions.WasCompleted, Is.True, "Message should be completed"); + } + + var secondProcessingResult = await ProcessMessage( + message: message, + completedMessagesCache: completedMessagesCache); + + using (Assert.EnterMultipleScope()) + { + Assert.That(secondProcessingResult.OnMessageWasCalled, Is.False, "OnMessage should not be called"); + Assert.That(secondProcessingResult.MessageActions.WasCompleted, Is.True, "Message should be completed"); + } + } + + [Test] + public async Task Should_complete_when_on_message_succeeds() + { + var result = await ProcessMessage( + onMessage: (_, _) => Task.CompletedTask); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.OnMessageWasCalled, Is.True, "OnMessage should be called"); + Assert.That(result.OnErrorWasCalled, Is.False, "OnError should not be called"); + Assert.That(result.MessageActions.WasCompleted, Is.True, "Message should be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.False, "Message should not be abandoned"); + } + } + + [Test] + public async Task Should_abandon_when_on_message_fails_and_retry_is_requested() + { + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, _) => Task.FromResult(ErrorHandleResult.RetryRequired)); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.OnMessageWasCalled, Is.True, "OnMessage should be called"); + Assert.That(result.OnErrorWasCalled, Is.True, "OnError should be called"); + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.True, "Message should be abandoned"); + } + } + + [Test] + public async Task Should_complete_when_on_message_fails_and_failure_is_marked_as_handled() + { + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, _) => Task.FromResult(ErrorHandleResult.Handled)); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.OnMessageWasCalled, Is.True, "OnMessage should be called"); + Assert.That(result.OnErrorWasCalled, Is.True, "OnError should be called"); + Assert.That(result.MessageActions.WasCompleted, Is.True, "Message should be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.False, "Message should not be abandoned"); + } + } + + [Test] + public async Task Should_abandon_when_on_error_throws_transient_service_bus_exception() + { + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, _) => throw new ServiceBusException("simulated transient exception", ServiceBusFailureReason.ServiceBusy)); //ServiceBusy is transient + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.True, "Message should be abandoned if onError throws"); + Assert.That(result.MessageActions.WasDeadLettered, Is.False, "Message should not be dead lettered"); + } + } + + [Test] + public async Task Should_abandon_when_on_error_throws_lock_lost_service_bus_exception() + { + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, _) => throw new ServiceBusException("simulated lock lost exception", ServiceBusFailureReason.MessageLockLost)); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.True, "Message should be abandoned if onError throws"); + Assert.That(result.MessageActions.WasDeadLettered, Is.False, "Message should not be dead lettered"); + } + } + + [Test] + public async Task Should_dlq_when_on_error_throws_non_transient_exception() + { + var exception = new Exception("simulated exception in on error"); + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, _) => throw exception); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.False, "Message should not be abandoned if onError throws"); + Assert.That(result.MessageActions.WasDeadLettered, Is.True, "Message should be dead lettered"); + } + } + + [Test] + public async Task Should_dlq_message_if_requested() + { + var expectedDlqReason = "some reason"; + var expectedDlqDescription = "some description"; + + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (errorContext, _) => + { + errorContext.TransportTransaction.Set(new DeadLetterRequest(expectedDlqReason, expectedDlqDescription, new Dictionary { { "MyProperty", "MyValue" } })); + return Task.FromResult(ErrorHandleResult.Handled); + }); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.False, "Message should not be abandoned"); + Assert.That(result.MessageActions.WasDeadLettered, Is.True, "Message should be dead lettered"); + Assert.That(result.LogCollector.LatestRecord.Level, Is.EqualTo(Microsoft.Extensions.Logging.LogLevel.Error), "DLQ requests should be logged as error"); + Assert.That(result.LogCollector.LatestRecord.Message, Does.Contain(expectedDlqReason), "Should log DLQ reason"); + Assert.That(result.LogCollector.LatestRecord.Message, Does.Contain(expectedDlqDescription), "Should log DLQ description"); + } + } + + [Test] + public async Task Should_expose_the_service_bus_message_on_both_message_and_error_context() + { + var message = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: Guid.NewGuid().ToString()); + + var result = await ProcessMessage( + message: message, + onMessage: (_, _) => throw new Exception("simulated exception")); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageContext?.Extensions.Get(), Is.SameAs(message), "MessageContext should contain the ServiceBusReceivedMessage"); + Assert.That(result.ErrorContext?.Extensions.Get(), Is.SameAs(message), "ErrorContext should contain the ServiceBusReceivedMessage"); + } + } + + [Test] + public async Task Should_abandon_when_token_is_cancelled_and_not_invoke_onerror() + { + var result = await ProcessMessage( + onMessage: (_, ct) => + { + ct.ThrowIfCancellationRequested(); + return Task.CompletedTask; + }, + cancellationToken: new CancellationToken(true) + ); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.OnErrorWasCalled, Is.False, "OnError should not be called"); + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.True, "Message should be abandoned"); + Assert.That(result.LogCollector.LatestRecord.Level, Is.EqualTo(Microsoft.Extensions.Logging.LogLevel.Debug), "Cancellation should be logged as debug"); + Assert.That(result.LogCollector.LatestRecord.Message, Does.Contain("Message processing canceled"), "Should log debug when processing canceled"); + Assert.That(result.LogCollector.LatestRecord.Exception, Is.InstanceOf()); + } + } + + [Test] + public async Task Should_abandon_on_error_when_token_is_cancelled() + { + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, ct) => + { + ct.ThrowIfCancellationRequested(); + return Task.FromResult(ErrorHandleResult.Handled); + }, + cancellationToken: new CancellationToken(true) + ); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasCompleted, Is.False, "Message should not be completed"); + Assert.That(result.MessageActions.WasAbandoned, Is.True, "Message should be abandoned"); + Assert.That(result.LogCollector.LatestRecord.Level, Is.EqualTo(Microsoft.Extensions.Logging.LogLevel.Debug), "Cancellation should be logged as debug"); + Assert.That(result.LogCollector.LatestRecord.Message, Does.Contain("OnError canceled"), "Should log debug when on error canceled"); + Assert.That(result.LogCollector.LatestRecord.Exception, Is.InstanceOf()); + } + } + + [Test] + public async Task Should_not_propagate_header_mutations_from_on_message_to_on_error() + { + var originalHeaderKey = "original-header"; + var originalHeaderValue = "original-value"; + var addedHeaderKey = "added-header"; + + var message = ServiceBusModelFactory.ServiceBusReceivedMessage( + messageId: Guid.NewGuid().ToString(), + properties: new Dictionary { { originalHeaderKey, originalHeaderValue } } + ); + + var result = await ProcessMessage( + message: message, + onMessage: (msgContext, _) => + { + msgContext.Headers[addedHeaderKey] = "some-value"; + msgContext.Headers[originalHeaderKey] = "some-other-value"; + throw new Exception("force error"); + } + ); + + var headers = result.ErrorContext!.Message.Headers; + using (Assert.EnterMultipleScope()) + { + Assert.That(headers.ContainsKey(originalHeaderKey), Is.True, "Original header should still exist in onError"); + Assert.That(headers[originalHeaderKey], Is.EqualTo(originalHeaderValue), "Original header value should be preserved in onError"); + Assert.That(headers.ContainsKey(addedHeaderKey), Is.False, "Added header should NOT be present in onError"); + } + } + + [Test] + public async Task Should_dead_letter_if_header_extraction_fails() + { + var exception = new Exception("simulated exception"); + var result = await ProcessMessage(headerExtractor: _ => throw exception); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.OnMessageWasCalled, Is.False, "OnMessage should not be called if header extraction fails"); + Assert.That(result.OnErrorWasCalled, Is.False, "OnError should not be called if header extraction fails"); + Assert.That(result.MessageActions.WasDeadLettered, Is.True, "Message should be dead lettered"); + } + } + + [Test] + public async Task Show_log_and_swallow_exceptions_from_dead_lettering_unless_invocation_is_cancelled() + { + var exception = new Exception("dlg failed"); + var messageActions = new TestableMessageActions { DeadLetterMessage = (_, _, _, _, _) => throw exception }; + var result = await ProcessMessage(headerExtractor: _ => throw new Exception("simulated exception"), messageActions: messageActions); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasDeadLettered, Is.True, "Message should be dead lettered"); + Assert.That(result.LogCollector.LatestRecord.Level, Is.EqualTo(Microsoft.Extensions.Logging.LogLevel.Warning), "Should be logged as warning"); + Assert.That(result.LogCollector.LatestRecord.Exception, Is.SameAs(exception)); + } + } + + [Test] + public async Task Show_log_and_swallow_exceptions_from_abandon_unless_invocation_is_cancelled() + { + var exception = new Exception("abandon failed"); + var messageActions = new TestableMessageActions { AbandonMessage = (_, _, _) => throw exception }; + + var result = await ProcessMessage( + onMessage: (_, _) => throw new Exception("simulated exception"), + onError: (_, _) => Task.FromResult(ErrorHandleResult.RetryRequired), + messageActions: messageActions); + + using (Assert.EnterMultipleScope()) + { + Assert.That(result.MessageActions.WasAbandoned, Is.True, "Message should be abandoned"); + Assert.That(result.LogCollector.LatestRecord.Level, Is.EqualTo(Microsoft.Extensions.Logging.LogLevel.Warning), "Should be logged as warning"); + Assert.That(result.LogCollector.LatestRecord.Exception, Is.SameAs(exception)); + } + } + + async Task ProcessMessage( + ServiceBusReceivedMessage? message = null, + TestableMessageActions? messageActions = null, + ICache? completedMessagesCache = null, + Func? onMessage = null, + Func>? onError = null, + Func>? headerExtractor = null, + CancellationToken cancellationToken = default) + { + message ??= ServiceBusModelFactory.ServiceBusReceivedMessage(); + onMessage ??= (_, _) => Task.CompletedTask; + onError ??= (_, _) => Task.FromResult(ErrorHandleResult.RetryRequired); + messageActions ??= new TestableMessageActions(); + completedMessagesCache ??= new FastConcurrentLru(1_000); + + + var fakeLogger = new FakeLogger(); + var processor = new PipelineInvokingMessageProcessor(new FakeBaseReceiver(), completedMessagesCache, fakeLogger, headerExtractor); + MessageContext? capturedMessageContext = null; + ErrorContext? capturedErrorContext = null; + + await processor.Initialize(PushRuntimeSettings.Default, + async (msgContext, token) => + { + capturedMessageContext = msgContext; + await onMessage(msgContext, token); + }, + async (errorContext, token) => + { + capturedErrorContext = errorContext; + return await onError(errorContext, token); + }, + cancellationToken); + + Assert.DoesNotThrowAsync(async () => await processor.Process(message, messageActions, cancellationToken)); + + return new ProcessingResult(messageActions, capturedMessageContext, capturedErrorContext, fakeLogger.Collector); + } + + record ProcessingResult(TestableMessageActions MessageActions, MessageContext? MessageContext, ErrorContext? ErrorContext, FakeLogCollector LogCollector) + { + public bool OnMessageWasCalled => MessageContext != null; + public bool OnErrorWasCalled => ErrorContext != null; + } + + class TestableMessageActions : ServiceBusMessageActions + { + public bool WasCompleted { get; private set; } + public bool WasAbandoned { get; private set; } + public bool WasDeadLettered => DeadLetterDetails is not null; + public DeadLetterCallDetails? DeadLetterDetails { get; private set; } + + public Func? CompleteMessage { get; set; } + public Func?, CancellationToken, Task>? AbandonMessage { get; set; } + public Func?, string?, string?, CancellationToken, Task>? DeadLetterMessage { get; set; } + + public override Task CompleteMessageAsync(ServiceBusReceivedMessage message, CancellationToken cancellationToken = new CancellationToken()) + { + WasCompleted = true; + return CompleteMessage != null ? CompleteMessage(message, cancellationToken) : Task.CompletedTask; + } + + public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDictionary? propertiesToModify = null, CancellationToken cancellationToken = new CancellationToken()) + { + WasAbandoned = true; + return AbandonMessage != null ? AbandonMessage(message, propertiesToModify, cancellationToken) : Task.CompletedTask; + } + + public override Task DeadLetterMessageAsync(ServiceBusReceivedMessage message, Dictionary? propertiesToModify = null, string? deadLetterReason = null, string? deadLetterErrorDescription = null, CancellationToken cancellationToken = new CancellationToken()) + { + DeadLetterDetails = new(deadLetterReason, deadLetterErrorDescription, propertiesToModify); + return DeadLetterMessage != null ? DeadLetterMessage(message, propertiesToModify, deadLetterReason, deadLetterErrorDescription, cancellationToken) : Task.CompletedTask; + } + + public record DeadLetterCallDetails(string? DeadLetterReason, string? DeadLetterErrorDescription, Dictionary? DeadLetterProperties); + } + + class FakeBaseReceiver : IMessageReceiver + { + public Task Initialize(PushRuntimeSettings limitations, OnMessage onMessage, OnError onError, CancellationToken cancellationToken = new()) => Task.CompletedTask; + public Task StartReceive(CancellationToken cancellationToken = new()) => throw new NotImplementedException(); + public Task ChangeConcurrency(PushRuntimeSettings limitations, CancellationToken cancellationToken = new()) => throw new NotImplementedException(); + public Task StopReceive(CancellationToken cancellationToken = new()) => throw new NotImplementedException(); + public ISubscriptionManager Subscriptions => null!; + public string Id => string.Empty; + public string ReceiveAddress => "TestEndpoint"; + } +} diff --git a/src/Tests/NServiceBus.AzureFunctions.Tests.csproj b/src/Tests/NServiceBus.AzureFunctions.Tests.csproj index b28b42c..223911a 100644 --- a/src/Tests/NServiceBus.AzureFunctions.Tests.csproj +++ b/src/Tests/NServiceBus.AzureFunctions.Tests.csproj @@ -13,6 +13,7 @@ +