From dd0ea5a9f9f338354c7991cf10d08f68c428e583 Mon Sep 17 00:00:00 2001 From: Anthony Accomazzo Date: Thu, 29 May 2025 16:05:17 -0700 Subject: [PATCH 1/2] Add support for nack'ing messages With the ack option `{:nack, timeout}`, add support for "nack"ing SQS messages (setting visibility timeout to some point in the future) --- lib/broadway_sqs/ex_aws_client.ex | 39 ++++++++++++++++++++++-- lib/broadway_sqs/options.ex | 16 ++++++++-- lib/broadway_sqs/producer.ex | 9 ++++-- test/broadway_sqs/ex_aws_client_test.exs | 31 +++++++++++++++++++ 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/lib/broadway_sqs/ex_aws_client.ex b/lib/broadway_sqs/ex_aws_client.ex index f07f305..e6763a2 100644 --- a/lib/broadway_sqs/ex_aws_client.ex +++ b/lib/broadway_sqs/ex_aws_client.ex @@ -37,13 +37,21 @@ defmodule BroadwaySQS.ExAwsClient do def ack(ack_ref, successful, failed) do ack_options = :persistent_term.get(ack_ref) - messages = + messages_to_delete = Enum.filter(successful, &ack?(&1, ack_options, :on_success)) ++ Enum.filter(failed, &ack?(&1, ack_options, :on_failure)) - messages + messages_to_nack_with_timeout = + collect_messages_to_nack(successful, ack_options, :on_success) ++ + collect_messages_to_nack(failed, ack_options, :on_failure) + + messages_to_delete + |> Enum.chunk_every(@max_num_messages_allowed_by_aws) + |> Enum.each(&delete_messages(&1, ack_options)) + + messages_to_nack_with_timeout |> Enum.chunk_every(@max_num_messages_allowed_by_aws) - |> Enum.each(fn messages -> delete_messages(messages, ack_options) end) + |> Enum.each(&change_message_visibilities(&1, ack_options)) end defp ack?(message, ack_options, option) do @@ -51,6 +59,18 @@ defmodule BroadwaySQS.ExAwsClient do (message_ack_options[option] || Map.fetch!(ack_options, option)) == :ack end + defp collect_messages_to_nack(messages, ack_options, option) do + Enum.map(messages, fn message -> + {_, _, message_ack_options} = message.acknowledger + + case message_ack_options[option] || Map.fetch!(ack_options, option) do + {:nack, timeout} -> {message, timeout} + _ -> nil + end + end) + |> Enum.filter(& &1) + end + @impl Acknowledger def configure(_ack_ref, ack_data, options) do {:ok, Map.merge(ack_data, Map.new(options))} @@ -64,6 +84,19 @@ defmodule BroadwaySQS.ExAwsClient do |> ExAws.request!(ack_options.config) end + defp change_message_visibilities(messages_with_timeouts, ack_options) do + entries = + Enum.map(messages_with_timeouts, fn {message, timeout} -> + message + |> extract_message_receipt() + |> Map.put(:visibility_timeout, timeout) + end) + + ack_options.queue_url + |> ExAws.SQS.change_message_visibility_batch(entries) + |> ExAws.request!(ack_options.config) + end + defp wrap_received_messages({:ok, %{body: body}}, %{ack_ref: ack_ref}) do Enum.map(body.messages, fn message -> metadata = Map.delete(message, :body) diff --git a/lib/broadway_sqs/options.ex b/lib/broadway_sqs/options.ex index 52646fd..9092562 100644 --- a/lib/broadway_sqs/options.ex +++ b/lib/broadway_sqs/options.ex @@ -34,7 +34,7 @@ defmodule BroadwaySQS.Options do default: 5000 ], on_success: [ - type: :atom, + type: {:custom, __MODULE__, :type_ack_action, [[{:name, :on_success}]]}, default: :ack, doc: """ configures the acking behaviour for successful messages. See the @@ -42,7 +42,7 @@ defmodule BroadwaySQS.Options do """ ], on_failure: [ - type: :atom, + type: {:custom, __MODULE__, :type_ack_action, [[{:name, :on_failure}]]}, default: :noop, doc: """ configures the acking behaviour for failed messages. See the @@ -221,4 +221,16 @@ defmodule BroadwaySQS.Options do "expected :#{name} to be a list with possible members #{inspect(allowed_members)}, got: #{inspect(value)}"} end end + + def type_ack_action(:ack, _opts), do: {:ok, :ack} + def type_ack_action(:noop, _opts), do: {:ok, :noop} + + def type_ack_action({:nack, timeout}, [{:name, _name}]) + when is_integer(timeout) and timeout >= 0 and timeout <= 43_200 do + {:ok, {:nack, timeout}} + end + + def type_ack_action(value, [{:name, name}]) do + {:error, "expected :#{name} to be :ack, :noop, or {:nack, timeout}, got: #{inspect(value)}"} + end end diff --git a/lib/broadway_sqs/producer.ex b/lib/broadway_sqs/producer.ex index c15578e..8cb6f32 100644 --- a/lib/broadway_sqs/producer.ex +++ b/lib/broadway_sqs/producer.ex @@ -31,8 +31,13 @@ defmodule BroadwaySQS.Producer do and will not redeliver it to any other consumer. * `:noop` - do not acknowledge the message. SQS will eventually redeliver the message - or remove it based on the "Visibility Timeout" and "Max Receive Count" - configurations. For more information, see: + or remove it based on the "Visibility Timeout" and "Max Receive Count" configurations. + + * `{:nack, timeout}` - change the message visibility timeout to `timeout` seconds + (0 to 43_200). The message will become available again for processing after + the given amount of time. + + For more information, see: * ["Visibility Timeout" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html) * ["Dead Letter Queue" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html) diff --git a/test/broadway_sqs/ex_aws_client_test.exs b/test/broadway_sqs/ex_aws_client_test.exs index 14cf208..1154baa 100644 --- a/test/broadway_sqs/ex_aws_client_test.exs +++ b/test/broadway_sqs/ex_aws_client_test.exs @@ -52,6 +52,12 @@ defmodule BroadwaySQS.ExAwsClientTest do {:ok, %{status_code: 200, body: ""}} end + + def request(:post, url, "Action=ChangeMessageVisibilityBatch" <> _ = body, _, _) do + send(self(), {:http_request_called, %{url: url, body: body}}) + + {:ok, %{status_code: 200, body: ""}} + end end defmodule FakeHttpClientWithError do @@ -292,6 +298,31 @@ defmodule BroadwaySQS.ExAwsClientTest do assert_received {:http_request_called, %{url: url}} assert url == "http://localhost:9324/" end + + test "request with :nack strategy", %{opts: base_opts} do + {:ok, opts} = ExAwsClient.init(base_opts ++ [on_failure: {:nack, 10}]) + + :persistent_term.put(opts.ack_ref, %{ + queue_url: opts[:queue_url], + config: opts[:config], + on_success: opts[:on_success], + on_failure: opts[:on_failure] + }) + + ack_data = %{receipt: %{id: "1", receipt_handle: "abc"}} + message = %Message{acknowledger: {ExAwsClient, opts.ack_ref, ack_data}, data: nil} + + ExAwsClient.ack(opts.ack_ref, [], [message]) + + assert_received {:http_request_called, %{body: body}} + + assert body == + "Action=ChangeMessageVisibilityBatch" <> + "&ChangeMessageVisibilityBatchRequestEntry.1.Id=1" <> + "&ChangeMessageVisibilityBatchRequestEntry.1.ReceiptHandle=abc" <> + "&ChangeMessageVisibilityBatchRequestEntry.1.VisibilityTimeout=10" <> + "&QueueUrl=my_queue" + end end defp fill_persistent_term(ack_ref, base_opts) do From 6b3e46603e516f5e67d28be19bd3862994360825 Mon Sep 17 00:00:00 2001 From: Anthony Accomazzo Date: Thu, 29 May 2025 16:05:17 -0700 Subject: [PATCH 2/2] [nack] Refactor to `nack` function to match behavior of `ack?` --- lib/broadway_sqs/ex_aws_client.ex | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/lib/broadway_sqs/ex_aws_client.ex b/lib/broadway_sqs/ex_aws_client.ex index e6763a2..ea83a3b 100644 --- a/lib/broadway_sqs/ex_aws_client.ex +++ b/lib/broadway_sqs/ex_aws_client.ex @@ -42,8 +42,8 @@ defmodule BroadwaySQS.ExAwsClient do Enum.filter(failed, &ack?(&1, ack_options, :on_failure)) messages_to_nack_with_timeout = - collect_messages_to_nack(successful, ack_options, :on_success) ++ - collect_messages_to_nack(failed, ack_options, :on_failure) + Enum.flat_map(successful, &nack(&1, ack_options, :on_success)) ++ + Enum.flat_map(failed, &nack(&1, ack_options, :on_failure)) messages_to_delete |> Enum.chunk_every(@max_num_messages_allowed_by_aws) @@ -59,16 +59,13 @@ defmodule BroadwaySQS.ExAwsClient do (message_ack_options[option] || Map.fetch!(ack_options, option)) == :ack end - defp collect_messages_to_nack(messages, ack_options, option) do - Enum.map(messages, fn message -> - {_, _, message_ack_options} = message.acknowledger + defp nack(message, ack_options, option) do + {_, _, message_ack_options} = message.acknowledger - case message_ack_options[option] || Map.fetch!(ack_options, option) do - {:nack, timeout} -> {message, timeout} - _ -> nil - end - end) - |> Enum.filter(& &1) + case message_ack_options[option] || Map.fetch!(ack_options, option) do + {:nack, timeout} -> [{message, timeout}] + _ -> [] + end end @impl Acknowledger