Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
0975e2b
Add failing test
andreasohlund Mar 19, 2026
44493f0
Add diagnostics for incorrect auto complete
andreasohlund Mar 19, 2026
bded1be
Use explicit settlement
andreasohlund Mar 19, 2026
cb5bc41
Wording
andreasohlund Mar 19, 2026
e6c0b17
Add first round of test
andreasohlund Mar 20, 2026
39875bf
Consolidate test code
andreasohlund Mar 21, 2026
58f937b
Add test for ErrorHandleResult.Handled
andreasohlund Mar 21, 2026
1b3fa2e
Add test to check exposure of incoming message
andreasohlund Mar 21, 2026
e762bb4
Add test to check cancellation behavior
andreasohlund Mar 21, 2026
ef484fa
Add test to check body and headers
andreasohlund Mar 21, 2026
60e0592
Cleanup
andreasohlund Mar 21, 2026
4f163e1
Add todo
andreasohlund Mar 21, 2026
5ef94c1
Add tests for upconverting
andreasohlund Mar 21, 2026
160c6b0
Add comment for unique transport transaction in message processing tests
andreasohlund Mar 21, 2026
e835868
Remove support for legacy body format
andreasohlund Mar 21, 2026
afecf7b
Reguire message it to be set
andreasohlund Mar 21, 2026
6a8851e
Use cancellation token none
andreasohlund Mar 21, 2026
ed63a37
Cleanup
andreasohlund Mar 21, 2026
abf6dd7
More cleanup
andreasohlund Mar 21, 2026
8c0cdd4
Log when message without message id is dlq'd
andreasohlund Mar 22, 2026
0fbd59d
Silence PS0004 in tests
andreasohlund Mar 22, 2026
c8f0450
Include scopes
andreasohlund Mar 22, 2026
885e451
Remove handled todos
andreasohlund Mar 22, 2026
82ea87e
Formatting
andreasohlund Mar 22, 2026
b81d4c8
Upconvert content type as well to align with the transport
andreasohlund Mar 22, 2026
2f41e86
Support deadlettering to be requested via onError
andreasohlund Mar 22, 2026
c4cbc67
Add test for message abandonment when onError throws an exception
andreasohlund Mar 22, 2026
9bc94cc
Enhance logging for dead lettering and add test coverage for message …
andreasohlund Mar 22, 2026
450859a
Cleanup
andreasohlund Mar 22, 2026
3d7741c
Clarify todo
andreasohlund Mar 22, 2026
b52eee6
Use the MS FakeLogger
andreasohlund Mar 22, 2026
401f720
More details
andreasohlund Mar 22, 2026
83c569a
Refine dead letter logging message for clarity
andreasohlund Mar 22, 2026
6e81c92
Log when processing and on error cancelled
andreasohlund Mar 23, 2026
06acb48
Add test to ensure header mutations are not propagated from onMessage…
andreasohlund Mar 24, 2026
c3e3927
Cleanup todos
andreasohlund Mar 24, 2026
b1b9b1c
DLQ if header extraction fails
andreasohlund Mar 24, 2026
f198813
Clone header
andreasohlund Mar 24, 2026
029be0e
Include context creating in try catch
andreasohlund Mar 24, 2026
e63af94
Dead letter message if on error fails with non-transient exception
andreasohlund Mar 24, 2026
111a98e
Support requesting DLQ from recoverability policy
andreasohlund Mar 24, 2026
2bea8e1
Approve api
andreasohlund Mar 24, 2026
f5ce9de
Revert test code
andreasohlund Mar 24, 2026
e864ed5
Remove demo
andreasohlund Mar 24, 2026
7a2a313
Truncate dlq message and reason to 1024 to make it less likely go ove…
andreasohlund Mar 25, 2026
08f0a75
Clarify code with comment on message header usage in error handling
andreasohlund Mar 25, 2026
edd2e60
Refactor dead letter request test to simulate exception handling and …
andreasohlund Mar 25, 2026
6d63269
Formatting
andreasohlund Mar 25, 2026
1186e0e
Refactor exception handling in dead letter request test for clarity a…
andreasohlund Mar 25, 2026
c34c038
Check that we swallow exceptions when dead lettering
andreasohlund Mar 27, 2026
0df2ece
Make sure auto generated message id is persisted on failure
andreasohlund Mar 27, 2026
c007b24
Use null or whitespace
andreasohlund Mar 27, 2026
463bd5c
SafeAbandon
andreasohlund Mar 27, 2026
549e890
Make sure auto generated message id is used
andreasohlund Mar 27, 2026
2582c7a
Fix bug in test storage
andreasohlund Mar 27, 2026
70eba9d
Use nsb header key for message id when storing auto generated message id
andreasohlund Mar 27, 2026
587e782
Cache message ids that have been failes during complete and complete …
andreasohlund Mar 27, 2026
07ab30b
Refactor logging to use `LoggerMessage` source generator for improved…
danielmarbach Mar 30, 2026
a413996
Update log level from Debug to Warning for message failure scenarios
danielmarbach Mar 30, 2026
4b7a1fd
Use `Guid.CreateVersion7` for generating `nativeMessageId` in message…
danielmarbach Mar 30, 2026
d92b0db
Simplify message body initialization by replacing `BinaryData.FromByt…
danielmarbach Mar 30, 2026
120837f
Replace `Guid.CreateVersion7` with `GuidHelper.CreateVersion8` for de…
danielmarbach Mar 30, 2026
ffdfc8b
Simplify `nativeMessageId` initialization using a ternary operator fo…
danielmarbach Mar 30, 2026
4d816e4
Replace `Assert` methods with `Assert.That` for consistency and impro…
danielmarbach Mar 30, 2026
a938616
Remove extra whitespace in `[ServiceBusTrigger]` attribute for consis…
danielmarbach Mar 30, 2026
d6c66a5
Refactor `PipelineInvokingMessageProcessor` to use primary constructo…
danielmarbach Mar 30, 2026
a28cdc2
Provide dead letter API as an extension of RecoverabilityActions to a…
andreasohlund Mar 31, 2026
f49ae61
Formatting
andreasohlund Mar 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/IntegrationTest.Billing/BillingFunctions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public partial class BillingFunctions
[Function("BillingApi")]
[NServiceBusFunction]
public partial Task BillingApi(
[ServiceBusTrigger("billing-api", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
[ServiceBusTrigger("billing-api", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions,
FunctionContext functionContext,
Expand All @@ -27,7 +27,7 @@ public static void ConfigureBillingApi(EndpointConfiguration configuration)
[Function("BillingBackend")]
[NServiceBusFunction]
public partial Task BillingBackend(
[ServiceBusTrigger("billing-backend", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
[ServiceBusTrigger("billing-backend", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions,
FunctionContext functionContext,
Expand Down
15 changes: 14 additions & 1 deletion src/IntegrationTest.Sales/SalesEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ namespace IntegrationTest.Sales;
using IntegrationTest.Shared;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.AzureFunctions.AzureServiceBus;

// Cleanest pattern for single-function endpoints
[NServiceBusFunction]
public partial class SalesEndpoint
{
[Function("Sales")]
public partial Task Sales(
[ServiceBusTrigger("sales", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = true)]
[ServiceBusTrigger("sales", Connection = "AzureWebJobsServiceBus", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions,
FunctionContext functionContext,
Expand All @@ -23,5 +24,17 @@ public static void ConfigureSales(EndpointConfiguration configuration)

configuration.RegisterComponents(services => services.AddSingleton(new MyComponent("Sales")));
configuration.AddHandler<Handlers.SubmitOrderHandler>();
configuration.AuditProcessedMessagesTo("audit");

// Use the dead letter queue for failures
configuration.Recoverability().CustomPolicy((_, context) =>
{
if (context.ImmediateProcessingFailures < 3)
{
return RecoverabilityAction.ImmediateRetry();
}

return RecoverabilityAction.DeadLetter(context.Exception);
});
}
}
9 changes: 5 additions & 4 deletions src/IntegrationTest.Shared/Infrastructure/TestStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public Payload CreatePayload(string testName)

var sortedMessages = list
.OrderBy(m => m.Order)
.ThenBy(m => m.MessageType)
.ThenBy(m => m.SendingEndpoint)
.ThenBy(m => m.ReceivingEndpoint)
.ThenBy(m => m.MessageType)
.ThenBy(m => m.SendingEndpoint)
.ThenBy(m => m.ReceivingEndpoint)
.ToArray();

return new Payload(sortedMessages);
Expand All @@ -39,8 +39,9 @@ public class TestStorage(string endpointName, GlobalTestStorage globalStorage)
public void LogMessage<T>(string testName, T message, IMessageHandlerContext context)
where T : class
{
var sendingEndpoint = context.MessageHeaders[Headers.OriginatingEndpoint] ?? "<unknown>";
var sendingEndpoint = context.MessageHeaders.GetValueOrDefault(Headers.OriginatingEndpoint, "<unknown>");
var storageOrder = context.Extensions.Get<int>("TestStorageOrder");

var rec = new MessageReceived(message.GetType().FullName!, storageOrder, sendingEndpoint, endpointName);
globalStorage.Add(testName, rec);
}
Expand Down
2 changes: 1 addition & 1 deletion src/IntegrationTest.Shipping/ShippingEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public partial class ShippingEndpoint
{
[Function(nameof(Shipping))]
public partial Task Shipping(
[ServiceBusTrigger("shipping", AutoCompleteMessages = true)]
[ServiceBusTrigger("shipping", AutoCompleteMessages = false)]
ServiceBusReceivedMessage message,
ServiceBusMessageActions messageActions,
FunctionContext functionContext,
Expand Down
11 changes: 3 additions & 8 deletions src/IntegrationTest/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Text.Json;
using IntegrationTest;
using IntegrationTest.Shared;
using IntegrationTest.Shared.Infrastructure;
Expand All @@ -10,18 +9,14 @@
var builder = FunctionsApplication.CreateBuilder(args);
builder.UseWhen<ExceptionTrackingMiddleware>(_ => true);

builder.Logging.ClearProviders();
builder.Logging.AddJsonConsole(o =>
{
o.IncludeScopes = true;
o.JsonWriterOptions = new JsonWriterOptions { Indented = true };
});
builder.Logging.SetMinimumLevel(LogLevel.Information);
builder.Logging.AddSimpleConsole(options => options.IncludeScopes = true);
builder.Logging.SetMinimumLevel(LogLevel.Warning);

builder.Services.AddSingleton<GlobalTestStorage>();
builder.Services.AddSingleton(new MyComponent("global"));

builder.AddNServiceBusFunctions();

builder.AddSendOnlyNServiceBusEndpoint("client", configuration =>
{
configuration.RegisterComponents(services => services.AddSingleton(new MyComponent("client")));
Expand Down
23 changes: 16 additions & 7 deletions src/NServiceBus.AzureFunctions.Analyzer/DiagnosticIds.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,56 @@ namespace NServiceBus.AzureFunctions.Analyzer;

using Microsoft.CodeAnalysis;

static class DiagnosticIds
public static class DiagnosticIds
{
public const string ClassMustBePartial = "NSBFUNC001";
public const string ShouldNotImplementIHandleMessages = "NSBFUNC002";
public const string MethodMustBePartial = "NSBFUNC003";
public const string MultipleConfigureMethods = "NSBFUNC005";
public const string MissingAddNServiceBusFunctionsCall = "NSBFUNC004";
public const string MultipleConfigureMethods = "NSBFUNC005";
public const string AutoCompleteEnabled = "NSBFUNC006";

public static readonly DiagnosticDescriptor ClassMustBePartialDescriptor = new(
internal static readonly DiagnosticDescriptor ClassMustBePartialDescriptor = new(
id: ClassMustBePartial,
title: "Class containing [NServiceBusFunction] must be partial",
messageFormat: "Class '{0}' must be declared as partial to use [NServiceBusFunction]",
category: "NServiceBus.AzureFunctions",
defaultSeverity: DiagnosticSeverity.Error,
isEnabledByDefault: true);

public static readonly DiagnosticDescriptor ShouldNotImplementIHandleMessagesDescriptor = new(
internal static readonly DiagnosticDescriptor ShouldNotImplementIHandleMessagesDescriptor = new(
id: ShouldNotImplementIHandleMessages,
title: "Function class should not implement IHandleMessages<T>",
messageFormat: "Class '{0}' should not implement IHandleMessages<T>; message handlers should be registered separately via IEndpointConfiguration",
category: "NServiceBus.AzureFunctions",
defaultSeverity: DiagnosticSeverity.Warning,
isEnabledByDefault: true);

public static readonly DiagnosticDescriptor MethodMustBePartialDescriptor = new(
internal static readonly DiagnosticDescriptor MethodMustBePartialDescriptor = new(
id: MethodMustBePartial,
title: "Method with [NServiceBusFunction] must be partial",
messageFormat: "Method '{0}' must be declared as partial to use [NServiceBusFunction]",
category: "NServiceBus.AzureFunctions",
defaultSeverity: DiagnosticSeverity.Error,
isEnabledByDefault: true);

public static readonly DiagnosticDescriptor MultipleConfigureMethodsDescriptor = new(
internal static readonly DiagnosticDescriptor MultipleConfigureMethodsDescriptor = new(
id: MultipleConfigureMethods,
title: "Multiple configuration methods found",
messageFormat: "Multiple '{0}' configuration methods found on class '{1}'",
category: "NServiceBus.AzureFunctions",
defaultSeverity: DiagnosticSeverity.Error,
isEnabledByDefault: true);

public static readonly DiagnosticDescriptor MissingAddNServiceBusFunctionsCallDescriptor = new(
internal static readonly DiagnosticDescriptor AutoCompleteMustBeExplicitlyDisabled = new(
id: AutoCompleteEnabled,
title: "Message auto completion must be explicitly disabled",
messageFormat: "The auto complete property on the service bus trigger for method '{0}' must be explicitly set to false",
category: "NServiceBus.AzureFunctions",
defaultSeverity: DiagnosticSeverity.Error,
isEnabledByDefault: true);

internal static readonly DiagnosticDescriptor MissingAddNServiceBusFunctionsCallDescriptor = new(
id: MissingAddNServiceBusFunctionsCall,
title: "AddNServiceBusFunctions() is not called",
messageFormat: "This project has NServiceBus endpoint registrations but does not call builder.AddNServiceBusFunctions(). Endpoints will not be started.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,24 @@ static FunctionSpecs ExtractFromMethod(IMethodSymbol methodSymbol, FunctionEndpo
queueName = pAttr.ConstructorArguments[0].Value as string;
}

var autoCompleteEnabled = true;
foreach (var namedArg in pAttr.NamedArguments)
{
if (namedArg.Key == "Connection")
{
connectionName = namedArg.Value.Value as string;
}

if (namedArg.Key == "AutoCompleteMessages")
{
var autoComplete = namedArg.Value.Value as bool?;
autoCompleteEnabled = autoComplete!.Value;
}
}

if (autoCompleteEnabled)
{
diagnostics.Add(CreateDiagnostic(DiagnosticIds.AutoCompleteMustBeExplicitlyDisabled, method, method.Name));
}

messageParamName = param.Name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ namespace NServiceBus.AzureFunctions.AzureServiceBus;

public class AzureServiceBusMessageProcessor(AzureServiceBusServerlessTransport transport, string endpointName)
{
//NOTE: Message actions and function context is here to be ready for future features like native dlq support without having to change the end user api.
public async Task Process(ServiceBusReceivedMessage message, ServiceBusMessageActions messageActions, FunctionContext functionContext, CancellationToken cancellationToken = default)
{
if (transport.MessageProcessor is null)
Expand All @@ -16,6 +15,6 @@ public async Task Process(ServiceBusReceivedMessage message, ServiceBusMessageAc
throw new InvalidOperationException($"Endpoint {endpointName} cannot process messages because it is configured in send-only mode.");
}

await transport.MessageProcessor.Process(message, cancellationToken).ConfigureAwait(false);
await transport.MessageProcessor.Process(message, messageActions, cancellationToken).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ namespace NServiceBus;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using AzureFunctions.AzureServiceBus;
using BitFaster.Caching.Lru;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.AzureFunctions.AzureServiceBus.Serverless.TransportWrapper;
using Microsoft.Extensions.Logging;
using NServiceBus.Transport;

public class AzureServiceBusServerlessTransport(TopicTopology topology) : TransportDefinition(TransportTransactionMode.ReceiveOnly,
Expand Down Expand Up @@ -53,7 +55,7 @@ public override async Task<TransportInfrastructure> Initialize(
.ConfigureAwait(false);

var serverlessTransportInfrastructure = new ServerlessTransportInfrastructure(baseTransportInfrastructure,
static receiver => new PipelineInvokingMessageProcessor(receiver));
receiver => new PipelineInvokingMessageProcessor(receiver, new FastConcurrentLru<string, bool>(1_000), hostSettings.ServiceProvider.GetRequiredService<ILogger<PipelineInvokingMessageProcessor>>()));

var isSendOnly = hostSettings.CoreSettings.GetOrDefault<bool>(SendOnlyConfigKey);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace NServiceBus.AzureFunctions.AzureServiceBus;

using Pipeline;
using Transport;

public sealed class DeadLetterMessage : RecoverabilityAction
{
internal DeadLetterMessage(string deadLetterReason, string deadLetterErrorDescription, Dictionary<string, object>? propertiesToModify = null) =>
deadLetterRequest = new DeadLetterRequest(deadLetterReason, deadLetterErrorDescription, propertiesToModify);

internal DeadLetterMessage(Exception exception) =>
deadLetterRequest = new DeadLetterRequest(exception);

public override IReadOnlyCollection<IRoutingContext> GetRoutingContexts(IRecoverabilityActionContext context)
{
context.Extensions.Get<TransportTransaction>().Set(deadLetterRequest);
return [];
}

public override ErrorHandleResult ErrorHandleResult => ErrorHandleResult.Handled;

readonly DeadLetterRequest deadLetterRequest;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
namespace NServiceBus.AzureFunctions.AzureServiceBus;

class DeadLetterRequest(string deadLetterReason, string deadLetterErrorDescription, Dictionary<string, object>? propertiesToModify = null)
{
public string DeadLetterReason { get; } = Truncate(deadLetterReason, 1024);
public string DeadLetterErrorDescription { get; } = Truncate(deadLetterErrorDescription, 1024);
public Dictionary<string, object> PropertiesToModify { get; } = propertiesToModify ?? [];

public DeadLetterRequest(Exception exception, Dictionary<string, object>? propertiesToModify = null) : this(
$"{exception.GetType().FullName} - {exception.Message}",
exception.StackTrace ?? "No stack trace available",
propertiesToModify)
{
}

static string Truncate(string value, int maxLength) => string.IsNullOrEmpty(value) ? value : value.Length <= maxLength ? value : value[..maxLength];
}
29 changes: 29 additions & 0 deletions src/NServiceBus.AzureFunctions.AzureServiceBus/GuidHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
namespace NServiceBus.AzureFunctions.AzureServiceBus;

using System;
using System.Buffers.Binary;

/// <summary>
/// Provides helper methods for working with <see cref="Guid"/>.
/// </summary>
/// <remarks>
/// Inspired by <a href="https://github.com/bgrainger/NGuid">NGuid</a> by Bradley Grainger,
/// used under the <a href="https://github.com/bgrainger/NGuid/blob/master/LICENSE">MIT License</a>.
/// </remarks>
static class GuidHelper
{
/// <summary>
/// Creates a Version 8 UUID with a v7-style layout: <paramref name="timestamp"/> as Unix
/// milliseconds in bytes 0–7 (time-sortable, stable across redeliveries) and
/// <paramref name="sequenceNumber"/> in bytes 8–15, both big-endian.
/// </summary>
public static Guid CreateVersion8(DateTimeOffset timestamp, long sequenceNumber)
{
Span<byte> guidBytes = stackalloc byte[16];
BinaryPrimitives.WriteInt64BigEndian(guidBytes, timestamp.ToUnixTimeMilliseconds());
BinaryPrimitives.WriteInt64BigEndian(guidBytes[8..], sequenceNumber);
guidBytes[6] = (byte)(0x80 | (guidBytes[6] & 0xF));
guidBytes[8] = (byte)(0x80 | (guidBytes[8] & 0x3F));
return new Guid(guidBytes, bigEndian: true);
}
}
Loading
Loading