Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/IntegrationTests.HostV4/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,5 @@
public class Startup : FunctionsStartup
{
public override void Configure(IFunctionsHostBuilder builder)
{
builder.UseNServiceBus(c => c.AdvancedConfiguration.EnableInstallers());
}
=> builder.UseNServiceBus(c => c.AdvancedConfiguration.EnableInstallers());
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Microsoft.CodeAnalysis;
using Microsoft.CodeAnalysis.CSharp;
using Microsoft.CodeAnalysis.Diagnostics;
Expand Down Expand Up @@ -111,6 +112,7 @@ static AnalyzerTestFixture()
MetadataReference.CreateFromFile(typeof(System.Linq.Expressions.Expression).GetTypeInfo().Assembly.Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime").Location),
MetadataReference.CreateFromFile(typeof(IFunctionEndpoint).GetTypeInfo().Assembly.Location),
MetadataReference.CreateFromFile(typeof(TokenCredential).GetTypeInfo().Assembly.Location),
MetadataReference.CreateFromFile(typeof(EndpointConfiguration).GetTypeInfo().Assembly.Location),
MetadataReference.CreateFromFile(typeof(AzureServiceBusTransport).GetTypeInfo().Assembly.Location),
];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class ConfigurationAnalyzerTests : AnalyzerTestFixture<ConfigurationAnaly
[TestCase("OverrideLocalAddress(null)", OverrideLocalAddressNotAllowedId)]
[TestCase("PurgeOnStartup(true)", PurgeOnStartupNotAllowedId)]
[TestCase("SetDiagnosticsPath(null)", SetDiagnosticsPathNotAllowedId)]
[TestCase("UseTransport(new AzureServiceBusTransport(null))", UseTransportNotAllowedId)]
[TestCase("UseTransport(new AzureServiceBusTransport(null, default(TopicTopology)))", UseTransportNotAllowedId)]
public Task DiagnosticIsReportedForEndpointConfiguration(string configuration, string diagnosticId)
{
var source =
Expand All @@ -40,7 +40,7 @@ void Bar(ServiceBusTriggeredEndpointConfiguration endpointConfig)
[TestCase("OverrideLocalAddress(null)", OverrideLocalAddressNotAllowedId)]
[TestCase("PurgeOnStartup(true)", PurgeOnStartupNotAllowedId)]
[TestCase("SetDiagnosticsPath(null)", SetDiagnosticsPathNotAllowedId)]
[TestCase("UseTransport(new AzureServiceBusTransport(null))", UseTransportNotAllowedId)]
[TestCase("UseTransport(new AzureServiceBusTransport(null, default(TopicTopology)))", UseTransportNotAllowedId)]
public Task DiagnosticIsNotReportedForOtherEndpointConfiguration(string configuration, string diagnosticId)
{
var source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
public class ConfigurationAnalyzerTestsCSharp8 : AnalyzerTestFixture<ConfigurationAnalyzer>
{
// HINT: In C# 7 this call is ambiguous with the LearningTransport version as the compiler cannot differentiate method calls via generic type constraints
[TestCase("UseTransport<AzureServiceBusTransport>()", UseTransportNotAllowedId)]
[TestCase("UseTransport<AzureServiceBusTransport>(null)", UseTransportNotAllowedId)]
public Task DiagnosticIsReportedForEndpointConfiguration(string configuration, string diagnosticId)
{
var source =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Transport.AzureServiceBus;

/// <summary>
/// Provides extension methods to configure a <see cref="IFunctionEndpoint"/> using <see cref="IFunctionsHostBuilder"/>.
Expand Down Expand Up @@ -112,11 +114,24 @@ static void ConfigureEndpointFactory(IServiceCollection services, FunctionsHostB
services,
Path.Combine(functionsHostBuilderContext.ApplicationRootPath, assemblyDirectoryName));

services.AddSingleton(serviceBusTriggeredEndpointConfiguration);
services.AddSingleton(startableEndpoint);
services.AddSingleton(serverless);
services.AddSingleton<InProcessFunctionEndpoint>();
services.AddSingleton<IFunctionEndpoint>(sp => sp.GetRequiredService<InProcessFunctionEndpoint>());
_ = services.AddSingleton(serviceBusTriggeredEndpointConfiguration);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity: what is the value of adding _?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not much other than clearly showing that the return type is not being consumed

_ = services.AddSingleton(startableEndpoint);
_ = services.AddSingleton(serverless);
_ = services.AddSingleton<InProcessFunctionEndpoint>();
_ = services.AddSingleton<IFunctionEndpoint>(sp => sp.GetRequiredService<InProcessFunctionEndpoint>());

#pragma warning disable CS0618 // Type or member is obsolete
// Validator is registered here in case the user wants to use the options directly. This makes sure that the options are validated.
// The transport still has to validate the options because options validators are only executed when the options are resolved.
_ = services.AddSingleton<IValidateOptions<MigrationTopologyOptions>, MigrationTopologyOptionsValidator>();
_ = services.AddOptions<MigrationTopologyOptions>()
#pragma warning restore CS0618 // Type or member is obsolete
.BindConfiguration("AzureServiceBus:MigrationTopologyOptions");

// Validator is registered here in case the user wants to use the options directly. This makes sure that the options are validated.
// The transport still has to validate the options because options validators are only executed when the options are resolved.
_ = services.AddSingleton<IValidateOptions<TopologyOptions>, TopologyOptionsValidator>();
_ = services.AddOptions<TopologyOptions>().BindConfiguration("AzureServiceBus:TopologyOptions");
}

static FunctionsHostBuilderContext GetContextInternal(this IFunctionsHostBuilder functionsHostBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.ServiceBus" Version="5.16.4" />
<PackageReference Include="NServiceBus" Version="9.2.6" />
<PackageReference Include="NServiceBus.Newtonsoft.Json" Version="4.0.1" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="4.2.4" />
<PackageReference Include="NServiceBus.Transport.AzureServiceBus" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,20 @@
{
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Microsoft.Extensions.Azure;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Transport;

class ServerlessTransport : TransportDefinition
class ServerlessTransport(AzureServiceBusTransport transport, string connectionString, string connectionName) : TransportDefinition(
TransportTransactionMode.ReceiveOnly,
transport.SupportsDelayedDelivery,
transport.SupportsPublishSubscribe,
transport.SupportsTTBR)
{
// HINT: This constant is defined in NServiceBus but is not exposed
const string MainReceiverId = "Main";
Expand All @@ -19,22 +25,11 @@ class ServerlessTransport : TransportDefinition

public IServiceProvider ServiceProvider { get; set; }

public ServerlessTransport(TransportExtensions<AzureServiceBusTransport> transportExtensions, string connectionString, string connectionName) : base(
transportExtensions.Transport.TransportTransactionMode,
transportExtensions.Transport.SupportsDelayedDelivery,
transportExtensions.Transport.SupportsPublishSubscribe,
transportExtensions.Transport.SupportsTTBR)
{
this.transportExtensions = transportExtensions;
this.connectionString = connectionString;
this.connectionName = connectionName;
}

public override async Task<TransportInfrastructure> Initialize(HostSettings hostSettings, ReceiveSettings[] receivers,
string[] sendingAddresses,
CancellationToken cancellationToken = default)
{
var configuredTransport = ConfigureTransportConnection(connectionString, connectionName, ServiceProvider.GetRequiredService<IConfiguration>(), transportExtensions,
var configuredTransport = ConfigureTransportConnection(connectionString, connectionName, ServiceProvider.GetRequiredService<IConfiguration>(), transport,
ServiceProvider.GetRequiredService<AzureComponentFactory>());

var baseTransportInfrastructure = await configuredTransport.Initialize(
Expand All @@ -58,16 +53,12 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
public override IReadOnlyCollection<TransportTransactionMode> GetSupportedTransactionModes() =>
supportedTransactionModes;

// We are deliberately using the old way of configuring a transport here because it allows us configuring
// the uninitialized transport with a connection string or a fully qualified name and a token provider.
// Once we deprecate the old way we can for example add make the internal ConnectionString, FQDN or
// TokenProvider properties visible to functions or the code base has already moved into a different direction.
static AzureServiceBusTransport ConfigureTransportConnection(string connectionString, string connectionName, IConfiguration configuration,
TransportExtensions<AzureServiceBusTransport> transportExtensions, AzureComponentFactory azureComponentFactory)
AzureServiceBusTransport transport, AzureComponentFactory azureComponentFactory)
{
if (connectionString != null)
{
_ = transportExtensions.ConnectionString(connectionString);
GetConnectionStringRef(transport) = connectionString;
}
else
{
Expand All @@ -80,7 +71,7 @@ static AzureServiceBusTransport ConfigureTransportConnection(string connectionSt

if (!string.IsNullOrWhiteSpace(connectionSection.Value))
{
_ = transportExtensions.ConnectionString(connectionSection.Value);
GetConnectionStringRef(transport) = connectionSection.Value;
}
else
{
Expand All @@ -91,22 +82,33 @@ static AzureServiceBusTransport ConfigureTransportConnection(string connectionSt
}

var credential = azureComponentFactory.CreateTokenCredential(connectionSection);
_ = transportExtensions.CustomTokenCredential(fullyQualifiedNamespace, credential);
GetFullyQualifiedNamespaceRef(transport) = fullyQualifiedNamespace;
GetTokenCredentialRef(transport) = credential;
}
}

return transportExtensions.Transport;
return transport;
}

// As a temporary workaround we are accessing the properties of the AzureServiceBusTransport using UnsafeAccessor
// This is another blocker to AoT but we are already using the execution assembly in the code base anyway
// Furthermore this allows us to still comply with initializing the transport as late as possible without having to
// expose the properties on the transport itself which would pollute the public API for not much added value.
[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<ConnectionString>k__BackingField")]
static extern ref string GetConnectionStringRef(AzureServiceBusTransport transport);

[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<FullyQualifiedNamespace>k__BackingField")]
static extern ref string GetFullyQualifiedNamespaceRef(AzureServiceBusTransport transport);

[UnsafeAccessor(UnsafeAccessorKind.Field, Name = "<TokenCredential>k__BackingField")]
static extern ref TokenCredential GetTokenCredentialRef(AzureServiceBusTransport transport);

internal const string DefaultServiceBusConnectionName = "AzureWebJobsServiceBus";

readonly TransportTransactionMode[] supportedTransactionModes =
{
[
TransportTransactionMode.ReceiveOnly,
TransportTransactionMode.SendsAtomicWithReceive
};
readonly TransportExtensions<AzureServiceBusTransport> transportExtensions;
readonly string connectionString;
readonly string connectionName;
];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
using System.Threading.Tasks;
using AzureFunctions.InProcess.ServiceBus;
using AzureFunctions.InProcess.ServiceBus.Serverless;
using Configuration.AdvancedExtensibility;
using Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Serialization;
using Settings;
using Transport.AzureServiceBus;

/// <summary>
/// Represents a serverless NServiceBus endpoint.
Expand Down Expand Up @@ -68,16 +70,28 @@ internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfigur
endpointConfiguration.License(licenseText);
}

// We are deliberately using the old way of creating a transport here because it allows us to create an
// uninitialized transport that can later be configured with a connection string or a fully qualified name and
// a token provider. Once we deprecate the old way we can for example add make the internal constructor
// visible to functions or the code base has already moved into a different direction.
transportExtensions = endpointConfiguration.UseTransport<AzureServiceBusTransport>();
// This is required for the Outbox validation to work in NServiceBus 8. It does not affect the actual consistency mode because it is controlled by the functions
// endpoint API (calling ProcessAtomic vs ProcessNonAtomic).
transportExtensions.Transactions(TransportTransactionMode.ReceiveOnly);
Transport = transportExtensions.Transport;
Routing = transportExtensions.Routing();
TopicTopology topicTopology = TopicTopology.Default;
var topologyOptionsSection = configuration?.GetSection("AzureServiceBus:TopologyOptions");
if (topologyOptionsSection.Exists())
{
topicTopology = TopicTopology.FromOptions(topologyOptionsSection.Get<TopologyOptions>());
}
// Migration options take precedence over topology options. We are not doing additional checks here for now.
var migrationOptionsSection = configuration?.GetSection("AzureServiceBus:MigrationTopologyOptions");
if (migrationOptionsSection.Exists())
{
#pragma warning disable CS0618 // Type or member is obsolete
topicTopology = TopicTopology.FromOptions(migrationOptionsSection.Get<MigrationTopologyOptions>());
#pragma warning restore CS0618 // Type or member is obsolete
}

Transport = new AzureServiceBusTransport("TransportWillBeInitializedCorrectlyLater", topicTopology)
{
// This is required for the Outbox validation to work in NServiceBus 8. It does not affect the actual consistency mode because it is controlled by the functions
// endpoint API (calling ProcessAtomic vs ProcessNonAtomic).
TransportTransactionMode = TransportTransactionMode.ReceiveOnly
};
Routing = new RoutingSettings<AzureServiceBusTransport>(endpointConfiguration.GetSettings());

endpointConfiguration.UseSerialization<NewtonsoftJsonSerializer>();

Expand All @@ -86,7 +100,7 @@ internal ServiceBusTriggeredEndpointConfiguration(string endpointName, IConfigur

internal ServerlessTransport InitializeTransport()
{
var serverlessTransport = new ServerlessTransport(transportExtensions, connectionString, connectionName);
var serverlessTransport = new ServerlessTransport(Transport, connectionString, connectionName);
AdvancedConfiguration.UseTransport(serverlessTransport);
return serverlessTransport;
}
Expand Down Expand Up @@ -117,6 +131,5 @@ public void LogDiagnostics() =>
readonly ServerlessRecoverabilityPolicy recoverabilityPolicy = new ServerlessRecoverabilityPolicy();
readonly string connectionString;
readonly string connectionName;
readonly TransportExtensions<AzureServiceBusTransport> transportExtensions;
}
}
39 changes: 39 additions & 0 deletions src/ServiceBus.AcceptanceTests/AcceptanceTestExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
namespace ServiceBus.Tests;

using System;
using System.IO.Hashing;
using System.Text;

public static class AcceptanceTestExtensions
{
public static string ToTopicName(this Type eventType) =>
eventType.FullName.Replace("+", ".").Shorten(maxLength: 260);

// The idea here is to preserve part of the text and append a non-cryptographic hash to it.
// This way, we can have a deterministic and unique names without harming much the readability.
// The chance of collisions should be very low but definitely not zero. We can always switch to
// using more bits in the hash or even back to a cryptographic hash if needed.
public static string Shorten(this string name, int maxLength = 50)
{
if (name.Length <= maxLength)
{
return name;
}

var nameBytes = Encoding.UTF8.GetBytes(name);
var hashValue = XxHash32.Hash(nameBytes);
string hashHex = Convert.ToHexString(hashValue);

int prefixLength = maxLength - hashHex.Length;

if (prefixLength < 0)
{
return hashHex.Length > maxLength
? hashHex[..maxLength] // in case even the hash is too long
: hashHex;
}

string prefix = name[..Math.Min(prefixLength, name.Length)];
return $"{prefix}{hashHex}";
}
}
25 changes: 12 additions & 13 deletions src/ServiceBus.AcceptanceTests/DefaultEndpoint.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace ServiceBus.Tests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus;
Expand Down Expand Up @@ -28,23 +29,21 @@ public async Task<EndpointConfiguration> GetConfiguration(
recoverability.Immediate(immediate => immediate.NumberOfRetries(0));
configuration.SendFailedMessagesTo("error");

configuration.EnforcePublisherMetadataRegistration(endpointConfiguration.EndpointName, endpointConfiguration.PublisherMetadata);

var connectionString =
Environment.GetEnvironmentVariable(ServerlessTransport.DefaultServiceBusConnectionName);

var azureServiceBusTransport = new AzureServiceBusTransport(connectionString)
var topology = TopicTopology.Default;
topology.OverrideSubscriptionNameFor(endpointConfiguration.EndpointName, endpointConfiguration.EndpointName.Shorten());
foreach (var eventType in endpointConfiguration.PublisherMetadata.Publishers.SelectMany(p => p.Events))
{
SubscriptionRuleNamingConvention = type =>
{
if (type.FullName.Length <= 50)
{
return type.FullName;
}

return type.Name;
}
};

var transport = configuration.UseTransport(azureServiceBusTransport);
topology.PublishTo(eventType, eventType.ToTopicName());
topology.SubscribeTo(eventType, eventType.ToTopicName());
}
var azureServiceBusTransport = new AzureServiceBusTransport(connectionString, topology);

_ = configuration.UseTransport(azureServiceBusTransport);

configuration.Pipeline.Register("TestIndependenceBehavior", b => new TestIndependenceSkipBehavior(runDescriptor.ScenarioContext), "Skips messages not created during the current test.");

Expand Down
Loading