diff --git a/lib/broadway_sqs/ex_aws_client.ex b/lib/broadway_sqs/ex_aws_client.ex
index f07f305..ea83a3b 100644
--- a/lib/broadway_sqs/ex_aws_client.ex
+++ b/lib/broadway_sqs/ex_aws_client.ex
@@ -37,13 +37,21 @@ defmodule BroadwaySQS.ExAwsClient do
def ack(ack_ref, successful, failed) do
ack_options = :persistent_term.get(ack_ref)
- messages =
+ messages_to_delete =
Enum.filter(successful, &ack?(&1, ack_options, :on_success)) ++
Enum.filter(failed, &ack?(&1, ack_options, :on_failure))
- messages
+ messages_to_nack_with_timeout =
+ Enum.flat_map(successful, &nack(&1, ack_options, :on_success)) ++
+ Enum.flat_map(failed, &nack(&1, ack_options, :on_failure))
+
+ messages_to_delete
+ |> Enum.chunk_every(@max_num_messages_allowed_by_aws)
+ |> Enum.each(&delete_messages(&1, ack_options))
+
+ messages_to_nack_with_timeout
|> Enum.chunk_every(@max_num_messages_allowed_by_aws)
- |> Enum.each(fn messages -> delete_messages(messages, ack_options) end)
+ |> Enum.each(&change_message_visibilities(&1, ack_options))
end
defp ack?(message, ack_options, option) do
@@ -51,6 +59,15 @@ defmodule BroadwaySQS.ExAwsClient do
(message_ack_options[option] || Map.fetch!(ack_options, option)) == :ack
end
+ defp nack(message, ack_options, option) do
+ {_, _, message_ack_options} = message.acknowledger
+
+ case message_ack_options[option] || Map.fetch!(ack_options, option) do
+ {:nack, timeout} -> [{message, timeout}]
+ _ -> []
+ end
+ end
+
@impl Acknowledger
def configure(_ack_ref, ack_data, options) do
{:ok, Map.merge(ack_data, Map.new(options))}
@@ -64,6 +81,19 @@ defmodule BroadwaySQS.ExAwsClient do
|> ExAws.request!(ack_options.config)
end
+ defp change_message_visibilities(messages_with_timeouts, ack_options) do
+ entries =
+ Enum.map(messages_with_timeouts, fn {message, timeout} ->
+ message
+ |> extract_message_receipt()
+ |> Map.put(:visibility_timeout, timeout)
+ end)
+
+ ack_options.queue_url
+ |> ExAws.SQS.change_message_visibility_batch(entries)
+ |> ExAws.request!(ack_options.config)
+ end
+
defp wrap_received_messages({:ok, %{body: body}}, %{ack_ref: ack_ref}) do
Enum.map(body.messages, fn message ->
metadata = Map.delete(message, :body)
diff --git a/lib/broadway_sqs/options.ex b/lib/broadway_sqs/options.ex
index 52646fd..9092562 100644
--- a/lib/broadway_sqs/options.ex
+++ b/lib/broadway_sqs/options.ex
@@ -34,7 +34,7 @@ defmodule BroadwaySQS.Options do
default: 5000
],
on_success: [
- type: :atom,
+ type: {:custom, __MODULE__, :type_ack_action, [[{:name, :on_success}]]},
default: :ack,
doc: """
configures the acking behaviour for successful messages. See the
@@ -42,7 +42,7 @@ defmodule BroadwaySQS.Options do
"""
],
on_failure: [
- type: :atom,
+ type: {:custom, __MODULE__, :type_ack_action, [[{:name, :on_failure}]]},
default: :noop,
doc: """
configures the acking behaviour for failed messages. See the
@@ -221,4 +221,16 @@ defmodule BroadwaySQS.Options do
"expected :#{name} to be a list with possible members #{inspect(allowed_members)}, got: #{inspect(value)}"}
end
end
+
+ def type_ack_action(:ack, _opts), do: {:ok, :ack}
+ def type_ack_action(:noop, _opts), do: {:ok, :noop}
+
+ def type_ack_action({:nack, timeout}, [{:name, _name}])
+ when is_integer(timeout) and timeout >= 0 and timeout <= 43_200 do
+ {:ok, {:nack, timeout}}
+ end
+
+ def type_ack_action(value, [{:name, name}]) do
+ {:error, "expected :#{name} to be :ack, :noop, or {:nack, timeout}, got: #{inspect(value)}"}
+ end
end
diff --git a/lib/broadway_sqs/producer.ex b/lib/broadway_sqs/producer.ex
index c15578e..8cb6f32 100644
--- a/lib/broadway_sqs/producer.ex
+++ b/lib/broadway_sqs/producer.ex
@@ -31,8 +31,13 @@ defmodule BroadwaySQS.Producer do
and will not redeliver it to any other consumer.
* `:noop` - do not acknowledge the message. SQS will eventually redeliver the message
- or remove it based on the "Visibility Timeout" and "Max Receive Count"
- configurations. For more information, see:
+ or remove it based on the "Visibility Timeout" and "Max Receive Count" configurations.
+
+ * `{:nack, timeout}` - change the message visibility timeout to `timeout` seconds
+ (0 to 43_200). The message will become available again for processing after
+ the given amount of time.
+
+ For more information, see:
* ["Visibility Timeout" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)
* ["Dead Letter Queue" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html)
diff --git a/test/broadway_sqs/ex_aws_client_test.exs b/test/broadway_sqs/ex_aws_client_test.exs
index 14cf208..1154baa 100644
--- a/test/broadway_sqs/ex_aws_client_test.exs
+++ b/test/broadway_sqs/ex_aws_client_test.exs
@@ -52,6 +52,12 @@ defmodule BroadwaySQS.ExAwsClientTest do
{:ok, %{status_code: 200, body: ""}}
end
+
+ def request(:post, url, "Action=ChangeMessageVisibilityBatch" <> _ = body, _, _) do
+ send(self(), {:http_request_called, %{url: url, body: body}})
+
+ {:ok, %{status_code: 200, body: ""}}
+ end
end
defmodule FakeHttpClientWithError do
@@ -292,6 +298,31 @@ defmodule BroadwaySQS.ExAwsClientTest do
assert_received {:http_request_called, %{url: url}}
assert url == "http://localhost:9324/"
end
+
+ test "request with :nack strategy", %{opts: base_opts} do
+ {:ok, opts} = ExAwsClient.init(base_opts ++ [on_failure: {:nack, 10}])
+
+ :persistent_term.put(opts.ack_ref, %{
+ queue_url: opts[:queue_url],
+ config: opts[:config],
+ on_success: opts[:on_success],
+ on_failure: opts[:on_failure]
+ })
+
+ ack_data = %{receipt: %{id: "1", receipt_handle: "abc"}}
+ message = %Message{acknowledger: {ExAwsClient, opts.ack_ref, ack_data}, data: nil}
+
+ ExAwsClient.ack(opts.ack_ref, [], [message])
+
+ assert_received {:http_request_called, %{body: body}}
+
+ assert body ==
+ "Action=ChangeMessageVisibilityBatch" <>
+ "&ChangeMessageVisibilityBatchRequestEntry.1.Id=1" <>
+ "&ChangeMessageVisibilityBatchRequestEntry.1.ReceiptHandle=abc" <>
+ "&ChangeMessageVisibilityBatchRequestEntry.1.VisibilityTimeout=10" <>
+ "&QueueUrl=my_queue"
+ end
end
defp fill_persistent_term(ack_ref, base_opts) do