From a66908dccc3f42dd4eb7c86b53b69ff62b514150 Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Mon, 24 Nov 2025 16:32:15 +0000 Subject: [PATCH 1/3] Add support to configuration brighter from IConfiguration --- Brighter.sln | 14 + docs/adr/0035-configuration-support.md | 545 ++++++++++++++++++ .../ConfigurationExtensions.cs | 45 ++ .../MicrosoftConfiguration.cs | 26 + ...e.Brighter.Extensions.Configuration.csproj | 16 + .../RmqMessagingGatewayConnection.cs | 19 +- ...essagingGatewayFactoryFromConfiguration.cs | 201 +++++++ src/Paramore.Brighter/IAmAConfiguration.cs | 39 ++ 8 files changed, 895 insertions(+), 10 deletions(-) create mode 100644 docs/adr/0035-configuration-support.md create mode 100644 src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs create mode 100644 src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs create mode 100644 src/Paramore.Brighter.Extensions.Configuration/Paramore.Brighter.Extensions.Configuration.csproj create mode 100644 src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs create mode 100644 src/Paramore.Brighter/IAmAConfiguration.cs diff --git a/Brighter.sln b/Brighter.sln index 21597c1769..7cdbfa5420 100644 --- a/Brighter.sln +++ b/Brighter.sln @@ -466,6 +466,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.MessageSc EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.TickerQ.Tests", "tests\Paramore.Brighter.TickerQ.Tests\Paramore.Brighter.TickerQ.Tests.csproj", "{261E1392-7713-525F-2859-7B40CA416A50}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Extensions.Configuration", "src\Paramore.Brighter.Extensions.Configuration\Paramore.Brighter.Extensions.Configuration.csproj", "{5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -2732,6 +2734,18 @@ Global {261E1392-7713-525F-2859-7B40CA416A50}.Release|Mixed Platforms.Build.0 = Release|Any CPU {261E1392-7713-525F-2859-7B40CA416A50}.Release|x86.ActiveCfg = Release|Any CPU {261E1392-7713-525F-2859-7B40CA416A50}.Release|x86.Build.0 = Release|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Debug|x86.ActiveCfg = Debug|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Debug|x86.Build.0 = Debug|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|Any CPU.Build.0 = Release|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|x86.ActiveCfg = Release|Any CPU + {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/docs/adr/0035-configuration-support.md b/docs/adr/0035-configuration-support.md new file mode 100644 index 0000000000..49384f7d5f --- /dev/null +++ b/docs/adr/0035-configuration-support.md @@ -0,0 +1,545 @@ +# 35. IConfiguration Integration for Brighter Components + +Date: 2025-01-24 + +## Status + +Proposed + +## Context + +Brighter currently supports configuration-as-code (see [ADR 0016](0016-use-configuration-as-code-not-by-convention.md)), which provides type safety and clarity about framework operation. However, modern .NET applications often need to manage configuration across multiple environments (development, staging, production) and sources (appsettings.json, environment variables, Azure Key Vault, etc.). While configuration-as-code is excellent for compile-time safety and explicitness, it can make environment-specific configuration management challenging, requiring code changes or recompilation to adapt to different deployment scenarios. + +The .NET platform provides `IConfiguration` as a standard abstraction for configuration, which supports: +- Multiple configuration sources with fallback behavior +- Environment-specific overrides +- Secrets management integration +- Dynamic configuration updates +- Configuration validation + +Several Brighter components need configuration that varies by environment: +- **Messaging Gateways** - connection strings, endpoints, credentials +- **Outbox** - database connections, table names, retention policies +- **Inbox** - database connections, table names, deduplication settings +- **Storage Providers (Luggage Store)** - S3 buckets, blob containers, access credentials +- **Distributed Locks** - connection strings, timeout values, lock prefixes + +Currently, Brighter does not provide a standardized way to load component configuration from `IConfiguration`. + +## Decision + +We will introduce `IConfiguration` integration for Brighter to provide a consistent pattern across all components while maintaining our configuration-as-code principles. This is not a replacement for code-based configuration but an **additive capability** that gives developers flexibility to choose the appropriate approach for their scenario. + +### Core Abstraction + +Introduce `IAmAConfiguration` as a core abstraction in `Paramore.Brighter` that wraps configuration access. This abstraction allows Brighter to work with configuration independently of the specific configuration system being used (Microsoft.Extensions.Configuration, .NET Aspire, or potentially others in the future). + +```csharp +/// +/// Abstraction for configuration access in Brighter +/// +public interface IAmAConfiguration +{ + /// + /// Gets a configuration sub-section with the specified key + /// + /// The key of the configuration section + /// The configuration section + IAmAConfiguration GetSection(string sectionName); + + /// + /// Attempts to bind the configuration to a new instance of type T + /// + /// The type to bind to + /// The new instance if successful, null otherwise + T? Get(); + + /// + /// Gets a connection string by name + /// + /// The connection string name + /// The connection string + string? GetConnectionString(string name); + + /// + /// Binds the configuration to an existing instance + /// + /// The type of the instance + /// The instance to bind to + void Bind(T obj); +} +``` + +The implementation in `Paramore.Brighter.Extensions.Configuration` will wrap `IConfiguration`: + +```csharp +public class MicrosoftConfiguration : IAmAConfiguration +{ + private readonly IConfiguration _configuration; + + public MicrosoftConfiguration(IConfiguration configuration) + { + _configuration = configuration; + } + + public IAmAConfiguration GetSection(string sectionName) + { + return new MicrosoftConfiguration(_configuration.GetSection(sectionName)); + } + + public T? Get() + { + return _configuration.Get(); + } + + public string? GetConnectionString(string name) + { + return _configuration.GetConnectionString(name); + } + + public void Bind(T obj) + { + _configuration.Bind(obj); + } +} +``` + +### Core Principles + +1. **Additive, Not Replacement** - Existing code-based configuration continues to work; configuration support is optional +2. **Consistent Patterns** - All components follow the same configuration structure and factory interface patterns +3. **Type Safety** - Configuration is strongly-typed and validated at startup +4. **Convention with Flexibility** - Default section names follow conventions but can be overridden +5. **Explicit Over Implicit** - Factories are explicit about what they create and what configuration they need + +### Configuration Section Convention + +All Brighter configuration follows the pattern: `Brighter:`. + +For components that support multiple named instances, the pattern is `Brighter::`. This allows defining multiple configurations for the same provider. + +Examples: +```json +{ + "Brighter": { + "RabbitMQ": { + "Connection": { + "Connection": { "AmqpUri": { "Uri": "amqp://guest:guest@localhost:5672" } } + }, + "AnalyticsConnection": { + "Connection": { "AmqpUri": { "Uri": "amqp://guest:guest@analytics-bus:5672" } } + } + }, + "DynamoDb": { + "Outbox": { + "TableName": "brighter-outbox", + "Region": "us-east-1" + } + }, + "SqlServer": { + "Connection": "Server=.;Database=Brighter;Integrated Security=True;", + "Inbox": { + "Table": "inbox_table" + } + }, + "PostgreSql": { + "Connection": "Host=localhost;Database=brighter;Username=postgres;Password=password" + } + } +} +``` + +### Factory Interfaces + +Introduce the following factory interfaces in `Paramore.Brighter`: + +```csharp +/// +/// Factory for creating messaging gateway components from configuration +/// +public interface IAmMessagingGatewayFactoryFromConfiguration +{ + /// + /// Creates a channel factory from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// A channel factory instance + IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string name, string? sectionName = null); + + /// + /// Creates a message consumer factory from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// A message consumer factory instance + IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, string name, string? sectionName = null); + + /// + /// Creates a producer registry from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// A producer registry instance + IAmAProducerRegistry CreateProducerRegistry(IAmAConfiguration configuration, string name, string? sectionName = null); +} + +/// +/// Factory for creating an outbox from configuration +/// +public interface IAmAnOutboxFactoryFromConfiguration +{ + /// + /// Creates an outbox from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// An outbox instance + IAmAnOutbox Create(IAmAConfiguration configuration, string? name = null, string? sectionName = null); +} + +/// +/// Factory for creating an inbox from configuration +/// +public interface IAmAnInboxFactoryFromConfiguration +{ + /// + /// Creates an inbox from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// An inbox instance + IAmAnInbox Create(IAmAConfiguration configuration, string? name = null, string? sectionName = null); +} + +/// +/// Factory for creating a storage provider (luggage store) from configuration +/// +public interface IAmAStorageProviderFactoryFromConfiguration +{ + /// + /// Creates a storage provider from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// A storage provider instance + IAmAStorageProvider Create(IAmAConfiguration configuration, string? name = null, string? sectionName = null); +} + +/// +/// Factory for creating a distributed lock from configuration +/// +public interface IAmADistributedLockFactoryFromConfiguration +{ + /// + /// Creates a distributed lock from configuration + /// + /// The configuration section + /// Optional name for named configurations + /// Optional override for the configuration section name + /// A distributed lock instance + IDistributedLock Create(IAmAConfiguration configuration, string? name = null, string? sectionName = null); +} +``` + + +### Extension Methods + +Add extension methods in `Paramore.Brighter.Extensions.Configuration` to simplify usage: + +```csharp +public static class ConfigurationExtensions +{ + // Existing method for messaging gateways + public static T CreateMessageGatewayConnection( + this IConfiguration configuration, + string? name = null, + string? sectionName = null) { ... } + + // New methods for other components + public static IAmAnOutbox CreateOutbox( + this IConfiguration configuration, + string? name = null, + string? sectionName = null) where T : IAmAnOutbox + { + var factory = Get(typeof(T).Assembly); + return factory.Create(new MicrosoftConfiguration(configuration), name, sectionName); + } + + public static IAmAnInbox CreateInbox( + this IConfiguration configuration, + string? name = null, + string? sectionName = null) where T : IAmAnInbox + { + var factory = Get(typeof(T).Assembly); + return factory.Create(new MicrosoftConfiguration(configuration), name, sectionName); + } + + public static IAmAStorageProvider CreateStorageProvider( + this IConfiguration configuration, + string? name = null, + string? sectionName = null) where T : IAmLuggageStorage + { + var factory = Get(typeof(T).Assembly); + return factory.Create(new MicrosoftConfiguration(configuration), name, sectionName); + } + + public static IDistributedLock CreateDistributedLock( + this IConfiguration configuration, + string? name = null, + string? sectionName = null) where T : IDistributedLock + { + var factory = Get(typeof(T).Assembly); + return factory.Create(new MicrosoftConfiguration(configuration), name, sectionName); + } +} +``` + +### Implementation Examples + +#### DynamoDB Outbox Configuration + +```json +{ + "Brighter": { + "DynamoDb": { + "Outbox": { + "TableName": "brighter-outbox", + "Region": "us-east-1", + "ServiceUrl": null, + "NumberOfShards": 5, + "ScanConcurrency": 4, + "CreateTableOnMissing": false + } + } + } +} +``` + +```csharp +// In Paramore.Brighter.DynamoDb +public class DynamoDbOutboxFactoryFromConfiguration : IAmAnOutboxFactoryFromConfiguration +{ + private const string DefaultSectionName = "DynamoDb"; + + public IAmAnOutbox Create(IAmAConfiguration configuration, string? name, string? sectionName) + { + sectionName ??= DefaultSectionName; + + var configSection = configuration.GetSection($"Brighter:{sectionName}"); + if (!string.IsNullOrEmpty(name)) + { + var namedSection = configSection.GetSection(name); + if (namedSection != null) + configSection = namedSection; + } + + var config = configSection.Get(); + if (config == null) + throw new ConfigurationException($"DynamoDB configuration section 'Brighter:{sectionName}' is missing or invalid"); + + return new DynamoDbOutbox(config); + } +} +``` + +#### PostgreSQL Distributed Lock Configuration + +```json +{ + "Brighter": { + "PostgreSql": { + "Connection": "Host=localhost;Database=brighter;Username=postgres;Password=password" + } + } +} +``` + +```csharp +// In Paramore.Brighter.Locking.PostgresSql +public class PostgreSqlDistributedLockFactoryFromConfiguration : IAmADistributedLockFactoryFromConfiguration +{ + private const string DefaultSectionName = "PostgreSql"; + + public IDistributedLock Create(IAmAConfiguration configuration, string? name, string? sectionName) + { + sectionName ??= DefaultSectionName; + + var configSection = configuration.GetSection($"Brighter:{sectionName}"); + if (!string.IsNullOrEmpty(name)) + { + var namedSection = configSection.GetSection(name); + if (namedSection != null) + configSection = namedSection; + } + + var config = configSection.Get(); + if (config == null) + throw new ConfigurationException($"PostgreSQL configuration section 'Brighter:{sectionName}' is missing or invalid"); + + return new PostgresLockingProvider(new PostgresLockingProviderOptions(config.ConnectionString) + { + SchemaName = config.SchemaName, + LockTimeout = config.LockTimeout + }); + } +} +``` + +#### .NET Aspire Configuration Support + +Brighter's `IAmAConfiguration` abstraction enables seamless integration with .NET Aspire. Aspire provides connection strings and configuration through the standard `IConfiguration` interface, which Brighter can consume directly. + +**Aspire ConnectionString Convention:** + +.NET Aspire uses a standardized connection string format: `Aspire::Client` with a `ConnectionString` property. Brighter's configuration factories will check for Aspire connection strings as a fallback when provider-specific configuration is not found. + +Example Aspire configuration for RabbitMQ: + +```json +{ + "Aspire": { + "RabbitMQ": { + "Connection": { + "Ampq": { + "Uri": "amqp://guest:guest@rabbitmq:5672" + } + } + } + } +} +``` + +The `RmqMessagingGatewayFactoryFromConfiguration` already implements Aspire support by checking multiple configuration sources in order: + +1. `Brighter:RabbitMQ` - Brighter-specific configuration (highest priority) +2. `Aspire:RabbitMQ:Client` - Aspire configuration +3. `ConnectionStrings:` - Standard .NET connection strings + +This pattern should be followed by all factory implementations to ensure consistent Aspire integration: + +```csharp +// Example pattern for Aspire support in factories +public IDistributedLock Create(IAmAConfiguration configuration, string? name, string? sectionName) +{ + sectionName ??= DefaultSectionName; + + // 1. Try Brighter-specific configuration + var configSection = configuration.GetSection($"Brighter:{sectionName}"); + var config = new PostgreSqlLockConfiguration(); + configSection.Bind(config); + + // 2. Try Aspire configuration + var aspireSection = configuration.GetSection($"Aspire:{sectionName}:Client"); + var aspireConnectionString = aspireSection.GetSection("ConnectionString").Get(); + if (!string.IsNullOrEmpty(aspireConnectionString)) + { + config.ConnectionString = aspireConnectionString; + } + + // 3. Try standard connection strings + var connectionString = configuration.GetConnectionString(name ?? sectionName); + if (!string.IsNullOrEmpty(connectionString)) + { + config.ConnectionString = connectionString; + } + + if (string.IsNullOrEmpty(config.ConnectionString)) + throw new ConfigurationException($"No configuration found for {sectionName}"); + + return new PostgresLockingProvider(new PostgresLockingProviderOptions(config.ConnectionString)); +} +``` + +This approach ensures that Brighter components can be configured through: +- Direct Brighter configuration for full control +- .NET Aspire for cloud-native application orchestration +- Standard connection strings for simple scenarios + + +### Service Collection Integration + +```csharp +// appsettings.json +{ + "Brighter": { + "RabbitMQ": { + "Connection": { "AmqpUri": { "Uri": "amqp://guest:guest@localhost:5672" } } + } + } +} + +// Program.cs +builder.Services.AddConsumers(opt => + { + opt.Subscriptions = builder.Configuration.CreateSubscription(); + opt.DefaultChannelFactory = builder.Configuration.CreateChannelFactory(); + }) + .AddProducers(opt => + { + opt.ProducerRegistry = builder.Configuration.CreateProducerRegistry(); + }); +``` + +## Consequences + +### Positive + +* **Environment Flexibility** - Developers can manage configuration through standard .NET configuration sources (appsettings.json, environment variables, Azure Key Vault, AWS Systems Manager, etc.) +* **No Code Changes** - Configuration changes don't require code modifications or recompilation for different environments +* **Consistency** - Unified configuration pattern across all Brighter components +* **Testability** - Easier to test with different configurations by swapping configuration sources +* **Secrets Management** - Integration with standard .NET secrets management tools +* **Aspire Integration** - Better support for .NET Aspire and other orchestration tools that provide configuration + +### Negative + +* **Two Ways to Configure** - Having both code-based and configuration-based approaches may cause confusion for new users +* **Runtime Validation** - Configuration errors are caught at runtime rather than compile-time +* **Documentation Burden** - Need to document both configuration approaches and when to use each +* **Migration Effort** - Existing provider implementations need updates to support the new factory interfaces +* **Testing Complexity** - More test scenarios to cover both configuration approaches + +### Mitigation Strategies + +1. **Clear Documentation** - Provide guidance on when to use each approach: + - Use code-based configuration for simple, single-environment scenarios + - Use IConfiguration integration for multi-environment deployments, when using secrets management, or with orchestration tools + +2. **Validation at Startup** - Implement configuration validation that runs at application startup to catch errors early + +3. **Gradual Rollout** - Implement factory interfaces incrementally across providers, starting with the most commonly used ones + +4. **Examples and Samples** - Provide comprehensive samples showing both approaches + +5. **Type Safety Preservation** - Use strongly-typed configuration classes that bind to IConfiguration, maintaining type safety benefits + +### Migration Path + +This is a non-breaking, additive change: + +1. Add `IAmAConfiguration` interface to `Paramore.Brighter` core +2. Add factory interfaces to `Paramore.Brighter` core +3. Add `MicrosoftConfiguration` implementation and extension methods in `Paramore.Brighter.Extensions.Configuration` +4. Implement factory interfaces in provider packages incrementally, with Aspire configuration fallback support: + - Phase 1: Messaging gateways (RabbitMQ, Kafka, AWS SNS/SQS, Azure Service Bus, etc.) + - Phase 2: Outbox implementations (SQL Server, DynamoDB, PostgreSQL, MySQL, etc.) + - Phase 3: Inbox implementations + - Phase 4: Distributed locks (PostgreSQL, DynamoDB, MongoDb, etc.) + - Phase 5: Storage providers (S3, Azure Blob Storage, etc.) +5. Update DI extension methods to support `*FromConfiguration` variants +6. Update documentation with configuration examples, including .NET Aspire integration scenarios +7. Existing code-based configuration continues to work unchanged + +### Relationship to Other ADRs + +* Complements [ADR 0016: Use Configuration As Code](0016-use-configuration-as-code-not-by-convention.md) by adding environment flexibility while maintaining explicitness +* Aligns with [ADR 0015: Push Button APIs For DSLs](0015-push-button-api-for-dsl.md) for ASP.NET Core integration +* Supports [ADR 0014: DI Friendly Framework](0014-di-friendly-framework.md) by integrating with standard .NET configuration patterns + diff --git a/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs b/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs new file mode 100644 index 0000000000..bae0ad2903 --- /dev/null +++ b/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs @@ -0,0 +1,45 @@ +using System; +using System.Reflection; +using Microsoft.Extensions.Configuration; + +namespace Paramore.Brighter.Extensions.Configuration; + +public static class ConfigurationExtensions +{ + public static T CreateMessageGatewayConnection(this IConfiguration configuration) + { + return CreateMessageGatewayConnection(configuration, null, null); + } + + public static T CreateMessageGatewayConnection(this IConfiguration configuration, string? name) + { + return CreateMessageGatewayConnection(configuration, name, null); + } + + public static T CreateMessageGatewayConnection(this IConfiguration configuration, + string? name, + string? sectionName) + { + var factory = Get(typeof(T).Assembly); + return (T)factory.Create(new MicrosoftConfiguration(configuration), name, sectionName); + } + + private static T Get(Assembly assembly) + { + var @interface = typeof(T); + foreach (var type in assembly.GetTypes()) + { + if (!type.IsClass || type.IsAbstract) + { + continue; + } + + if (@interface.IsAssignableFrom(type)) + { + return (T)Activator.CreateInstance(type)!; + } + } + + throw new InvalidOperationException($"Interface {typeof(T).FullName} isn't implement"); + } +} diff --git a/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs b/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs new file mode 100644 index 0000000000..ab51189a1d --- /dev/null +++ b/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs @@ -0,0 +1,26 @@ +using Microsoft.Extensions.Configuration; + +namespace Paramore.Brighter.Extensions.Configuration; + +public class MicrosoftConfiguration(IConfiguration configuration) : IAmAConfiguration +{ + public IAmAConfiguration GetSection(string sectionName) + { + return new MicrosoftConfiguration(configuration.GetSection(sectionName)); + } + + public T? Get() + { + return configuration.Get(); + } + + public string? GetConnectionString(string name) + { + return configuration.GetConnectionString(name); + } + + public void Bind(T obj) + { + configuration.Bind(obj); + } +} diff --git a/src/Paramore.Brighter.Extensions.Configuration/Paramore.Brighter.Extensions.Configuration.csproj b/src/Paramore.Brighter.Extensions.Configuration/Paramore.Brighter.Extensions.Configuration.csproj new file mode 100644 index 0000000000..af79a4b0e0 --- /dev/null +++ b/src/Paramore.Brighter.Extensions.Configuration/Paramore.Brighter.Extensions.Configuration.csproj @@ -0,0 +1,16 @@ + + + + $(BrighterTargetFrameworks) + enable + + + + + + + + + + + diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayConnection.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayConnection.cs index f45a7efb0b..8018adfbc6 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayConnection.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayConnection.cs @@ -29,17 +29,17 @@ THE SOFTWARE. */ namespace Paramore.Brighter.MessagingGateway.RMQ.Async { public class RmqMessagingGatewayConnection : IAmGatewayConfiguration - { + { /// - /// Sets Unique name for the subscription - /// - public string Name { get; set; } = Environment.MachineName; + /// Sets Unique name for the subscription + /// + public string Name { get; set; } = Environment.MachineName; - /// - /// Gets or sets the ampq URI. - /// - /// The ampq URI. - public AmqpUriSpecification? AmpqUri { get; set; } + /// + /// Gets or sets the ampq URI. + /// + /// The ampq URI. + public AmqpUriSpecification? AmpqUri { get; set; } /// /// Gets or sets the exchange. @@ -86,7 +86,6 @@ public class AmqpUriSpecification( /// The URI. public Uri Uri { get; set; } = uri; - /// /// Gets or sets the retry count for when a subscription fails /// diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs new file mode 100644 index 0000000000..6bf03dc924 --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs @@ -0,0 +1,201 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using RabbitMQ.Client; + +namespace Paramore.Brighter.MessagingGateway.RMQ.Async; + +public class RmqMessagingGatewayFactoryFromConfiguration : IAmMessagingGatewayFactoryFromConfiguration +{ + private const string BrighterSection = "Brighter:RabbitMQ"; + private const string AspireSection = "Aspire:RabbitMQ:Client"; + + private const string AspireConnection = "ConnectionString"; + + public IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string name, string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + return new ChannelFactory(new RmqMessageConsumerFactory(connection)); + } + + public IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, + string name, + string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + return new RmqMessageConsumerFactory(connection); + } + + public IAmAProducerRegistryFactory CreateProducerRegistryFactory(IAmAConfiguration configuration, + string name, + string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + + return new RmqProducerRegistryFactory(connection, + rabbitMqConfiguration.Publications.Select(x => x.ToPublication())); + } + + + private static RabbitMqConfiguration GetRabbitMqConfiguration(IAmAConfiguration configuration, + string name, + string? sectionName) + { + if (string.IsNullOrEmpty(sectionName)) + { + sectionName = BrighterSection; + } + + var configurationSection = configuration.GetSection(sectionName!); + var namedConfigurationSection = configurationSection.GetSection(name); + + var rabbitMqConfiguration = new RabbitMqConfiguration(); + configurationSection.Bind(rabbitMqConfiguration); + namedConfigurationSection.Bind(rabbitMqConfiguration); + + var aspireConfiguration = configuration.GetSection(AspireSection); + var namedAspireConfiguration = aspireConfiguration.GetSection(name); + + var connection = aspireConfiguration.GetSection(AspireConnection).Get(); + if (!string.IsNullOrEmpty(connection)) + { + rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); + rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; + } + + connection = namedAspireConfiguration.GetSection(AspireConnection).Get(); + if (!string.IsNullOrEmpty(connection)) + { + rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); + rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; + } + + connection = configuration.GetConnectionString(name); + if (!string.IsNullOrEmpty(connection)) + { + rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); + rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; + } + + return rabbitMqConfiguration; + } + + public class RabbitMqConfiguration + { + public GatewayConnection Connection { get; set; } = null!; + public List Subscriptions { get; set; } = []; + public List Publications { get; set; } = []; + } + + public class GatewayConnection + { + public string Name { get; set; } = Environment.MachineName; + public AmqpUriSpecificationConfiguration? AmpqUri { get; set; } = null; + public ExchangeConfiguration? Exchange { get; set; } + public ExchangeConfiguration? DeadLetterExchange { get; set; } + public ushort Heartbeat { get; set; } = 20; + public bool PersistMessages { get; set; } + public ushort ContinuationTimeout { get; set; } = 20; + + public RmqMessagingGatewayConnection ToMessagingGatewayConnection() + { + return new RmqMessagingGatewayConnection + { + Name = Name, + AmpqUri = AmpqUri != null ? new AmqpUriSpecification( + uri: new Uri(AmpqUri.Uri, UriKind.Absolute), + connectionRetryCount: AmpqUri.ConnectionRetryCount, + retryWaitInMilliseconds: AmpqUri.RetryWaitInMilliseconds, + circuitBreakTimeInMilliseconds: AmpqUri.CircuitBreakTimeInMilliseconds) : null, + Exchange = Exchange != null ? new Exchange( + name: Exchange.Name, + type: Exchange.Type, + durable: Exchange.Durable, + supportDelay: Exchange.SupportDelay) : null, + DeadLetterExchange = DeadLetterExchange != null ? new Exchange( + name: DeadLetterExchange.Name, + type: DeadLetterExchange.Type, + durable: DeadLetterExchange.Durable, + supportDelay: DeadLetterExchange.SupportDelay) : null, + Heartbeat = Heartbeat, + PersistMessages = PersistMessages, + ContinuationTimeout = ContinuationTimeout, + }; + } + } + + public class AmqpUriSpecificationConfiguration + { + public string Uri { get; set; } = string.Empty; + public int ConnectionRetryCount { get; set; } = 3; + public int RetryWaitInMilliseconds { get; set; } = 1_000; + public int CircuitBreakTimeInMilliseconds { get; set; } = 60_000; + } + + public class ExchangeConfiguration + { + public string Name { get; set; } = string.Empty; + + public string Type { get; set; } = ExchangeType.Direct; + + public bool Durable { get; set; } + + public bool SupportDelay { get; set; } + } + + public class RabbitMqSubscriptionConfiguration + { + + } + + public class RabbitMqPublicationConfiguration + { + public string? DataSchema { get; set; } + public OnMissingChannel MakeChannels { get; set; } + public string? RequestType { get; set; } + + public string Source { get; set; } = "http://goparamore.io"; + + public string? Subject { get; set; } + public string? Topic { get; set; } + public string Type { get; set; } = string.Empty; + + public IDictionary? DefaultHeaders { get; set; } + public IDictionary? CloudEventsAdditionalProperties { get; set; } + + public string? ReplyTo { get; set; } + public int WaitForConfirmsTimeOutInMilliseconds { get; set; } = 500; + + public RmqPublication ToPublication() + { + Uri? dataschema = null; + if (!string.IsNullOrEmpty(DataSchema)) + { + dataschema = new Uri(DataSchema!, UriKind.RelativeOrAbsolute); + } + + RoutingKey? topic = null; + if (!string.IsNullOrEmpty(Topic)) + { + topic = new RoutingKey(Topic!); + } + + return new RmqPublication + { + DataSchema = dataschema, + MakeChannels = MakeChannels, + Source = new Uri(Source, UriKind.RelativeOrAbsolute), + Subject = Subject, + Topic = topic, + Type = new CloudEventsType(Type), + DefaultHeaders = DefaultHeaders, + CloudEventsAdditionalProperties = CloudEventsAdditionalProperties, + ReplyTo = ReplyTo, + WaitForConfirmsTimeOutInMilliseconds = WaitForConfirmsTimeOutInMilliseconds, + }; + } + } +} diff --git a/src/Paramore.Brighter/IAmAConfiguration.cs b/src/Paramore.Brighter/IAmAConfiguration.cs new file mode 100644 index 0000000000..16b8f25d5c --- /dev/null +++ b/src/Paramore.Brighter/IAmAConfiguration.cs @@ -0,0 +1,39 @@ +namespace Paramore.Brighter; + +public interface IAmAConfiguration +{ + IAmAConfiguration GetSection(string sectionName); + + T? Get(); + string? GetConnectionString(string name); + + void Bind(T obj); +} + +public interface IAmGatewayConfigurationFactoryFromConfiguration +{ + IAmGatewayConfiguration Create(IAmAConfiguration configuration, string? name, string? sectionName); +} + +public interface IAmAMessageConsumerFactoryFromConfiguration +{ + IAmAMessageConsumerFactory Create(IAmAConfiguration configuration, string? connectionString, string? sectionName); +} + +public interface IAmAMessageProducerFactoryFromConfiguration +{ + IAmAMessageProducerFactory Create(IAmAConfiguration configuration, string? connectionString, string? sectionName); +} + +public interface IAmAProducerRegistryFactoryFromConfiguration +{ + IAmAProducerRegistryFactory Create(IAmAConfiguration configuration, string? connectionString, string? sectionName); +} + + +public interface IAmMessagingGatewayFactoryFromConfiguration +{ + IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string name, string? sectionName); + IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, string name, string? sectionName); + IAmAProducerRegistryFactory CreateProducerRegistryFactory(IAmAConfiguration configuration, string name, string? sectionName); +} From ce93ef4d3d033c6e1ac4f5898875e991ff9331aa Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Fri, 28 Nov 2025 15:12:59 +0000 Subject: [PATCH 2/3] Increate the implementation by adding a sample --- .../GreetingsReceiverConsole.csproj | 1 + .../GreetingsReceiverConsole/Program.cs | 10 +- .../GreetingsSender/GreetingsSender.csproj | 1 + .../RMQTaskQueue/GreetingsSender/Program.cs | 9 +- .../ConfigurationExtensions.cs | 107 ++- .../MicrosoftConfiguration.cs | 90 +++ ...essagingGatewayFactoryFromConfiguration.cs | 201 ------ ...essagingGatewayFromConfigurationFactory.cs | 665 ++++++++++++++++++ .../RmqSubscription.cs | 1 - .../ConfigurationFactory/IAmAConfiguration.cs | 61 ++ ...essagingGatewayFromConfigurationFactory.cs | 87 +++ .../PublicationConfiguration.cs | 207 ++++++ .../SubscriptionConfiguration.cs | 281 ++++++++ src/Paramore.Brighter/IAmAConfiguration.cs | 39 - .../CreateChannelFactoryTests.cs | 324 +++++++++ ...MessageGatewayConfigurationFactoryTests.cs | 372 ++++++++++ .../Paramore.Brighter.RMQ.Async.Tests.csproj | 1 + 17 files changed, 2199 insertions(+), 258 deletions(-) delete mode 100644 src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs create mode 100644 src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs create mode 100644 src/Paramore.Brighter/ConfigurationFactory/IAmAConfiguration.cs create mode 100644 src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs create mode 100644 src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs create mode 100644 src/Paramore.Brighter/ConfigurationFactory/SubscriptionConfiguration.cs delete mode 100644 src/Paramore.Brighter/IAmAConfiguration.cs create mode 100644 tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateChannelFactoryTests.cs create mode 100644 tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateMessageGatewayConfigurationFactoryTests.cs diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj index 32ccee7f24..be542a77e7 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj @@ -4,6 +4,7 @@ Exe + diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs index fca9c47816..4645ef2582 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs @@ -28,6 +28,7 @@ THE SOFTWARE. */ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Paramore.Brighter; +using Paramore.Brighter.Extensions.Configuration; using Paramore.Brighter.MessagingGateway.RMQ.Async; using Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection; using Paramore.Brighter.ServiceActivator.Extensions.Hosting; @@ -46,7 +47,7 @@ public static async Task Main(string[] args) .CreateLogger(); var host = new HostBuilder() - .ConfigureServices((_, services) => + .ConfigureServices((host, services) => { var subscriptions = new Subscription[] @@ -80,10 +81,9 @@ public static async Task Main(string[] args) var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnection); services.AddConsumers(options => - { - options.Subscriptions = subscriptions; - options.DefaultChannelFactory = new ChannelFactory(rmqMessageConsumerFactory); - }) + { + options.Subscriptions = host.Configuration.CreateSubscriptions(); + }) .AutoFromAssemblies(); diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj index b9abec47f8..df81d8f873 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj @@ -12,6 +12,7 @@ + diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs index ece4632517..7d2d2cb150 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs @@ -25,9 +25,11 @@ THE SOFTWARE. */ using System; using System.Collections.Generic; using Greetings.Ports.Commands; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Paramore.Brighter; +using Paramore.Brighter.Extensions.Configuration; using Paramore.Brighter.Extensions.DependencyInjection; using Paramore.Brighter.MessagingGateway.RMQ.Async; using Serilog; @@ -45,6 +47,11 @@ static void Main(string[] args) .WriteTo.Console() .CreateLogger(); + var configuration = new ConfigurationBuilder() + .AddJsonFile("appsetting.json") + .AddEnvironmentVariables() + .Build(); + var serviceCollection = new ServiceCollection(); serviceCollection.AddSingleton(new SerilogLoggerFactory()); @@ -77,7 +84,7 @@ static void Main(string[] args) .AddBrighter() .AddProducers((configure) => { - configure.ProducerRegistry = producerRegistry; + configure.ProducerRegistry = configuration.CreateProducerRegistry(); configure.MaxOutStandingMessages = 5; configure.MaxOutStandingCheckInterval = TimeSpan.FromMilliseconds(500); }) diff --git a/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs b/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs index bae0ad2903..f5dcc721a1 100644 --- a/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs +++ b/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs @@ -1,29 +1,97 @@ using System; +using System.Collections.Generic; +using System.Linq; using System.Reflection; using Microsoft.Extensions.Configuration; +using Paramore.Brighter.ConfigurationFactory; namespace Paramore.Brighter.Extensions.Configuration; +/// +/// Extension methods for to enable configuration-based initialization of Brighter components. +/// Provides convenient methods to create messaging gateways, outboxes, inboxes, storage providers, and distributed locks +/// from configuration sources such as appsettings.json, environment variables, Azure Key Vault, or .NET Aspire. +/// +/// +/// These extension methods provide a bridge between Microsoft.Extensions.Configuration and Brighter's configuration +/// abstraction layer. They automatically discover and instantiate the appropriate factory implementations from the +/// provider assemblies using reflection. +/// Configuration follows the convention "Brighter:{ProviderName}" with support for named instances via +/// "Brighter:{ProviderName}:{InstanceName}". +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// public static class ConfigurationExtensions { - public static T CreateMessageGatewayConnection(this IConfiguration configuration) + /// + /// Creates a messaging gateway configuration from the with full control over section name and instance name. + /// + /// The type of gateway configuration to create. Must implement . + /// The containing the messaging gateway settings. + /// The optional name for the configuration instance, allowing multiple configurations for the same provider. + /// The optional override for the configuration section name. If null, uses the provider's default section name. + /// An instance of configured from the configuration source. + /// Thrown when no factory implementation for is found in the assembly. + /// + /// This overload provides full flexibility for configuration location. The parameter + /// allows overriding the default provider section name, useful for non-standard configuration structures. + /// The method automatically discovers the factory implementation from the assembly containing + /// and delegates configuration loading to the factory. + /// + public static T CreateMessageGatewayConnection(this IConfiguration configuration, + string? name = null, + string? sectionName = null) + where T : class, IAmGatewayConfiguration { - return CreateMessageGatewayConnection(configuration, null, null); + var factory = Get(typeof(T).Assembly); + return (T)factory.CreateMessageConsumerFactory(new MicrosoftConfiguration(configuration), name, sectionName); } - - public static T CreateMessageGatewayConnection(this IConfiguration configuration, string? name) + + public static Subscription[] CreateSubscriptions(this IConfiguration configuration, + string? name = null, + string? sectionName = null) { - return CreateMessageGatewayConnection(configuration, name, null); + var cfg = new MicrosoftConfiguration(configuration); + var factories = GetAll(); + var subscriptions = new List(); + + foreach (var factory in factories) + { + subscriptions.AddRange(factory.CreateSubscriptions(cfg, name, sectionName)); + } + + return subscriptions.ToArray(); } - - public static T CreateMessageGatewayConnection(this IConfiguration configuration, - string? name, - string? sectionName) + + public static IAmAProducerRegistry CreateProducerRegistry(this IConfiguration configuration, + string? name = null, + string? sectionName = null) { - var factory = Get(typeof(T).Assembly); - return (T)factory.Create(new MicrosoftConfiguration(configuration), name, sectionName); + var cfg = new MicrosoftConfiguration(configuration); + var factories = GetAll() + .ToList(); + var messageProducerFactories = new List(); + + foreach (var factory in factories) + { + messageProducerFactories.Add(factory.CreateMessageProducerFactory(cfg, name, sectionName)); + } + + return new CombinedProducerRegistryFactory(messageProducerFactories.ToArray()) + .Create(); } + /// + /// Discovers and instantiates an implementation of the specified interface from the given assembly. + /// + /// The interface type to find an implementation for. + /// The to search for implementations. + /// An instance of the first concrete, non-abstract class that implements . + /// Thrown when no implementation of is found in the assembly. + /// + /// This method uses reflection to scan the assembly for concrete classes implementing the specified interface. + /// It instantiates the first matching type using the parameterless constructor via . + /// This is used internally to automatically discover factory implementations in provider assemblies. + /// private static T Get(Assembly assembly) { var @interface = typeof(T); @@ -42,4 +110,21 @@ private static T Get(Assembly assembly) throw new InvalidOperationException($"Interface {typeof(T).FullName} isn't implement"); } + + private static IEnumerable GetAll() + { + var @interface = typeof(T); + foreach (var type in AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())) + { + if (!type.IsClass || type.IsAbstract) + { + continue; + } + + if (@interface.IsAssignableFrom(type)) + { + yield return (T)Activator.CreateInstance(type)!; + } + } + } } diff --git a/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs b/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs index ab51189a1d..2a71cb5cf7 100644 --- a/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs +++ b/src/Paramore.Brighter.Extensions.Configuration/MicrosoftConfiguration.cs @@ -1,24 +1,114 @@ using Microsoft.Extensions.Configuration; +using Paramore.Brighter.ConfigurationFactory; namespace Paramore.Brighter.Extensions.Configuration; +/// +/// Adapter that bridges Microsoft.Extensions.Configuration to Brighter's configuration abstraction. +/// Wraps to provide configuration access through , +/// enabling Brighter components to work with standard .NET configuration sources. +/// +/// +/// This adapter enables Brighter to consume configuration from any source supported by Microsoft.Extensions.Configuration, +/// including: +/// +/// appsettings.json files +/// Environment variables +/// Command-line arguments +/// Azure Key Vault +/// AWS Systems Manager Parameter Store +/// .NET Aspire configuration +/// User secrets (development) +/// +/// +/// The adapter maintains the hierarchical structure and fallback behavior of the underlying , +/// allowing Brighter's factory implementations to resolve configuration from multiple sources with proper precedence. +/// +/// +/// Example usage: +/// +/// var configuration = new ConfigurationBuilder() +/// .AddJsonFile("appsettings.json") +/// .AddEnvironmentVariables() +/// .Build(); +/// +/// var brighterConfig = new MicrosoftConfiguration(configuration); +/// var rabbitMqSection = brighterConfig.GetSection("Brighter:RabbitMQ"); +/// +/// +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// +/// The instance to wrap. Must not be null. public class MicrosoftConfiguration(IConfiguration configuration) : IAmAConfiguration { + /// + /// Gets a configuration sub-section with the specified key. + /// + /// The key of the configuration section to retrieve. + /// A wrapping the requested section. + /// + /// This method delegates to and wraps the result in a new + /// instance, preserving the adapter pattern through the configuration hierarchy. + /// If the section does not exist, an empty configuration section is returned (following IConfiguration semantics). + /// public IAmAConfiguration GetSection(string sectionName) { return new MicrosoftConfiguration(configuration.GetSection(sectionName)); } + /// + /// Attempts to bind the configuration to a new instance of type . + /// + /// The type to bind the configuration to. Should be a class or struct with properties matching the configuration structure. + /// A new instance of with properties populated from configuration, or null if binding fails. + /// + /// This method delegates to which uses reflection to + /// create an instance and populate properties. Property names are matched case-insensitively to configuration keys. + /// Complex objects, collections, and nested types are supported. + /// public T? Get() { return configuration.Get(); } + /// + /// Gets a connection string by name from the configuration provider. + /// + /// The name of the connection string to retrieve. + /// The connection string as a , or null if not found. + /// + /// This method delegates to which looks for connection strings + /// in the "ConnectionStrings" configuration section. This is the standard .NET convention for storing database + /// and service connection strings. + /// + /// Example configuration: + /// + /// { + /// "ConnectionStrings": { + /// "RabbitMQ": "amqp://guest:guest@localhost:5672", + /// "Database": "Server=.;Database=Brighter;Integrated Security=True;" + /// } + /// } + /// + /// + /// public string? GetConnectionString(string name) { return configuration.GetConnectionString(name); } + /// + /// Binds the configuration values to an existing instance. + /// + /// The type of the instance to bind to. + /// The existing instance whose properties should be populated from configuration. + /// + /// This method delegates to which uses reflection + /// to update properties of the existing object. Unlike , this method does not create a new + /// instance but modifies the provided one. This is useful for applying configuration overrides to objects with + /// default values already set. + /// Property names are matched case-insensitively to configuration keys. + /// public void Bind(T obj) { configuration.Bind(obj); diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs deleted file mode 100644 index 6bf03dc924..0000000000 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFactoryFromConfiguration.cs +++ /dev/null @@ -1,201 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using RabbitMQ.Client; - -namespace Paramore.Brighter.MessagingGateway.RMQ.Async; - -public class RmqMessagingGatewayFactoryFromConfiguration : IAmMessagingGatewayFactoryFromConfiguration -{ - private const string BrighterSection = "Brighter:RabbitMQ"; - private const string AspireSection = "Aspire:RabbitMQ:Client"; - - private const string AspireConnection = "ConnectionString"; - - public IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string name, string? sectionName) - { - var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name, sectionName); - var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); - return new ChannelFactory(new RmqMessageConsumerFactory(connection)); - } - - public IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, - string name, - string? sectionName) - { - var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name, sectionName); - var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); - return new RmqMessageConsumerFactory(connection); - } - - public IAmAProducerRegistryFactory CreateProducerRegistryFactory(IAmAConfiguration configuration, - string name, - string? sectionName) - { - var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name, sectionName); - var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); - - return new RmqProducerRegistryFactory(connection, - rabbitMqConfiguration.Publications.Select(x => x.ToPublication())); - } - - - private static RabbitMqConfiguration GetRabbitMqConfiguration(IAmAConfiguration configuration, - string name, - string? sectionName) - { - if (string.IsNullOrEmpty(sectionName)) - { - sectionName = BrighterSection; - } - - var configurationSection = configuration.GetSection(sectionName!); - var namedConfigurationSection = configurationSection.GetSection(name); - - var rabbitMqConfiguration = new RabbitMqConfiguration(); - configurationSection.Bind(rabbitMqConfiguration); - namedConfigurationSection.Bind(rabbitMqConfiguration); - - var aspireConfiguration = configuration.GetSection(AspireSection); - var namedAspireConfiguration = aspireConfiguration.GetSection(name); - - var connection = aspireConfiguration.GetSection(AspireConnection).Get(); - if (!string.IsNullOrEmpty(connection)) - { - rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); - rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; - } - - connection = namedAspireConfiguration.GetSection(AspireConnection).Get(); - if (!string.IsNullOrEmpty(connection)) - { - rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); - rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; - } - - connection = configuration.GetConnectionString(name); - if (!string.IsNullOrEmpty(connection)) - { - rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); - rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; - } - - return rabbitMqConfiguration; - } - - public class RabbitMqConfiguration - { - public GatewayConnection Connection { get; set; } = null!; - public List Subscriptions { get; set; } = []; - public List Publications { get; set; } = []; - } - - public class GatewayConnection - { - public string Name { get; set; } = Environment.MachineName; - public AmqpUriSpecificationConfiguration? AmpqUri { get; set; } = null; - public ExchangeConfiguration? Exchange { get; set; } - public ExchangeConfiguration? DeadLetterExchange { get; set; } - public ushort Heartbeat { get; set; } = 20; - public bool PersistMessages { get; set; } - public ushort ContinuationTimeout { get; set; } = 20; - - public RmqMessagingGatewayConnection ToMessagingGatewayConnection() - { - return new RmqMessagingGatewayConnection - { - Name = Name, - AmpqUri = AmpqUri != null ? new AmqpUriSpecification( - uri: new Uri(AmpqUri.Uri, UriKind.Absolute), - connectionRetryCount: AmpqUri.ConnectionRetryCount, - retryWaitInMilliseconds: AmpqUri.RetryWaitInMilliseconds, - circuitBreakTimeInMilliseconds: AmpqUri.CircuitBreakTimeInMilliseconds) : null, - Exchange = Exchange != null ? new Exchange( - name: Exchange.Name, - type: Exchange.Type, - durable: Exchange.Durable, - supportDelay: Exchange.SupportDelay) : null, - DeadLetterExchange = DeadLetterExchange != null ? new Exchange( - name: DeadLetterExchange.Name, - type: DeadLetterExchange.Type, - durable: DeadLetterExchange.Durable, - supportDelay: DeadLetterExchange.SupportDelay) : null, - Heartbeat = Heartbeat, - PersistMessages = PersistMessages, - ContinuationTimeout = ContinuationTimeout, - }; - } - } - - public class AmqpUriSpecificationConfiguration - { - public string Uri { get; set; } = string.Empty; - public int ConnectionRetryCount { get; set; } = 3; - public int RetryWaitInMilliseconds { get; set; } = 1_000; - public int CircuitBreakTimeInMilliseconds { get; set; } = 60_000; - } - - public class ExchangeConfiguration - { - public string Name { get; set; } = string.Empty; - - public string Type { get; set; } = ExchangeType.Direct; - - public bool Durable { get; set; } - - public bool SupportDelay { get; set; } - } - - public class RabbitMqSubscriptionConfiguration - { - - } - - public class RabbitMqPublicationConfiguration - { - public string? DataSchema { get; set; } - public OnMissingChannel MakeChannels { get; set; } - public string? RequestType { get; set; } - - public string Source { get; set; } = "http://goparamore.io"; - - public string? Subject { get; set; } - public string? Topic { get; set; } - public string Type { get; set; } = string.Empty; - - public IDictionary? DefaultHeaders { get; set; } - public IDictionary? CloudEventsAdditionalProperties { get; set; } - - public string? ReplyTo { get; set; } - public int WaitForConfirmsTimeOutInMilliseconds { get; set; } = 500; - - public RmqPublication ToPublication() - { - Uri? dataschema = null; - if (!string.IsNullOrEmpty(DataSchema)) - { - dataschema = new Uri(DataSchema!, UriKind.RelativeOrAbsolute); - } - - RoutingKey? topic = null; - if (!string.IsNullOrEmpty(Topic)) - { - topic = new RoutingKey(Topic!); - } - - return new RmqPublication - { - DataSchema = dataschema, - MakeChannels = MakeChannels, - Source = new Uri(Source, UriKind.RelativeOrAbsolute), - Subject = Subject, - Topic = topic, - Type = new CloudEventsType(Type), - DefaultHeaders = DefaultHeaders, - CloudEventsAdditionalProperties = CloudEventsAdditionalProperties, - ReplyTo = ReplyTo, - WaitForConfirmsTimeOutInMilliseconds = WaitForConfirmsTimeOutInMilliseconds, - }; - } - } -} diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs new file mode 100644 index 0000000000..d23af4efcb --- /dev/null +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs @@ -0,0 +1,665 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Paramore.Brighter.ConfigurationFactory; +using RabbitMQ.Client; + +namespace Paramore.Brighter.MessagingGateway.RMQ.Async; + +/// +/// Factory for creating RabbitMQ messaging gateway components from configuration. +/// Enables configuration-based initialization of RabbitMQ channels, consumers, producers, and subscriptions +/// from configuration sources including Brighter-specific sections, .NET Aspire, and connection strings. +/// +/// +/// This factory implements to provide RabbitMQ-specific +/// configuration support. It follows a multi-source configuration strategy with the following precedence: +/// +/// Brighter:RabbitMQ:{name} - Highest priority, Brighter-specific configuration +/// Aspire:RabbitMQ:Client - .NET Aspire orchestration configuration +/// ConnectionStrings:{name} - Standard .NET connection strings +/// +/// +/// Configuration supports both default (unnamed) and named instances, allowing multiple RabbitMQ connections +/// within the same application. Named instances enable scenarios like separate connections for analytics, +/// reporting, or different message brokers. +/// +/// +/// Default configuration section: "Brighter:RabbitMQ" +/// +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// +public class RmqMessagingGatewayFromConfigurationFactory : IAmMessagingGatewayFromConfigurationFactory +{ + private const string BrighterSection = "Brighter:RabbitMQ"; + private const string AspireSection = "Aspire:RabbitMQ:Client"; + + private const string AspireConnection = "ConnectionString"; + + /// + /// Creates a RabbitMQ gateway configuration from the provided configuration source. + /// + /// The containing the RabbitMQ gateway settings. + /// The optional name for named configuration instances, allowing multiple RabbitMQ connections. + /// The optional override for the configuration section name. If null, uses . + /// A configured from the provided settings. + /// + /// This method resolves RabbitMQ connection configuration from multiple sources with the precedence order + /// documented in . It returns a gateway connection that can be used + /// to create producers and consumers. + /// + public IAmGatewayConfiguration CreateMessageGatewayConfigurationFactory(IAmAConfiguration configuration, string? name, + string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name ?? string.Empty, sectionName); + return rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + } + + /// + /// Creates a RabbitMQ channel factory from the provided configuration source. + /// + /// The containing the RabbitMQ channel factory settings. + /// The optional name for named configuration instances, allowing multiple RabbitMQ connections. + /// The optional override for the configuration section name. If null, uses . + /// A that creates RabbitMQ channels for message consumption. + /// + /// The channel factory creates channels that connect message consumers to RabbitMQ queues. + /// Channels are configured with the connection settings resolved from the configuration. + /// + public IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string? name, string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name ?? string.Empty, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + return new ChannelFactory(new RmqMessageConsumerFactory(connection)); + } + + /// + /// Creates a RabbitMQ message consumer factory from the provided configuration source. + /// + /// The containing the RabbitMQ consumer factory settings. + /// The optional name for named configuration instances, allowing multiple RabbitMQ connections. + /// The optional override for the configuration section name. If null, uses . + /// A that creates RabbitMQ message consumers. + /// + /// The message consumer factory creates consumers that receive messages from RabbitMQ queues. + /// Consumers are configured with connection settings, prefetch counts, and acknowledgment modes + /// resolved from the configuration. + /// + public IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, + string? name, + string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name ?? string.Empty, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + return new RmqMessageConsumerFactory(connection); + } + + public IAmAMessageProducerFactory CreateMessageProducerFactory(IAmAConfiguration configuration, + string? name, + string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name ?? string.Empty, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + + return new RmqMessageProducerFactory(connection, + rabbitMqConfiguration.Publications.Select(x => x.ToPublication())); + } + + /// + /// Creates a collection of RabbitMQ subscriptions from the provided configuration source. + /// + /// The containing the RabbitMQ subscription settings. + /// The optional name for named configuration instances, allowing multiple RabbitMQ connections. + /// The optional override for the configuration section name. If null, uses . + /// An of instances configured from the provided settings. + /// + /// This method converts all subscription configurations into runtime objects + /// used by Brighter's message pump. Each subscription defines a queue binding with routing keys, dead letter + /// configuration, and consumer behavior settings. The channel factory is shared across all subscriptions + /// using the same connection configuration. + /// + public IEnumerable CreateSubscriptions(IAmAConfiguration configuration, string? name, string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name ?? string.Empty, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + var factory = new ChannelFactory(new RmqMessageConsumerFactory(connection)); + return rabbitMqConfiguration + .Subscriptions + .Select(x => x.ToSubscription(factory)); + } + + + /// + /// Retrieves RabbitMQ configuration from multiple sources with fallback behavior. + /// + /// The containing the configuration sources. + /// The name for named configuration instances. Use empty string for default configuration. + /// The optional override for the configuration section name. If null, uses . + /// A instance with settings from all applicable configuration sources. + /// + /// This method implements a multi-source configuration strategy with the following resolution order: + /// + /// Brighter:RabbitMQ - Base Brighter-specific configuration + /// Brighter:RabbitMQ:{name} - Named instance overrides (if name is provided) + /// Aspire:RabbitMQ:Client:ConnectionString - .NET Aspire connection string + /// Aspire:RabbitMQ:Client:{name}:ConnectionString - Named Aspire connection string + /// ConnectionStrings:{name} - Standard .NET connection string + /// + /// + /// Later sources override earlier ones. This enables flexible configuration management where base settings + /// can be defined in Brighter configuration and environment-specific connection strings can override them + /// through Aspire or standard connection strings. + /// + /// + private static RabbitMqConfiguration GetRabbitMqConfiguration(IAmAConfiguration configuration, + string name, + string? sectionName) + { + if (string.IsNullOrEmpty(sectionName)) + { + sectionName = BrighterSection; + } + + var configurationSection = configuration.GetSection(sectionName!); + var namedConfigurationSection = configurationSection.GetSection(name); + + var rabbitMqConfiguration = new RabbitMqConfiguration(); + configurationSection.Bind(rabbitMqConfiguration); + namedConfigurationSection.Bind(rabbitMqConfiguration); + + var aspireConfiguration = configuration.GetSection(AspireSection); + var namedAspireConfiguration = aspireConfiguration.GetSection(name); + + var connection = aspireConfiguration.GetSection(AspireConnection).Get(); + if (!string.IsNullOrEmpty(connection)) + { + rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); + rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; + } + + connection = namedAspireConfiguration.GetSection(AspireConnection).Get(); + if (!string.IsNullOrEmpty(connection)) + { + rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); + rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; + } + + connection = configuration.GetConnectionString(name); + if (!string.IsNullOrEmpty(connection)) + { + rabbitMqConfiguration.Connection.AmpqUri ??= new AmqpUriSpecificationConfiguration(); + rabbitMqConfiguration.Connection.AmpqUri.Uri = connection!; + } + + return rabbitMqConfiguration; + } + + /// + /// Root configuration class for RabbitMQ that binds to configuration sections. + /// Contains connection details, subscriptions, and publications for RabbitMQ messaging. + /// + /// + /// This class represents the complete RabbitMQ configuration structure that maps to JSON configuration. + /// Example configuration structure: + /// + /// { + /// "Brighter": { + /// "RabbitMQ": { + /// "Connection": { ... }, + /// "Subscriptions": [ ... ], + /// "Publications": [ ... ] + /// } + /// } + /// } + /// + /// + public class RabbitMqConfiguration + { + /// + /// Gets or sets the RabbitMQ gateway connection configuration. + /// + /// A containing connection details for RabbitMQ. + public GatewayConnection Connection { get; set; } = null!; + + /// + /// Gets or sets the list of RabbitMQ subscriptions. + /// + /// A list of defining how to consume messages from RabbitMQ. + public List Subscriptions { get; set; } = []; + + /// + /// Gets or sets the list of RabbitMQ publications. + /// + /// A list of defining how to publish messages to RabbitMQ. + public List Publications { get; set; } = []; + } + + /// + /// Configuration class for RabbitMQ gateway connection details. + /// Contains connection URIs, exchange information, heartbeat settings, and message persistence options. + /// + /// + /// This class supports AMQP URI format for connection details. The configuration can include primary + /// exchanges and dead letter exchanges for advanced routing scenarios. + /// Connection settings include heartbeat intervals and continuation timeouts for robust connection management. + /// + public class GatewayConnection + { + /// + /// Gets or sets the name identifier for this RabbitMQ connection. + /// + /// The connection name as a . Default is the machine name from . + /// + /// This name is used for connection identification in RabbitMQ management UI and logs. + /// It helps distinguish between multiple connections from different machines or applications. + /// + public string Name { get; set; } = Environment.MachineName; + + /// + /// Gets or sets the AMQP URI specification for connecting to RabbitMQ. + /// + /// An containing the connection URI, or null if not specified. + /// + /// AMQP URI format supports both amqp:// (plain) and amqps:// (TLS) protocols. + /// Example: "amqp://guest:guest@localhost:5672/" or "amqps://user:pass@host:5671/vhost" + /// This configuration can be provided directly or resolved from connection strings or Aspire configuration. + /// + public AmqpUriSpecificationConfiguration? AmpqUri { get; set; } = null; + + /// + /// Gets or sets the primary exchange configuration for message routing. + /// + /// An defining the exchange settings, or null for default exchange. + /// + /// The exchange is the routing mechanism in RabbitMQ that receives messages from publishers and routes + /// them to queues based on routing rules (direct, topic, fanout, headers). + /// If null, the default exchange (empty string) will be used. + /// + public ExchangeConfiguration? Exchange { get; set; } + + /// + /// Gets or sets the dead letter exchange configuration for handling rejected or expired messages. + /// + /// An for the dead letter exchange, or null if not using dead lettering. + /// + /// Dead letter exchanges receive messages that are rejected, expired, or exceed queue length limits. + /// This enables building robust error handling and poison message management strategies. + /// Configure dead letter queues bound to this exchange to capture and analyze failed messages. + /// + public ExchangeConfiguration? DeadLetterExchange { get; set; } + + /// + /// Gets or sets the heartbeat interval in seconds for detecting broken connections. + /// + /// The heartbeat interval as a . Default is 20 seconds. + /// + /// RabbitMQ uses heartbeats to detect dead TCP connections. Both client and server send heartbeat frames + /// at intervals. If no frames are received within twice the heartbeat interval, the connection is considered dead. + /// Set to 0 to disable heartbeats (not recommended for production). + /// + public ushort Heartbeat { get; set; } = 20; + + /// + /// Gets or sets whether messages should be persisted to disk. + /// + /// true to persist messages; otherwise, false. Default is false. + /// + /// When enabled, messages are marked as persistent (delivery mode 2) and written to disk. + /// This ensures messages survive broker restarts but impacts performance. + /// Use persistent messages for critical data that cannot be lost. + /// Note: Queues must also be declared as durable for full persistence. + /// + public bool PersistMessages { get; set; } + + /// + /// Gets or sets the timeout in seconds for continuation operations. + /// + /// The continuation timeout as a . Default is 20 seconds. + /// + /// This timeout applies to asynchronous continuation operations in the RabbitMQ client. + /// It determines how long to wait for protocol handshakes and channel operations to complete. + /// + public ushort ContinuationTimeout { get; set; } = 20; + + /// + /// Converts this configuration to a for runtime use. + /// + /// A instance configured from this configuration. + /// + /// This method transforms the configuration-friendly representation into the runtime connection object + /// used by RabbitMQ messaging gateway. It handles conversion of nullable properties and creates + /// appropriate default values where needed. + /// + public RmqMessagingGatewayConnection ToMessagingGatewayConnection() + { + return new RmqMessagingGatewayConnection + { + Name = Name, + AmpqUri = AmpqUri != null ? new AmqpUriSpecification( + uri: new Uri(AmpqUri.Uri, UriKind.Absolute), + connectionRetryCount: AmpqUri.ConnectionRetryCount, + retryWaitInMilliseconds: AmpqUri.RetryWaitInMilliseconds, + circuitBreakTimeInMilliseconds: AmpqUri.CircuitBreakTimeInMilliseconds) : null, + Exchange = Exchange != null ? new Exchange( + name: Exchange.Name, + type: Exchange.Type, + durable: Exchange.Durable, + supportDelay: Exchange.SupportDelay) : null, + DeadLetterExchange = DeadLetterExchange != null ? new Exchange( + name: DeadLetterExchange.Name, + type: DeadLetterExchange.Type, + durable: DeadLetterExchange.Durable, + supportDelay: DeadLetterExchange.SupportDelay) : null, + Heartbeat = Heartbeat, + PersistMessages = PersistMessages, + ContinuationTimeout = ContinuationTimeout, + }; + } + } + + /// + /// Configuration class for AMQP URI specification including connection resilience settings. + /// + /// + /// This class contains the AMQP URI and parameters for connection retry logic and circuit breaker patterns + /// to handle transient connection failures gracefully. + /// + public class AmqpUriSpecificationConfiguration + { + /// + /// Gets or sets the AMQP URI string for connecting to RabbitMQ. + /// + /// The AMQP URI as a . Default is an empty string. + /// + /// AMQP URI format: amqp://username:password@host:port/virtualhost or amqps:// for TLS. + /// Example: "amqp://guest:guest@localhost:5672/" or "amqps://user:pass@broker.example.com:5671/production" + /// + public string Uri { get; set; } = string.Empty; + + /// + /// Gets or sets the number of retry attempts for connection failures. + /// + /// The retry count as an . Default is 3. + /// + /// When a connection attempt fails, Brighter will retry this many times before giving up. + /// Each retry is delayed by . + /// + public int ConnectionRetryCount { get; set; } = 3; + + /// + /// Gets or sets the delay in milliseconds between connection retry attempts. + /// + /// The retry delay in milliseconds as an . Default is 1000ms (1 second). + /// + /// This delay helps prevent overwhelming a recovering broker with rapid reconnection attempts. + /// Consider using exponential backoff for more sophisticated retry strategies. + /// + public int RetryWaitInMilliseconds { get; set; } = 1_000; + + /// + /// Gets or sets the circuit breaker timeout in milliseconds. + /// + /// The circuit breaker timeout in milliseconds as an . Default is 60000ms (1 minute). + /// + /// When the circuit breaker opens due to repeated failures, it remains open for this duration + /// before attempting to reconnect. This prevents continuous connection attempts to a failed broker + /// and allows time for infrastructure recovery. + /// + public int CircuitBreakTimeInMilliseconds { get; set; } = 60_000; + } + + /// + /// Configuration class for RabbitMQ exchange settings. + /// + /// + /// Exchanges are the message routing components in RabbitMQ that receive messages from publishers + /// and route them to queues based on exchange type and routing keys. + /// + public class ExchangeConfiguration + { + /// + /// Gets or sets the name of the exchange. + /// + /// The exchange name as a . Default is an empty string (the default exchange). + /// + /// Exchange names must be unique within a virtual host. An empty string refers to the default exchange, + /// which routes messages directly to queues by queue name. + /// + public string Name { get; set; } = string.Empty; + + /// + /// Gets or sets the type of the exchange. + /// + /// The exchange type as a . Default is . + /// + /// Common exchange types: + /// + /// - Routes to queues with exact routing key match + /// - Routes based on wildcard pattern matching (* and #) + /// - Routes to all bound queues regardless of routing key + /// - Routes based on message header attributes + /// + /// + public string Type { get; set; } = ExchangeType.Direct; + + /// + /// Gets or sets whether the exchange should survive broker restarts. + /// + /// true if the exchange is durable; otherwise, false. Default is false. + /// + /// Durable exchanges are persisted to disk and survive RabbitMQ broker restarts. + /// Non-durable exchanges are transient and are lost when the broker restarts. + /// For production systems, exchanges should typically be durable. + /// + public bool Durable { get; set; } + + /// + /// Gets or sets whether the exchange supports delayed message delivery. + /// + /// true if delayed delivery is supported; otherwise, false. Default is false. + /// + /// Delayed message delivery requires the RabbitMQ delayed message exchange plugin. + /// When enabled, messages can be published with a delay header to defer delivery. + /// This is useful for implementing scheduled tasks, retry delays, and time-based workflows. + /// + public bool SupportDelay { get; set; } + } + + /// + /// Configuration class for RabbitMQ-specific subscription settings. + /// Extends with RabbitMQ queue features like dead lettering, + /// high availability, durability, and message TTL. + /// + /// + /// This class provides configuration for RabbitMQ queues including advanced features like dead letter exchanges, + /// quorum queues for high availability, and message time-to-live settings. + /// Use to convert this configuration into a runtime . + /// + public class RabbitMqSubscriptionConfiguration : SubscriptionConfiguration + { + /// + /// Gets or sets the name of the dead letter queue channel. + /// + /// The dead letter channel name as a , or null if not using dead lettering. + /// + /// When a message is rejected or expires, it will be routed to this dead letter queue. + /// The dead letter queue must be configured separately and bound to the dead letter exchange. + /// + public string? DeadLetterChannel { get; set; } + + /// + /// Gets or sets the routing key for dead letter messages. + /// + /// The dead letter routing key as a , or null to use the original routing key. + /// + /// This routing key is used when sending messages to the dead letter exchange. + /// If null, the message's original routing key is preserved. + /// Useful for routing different types of failures to different dead letter queues. + /// + public string? DeadLetterRoutingKey { get; set; } + + /// + /// Gets or sets whether the queue should be configured for high availability. + /// + /// true to enable high availability; otherwise, false. Default is false. + /// + /// High availability is typically achieved using quorum queues (RabbitMQ 3.8+) which replicate data + /// across multiple nodes. This provides better data safety and availability but with some performance overhead. + /// Consider using for high availability queues. + /// + public bool HighAvailability { get; set; } + + /// + /// Gets or sets whether the queue should be durable (survive broker restarts). + /// + /// true if the queue is durable; otherwise, false. Default is false. + /// + /// Durable queues are persisted to disk and survive RabbitMQ broker restarts. + /// For message durability, both the queue and messages must be marked as durable. + /// Non-durable queues are transient and suitable for temporary or development scenarios. + /// + public bool IsDurable { get; set; } + + /// + /// Gets or sets the maximum number of messages the queue can hold. + /// + /// The maximum queue length as an , or null for unlimited. Default is null. + /// + /// When the queue reaches this limit, new messages will be rejected or, if configured, + /// routed to a dead letter exchange. This helps prevent memory exhaustion from unbounded queue growth. + /// Use in conjunction with dead letter queues to avoid message loss. + /// + public int? MaxQueueLength { get; set; } + + /// + /// Gets or sets the message time-to-live (TTL) for messages in the queue. + /// + /// The TTL as a , or null for no expiration. Default is null. + /// + /// Messages in the queue that exceed this TTL are automatically expired and, if configured, + /// routed to a dead letter exchange. This prevents stale messages from accumulating. + /// Individual messages can also have their own TTL which takes precedence. + /// + public TimeSpan? Ttl { get; set; } + + /// + /// Gets or sets the type of queue to create. + /// + /// A value. Default is . + /// + /// Queue types include: + /// + /// - Traditional RabbitMQ queues + /// - Replicated queues for high availability (RabbitMQ 3.8+) + /// + /// Quorum queues provide better data safety and availability at the cost of throughput. + /// + public QueueType QueueType { get; set; } = QueueType.Classic; + + /// + /// Converts this configuration to a for runtime use. + /// + /// The used to create channels for this subscription. + /// A instance configured from this configuration. + /// + /// This method transforms the configuration-friendly representation into the runtime subscription object + /// used by Brighter's message pump. It resolves derived values (channel name, routing key) and creates + /// the appropriate RabbitMQ-specific subscription with queue features like dead lettering and TTL. + /// + public RmqSubscription ToSubscription(IAmAChannelFactory factory) + { + return new RmqSubscription( + subscriptionName: new SubscriptionName(GetName()), + channelName: new ChannelName(GetChannelName()), + routingKey: new RoutingKey(GetRoutingKey()), + requestType: GetRequestType(), + bufferSize: BufferSize, + noOfPerformers: NoOfPerformers, + timeOut: TimeOut, + requeueCount: RequeueCount, + requeueDelay: RequeueDelay, + unacceptableMessageLimit: UnacceptableMessageLimit, + messagePumpType: MessagePumpType, + makeChannels: MakeChannels, + emptyChannelDelay: EmptyChannelDelay, + channelFailureDelay: ChannelFailureDelay, + channelFactory: factory, + deadLetterChannelName: string.IsNullOrEmpty(DeadLetterChannel) ? null : new ChannelName(DeadLetterChannel!), + deadLetterRoutingKey: string.IsNullOrEmpty(DeadLetterRoutingKey) ? null : new RoutingKey(DeadLetterRoutingKey!), + highAvailability: HighAvailability, + isDurable: IsDurable, + ttl: Ttl, + queueType: QueueType, + maxQueueLength: MaxQueueLength); + } + } + + /// + /// Configuration class for RabbitMQ-specific publication settings. + /// Extends with RabbitMQ publisher confirms timeout. + /// + /// + /// This class provides configuration for RabbitMQ message publishing including publisher confirms + /// which provide reliable delivery guarantees by waiting for broker acknowledgment. + /// Use to convert this configuration into a runtime . + /// + public class RabbitMqPublicationConfiguration : PublicationConfiguration + { + /// + /// Gets or sets the timeout in milliseconds to wait for RabbitMQ publisher confirms. + /// + /// The timeout in milliseconds as an . Default is 500ms. + /// + /// Publisher confirms ensure messages are successfully received by the broker. When enabled, + /// RabbitMQ sends an acknowledgment back to the publisher. This timeout controls how long to wait + /// for that acknowledgment before considering the publish operation failed. + /// + /// A longer timeout provides more resilience against transient network issues but increases + /// publish latency. A shorter timeout improves throughput but may result in false failures. + /// + /// + /// This setting only applies when publisher confirms are enabled on the connection. + /// + /// + public int WaitForConfirmsTimeOutInMilliseconds { get; set; } = 500; + + /// + /// Converts this configuration to a for runtime use. + /// + /// A instance configured from this configuration. + /// + /// This method transforms the configuration-friendly representation into the runtime publication object + /// used by Brighter's message producers. It converts string-based URIs and routing keys to their + /// strongly-typed equivalents and applies CloudEvents properties. + /// + public RmqPublication ToPublication() + { + Uri? dataschema = null; + if (!string.IsNullOrEmpty(DataSchema)) + { + dataschema = new Uri(DataSchema!, UriKind.RelativeOrAbsolute); + } + + RoutingKey? topic = null; + if (!string.IsNullOrEmpty(Topic)) + { + topic = new RoutingKey(Topic!); + } + + return new RmqPublication + { + DataSchema = dataschema, + MakeChannels = MakeChannels, + Source = new Uri(Source, UriKind.RelativeOrAbsolute), + Subject = Subject, + Topic = topic, + Type = new CloudEventsType(Type), + DefaultHeaders = DefaultHeaders, + CloudEventsAdditionalProperties = CloudEventsAdditionalProperties, + ReplyTo = ReplyTo, + RequestType = GetRequestType(), + WaitForConfirmsTimeOutInMilliseconds = WaitForConfirmsTimeOutInMilliseconds, + }; + } + } +} diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqSubscription.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqSubscription.cs index b0c4c0af86..a153ac98e4 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqSubscription.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqSubscription.cs @@ -28,7 +28,6 @@ namespace Paramore.Brighter.MessagingGateway.RMQ.Async { public class RmqSubscription : Subscription { - /// /// The name of the queue to send rejects messages to /// diff --git a/src/Paramore.Brighter/ConfigurationFactory/IAmAConfiguration.cs b/src/Paramore.Brighter/ConfigurationFactory/IAmAConfiguration.cs new file mode 100644 index 0000000000..5392d597a9 --- /dev/null +++ b/src/Paramore.Brighter/ConfigurationFactory/IAmAConfiguration.cs @@ -0,0 +1,61 @@ +namespace Paramore.Brighter.ConfigurationFactory; + +/// +/// Abstraction for configuration access in Brighter components. +/// Provides a configuration system-independent interface that wraps underlying configuration providers +/// such as Microsoft.Extensions.Configuration, .NET Aspire, or other configuration systems. +/// +/// +/// This interface enables Brighter to support configuration-based initialization of components +/// (messaging gateways, outboxes, inboxes, distributed locks, storage providers) while maintaining +/// independence from specific configuration implementations. +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// +public interface IAmAConfiguration +{ + /// + /// Gets a configuration sub-section with the specified key. + /// + /// The key of the configuration section to retrieve. + /// An representing the configuration section, or an empty section if not found. + /// + /// This method is used to navigate hierarchical configuration structures. For example, to access + /// configuration at "Brighter:RabbitMQ:Connection", you would call GetSection("Brighter"), then + /// GetSection("RabbitMQ"), then GetSection("Connection"). + /// + IAmAConfiguration GetSection(string sectionName); + + /// + /// Attempts to bind the configuration to a new instance of type . + /// + /// The type to bind the configuration to. Should be a class or struct with properties matching the configuration structure. + /// A new instance of with properties populated from configuration, or null if binding fails. + /// + /// This method creates a new instance of the target type and populates its properties based on the + /// configuration values. Property names should match configuration keys (case-insensitive). + /// + T? Get(); + + /// + /// Gets a connection string by name from the configuration provider. + /// + /// The name of the connection string to retrieve. + /// The connection string as a , or null if not found. + /// + /// Connection strings are typically stored in a dedicated "ConnectionStrings" section in configuration + /// files (e.g., appsettings.json). This method provides access to those connection strings. + /// + string? GetConnectionString(string name); + + /// + /// Binds the configuration values to an existing instance. + /// + /// The type of the instance to bind to. + /// The existing instance whose properties should be populated from configuration. + /// + /// This method updates the properties of an existing object based on configuration values. + /// Unlike , this method does not create a new instance but modifies the provided one. + /// Property names should match configuration keys (case-insensitive). + /// + void Bind(T obj); +} diff --git a/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs b/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs new file mode 100644 index 0000000000..4d42bdf590 --- /dev/null +++ b/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs @@ -0,0 +1,87 @@ +using System.Collections.Generic; + +namespace Paramore.Brighter.ConfigurationFactory; + +/// +/// Factory interface for creating messaging gateway components from configuration. +/// Enables configuration-based initialization of messaging infrastructure (channels, consumers, producers) +/// for transport implementations like RabbitMQ, Kafka, AWS SNS/SQS, Azure Service Bus, etc. +/// +/// +/// This interface provides a consistent pattern for configuring messaging gateways across different transport +/// providers while maintaining independence from specific configuration systems. Implementations should support +/// reading from Brighter-specific configuration sections (e.g., "Brighter:RabbitMQ"), .NET Aspire configuration, +/// and standard connection strings. +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// +public interface IAmMessagingGatewayFromConfigurationFactory +{ + /// + /// Creates a gateway configuration from the provided configuration source. + /// + /// The containing the messaging gateway settings. + /// The optional name for named configuration instances, allowing multiple configurations for the same provider. + /// The optional override for the configuration section name. If null, uses the provider's default section name. + /// An instance configured from the provided settings. + /// + /// The gateway configuration contains transport-specific connection settings, credentials, and endpoint information. + /// When is provided, the factory looks for configuration at "Brighter:{sectionName}:{name}". + /// + IAmGatewayConfiguration CreateMessageGatewayConfigurationFactory(IAmAConfiguration configuration, string? name, string? sectionName); + + /// + /// Creates a channel factory from the provided configuration source. + /// + /// The containing the channel factory settings. + /// The optional name for named configuration instances, allowing multiple configurations for the same provider. + /// The optional override for the configuration section name. If null, uses the provider's default section name. + /// An instance that creates channels for message consumption. + /// + /// The channel factory is responsible for creating channels that connect message consumers to the underlying + /// transport. Configuration typically includes connection details, channel properties, and consumer behavior settings. + /// + IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string? name, string? sectionName); + + /// + /// Creates a message consumer factory from the provided configuration source. + /// + /// The containing the message consumer factory settings. + /// The optional name for named configuration instances, allowing multiple configurations for the same provider. + /// The optional override for the configuration section name. If null, uses the provider's default section name. + /// An instance that creates message consumers for receiving messages. + /// + /// The message consumer factory creates consumers that receive messages from the transport. Configuration + /// includes connection settings, subscription details, and consumer-specific options like prefetch count, + /// acknowledgment mode, and retry policies. + /// + IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, string? name, string? sectionName); + + /// + /// Creates a producer registry factory from the provided configuration source. + /// + /// The containing the producer registry factory settings. + /// The optional name for named configuration instances, allowing multiple configurations for the same provider. + /// The optional override for the configuration section name. If null, uses the provider's default section name. + /// An instance that creates producer registries for sending messages. + /// + /// The producer registry factory creates registries that manage message producers (publishers) for sending + /// messages to the transport. Configuration includes connection settings, publication details, and producer-specific + /// options like confirmation mode, persistence settings, and routing strategies. + /// + IAmAMessageProducerFactory CreateMessageProducerFactory(IAmAConfiguration configuration, string? name, string? sectionName); + + /// + /// Creates a collection of subscriptions from the provided configuration source. + /// + /// The containing the subscription settings. + /// The optional name for named configuration instances, allowing multiple configurations for the same provider. + /// The optional override for the configuration section name. If null, uses the provider's default section name. + /// An of instances configured from the provided settings. + /// + /// Subscriptions define the bindings between message handlers and messaging infrastructure (topics, queues, routing keys). + /// Configuration typically includes subscription names, channel names, routing keys/patterns, message types, + /// and consumer-specific settings like buffer size, unacceptable message limits, and requeue options. + /// Multiple subscriptions can be defined in configuration to set up complex message routing scenarios. + /// + IEnumerable CreateSubscriptions(IAmAConfiguration configuration, string? name, string? sectionName); +} diff --git a/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs b/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs new file mode 100644 index 0000000000..3013c13b81 --- /dev/null +++ b/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs @@ -0,0 +1,207 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Paramore.Brighter.ConfigurationFactory; + +/// +/// Base class for publication configuration that can be loaded from configuration sources. +/// Provides properties for configuring how Brighter publishes messages to a transport, +/// including routing, message metadata, and CloudEvents properties. +/// +/// +/// This abstract class is intended to be extended by transport-specific publication configuration classes +/// (e.g., RabbitMQ, Kafka, AWS SNS) that bind to configuration sections. It provides common publication +/// properties and helper methods for deriving runtime values from configuration. +/// Properties mirror those in but use configuration-friendly types (e.g., string instead of Uri). +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// +public abstract class PublicationConfiguration +{ + /// + /// Gets or sets the URI identifying the schema that data adheres to. + /// + /// The schema URI as a , or null if not specified. + /// + /// From the CloudEvents Specification. + /// Identifies the schema that data adheres to. Incompatible changes to the schema SHOULD be reflected by a different URI. + /// This is an optional CloudEvents attribute. + /// + public string? DataSchema { get; set; } + + /// + /// Gets or sets the behavior for creating missing channels. + /// + /// An value indicating how to handle missing infrastructure. + /// + /// Controls whether Brighter should create infrastructure (topics, exchanges) if it doesn't exist. + /// Use for development environments and + /// or for production where infrastructure is managed separately. + /// + public OnMissingChannel MakeChannels { get; set; } + + /// + /// Gets or sets the fully qualified type name of the request/message type for this publication. + /// + /// The fully qualified type name as a , or null for untyped publications. + /// + /// This should be the full type name including namespace (e.g., "MyApp.Commands.ProcessOrderCommand"). + /// The type is resolved at runtime using reflection across all loaded assemblies via . + /// Used to set the property when building publication objects. + /// + public string? RequestType { get; set; } + + /// + /// Gets or sets the source URI identifying the context in which an event happened. + /// + /// The source URI as a . Default is "http://goparamore.io". + /// + /// From the CloudEvents Specification. + /// Identifies the context in which an event happened. Often this includes information such as the type of + /// the event source, the organization publishing the event, or the process that produced the event. + /// Producers MUST ensure that source + id is unique for each distinct event. + /// This is a required CloudEvents attribute. + /// + public string Source { get; set; } = "http://goparamore.io"; + + /// + /// Gets or sets the subject of the event in the context of the event producer. + /// + /// The subject as a , or null if not specified. + /// + /// From the CloudEvents Specification. + /// Describes the subject of the event in the context of the event producer (identified by ). + /// In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source, + /// but the source identifier alone might not be sufficient as a qualifier for any specific event if the + /// source context has internal sub-structure. + /// This is an optional CloudEvents attribute. + /// + public string? Subject { get; set; } + + /// + /// Gets or sets the topic (routing key) for this publication. + /// + /// The topic as a , or null if not specified. + /// + /// In a pub-sub scenario, this is typically the topic to which messages are published. Subscribers then + /// create their own queues which the broker delivers messages to based on routing rules. + /// For topic-based transports like RabbitMQ, this supports wildcards (* and #). + /// Maps to the property when building publication objects. + /// + public string? Topic { get; set; } + + /// + /// Gets or sets the CloudEvents type describing the type of event. + /// + /// The event type as a . Default is an empty string. + /// + /// From the CloudEvents Specification. + /// This attribute contains a value describing the type of event related to the originating occurrence. + /// Often this attribute is used for routing, observability, policy enforcement, etc. + /// SHOULD be prefixed with a reverse-DNS name. The prefixed domain dictates the organization which defines + /// the semantics of this event type. + /// This is a required CloudEvents attribute. + /// + public string Type { get; set; } = string.Empty; + + /// + /// Gets or sets the default headers to be included in published messages. + /// + /// A dictionary of header name-value pairs, or null if no default headers are specified. + /// + /// These headers will be automatically added to all messages published through Brighter's message producers + /// when using default message mappers. Headers should be structured as key-value pairs where the key is the + /// header name (string) and the value is the header value (object). + /// + /// Example configuration: + /// + /// "DefaultHeaders": { + /// "x-correlation-id": "00000000-0000-0000-0000-000000000000", + /// "x-message-type": "MyApp.Events.OrderCreated" + /// } + /// + /// + /// + /// Note: These headers are only applied when using Brighter's default message mapping pipeline. + /// Custom mappers may ignore this property. + /// + /// + public Dictionary? DefaultHeaders { get; set; } + + /// + /// Gets or sets additional CloudEvents properties beyond the standard attributes. + /// + /// A dictionary of additional CloudEvents property name-value pairs, or null if no additional properties are specified. + /// + /// This enables the inclusion of custom or vendor-specific metadata beyond the standard CloudEvents attributes + /// (id, source, type, etc.). These properties are serialized alongside core CloudEvents attributes when mapping + /// to a CloudEvent message. + /// + /// Use this dictionary to attach any non-standard CloudEvents attributes or extensions pertinent to your + /// application or integration requirements. During serialization to CloudEvent JSON, the key-value pairs + /// are added as top-level properties in the resulting JSON. + /// + /// + /// Important: If any key in this dictionary conflicts with a standard CloudEvents property + /// (e.g., "id", "source", "type"), the value in this dictionary will override the standard property during + /// serialization. Exercise caution to avoid unintended overwrites. + /// + /// + /// This property is utilized by CloudEventJsonMessageMapper and CloudEventsTransformer. + /// + /// + public Dictionary? CloudEventsAdditionalProperties { get; set; } + + /// + /// Gets or sets the reply-to topic for request-reply messaging patterns. + /// + /// The reply-to topic as a , or null if not using request-reply. + /// + /// Used when doing Request-Reply instead of Publish-Subscribe to identify the queue or topic that the + /// sender is listening on for responses. Usually a sender listens on a private queue so they do not + /// have to filter replies intended for other listeners. + /// This is an optional property only needed for request-reply scenarios. + /// + public string? ReplyTo { get; set; } + + /// + /// Gets the request type by resolving the string to a using reflection. + /// + /// The resolved , or null if is not specified. + /// Thrown when is specified but the type cannot be found in any loaded assembly. + /// + /// This method searches all loaded assemblies in the current for a concrete, non-abstract + /// class matching the fully qualified type name specified in . + /// The type must be a class (not an interface or struct) and must not be abstract. + /// + /// Example: "MyApp.Commands.ProcessOrderCommand" or "MyApp.Commands.ProcessOrderCommand, MyApp" if the assembly name is needed. + /// + /// + /// This method is typically called by derived classes when building objects. + /// + /// + protected Type? GetRequestType() + { + if (string.IsNullOrEmpty(RequestType)) + { + return null; + } + + var assemblies = AppDomain.CurrentDomain.GetAssemblies(); + foreach (var assembly in assemblies) + { + var types = assembly.GetTypes(); + var type = types.FirstOrDefault(x => x.FullName == RequestType); + if (type != null && type.IsClass && !type.IsAbstract) + { + return type; + } + } + + throw new ConfigurationException( + $"RequestType '{RequestType}' could not be resolved to a valid type. " + + $"Ensure the type name is fully qualified (e.g., 'MyApp.Commands.ProcessOrderCommand'), " + + $"the assembly containing the type is loaded, and the type is a concrete class (not abstract or an interface). " + + $"Searched {assemblies.Length} loaded assemblies."); + } +} diff --git a/src/Paramore.Brighter/ConfigurationFactory/SubscriptionConfiguration.cs b/src/Paramore.Brighter/ConfigurationFactory/SubscriptionConfiguration.cs new file mode 100644 index 0000000000..653583e4b3 --- /dev/null +++ b/src/Paramore.Brighter/ConfigurationFactory/SubscriptionConfiguration.cs @@ -0,0 +1,281 @@ +using System; +using System.Linq; +using System.Reflection; + +namespace Paramore.Brighter.ConfigurationFactory; + +/// +/// Base class for subscription configuration that can be loaded from configuration sources. +/// Provides properties for configuring how Brighter subscribes to and processes messages from a transport, +/// including channel behavior, message routing, and consumer performance settings. +/// +/// +/// This abstract class is intended to be extended by transport-specific subscription configuration classes +/// (e.g., RabbitMQ, Kafka, AWS SQS) that bind to configuration sections. It provides common subscription +/// properties and helper methods for deriving runtime values from configuration. +/// See ADR 0035 for details on Brighter's configuration support strategy. +/// +public abstract class SubscriptionConfiguration +{ + /// + /// Gets or sets the size of the channel buffer for queuing messages. + /// + /// The buffer size as an . Default is 1. + /// + /// The buffer size determines how many messages can be queued in memory before the channel + /// blocks. A larger buffer can improve throughput but increases memory usage. + /// + public int BufferSize { get; set; } = 1; + + /// + /// Gets or sets the name of the channel to create for this subscription. + /// + /// The channel name as a , or null to derive from . + /// + /// If not specified, the channel name will be derived from the . + /// The channel name is used to identify the channel in the message pump and diagnostics. + /// + public string? ChannelName { get; set; } + + /// + /// Gets or sets the delay before retrying after a channel failure. + /// + /// The delay as a , or null to use the default. + /// + /// This delay is applied when the channel encounters an error and needs to reconnect or recover. + /// A longer delay can reduce load on failing infrastructure but increases recovery time. + /// + public TimeSpan? ChannelFailureDelay { get; set; } + + /// + /// Gets or sets the fully qualified type name of the request/message type for this subscription. + /// + /// The fully qualified type name as a , or null for untyped subscriptions. + /// + /// This should be the full type name including namespace (e.g., "MyApp.Commands.ProcessOrderCommand"). + /// The type is resolved at runtime using reflection across all loaded assemblies. + /// Used as a fallback for , , and if those are not specified. + /// + public string? RequestType { get; set; } + + /// + /// Gets or sets the delay to wait when the channel is empty before polling again. + /// + /// The delay as a , or null to use the default. + /// + /// This delay reduces CPU usage when no messages are available by preventing tight polling loops. + /// A shorter delay improves message processing latency but increases CPU usage. + /// + public TimeSpan? EmptyChannelDelay { get; set; } + + /// + /// Gets or sets the behavior for creating missing channels. + /// + /// An value. Default is . + /// + /// Controls whether Brighter should create infrastructure (queues, topics, bindings) if it doesn't exist. + /// Use for development environments and + /// or for production where infrastructure is managed separately. + /// + public OnMissingChannel MakeChannels { get; set; } = OnMissingChannel.Assume; + + /// + /// Gets or sets the name of the subscription. + /// + /// The subscription name as a , or null to derive from or generate a GUID. + /// + /// The subscription name is used to identify the subscription in configuration and diagnostics. + /// If not specified, it will be derived from or a GUID will be generated. + /// + public string? Name { get; set; } + + /// + /// Gets or sets the number of concurrent message processors (performers) for this subscription. + /// + /// The number of performers as an . Default is 1. + /// + /// Each performer runs on a separate thread/task and processes messages concurrently. + /// Increasing performers can improve throughput but requires thread-safe message handlers. + /// + public int NoOfPerformers { get; set; } = 1; + + /// + /// Gets or sets the maximum number of times to requeue a message before sending it to a dead letter queue. + /// + /// The requeue count as an . Default is -1 (unlimited). + /// + /// A value of -1 means messages will be requeued indefinitely. A value of 0 means no requeueing. + /// Use this to prevent poison messages from blocking the queue indefinitely. + /// + public int RequeueCount { get; set; } = -1; + + /// + /// Gets or sets the delay before requeuing a message after a processing failure. + /// + /// The delay as a , or null to requeue immediately. + /// + /// A delay can help with transient failures by giving downstream systems time to recover. + /// + public TimeSpan? RequeueDelay { get; set; } + + /// + /// Gets or sets the routing key used to bind the subscription to topics/exchanges. + /// + /// The routing key as a . Default is an empty string. + /// + /// The routing key determines which messages are routed to this subscription. For topic-based + /// transports like RabbitMQ, this supports wildcards (* and #). For direct routing, this should + /// match the message's routing key exactly. If not specified, it will be derived from . + /// + public string RoutingKey { get; set; } = string.Empty; + + /// + /// Gets or sets the type of message pump to use for this subscription. + /// + /// A value. Default is . + /// + /// uses asynchronous I/O and is suitable for I/O-bound workloads. + /// uses synchronous I/O and may be better for CPU-bound workloads. + /// + public MessagePumpType MessagePumpType { get; set; } = MessagePumpType.Proactor; + + /// + /// Gets or sets the timeout for message processing. + /// + /// The timeout as a , or null to use the default. + /// + /// If a message handler exceeds this timeout, the message pump may cancel the operation. + /// Set this based on your handler's expected execution time plus a safety margin. + /// + public TimeSpan? TimeOut { get; set; } + + /// + /// Gets or sets the maximum number of unacceptable messages before the channel stops processing. + /// + /// The limit as an . Default is 0 (unlimited). + /// + /// Unacceptable messages are those that cannot be deserialized or are malformed. A limit prevents + /// a flood of bad messages from causing indefinite processing failures. A value of 0 means no limit. + /// + public int UnacceptableMessageLimit { get; set; } = 0; + + /// + /// Gets the subscription name, deriving it from or generating a GUID if necessary. + /// + /// The subscription name as a . + /// + /// Resolution order: + /// 1. Returns if specified + /// 2. Returns if specified + /// 3. Generates a new GUID + /// This method is typically called by derived classes when building subscription objects. + /// + protected string GetName() + { + if (string.IsNullOrEmpty(Name) && !string.IsNullOrEmpty(RequestType)) + { + return RequestType!; + } + + if (string.IsNullOrEmpty(Name)) + { + return Guid.NewGuid().ToString(); + } + + return Name!; + } + + /// + /// Gets the channel name, deriving it from if necessary. + /// + /// The channel name as a . + /// Thrown when both and are null or empty. + /// + /// Resolution order: + /// 1. Returns if specified + /// 2. Returns if specified + /// 3. Throws if neither is available + /// This method is typically called by derived classes when building subscription objects. + /// + protected string GetChannelName() + { + if (string.IsNullOrEmpty(ChannelName) && !string.IsNullOrEmpty(RequestType)) + { + return RequestType!; + } + + if (string.IsNullOrEmpty(ChannelName)) + { + throw new ConfigurationException( + "Subscription configuration is missing ChannelName. Please specify either 'ChannelName' or 'RequestType' in your configuration. " + + $"Current values - ChannelName: '{ChannelName ?? "null"}', RequestType: '{RequestType ?? "null"}'"); + } + + return ChannelName!; + } + + /// + /// Gets the routing key, deriving it from if necessary. + /// + /// The routing key as a . + /// Thrown when both and are null or empty. + /// + /// Resolution order: + /// 1. Returns if specified + /// 2. Returns if specified + /// 3. Throws if neither is available + /// This method is typically called by derived classes when building subscription objects. + /// + protected string GetRoutingKey() + { + if (string.IsNullOrEmpty(RoutingKey) && !string.IsNullOrEmpty(RequestType)) + { + return RequestType!; + } + + if (string.IsNullOrEmpty(RoutingKey)) + { + throw new ConfigurationException( + "Subscription configuration is missing RoutingKey. Please specify either 'RoutingKey' or 'RequestType' in your configuration. " + + $"Current values - RoutingKey: '{RoutingKey ?? "null"}', RequestType: '{RequestType ?? "null"}'"); + } + + return RoutingKey!; + } + + /// + /// Gets the request type by resolving the string to a using reflection. + /// + /// The resolved , or null if is not specified. + /// Thrown when is specified but the type cannot be found in any loaded assembly. + /// + /// This method searches all loaded assemblies in the current for a concrete, non-abstract + /// class matching the fully qualified type name specified in . + /// The type must be a class (not an interface or struct) and must not be abstract. + /// Example: "MyApp.Commands.ProcessOrderCommand, MyApp" or just "MyApp.Commands.ProcessOrderCommand" if the assembly is already loaded. + /// + protected Type? GetRequestType() + { + if (string.IsNullOrEmpty(RequestType)) + { + return null; + } + + var assemblies = AppDomain.CurrentDomain.GetAssemblies(); + foreach (var assembly in assemblies) + { + var types = assembly.GetTypes(); + var type = types.FirstOrDefault(x => x.FullName == RequestType); + if (type != null && type.IsClass && !type.IsAbstract) + { + return type; + } + } + + throw new ConfigurationException( + $"RequestType '{RequestType}' could not be resolved to a valid type. " + + $"Ensure the type name is fully qualified (e.g., 'MyApp.Commands.ProcessOrderCommand'), " + + $"the assembly containing the type is loaded, and the type is a concrete class (not abstract or an interface). " + + $"Searched {assemblies.Length} loaded assemblies."); + } +} diff --git a/src/Paramore.Brighter/IAmAConfiguration.cs b/src/Paramore.Brighter/IAmAConfiguration.cs deleted file mode 100644 index 16b8f25d5c..0000000000 --- a/src/Paramore.Brighter/IAmAConfiguration.cs +++ /dev/null @@ -1,39 +0,0 @@ -namespace Paramore.Brighter; - -public interface IAmAConfiguration -{ - IAmAConfiguration GetSection(string sectionName); - - T? Get(); - string? GetConnectionString(string name); - - void Bind(T obj); -} - -public interface IAmGatewayConfigurationFactoryFromConfiguration -{ - IAmGatewayConfiguration Create(IAmAConfiguration configuration, string? name, string? sectionName); -} - -public interface IAmAMessageConsumerFactoryFromConfiguration -{ - IAmAMessageConsumerFactory Create(IAmAConfiguration configuration, string? connectionString, string? sectionName); -} - -public interface IAmAMessageProducerFactoryFromConfiguration -{ - IAmAMessageProducerFactory Create(IAmAConfiguration configuration, string? connectionString, string? sectionName); -} - -public interface IAmAProducerRegistryFactoryFromConfiguration -{ - IAmAProducerRegistryFactory Create(IAmAConfiguration configuration, string? connectionString, string? sectionName); -} - - -public interface IAmMessagingGatewayFactoryFromConfiguration -{ - IAmAChannelFactory CreateChannelFactory(IAmAConfiguration configuration, string name, string? sectionName); - IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, string name, string? sectionName); - IAmAProducerRegistryFactory CreateProducerRegistryFactory(IAmAConfiguration configuration, string name, string? sectionName); -} diff --git a/tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateChannelFactoryTests.cs b/tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateChannelFactoryTests.cs new file mode 100644 index 0000000000..b7f82ce622 --- /dev/null +++ b/tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateChannelFactoryTests.cs @@ -0,0 +1,324 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System.Collections.Generic; +using Microsoft.Extensions.Configuration; +using Paramore.Brighter.Extensions.Configuration; +using Paramore.Brighter.MessagingGateway.RMQ.Async; +using Xunit; + +namespace Paramore.Brighter.RMQ.Async.Tests.Configuration; + +public class CreateChannelFactoryTests +{ + [Fact] + public void When_creating_channel_factory_from_brighter_section_should_return_configured_factory() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test-connection", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/%2f", + ["Brighter:RabbitMQ:Connection:Heartbeat"] = "30", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + null, + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_with_named_instance_should_use_named_configuration() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + // Base configuration + ["Brighter:RabbitMQ:Connection:Name"] = "base-connection", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + + // Named instance configuration (should override) + ["Brighter:RabbitMQ:analytics:Connection:Name"] = "analytics-connection", + ["Brighter:RabbitMQ:analytics:Connection:AmpqUri:Uri"] = "amqp://user:pass@analytics-host:5672/analytics", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + "analytics", + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_from_aspire_section_should_use_aspire_connection() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + + // Aspire configuration (should override URI) + ["Aspire:RabbitMQ:Client:ConnectionString"] = "amqp://aspire:aspire@aspire-host:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + null, + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_from_named_aspire_section_should_use_named_aspire_connection() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + + // Named Aspire configuration + ["Aspire:RabbitMQ:Client:reporting:ConnectionString"] = "amqp://reporting:reporting@reporting-host:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + "reporting", + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_from_connection_string_should_use_connection_string() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + + // Connection string + ["ConnectionStrings:messaging"] = "amqp://connstr:connstr@connstr-host:5672/vhost", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + "messaging", + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_with_custom_section_name_should_use_custom_section() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["CustomMessaging:RabbitMQ:Connection:Name"] = "custom-connection", + ["CustomMessaging:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://custom:custom@custom-host:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + null, + "CustomMessaging:RabbitMQ"); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_with_exchange_settings_should_create_factory() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + ["Brighter:RabbitMQ:Connection:Exchange:Name"] = "my.exchange", + ["Brighter:RabbitMQ:Connection:Exchange:Type"] = "topic", + ["Brighter:RabbitMQ:Connection:Exchange:Durable"] = "true", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + null, + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_with_resilience_settings_should_create_factory() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + ["Brighter:RabbitMQ:Connection:AmpqUri:ConnectionRetryCount"] = "5", + ["Brighter:RabbitMQ:Connection:AmpqUri:RetryWaitInMilliseconds"] = "2000", + ["Brighter:RabbitMQ:Connection:AmpqUri:CircuitBreakTimeInMilliseconds"] = "120000", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + null, + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_with_minimal_settings_should_use_defaults() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + null, + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } + + [Fact] + public void When_creating_channel_factory_with_precedence_connection_string_should_win() + { + //Arrange - Testing precedence: ConnectionString > Named Aspire > Aspire > Named Brighter > Brighter + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + // Base Brighter configuration + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://base:base@base-host:5672/", + + // Named Brighter configuration + ["Brighter:RabbitMQ:prod:Connection:AmpqUri:Uri"] = "amqp://named:named@named-host:5672/", + + // Aspire configuration + ["Aspire:RabbitMQ:Client:ConnectionString"] = "amqp://aspire:aspire@aspire-host:5672/", + + // Named Aspire configuration + ["Aspire:RabbitMQ:Client:prod:ConnectionString"] = "amqp://aspire-named:aspire-named@aspire-named-host:5672/", + + // Connection string (should win) + ["ConnectionStrings:prod"] = "amqp://connstr:connstr@connstr-host:5672/winner", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var channelFactory = factory.CreateChannelFactory( + brighterConfig, + "prod", + null); + + //Assert + Assert.NotNull(channelFactory); + Assert.IsType(channelFactory); + } +} diff --git a/tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateMessageGatewayConfigurationFactoryTests.cs b/tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateMessageGatewayConfigurationFactoryTests.cs new file mode 100644 index 0000000000..c37fd04946 --- /dev/null +++ b/tests/Paramore.Brighter.RMQ.Async.Tests/Configuration/CreateMessageGatewayConfigurationFactoryTests.cs @@ -0,0 +1,372 @@ +#region Licence +/* The MIT License (MIT) +Copyright © 2014 Ian Cooper + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. */ + +#endregion + +using System.Collections.Generic; +using Microsoft.Extensions.Configuration; +using Paramore.Brighter.Extensions.Configuration; +using Paramore.Brighter.MessagingGateway.RMQ.Async; +using Xunit; + +namespace Paramore.Brighter.RMQ.Async.Tests.Configuration; + +public class CreateMessageGatewayConfigurationFactoryTests +{ + [Fact] + public void When_creating_gateway_configuration_from_brighter_section_should_return_configured_connection() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test-connection", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/%2f", + ["Brighter:RabbitMQ:Connection:AmpqUri:ConnectionRetryCount"] = "5", + ["Brighter:RabbitMQ:Connection:AmpqUri:RetryWaitInMilliseconds"] = "1000", + ["Brighter:RabbitMQ:Connection:AmpqUri:CircuitBreakTimeInMilliseconds"] = "60000", + ["Brighter:RabbitMQ:Connection:Heartbeat"] = "30", + ["Brighter:RabbitMQ:Connection:PersistMessages"] = "true", + ["Brighter:RabbitMQ:Connection:ContinuationTimeout"] = "25", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + null, + null); + + //Assert + Assert.NotNull(gatewayConfiguration); + Assert.IsType(gatewayConfiguration); + + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.Equal("test-connection", rmqConnection.Name); + Assert.NotNull(rmqConnection.AmpqUri); + Assert.Equal("amqp://guest:guest@localhost:5672/%2f", rmqConnection.AmpqUri!.Uri.ToString()); + Assert.Equal(5, rmqConnection.AmpqUri.ConnectionRetryCount); + Assert.Equal(1000, rmqConnection.AmpqUri.RetryWaitInMilliseconds); + Assert.Equal(60000, rmqConnection.AmpqUri.CircuitBreakTimeInMilliseconds); + Assert.Equal((ushort)30, rmqConnection.Heartbeat); + Assert.True(rmqConnection.PersistMessages); + Assert.Equal((ushort)25, rmqConnection.ContinuationTimeout); + } + + [Fact] + public void When_creating_gateway_configuration_with_exchange_settings_should_configure_exchange() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + ["Brighter:RabbitMQ:Connection:Exchange:Name"] = "my.exchange", + ["Brighter:RabbitMQ:Connection:Exchange:Type"] = "topic", + ["Brighter:RabbitMQ:Connection:Exchange:Durable"] = "true", + ["Brighter:RabbitMQ:Connection:Exchange:SupportDelay"] = "true", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + null, + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.NotNull(rmqConnection.Exchange); + Assert.Equal("my.exchange", rmqConnection.Exchange!.Name); + Assert.Equal("topic", rmqConnection.Exchange.Type); + Assert.True(rmqConnection.Exchange.Durable); + Assert.True(rmqConnection.Exchange.SupportDelay); + } + + [Fact] + public void When_creating_gateway_configuration_with_dead_letter_exchange_should_configure_dlx() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + ["Brighter:RabbitMQ:Connection:DeadLetterExchange:Name"] = "my.dlx", + ["Brighter:RabbitMQ:Connection:DeadLetterExchange:Type"] = "direct", + ["Brighter:RabbitMQ:Connection:DeadLetterExchange:Durable"] = "true", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + null, + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.NotNull(rmqConnection.DeadLetterExchange); + Assert.Equal("my.dlx", rmqConnection.DeadLetterExchange!.Name); + Assert.Equal("direct", rmqConnection.DeadLetterExchange.Type); + Assert.True(rmqConnection.DeadLetterExchange.Durable); + } + + [Fact] + public void When_creating_gateway_configuration_with_named_instance_should_override_base_configuration() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + // Base configuration + ["Brighter:RabbitMQ:Connection:Name"] = "base-connection", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + ["Brighter:RabbitMQ:Connection:Heartbeat"] = "20", + + // Named instance configuration (should override) + ["Brighter:RabbitMQ:analytics:Connection:Name"] = "analytics-connection", + ["Brighter:RabbitMQ:analytics:Connection:AmpqUri:Uri"] = "amqp://user:pass@analytics-host:5672/analytics", + ["Brighter:RabbitMQ:analytics:Connection:Heartbeat"] = "40", + ["Brighter:RabbitMQ:analytics:Connection:PersistMessages"] = "true", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + "analytics", + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.Equal("analytics-connection", rmqConnection.Name); + Assert.Equal("amqp://user:pass@analytics-host:5672/analytics", rmqConnection.AmpqUri!.Uri.ToString()); + Assert.Equal((ushort)40, rmqConnection.Heartbeat); + Assert.True(rmqConnection.PersistMessages); + } + + [Fact] + public void When_creating_gateway_configuration_from_aspire_section_should_use_aspire_connection_string() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:Heartbeat"] = "20", + + // Aspire configuration (should override URI) + ["Aspire:RabbitMQ:Client:ConnectionString"] = "amqp://aspire:aspire@aspire-host:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + null, + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.Equal("test", rmqConnection.Name); + Assert.NotNull(rmqConnection.AmpqUri); + Assert.Equal("amqp://aspire:aspire@aspire-host:5672/", rmqConnection.AmpqUri!.Uri.ToString()); + Assert.Equal((ushort)20, rmqConnection.Heartbeat); + } + + [Fact] + public void When_creating_gateway_configuration_from_named_aspire_section_should_use_named_aspire_connection() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + + // Named Aspire configuration + ["Aspire:RabbitMQ:Client:reporting:ConnectionString"] = "amqp://reporting:reporting@reporting-host:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + "reporting", + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.NotNull(rmqConnection.AmpqUri); + Assert.Equal("amqp://reporting:reporting@reporting-host:5672/", rmqConnection.AmpqUri!.Uri.ToString()); + } + + [Fact] + public void When_creating_gateway_configuration_from_connection_string_should_use_connection_string() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:Heartbeat"] = "20", + + // Connection string + ["ConnectionStrings:messaging"] = "amqp://connstr:connstr@connstr-host:5672/vhost", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + "messaging", + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.NotNull(rmqConnection.AmpqUri); + Assert.Equal("amqp://connstr:connstr@connstr-host:5672/vhost", rmqConnection.AmpqUri!.Uri.ToString()); + } + + [Fact] + public void When_creating_gateway_configuration_with_custom_section_name_should_use_custom_section() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["CustomMessaging:RabbitMQ:Connection:Name"] = "custom-connection", + ["CustomMessaging:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://custom:custom@custom-host:5672/", + ["CustomMessaging:RabbitMQ:Connection:Heartbeat"] = "50", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + null, + "CustomMessaging:RabbitMQ"); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.Equal("custom-connection", rmqConnection.Name); + Assert.Equal("amqp://custom:custom@custom-host:5672/", rmqConnection.AmpqUri!.Uri.ToString()); + Assert.Equal((ushort)50, rmqConnection.Heartbeat); + } + + [Fact] + public void When_creating_gateway_configuration_with_precedence_connection_string_should_win() + { + //Arrange - Testing precedence: ConnectionString > Named Aspire > Aspire > Named Brighter > Brighter + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + // Base Brighter configuration + ["Brighter:RabbitMQ:Connection:Name"] = "test", + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://base:base@base-host:5672/", + + // Named Brighter configuration + ["Brighter:RabbitMQ:prod:Connection:AmpqUri:Uri"] = "amqp://named:named@named-host:5672/", + + // Aspire configuration + ["Aspire:RabbitMQ:Client:ConnectionString"] = "amqp://aspire:aspire@aspire-host:5672/", + + // Named Aspire configuration + ["Aspire:RabbitMQ:Client:prod:ConnectionString"] = "amqp://aspire-named:aspire-named@aspire-named-host:5672/", + + // Connection string (should win) + ["ConnectionStrings:prod"] = "amqp://connstr:connstr@connstr-host:5672/winner", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + "prod", + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.Equal("amqp://connstr:connstr@connstr-host:5672/winner", rmqConnection.AmpqUri!.Uri.ToString()); + } + + [Fact] + public void When_creating_gateway_configuration_with_minimal_settings_should_use_defaults() + { + //Arrange + var configuration = new ConfigurationBuilder() + .AddInMemoryCollection(new Dictionary + { + ["Brighter:RabbitMQ:Connection:AmpqUri:Uri"] = "amqp://guest:guest@localhost:5672/", + }) + .Build(); + + var factory = new RmqMessagingGatewayFromConfigurationFactory(); + var brighterConfig = new MicrosoftConfiguration(configuration); + + //Act + var gatewayConfiguration = factory.CreateMessageGatewayConfigurationFactory( + brighterConfig, + null, + null); + + //Assert + var rmqConnection = (RmqMessagingGatewayConnection)gatewayConfiguration; + Assert.NotNull(rmqConnection.AmpqUri); + Assert.Equal(3, rmqConnection.AmpqUri!.ConnectionRetryCount); // Default + Assert.Equal(1000, rmqConnection.AmpqUri.RetryWaitInMilliseconds); // Default + Assert.Equal(60000, rmqConnection.AmpqUri.CircuitBreakTimeInMilliseconds); // Default + Assert.Equal((ushort)20, rmqConnection.Heartbeat); // Default + Assert.False(rmqConnection.PersistMessages); // Default + Assert.Equal((ushort)20, rmqConnection.ContinuationTimeout); // Default + } +} + + diff --git a/tests/Paramore.Brighter.RMQ.Async.Tests/Paramore.Brighter.RMQ.Async.Tests.csproj b/tests/Paramore.Brighter.RMQ.Async.Tests/Paramore.Brighter.RMQ.Async.Tests.csproj index 8b64f58205..896a6c4835 100644 --- a/tests/Paramore.Brighter.RMQ.Async.Tests/Paramore.Brighter.RMQ.Async.Tests.csproj +++ b/tests/Paramore.Brighter.RMQ.Async.Tests/Paramore.Brighter.RMQ.Async.Tests.csproj @@ -24,6 +24,7 @@ + From 5474adeb146456adaeaef8eaa760b4c2cfd9ac94 Mon Sep 17 00:00:00 2001 From: Rafael Andrade Date: Mon, 1 Dec 2025 13:47:34 +0000 Subject: [PATCH 3/3] Add RabbitMQ sample of Brighter integration with Configuration --- Brighter.sln | 15 +++ Directory.Packages.props | 1 + docs/adr/0035-configuration-support.md | 115 ++++++++++++++++-- .../GreetingsReceiverConsole.csproj | 5 + .../GreetingsReceiverConsole/Program.cs | 84 +++++-------- .../GreetingsReceiverConsole/appsettings.json | 33 +++++ .../GreetingsSender/GreetingsSender.csproj | 5 + .../RMQTaskQueue/GreetingsSender/Program.cs | 115 +++++++----------- .../GreetingsSender/appsettings.json | 23 ++++ .../Rmq.Greetings.AppHost/AppHost.cs | 13 ++ .../Properties/launchSettings.json | 31 +++++ .../Rmq.Greetings.AppHost.csproj | 20 +++ .../appsettings.Development.json | 8 ++ .../Rmq.Greetings.AppHost/appsettings.json | 9 ++ .../ConfigurationExtensions.cs | 53 ++++---- ...essagingGatewayFromConfigurationFactory.cs | 18 ++- ...essagingGatewayFromConfigurationFactory.cs | 14 +-- .../PublicationConfiguration.cs | 2 +- 18 files changed, 380 insertions(+), 184 deletions(-) create mode 100644 samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/appsettings.json create mode 100644 samples/TaskQueue/RMQTaskQueue/GreetingsSender/appsettings.json create mode 100644 samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/AppHost.cs create mode 100644 samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Properties/launchSettings.json create mode 100644 samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Rmq.Greetings.AppHost.csproj create mode 100644 samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.Development.json create mode 100644 samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.json diff --git a/Brighter.sln b/Brighter.sln index 7cdbfa5420..7327ecec95 100644 --- a/Brighter.sln +++ b/Brighter.sln @@ -468,6 +468,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.TickerQ.T EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Paramore.Brighter.Extensions.Configuration", "src\Paramore.Brighter.Extensions.Configuration\Paramore.Brighter.Extensions.Configuration.csproj", "{5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Rmq.Greetings.AppHost", "samples\TaskQueue\RMQTaskQueue\Rmq.Greetings.AppHost\Rmq.Greetings.AppHost.csproj", "{476226D9-8ECD-4C3F-B6FA-845D3E2509F1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -2746,6 +2748,18 @@ Global {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|Mixed Platforms.Build.0 = Release|Any CPU {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|x86.ActiveCfg = Release|Any CPU {5655FC4F-B6DF-48DD-9CAC-5C3B6AA41DE9}.Release|x86.Build.0 = Release|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Debug|Mixed Platforms.ActiveCfg = Debug|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Debug|Mixed Platforms.Build.0 = Debug|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Debug|x86.ActiveCfg = Debug|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Debug|x86.Build.0 = Debug|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Release|Any CPU.Build.0 = Release|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Release|Mixed Platforms.ActiveCfg = Release|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Release|Mixed Platforms.Build.0 = Release|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Release|x86.ActiveCfg = Release|Any CPU + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -2888,6 +2902,7 @@ Global {9063F17B-5636-4AD5-999B-C894517DB5FD} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} {39B7CFF4-4CA9-4B1F-B9C4-EED3A657D00D} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} {261E1392-7713-525F-2859-7B40CA416A50} = {329736D2-BF92-4D06-A7BF-19F4B6B64EDD} + {476226D9-8ECD-4C3F-B6FA-845D3E2509F1} = {E9748DC0-6E72-4634-AF49-428F806E03B0} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {8B7C7E31-2E32-4E0D-9426-BC9AF22E9F4C} diff --git a/Directory.Packages.props b/Directory.Packages.props index 9236e5f601..388a3238ee 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -12,6 +12,7 @@ 4.0.3.8 + diff --git a/docs/adr/0035-configuration-support.md b/docs/adr/0035-configuration-support.md index 49384f7d5f..2120fd1651 100644 --- a/docs/adr/0035-configuration-support.md +++ b/docs/adr/0035-configuration-support.md @@ -463,28 +463,121 @@ This approach ensures that Brighter components can be configured through: - Standard connection strings for simple scenarios -### Service Collection Integration +### End-to-End Configuration Example -```csharp -// appsettings.json +Here is a practical example of how to configure a sender and a receiver using `IConfiguration`. + +#### Sender Configuration + +The sender application sends `GreetingEvent` and `FarewellEvent` messages. + +**`appsettings.json`** + +```json { "Brighter": { "RabbitMQ": { - "Connection": { "AmqpUri": { "Uri": "amqp://guest:guest@localhost:5672" } } + "Connection": { + "Exchange": { + "Name": "paramore.brighter.exchange" + } + }, + "Publications":[ + { + "WaitForConfirmsTimeOutInMilliseconds": 1000, + "Topic": "greeting.event", + "RequestType": "Greetings.Ports.Commands.GreetingEvent" + }, + { + "WaitForConfirmsTimeOutInMilliseconds": 1000, + "Topic": "farewell.event", + "RequestType": "Greetings.Ports.Commands.FarewellEvent" + } + ] } } } +``` + +**`Program.cs`** + +```csharp +var configuration = new ConfigurationBuilder() + .AddJsonFile("appsettings.json") + .Build(); -// Program.cs -builder.Services.AddConsumers(opt => +var serviceCollection = new ServiceCollection(); + +serviceCollection + .AddBrighter() + .AddProducers((configure) => { - opt.Subscriptions = builder.Configuration.CreateSubscription(); - opt.DefaultChannelFactory = builder.Configuration.CreateChannelFactory(); + configure.ProducerRegistry = configuration.CreateProducerRegistry("messaging"); }) - .AddProducers(opt => + .AutoFromAssemblies(); + +var serviceProvider = serviceCollection.BuildServiceProvider(); +var commandProcessor = serviceProvider.GetRequiredService(); + +commandProcessor.Post(new GreetingEvent("Ian says: Hi there!")); +commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); +``` + +#### Receiver Configuration + +The receiver application listens for `GreetingEvent` and `FarewellEvent` messages. + +**`appsettings.json`** + +```json +{ + "Brighter": { + "RabbitMq": { + "Connection": { + "Exchange": { + "Name": "paramore.brighter.exchange" + } + }, + "Subscriptions":[ + { + "SubscriptionName": "paramore.example.greeting", + "ChannelName": "greeting.event", + "RoutingKey": "greeting.event", + "RequestType": "Greetings.Ports.Commands.GreetingEvent" + }, + { + "SubscriptionName": "paramore.example.farewell", + "ChannelName": "farewell.event", + "RoutingKey": "farewell.event", + "RequestType": "Greetings.Ports.Commands.FarewellEvent" + } + ] + } + } +} +``` + +**`Program.cs`** + +```csharp +var host = new HostBuilder() + .ConfigureAppConfiguration(c => c + .AddJsonFile("appsettings.json")) + .ConfigureServices((host, services) => { - opt.ProducerRegistry = builder.Configuration.CreateProducerRegistry(); - }); + services.AddConsumers(options => + { + options.Subscriptions = host.Configuration.CreateSubscriptions("messaging", + assemblies: [typeof(GreetingEvent).Assembly]) + .ToArray(); + }) + .AutoFromAssemblies(); + + services.AddHostedService(); + }) + .Build(); + +await host.RunAsync(); ``` ## Consequences diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj index be542a77e7..fd5450f7a5 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/GreetingsReceiverConsole.csproj @@ -16,4 +16,9 @@ + + + Always + + \ No newline at end of file diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs index 4645ef2582..749a087542 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/Program.cs @@ -22,78 +22,50 @@ THE SOFTWARE. */ #endregion -using System; +using System.Linq; using System.Threading.Tasks; using Greetings.Ports.Commands; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using Paramore.Brighter; using Paramore.Brighter.Extensions.Configuration; using Paramore.Brighter.MessagingGateway.RMQ.Async; using Paramore.Brighter.ServiceActivator.Extensions.DependencyInjection; using Paramore.Brighter.ServiceActivator.Extensions.Hosting; using Serilog; -namespace GreetingsReceiverConsole +namespace GreetingsReceiverConsole; + +public class Program { - public class Program + public static async Task Main(string[] args) { - public static async Task Main(string[] args) - { - Log.Logger = new LoggerConfiguration() - .MinimumLevel.Debug() - .Enrich.FromLogContext() - .WriteTo.Console() - .CreateLogger(); - - var host = new HostBuilder() - .ConfigureServices((host, services) => - - { - var subscriptions = new Subscription[] - { - new RmqSubscription( - new SubscriptionName("paramore.example.greeting"), - new ChannelName("greeting.event"), - new RoutingKey("greeting.event"), - timeOut: TimeSpan.FromMilliseconds(2000), - isDurable: true, - highAvailability: true, - messagePumpType: MessagePumpType.Reactor, - makeChannels: OnMissingChannel.Create), //change to OnMissingChannel.Validate if you have infrastructure declared elsewhere - new RmqSubscription( - new SubscriptionName("paramore.example.farewell"), //change to OnMissingChannel.Validate if you have infrastructure declared elsewhere - new ChannelName("farewell.event"), - new RoutingKey("farewell.event"), - timeOut: TimeSpan.FromMilliseconds(200), - isDurable: true, - highAvailability: true, - messagePumpType: MessagePumpType.Reactor, - makeChannels: OnMissingChannel.Create) - }; + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .Enrich.FromLogContext() + .WriteTo.Console() + .CreateLogger(); - var rmqConnection = new RmqMessagingGatewayConnection + var host = new HostBuilder() + .ConfigureAppConfiguration(c => c + .AddJsonFile("appsettings.json") + .AddEnvironmentVariables()) + .ConfigureServices((host, services) => + { + services.AddConsumers(options => { - AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")), - Exchange = new Exchange("paramore.brighter.exchange") - }; - - var rmqMessageConsumerFactory = new RmqMessageConsumerFactory(rmqConnection); - - services.AddConsumers(options => - { - options.Subscriptions = host.Configuration.CreateSubscriptions(); - }) + options.Subscriptions = host.Configuration.CreateSubscriptions("messaging", + assemblies: [typeof(GreetingEvent).Assembly]) + .ToArray(); + }) .AutoFromAssemblies(); - - services.AddHostedService(); - }) - .UseConsoleLifetime() - .UseSerilog() - .Build(); + services.AddHostedService(); + }) + .UseConsoleLifetime() + .UseSerilog() + .Build(); - await host.RunAsync(); - } + await host.RunAsync(); } } diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/appsettings.json b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/appsettings.json new file mode 100644 index 0000000000..0b67826732 --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsReceiverConsole/appsettings.json @@ -0,0 +1,33 @@ +{ + "Brighter": { + "RabbitMq": { + "Connection": { + "Exchange": { + "Name": "paramore.brighter.exchange" + } + }, + "Subscriptions":[ + { + "SubscriptionName": "paramore.example.greeting", + "ChannelName": "greeting.event", + "RoutingKey": "greeting.event", + "Timeout": "00:00:02", + "IsDurable": true, + "HighAvailability": true, + "MessagePumpType": "Reactor", + "RequestType": "Greetings.Ports.Commands.GreetingEvent" + }, + { + "SubscriptionName": "paramore.example.farewell", + "ChannelName": "farewell.event", + "RoutingKey": "farewell.event", + "Timeout": "00:00:02", + "IsDurable": true, + "HighAvailability": true, + "MessagePumpType": "Reactor", + "RequestType": "Greetings.Ports.Commands.FarewellEvent" + } + ] + } + } +} \ No newline at end of file diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj index df81d8f873..e70e04e5c3 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/GreetingsSender.csproj @@ -20,4 +20,9 @@ + + + Always + + \ No newline at end of file diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs index 7d2d2cb150..a95f0e108a 100644 --- a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/Program.cs @@ -35,90 +35,63 @@ THE SOFTWARE. */ using Serilog; using Serilog.Extensions.Logging; -namespace GreetingsSender +namespace GreetingsSender; + +static class Program { - static class Program + static void Main(string[] args) { - static void Main(string[] args) - { - Log.Logger = new LoggerConfiguration() - .MinimumLevel.Debug() - .Enrich.FromLogContext() - .WriteTo.Console() - .CreateLogger(); + Log.Logger = new LoggerConfiguration() + .MinimumLevel.Debug() + .Enrich.FromLogContext() + .WriteTo.Console() + .CreateLogger(); - var configuration = new ConfigurationBuilder() - .AddJsonFile("appsetting.json") - .AddEnvironmentVariables() - .Build(); + var configuration = new ConfigurationBuilder() + .AddJsonFile("appsettings.json") + .AddEnvironmentVariables() + .Build(); - var serviceCollection = new ServiceCollection(); - serviceCollection.AddSingleton(new SerilogLoggerFactory()); + var serviceCollection = new ServiceCollection(); + serviceCollection.AddSingleton(new SerilogLoggerFactory()); - var rmqConnection = new RmqMessagingGatewayConnection + serviceCollection + .AddBrighter() + .AddProducers((configure) => { - AmpqUri = new AmqpUriSpecification(new Uri("amqp://guest:guest@localhost:5672")), - Exchange = new Exchange("paramore.brighter.exchange"), - }; - - var producerRegistry = new RmqProducerRegistryFactory( - rmqConnection, - [ - new() - { - WaitForConfirmsTimeOutInMilliseconds = 1000, - MakeChannels =OnMissingChannel.Create, - Topic = new RoutingKey("greeting.event"), - RequestType = typeof(GreetingEvent) - }, - new() - { - WaitForConfirmsTimeOutInMilliseconds = 1000, - MakeChannels =OnMissingChannel.Create, - Topic = new RoutingKey("farewell.event"), - RequestType = typeof(FarewellEvent) - } - ]).Create(); - - serviceCollection - .AddBrighter() - .AddProducers((configure) => - { - configure.ProducerRegistry = configuration.CreateProducerRegistry(); - configure.MaxOutStandingMessages = 5; - configure.MaxOutStandingCheckInterval = TimeSpan.FromMilliseconds(500); - }) - .UsePublicationFinder() - .AutoFromAssemblies(); + configure.ProducerRegistry = configuration.CreateProducerRegistry("messaging"); + configure.MaxOutStandingMessages = 5; + configure.MaxOutStandingCheckInterval = TimeSpan.FromMilliseconds(500); + }) + .UsePublicationFinder() + .AutoFromAssemblies(); - var serviceProvider = serviceCollection.BuildServiceProvider(); + var serviceProvider = serviceCollection.BuildServiceProvider(); + var commandProcessor = serviceProvider.GetRequiredService(); - var commandProcessor = serviceProvider.GetService(); - - commandProcessor.Post(new GreetingEvent("Ian says: Hi there!")); - commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); - } + commandProcessor.Post(new GreetingEvent("Ian says: Hi there!")); + commandProcessor.Post(new FarewellEvent("Ian says: See you later!")); } +} - public class CustomPublicationFinder : FindPublicationByPublicationTopicOrRequestType +public class CustomPublicationFinder : FindPublicationByPublicationTopicOrRequestType +{ + private static readonly Dictionary s_typeRouteMapper = new() { - private static readonly Dictionary s_typeRouteMapper = new() - { - [typeof(GreetingEvent)] = "greeting.event", - [typeof(FarewellEvent)] = "farewell.event" - }; + [typeof(GreetingEvent)] = "greeting.event", + [typeof(FarewellEvent)] = "farewell.event" + }; - public override Publication Find(IAmAProducerRegistry registry, RequestContext context) + public override Publication Find(IAmAProducerRegistry registry, RequestContext context) + { + if (s_typeRouteMapper.TryGetValue(typeof(TRequest), out var topic)) { - if (s_typeRouteMapper.TryGetValue(typeof(TRequest), out var topic)) - { - // If you have a tenant topic you could have this - // MAP: [typeof(A), "some-topic-{tenant}" - // topic.Replace("{tenant}", tenantContext.Tenant) - return registry.LookupBy(topic).Publication; - } - - return base.Find(registry, context); + // If you have a tenant topic you could have this + // MAP: [typeof(A), "some-topic-{tenant}" + // topic.Replace("{tenant}", tenantContext.Tenant) + return registry.LookupBy(topic).Publication; } + + return base.Find(registry, context); } } diff --git a/samples/TaskQueue/RMQTaskQueue/GreetingsSender/appsettings.json b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/appsettings.json new file mode 100644 index 0000000000..86939b6048 --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/GreetingsSender/appsettings.json @@ -0,0 +1,23 @@ +{ + "Brighter": { + "RabbitMQ": { + "Connection": { + "Exchange": { + "Name": "paramore.brighter.exchange" + } + }, + "Publications":[ + { + "WaitForConfirmsTimeOutInMilliseconds": 1000, + "Topic": "greeting.event", + "RequestType": "Greetings.Ports.Commands.GreetingEvent" + }, + { + "WaitForConfirmsTimeOutInMilliseconds": 1000, + "Topic": "farewell.event", + "RequestType": "Greetings.Ports.Commands.FarewellEvent" + } + ] + } + } +} \ No newline at end of file diff --git a/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/AppHost.cs b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/AppHost.cs new file mode 100644 index 0000000000..2227b62cac --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/AppHost.cs @@ -0,0 +1,13 @@ +var builder = DistributedApplication.CreateBuilder(args); + +var rmq = builder + .AddRabbitMQ("messaging") + .WithLifetime(ContainerLifetime.Persistent); + +builder.AddProject("sender") + .WithReference(rmq); + +builder.AddProject("receiver") + .WithReference(rmq); + +builder.Build().Run(); diff --git a/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Properties/launchSettings.json b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Properties/launchSettings.json new file mode 100644 index 0000000000..9e2d8de801 --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Properties/launchSettings.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "profiles": { + "https": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "https://localhost:17087;http://localhost:15091", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + "ASPIRE_DASHBOARD_OTLP_ENDPOINT_URL": "https://localhost:21163", + "ASPIRE_DASHBOARD_MCP_ENDPOINT_URL": "https://localhost:23134", + "ASPIRE_RESOURCE_SERVICE_ENDPOINT_URL": "https://localhost:22000" + } + }, + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "applicationUrl": "http://localhost:15091", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "DOTNET_ENVIRONMENT": "Development", + "ASPIRE_DASHBOARD_OTLP_ENDPOINT_URL": "http://localhost:19135", + "ASPIRE_DASHBOARD_MCP_ENDPOINT_URL": "http://localhost:18084", + "ASPIRE_RESOURCE_SERVICE_ENDPOINT_URL": "http://localhost:20133" + } + } + } +} diff --git a/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Rmq.Greetings.AppHost.csproj b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Rmq.Greetings.AppHost.csproj new file mode 100644 index 0000000000..0990b29b21 --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/Rmq.Greetings.AppHost.csproj @@ -0,0 +1,20 @@ + + + + Exe + net9.0 + enable + enable + 2243fb4c-9173-484b-b3d7-672392906182 + + + + + + + + + + + + diff --git a/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.Development.json b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.Development.json new file mode 100644 index 0000000000..0c208ae918 --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.Development.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + } +} diff --git a/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.json b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.json new file mode 100644 index 0000000000..31c092aa45 --- /dev/null +++ b/samples/TaskQueue/RMQTaskQueue/Rmq.Greetings.AppHost/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning", + "Aspire.Hosting.Dcp": "Warning" + } + } +} diff --git a/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs b/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs index f5dcc721a1..51b98c40bd 100644 --- a/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs +++ b/src/Paramore.Brighter.Extensions.Configuration/ConfigurationExtensions.cs @@ -46,39 +46,27 @@ public static T CreateMessageGatewayConnection(this IConfiguration configurat return (T)factory.CreateMessageConsumerFactory(new MicrosoftConfiguration(configuration), name, sectionName); } - public static Subscription[] CreateSubscriptions(this IConfiguration configuration, + public static IEnumerable CreateSubscriptions(this IConfiguration configuration, string? name = null, - string? sectionName = null) + string? sectionName = null, + Assembly[]? assemblies = null) + where T : Subscription { var cfg = new MicrosoftConfiguration(configuration); - var factories = GetAll(); - var subscriptions = new List(); - - foreach (var factory in factories) - { - subscriptions.AddRange(factory.CreateSubscriptions(cfg, name, sectionName)); - } - - return subscriptions.ToArray(); + var factory = Get(typeof(T).Assembly); + return factory.CreateSubscriptions(cfg, name, sectionName); } - - public static IAmAProducerRegistry CreateProducerRegistry(this IConfiguration configuration, + + public static IAmAProducerRegistry CreateProducerRegistry(this IConfiguration configuration, string? name = null, string? sectionName = null) + where T: class, IAmAProducerRegistryFactory { var cfg = new MicrosoftConfiguration(configuration); - var factories = GetAll() - .ToList(); - var messageProducerFactories = new List(); - - foreach (var factory in factories) - { - messageProducerFactories.Add(factory.CreateMessageProducerFactory(cfg, name, sectionName)); - } - - return new CombinedProducerRegistryFactory(messageProducerFactories.ToArray()) - .Create(); + var factory = Get(typeof(T).Assembly); + return factory.CreateProducerRegistryFactory(cfg, name, sectionName).Create(); } + /// /// Discovers and instantiates an implementation of the specified interface from the given assembly. @@ -114,16 +102,19 @@ private static T Get(Assembly assembly) private static IEnumerable GetAll() { var @interface = typeof(T); - foreach (var type in AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())) + foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()) { - if (!type.IsClass || type.IsAbstract) + foreach (var type in assembly.GetTypes()) { - continue; - } + if (!type.IsClass || type.IsAbstract) + { + continue; + } - if (@interface.IsAssignableFrom(type)) - { - yield return (T)Activator.CreateInstance(type)!; + if (@interface.IsAssignableFrom(type)) + { + yield return (T)Activator.CreateInstance(type)!; + } } } } diff --git a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs index d23af4efcb..4bf7080ebb 100644 --- a/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs +++ b/src/Paramore.Brighter.MessagingGateway.RMQ.Async/RmqMessagingGatewayFromConfigurationFactory.cs @@ -94,6 +94,18 @@ public IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration return new RmqMessageConsumerFactory(connection); } + public IAmAProducerRegistryFactory CreateProducerRegistryFactory(IAmAConfiguration configuration, string? name, + string? sectionName) + { + var rabbitMqConfiguration = GetRabbitMqConfiguration(configuration, name ?? string.Empty, sectionName); + var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); + + return new RmqProducerRegistryFactory(connection, + rabbitMqConfiguration.Publications + .Select(x => x.ToPublication()) + .ToArray()); + } + public IAmAMessageProducerFactory CreateMessageProducerFactory(IAmAConfiguration configuration, string? name, string? sectionName) @@ -102,7 +114,9 @@ public IAmAMessageProducerFactory CreateMessageProducerFactory(IAmAConfiguration var connection = rabbitMqConfiguration.Connection.ToMessagingGatewayConnection(); return new RmqMessageProducerFactory(connection, - rabbitMqConfiguration.Publications.Select(x => x.ToPublication())); + rabbitMqConfiguration.Publications + .Select(x => x.ToPublication()) + .ToArray()); } /// @@ -219,7 +233,7 @@ public class RabbitMqConfiguration /// Gets or sets the RabbitMQ gateway connection configuration. /// /// A containing connection details for RabbitMQ. - public GatewayConnection Connection { get; set; } = null!; + public GatewayConnection Connection { get; set; } = new(); /// /// Gets or sets the list of RabbitMQ subscriptions. diff --git a/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs b/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs index 4d42bdf590..1f3f3e36a5 100644 --- a/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs +++ b/src/Paramore.Brighter/ConfigurationFactory/IAmMessagingGatewayFromConfigurationFactory.cs @@ -56,18 +56,8 @@ public interface IAmMessagingGatewayFromConfigurationFactory /// IAmAMessageConsumerFactory CreateMessageConsumerFactory(IAmAConfiguration configuration, string? name, string? sectionName); - /// - /// Creates a producer registry factory from the provided configuration source. - /// - /// The containing the producer registry factory settings. - /// The optional name for named configuration instances, allowing multiple configurations for the same provider. - /// The optional override for the configuration section name. If null, uses the provider's default section name. - /// An instance that creates producer registries for sending messages. - /// - /// The producer registry factory creates registries that manage message producers (publishers) for sending - /// messages to the transport. Configuration includes connection settings, publication details, and producer-specific - /// options like confirmation mode, persistence settings, and routing strategies. - /// + IAmAProducerRegistryFactory CreateProducerRegistryFactory(IAmAConfiguration configuration, string? name, string? sectionName); + IAmAMessageProducerFactory CreateMessageProducerFactory(IAmAConfiguration configuration, string? name, string? sectionName); /// diff --git a/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs b/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs index 3013c13b81..93800879bf 100644 --- a/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs +++ b/src/Paramore.Brighter/ConfigurationFactory/PublicationConfiguration.cs @@ -192,7 +192,7 @@ public abstract class PublicationConfiguration { var types = assembly.GetTypes(); var type = types.FirstOrDefault(x => x.FullName == RequestType); - if (type != null && type.IsClass && !type.IsAbstract) + if (type is { IsClass: true }) { return type; }