Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Master

### Azure Service Bus: dead-letter reason and description (#4196)

When a handler rejects a message consumed from Azure Service Bus, `AzureServiceBusConsumer` now records the rejection reason and description in the broker's native `DeadLetterReason` and `DeadLetterErrorDescription` fields rather than dead-lettering with blank values — so the reason is visible to operators triaging the dead-letter queue instead of living only in logs. Values are truncated to the 4096-character limit Azure Service Bus enforces. A `DeadLetterAsync(lockToken, reason, description)` overload is added to the public `IServiceBusReceiverWrapper`.

### Box Schema Versioning and Migrations (spec 0027)

Brighter's box-provisioning system now ships a versioned migration chain for the Outbox and Inbox tables. New deployments install at `V_latest` directly; deployments installed under spec 0023 (which only had a `V=1` history row) are recognised by the runner and advance to `V_latest` without re-running DDL — the existing `V=1` row is preserved verbatim and the V2..V_latest rows are appended. Deployments with pre-spec-0023 (legacy) tables are bootstrapped via column introspection, gated by a `HeaderBag`/`CommandBody` discriminator, then upgraded to `V_latest` under the existing per-backend migration lock.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public async Task NackAsync(Message message, CancellationToken cancellationToken
if(ServiceBusReceiver == null)
await GetMessageReceiverProviderAsync();

await ServiceBusReceiver!.DeadLetterAsync(lockToken);
await ServiceBusReceiver!.DeadLetterAsync(lockToken, reasonString, description);
if (SubscriptionConfiguration.RequireSession)
if (ServiceBusReceiver is not null) await ServiceBusReceiver.CloseAsync();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,16 @@ public interface IServiceBusReceiverWrapper
/// <returns></returns>
Task DeadLetterAsync(string lockToken);

/// <summary>
/// Send a message to the Dead Letter Queue, recording the reason and description in the
/// broker's native dead-letter fields so they are visible to operators triaging the DLQ.
/// </summary>
/// <param name="lockToken">The Lock Token the message was provided with.</param>
/// <param name="reason">The reason the message was dead-lettered (Azure Service Bus caps this at 4096 characters).</param>
/// <param name="description">A fuller description of why the message was dead-lettered (Azure Service Bus caps this at 4096 characters).</param>
/// <returns></returns>
Task DeadLetterAsync(string lockToken, string reason, string? description);

/// <summary>
/// Abandons a message, releasing the lock so the message is available for redelivery.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,22 @@ public Task DeadLetterAsync(string lockToken)
return _messageReceiver.DeadLetterMessageAsync(CreateMessageShiv(lockToken));
}

/// <summary>
/// Deadletters the message, recording the reason and description in the broker's native
/// dead-letter fields.
/// </summary>
/// <param name="lockToken">The lock token of the message to deadletter.</param>
/// <param name="reason">The reason the message was dead-lettered. Truncated to 4096 characters as required by Azure Service Bus.</param>
/// <param name="description">A fuller description of the dead-lettering. Truncated to 4096 characters as required by Azure Service Bus.</param>
/// <returns>A task that represents the asynchronous deadletter operation.</returns>
public Task DeadLetterAsync(string lockToken, string reason, string? description)
{
return _messageReceiver.DeadLetterMessageAsync(
CreateMessageShiv(lockToken),
Truncate(reason),
description is null ? null : Truncate(description));
}

/// <summary>
/// Abandons the message, releasing the lock so it is available for redelivery.
/// </summary>
Expand All @@ -127,6 +143,13 @@ private ServiceBusReceivedMessage CreateMessageShiv(string lockToken)
return ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.Parse(lockToken));
}

// Azure Service Bus rejects dead-letter reason/description values longer than 4096 characters
// with an ArgumentOutOfRangeException, so clamp them to the limit.
private const int MaxDeadLetterFieldLength = 4096;

private static string Truncate(string value)
=> value.Length > MaxDeadLetterFieldLength ? value.Substring(0, MaxDeadLetterFieldLength) : value;

