Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions lib/broadway_sqs/ex_aws_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,37 @@ defmodule BroadwaySQS.ExAwsClient do
def ack(ack_ref, successful, failed) do
ack_options = :persistent_term.get(ack_ref)

messages =
messages_to_delete =
Enum.filter(successful, &ack?(&1, ack_options, :on_success)) ++
Enum.filter(failed, &ack?(&1, ack_options, :on_failure))

messages
messages_to_nack_with_timeout =
Enum.flat_map(successful, &nack(&1, ack_options, :on_success)) ++
Enum.flat_map(failed, &nack(&1, ack_options, :on_failure))

messages_to_delete
|> Enum.chunk_every(@max_num_messages_allowed_by_aws)
|> Enum.each(&delete_messages(&1, ack_options))

messages_to_nack_with_timeout
|> Enum.chunk_every(@max_num_messages_allowed_by_aws)
|> Enum.each(fn messages -> delete_messages(messages, ack_options) end)
|> Enum.each(&change_message_visibilities(&1, ack_options))
end

defp ack?(message, ack_options, option) do
{_, _, message_ack_options} = message.acknowledger
(message_ack_options[option] || Map.fetch!(ack_options, option)) == :ack
end

defp nack(message, ack_options, option) do
{_, _, message_ack_options} = message.acknowledger

case message_ack_options[option] || Map.fetch!(ack_options, option) do
{:nack, timeout} -> [{message, timeout}]
_ -> []
end
end

@impl Acknowledger
def configure(_ack_ref, ack_data, options) do
{:ok, Map.merge(ack_data, Map.new(options))}
Expand All @@ -64,6 +81,19 @@ defmodule BroadwaySQS.ExAwsClient do
|> ExAws.request!(ack_options.config)
end

defp change_message_visibilities(messages_with_timeouts, ack_options) do
entries =
Enum.map(messages_with_timeouts, fn {message, timeout} ->
message
|> extract_message_receipt()
|> Map.put(:visibility_timeout, timeout)
end)

ack_options.queue_url
|> ExAws.SQS.change_message_visibility_batch(entries)
|> ExAws.request!(ack_options.config)
end

defp wrap_received_messages({:ok, %{body: body}}, %{ack_ref: ack_ref}) do
Enum.map(body.messages, fn message ->
metadata = Map.delete(message, :body)
Expand Down
16 changes: 14 additions & 2 deletions lib/broadway_sqs/options.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ defmodule BroadwaySQS.Options do
default: 5000
],
on_success: [
type: :atom,
type: {:custom, __MODULE__, :type_ack_action, [[{:name, :on_success}]]},
default: :ack,
doc: """
configures the acking behaviour for successful messages. See the
"Acknowledgments" section below for all the possible values.
"""
],
on_failure: [
type: :atom,
type: {:custom, __MODULE__, :type_ack_action, [[{:name, :on_failure}]]},
default: :noop,
doc: """
configures the acking behaviour for failed messages. See the
Expand Down Expand Up @@ -221,4 +221,16 @@ defmodule BroadwaySQS.Options do
"expected :#{name} to be a list with possible members #{inspect(allowed_members)}, got: #{inspect(value)}"}
end
end

def type_ack_action(:ack, _opts), do: {:ok, :ack}
def type_ack_action(:noop, _opts), do: {:ok, :noop}

def type_ack_action({:nack, timeout}, [{:name, _name}])
when is_integer(timeout) and timeout >= 0 and timeout <= 43_200 do
{:ok, {:nack, timeout}}
end

def type_ack_action(value, [{:name, name}]) do
{:error, "expected :#{name} to be :ack, :noop, or {:nack, timeout}, got: #{inspect(value)}"}
end
end
9 changes: 7 additions & 2 deletions lib/broadway_sqs/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,13 @@ defmodule BroadwaySQS.Producer do
and will not redeliver it to any other consumer.

* `:noop` - do not acknowledge the message. SQS will eventually redeliver the message
or remove it based on the "Visibility Timeout" and "Max Receive Count"
configurations. For more information, see:
or remove it based on the "Visibility Timeout" and "Max Receive Count" configurations.

* `{:nack, timeout}` - change the message visibility timeout to `timeout` seconds
(0 to 43_200). The message will become available again for processing after
the given amount of time.

For more information, see:

* ["Visibility Timeout" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)
* ["Dead Letter Queue" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html)
Expand Down
31 changes: 31 additions & 0 deletions test/broadway_sqs/ex_aws_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ defmodule BroadwaySQS.ExAwsClientTest do

{:ok, %{status_code: 200, body: "<DeleteMessageBatchResponse />"}}
end

def request(:post, url, "Action=ChangeMessageVisibilityBatch" <> _ = body, _, _) do
send(self(), {:http_request_called, %{url: url, body: body}})

{:ok, %{status_code: 200, body: "<ChangeMessageVisibilityBatchResponse />"}}
end
end

defmodule FakeHttpClientWithError do
Expand Down Expand Up @@ -292,6 +298,31 @@ defmodule BroadwaySQS.ExAwsClientTest do
assert_received {:http_request_called, %{url: url}}
assert url == "http://localhost:9324/"
end

test "request with :nack strategy", %{opts: base_opts} do
{:ok, opts} = ExAwsClient.init(base_opts ++ [on_failure: {:nack, 10}])

:persistent_term.put(opts.ack_ref, %{
queue_url: opts[:queue_url],
config: opts[:config],
on_success: opts[:on_success],
on_failure: opts[:on_failure]
})

ack_data = %{receipt: %{id: "1", receipt_handle: "abc"}}
message = %Message{acknowledger: {ExAwsClient, opts.ack_ref, ack_data}, data: nil}

ExAwsClient.ack(opts.ack_ref, [], [message])

assert_received {:http_request_called, %{body: body}}

assert body ==
"Action=ChangeMessageVisibilityBatch" <>
"&ChangeMessageVisibilityBatchRequestEntry.1.Id=1" <>
"&ChangeMessageVisibilityBatchRequestEntry.1.ReceiptHandle=abc" <>
"&ChangeMessageVisibilityBatchRequestEntry.1.VisibilityTimeout=10" <>
"&QueueUrl=my_queue"
end
end

defp fill_persistent_term(ack_ref, base_opts) do
Expand Down
Loading