private static partial class Log
{
[LoggerMessage(LogLevel.Warning, "Closing the MessageReceiver connection")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ public class FakeServiceBusReceiverWrapper : IServiceBusReceiverWrapper
public Exception DeadLetterException = null;
public Exception CompleteException = null;
public Exception ReceiveException = null;

public bool DeadLettered { get; private set; }
public string? DeadLetterReason { get; private set; }
public string? DeadLetterDescription { get; private set; }

public Task<IEnumerable<IBrokeredMessageWrapper>> ReceiveAsync(int batchSize, TimeSpan serverWaitTime)
{
Expand Down Expand Up @@ -51,6 +55,18 @@ public Task DeadLetterAsync(string lockToken)
if (DeadLetterException != null)
throw DeadLetterException;

DeadLettered = true;
return Task.CompletedTask;
}

public Task DeadLetterAsync(string lockToken, string reason, string? description)
{
if (DeadLetterException != null)
throw DeadLetterException;

DeadLettered = true;
DeadLetterReason = reason;
DeadLetterDescription = description;
return Task.CompletedTask;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,40 @@ public void When_receiving_messages_and_the_receiver_is_closing_a_MT_QUIT_messag

}

[Fact]
public async Task When_rejecting_a_message_the_reason_and_description_are_forwarded_to_the_dead_letter_queue()
{
_nameSpaceManagerWrapper.ResetState();
_nameSpaceManagerWrapper.Topics.Add("topic", ["subscription"]);

var messageHeader = new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("topic"), MessageType.MT_EVENT);
var message = new Message(messageHeader, new MessageBody("body"));
message.Header.Bag.Add("LockToken", Guid.NewGuid());

var reason = new MessageRejectionReason(RejectionReason.Unacceptable, "currency-missing");

await _azureServiceBusConsumer.RejectAsync(message, reason);

Assert.Equal("Unacceptable", _messageReceiver.DeadLetterReason);
Assert.Equal("currency-missing", _messageReceiver.DeadLetterDescription);
}

[Fact]
public async Task When_rejecting_a_message_with_no_reason_a_default_reason_and_description_are_forwarded_to_the_dead_letter_queue()
{
_nameSpaceManagerWrapper.ResetState();
_nameSpaceManagerWrapper.Topics.Add("topic", ["subscription"]);

var messageHeader = new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("topic"), MessageType.MT_EVENT);
var message = new Message(messageHeader, new MessageBody("body"));
message.Header.Bag.Add("LockToken", Guid.NewGuid());

await _azureServiceBusConsumer.RejectAsync(message);

Assert.Equal("DeliveryError", _messageReceiver.DeadLetterReason);
Assert.Equal("unknown", _messageReceiver.DeadLetterDescription);
}

[Fact]
public void When_a_subscription_does_not_exist_and_Missing_is_set_to_Validate_a_Channel_Failure_is_Raised()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,40 @@ public void When_receiving_messages_and_the_receiver_is_closing_a_MT_QUIT_messag

}

[Fact]
public void When_rejecting_a_message_the_reason_and_description_are_forwarded_to_the_dead_letter_queue()
{
_nameSpaceManagerWrapper.ResetState();
_nameSpaceManagerWrapper.Topics.Add("topic", ["subscription"]);

var messageHeader = new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("topic"), MessageType.MT_EVENT);
var message = new Message(messageHeader, new MessageBody("body"));
message.Header.Bag.Add("LockToken", Guid.NewGuid());

var reason = new MessageRejectionReason(RejectionReason.Unacceptable, "currency-missing");

_azureServiceBusConsumer.Reject(message, reason);

Assert.Equal("Unacceptable", _messageReceiver.DeadLetterReason);
Assert.Equal("currency-missing", _messageReceiver.DeadLetterDescription);
}

[Fact]
public void When_rejecting_a_message_with_no_reason_a_default_reason_and_description_are_forwarded_to_the_dead_letter_queue()
{
_nameSpaceManagerWrapper.ResetState();
_nameSpaceManagerWrapper.Topics.Add("topic", ["subscription"]);

var messageHeader = new MessageHeader(Guid.NewGuid().ToString(), new RoutingKey("topic"), MessageType.MT_EVENT);
var message = new Message(messageHeader, new MessageBody("body"));
message.Header.Bag.Add("LockToken", Guid.NewGuid());

_azureServiceBusConsumer.Reject(message);

Assert.Equal("DeliveryError", _messageReceiver.DeadLetterReason);
Assert.Equal("unknown", _messageReceiver.DeadLetterDescription);
}

[Fact]
public void When_a_subscription_does_not_exist_and_Missing_is_set_to_Validate_a_Channel_Failure_is_Raised()
{
Expand Down
Loading