lib/broadway_sqs/producer.ex

defmodule BroadwaySQS.Producer do
  @moduledoc """
  A GenStage producer that continuously polls messages from a SQS queue and
  acknowledge them after being successfully processed.

  By default this producer uses `BroadwaySQS.ExAwsClient` to talk to SQS but
  you can provide your client by implementing the `BroadwaySQS.SQSClient`
  behaviour.

  For a quick getting started on using Broadway with Amazon SQS, please see
  the [Amazon SQS Guide](https://hexdocs.pm/broadway/amazon-sqs.html).

  ## Options

  Aside from `:receive_interval` and `:sqs_client` which are generic and apply to all
  producers (regardless of the client implementation), all other options are specific to
  the `BroadwaySQS.ExAwsClient`, which is the default client.

  #{NimbleOptions.docs(BroadwaySQS.Options.definition())}

  ## Acknowledgments

  You can use the `:on_success` and `:on_failure` options to control how messages are
  acked on SQS. You can set these options when starting the SQS producer or change them
  for each message through `Broadway.Message.configure_ack/2`. By default, successful
  messages are acked (`:ack`) and failed messages are not (`:noop`).

  The possible values for `:on_success` and `:on_failure` are:

    * `:ack` - acknowledge the message. SQS will delete the message from the queue
      and will not redeliver it to any other consumer.

    * `:noop` - do not acknowledge the message. SQS will eventually redeliver the message
    or remove it based on the "Visibility Timeout" and "Max Receive Count"
    configurations. For more information, see:

      * ["Visibility Timeout" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html)
      * ["Dead Letter Queue" page on Amazon SQS](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html)

  ### Batching

  Even if you are not interested in working with Broadway batches via the
  `handle_batch/3` callback, we recommend all Broadway pipelines with SQS
  producers to define a default batcher with `batch_size` set to 10, so
  messages can be acknowledged in batches, which improves the performance
  and reduce the costs of integrating with SQS.

  ## Example

      Broadway.start_link(MyBroadway,
        name: MyBroadway,
        producer: [
          module: {BroadwaySQS.Producer,
            queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
            config: [
              access_key_id: "YOUR_AWS_ACCESS_KEY_ID",
              secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY",
              region: "us-east-2"
            ]
          }
        ],
        processors: [
          default: []
        ],
        batchers: [
          default: [
            batch_size: 10,
            batch_timeout: 2000
          ]
        ]
      )

  The above configuration will set up a producer that continuously receives
  messages from `"my_queue"` and sends them downstream.

  ## Retrieving Metadata

  By default the following information is added to the `metadata` field in the
  `%Message{}` struct:

    * `message_id` - The message id received when the message was sent to the queue
    * `receipt_handle` - The receipt handle
    * `md5_of_body` - An MD5 digest of the message body

  You can access any of that information directly while processing the message:

      def handle_message(_, message, _) do
        receipt = %{
          id: message.metadata.message_id,
          receipt_handle: message.metadata.receipt_handle
        }

        # Do something with the receipt
      end

  If you want to retrieve `attributes` or `message_attributes`, you need to
  configure the `:attributes_names` and `:message_attributes_names` options
  accordingly, otherwise, attributes will not be attached to the response and
  will not be available in the `metadata` field

      producer: [
        module: {BroadwaySQS.Producer,
          queue_url: "https://sqs.amazonaws.com/0000000000/my_queue",
          # Define which attributes/message_attributes you want to be attached
          attribute_names: [:approximate_receive_count],
          message_attribute_names: ["SomeAttribute"]
        }
      ]

  and then in `handle_message`:

      def handle_message(_, message, _) do
        approximate_receive_count = message.metadata.attributes["approximate_receive_count"]
        some_attribute = message.metadata.message_attributes["SomeAttribute"]

        # Do something with the attributes
      end

  For more information on the `:attributes_names` and `:message_attributes_names`
  options, see ["AttributeName.N" and "MessageAttributeName.N" on the ReceiveMessage documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html)

  ## Telemetry

  This library exposes the following Telemetry events:

    * `[:broadway_sqs, :receive_messages, :start]` - Dispatched before receiving
      messages from SQS (`c:BroadwaySQS.SQSClient.receive_messages/2`)

      * measurement: `%{time: System.monotonic_time}`
      * metadata: `%{name: atom, demand: integer}`

    * `[:broadway_sqs, :receive_messages, :stop]` - Dispatched after messages have
      been received from SQS and "wrapped".

      * measurement: `%{duration: native_time}`
      * metadata:

        ```
        %{
          name: atom,
          messages: [Broadway.Message.t],
          demand: integer
        }
        ```

    * `[:broadway_sqs, :receive_messages, :exception]` - Dispatched after a failure
      while receiving messages from SQS.

      * measurement: `%{duration: native_time}`
      * metadata:

        ```
        %{
          name: atom,
          demand: integer,
          kind: kind,
          reason: reason,
          stacktrace: stacktrace
        }
        ```

  """

  use GenStage

  require Logger
  alias Broadway.Producer
  alias NimbleOptions.ValidationError

  @behaviour Producer

  @impl true
  def init(opts) do
    receive_interval = opts[:receive_interval]
    sqs_client = opts[:sqs_client]
    {:ok, client_opts} = sqs_client.init(opts)

    {:producer,
     %{
       demand: 0,
       receive_timer: nil,
       receive_interval: receive_interval,
       sqs_client: {sqs_client, client_opts}
     }}
  end

  @impl true
  def prepare_for_start(_module, broadway_opts) do
    {producer_module, client_opts} = broadway_opts[:producer][:module]

    if Keyword.has_key?(client_opts, :queue_name) do
      Logger.error(
        "The option :queue_name has been removed in order to keep compatibility with " <>
          "ex_aws_sqs >= v3.0.0. Please set the queue URL using the new :queue_url option."
      )

      exit(:invalid_config)
    end

    case NimbleOptions.validate(client_opts, BroadwaySQS.Options.definition()) do
      {:error, error} ->
        raise ArgumentError, format_error(error)

      {:ok, opts} ->
        ack_ref = broadway_opts[:name]

        :persistent_term.put(ack_ref, %{
          queue_url: opts[:queue_url],
          config: opts[:config],
          on_success: opts[:on_success],
          on_failure: opts[:on_failure]
        })

        broadway_opts_with_defaults =
          put_in(broadway_opts, [:producer, :module], {producer_module, opts})

        {[], broadway_opts_with_defaults}
    end
  end

  defp format_error(%ValidationError{keys_path: [], message: message}) do
    "invalid configuration given to SQSBroadway.prepare_for_start/2, " <> message
  end

  defp format_error(%ValidationError{keys_path: keys_path, message: message}) do
    "invalid configuration given to SQSBroadway.prepare_for_start/2 for key #{inspect(keys_path)}, " <>
      message
  end

  @impl true
  def handle_demand(incoming_demand, %{demand: demand} = state) do
    handle_receive_messages(%{state | demand: demand + incoming_demand})
  end

  @impl true
  def handle_info(:receive_messages, %{receive_timer: nil} = state) do
    {:noreply, [], state}
  end

  @impl true
  def handle_info(:receive_messages, state) do
    handle_receive_messages(%{state | receive_timer: nil})
  end

  @impl true
  def handle_info(_, state) do
    {:noreply, [], state}
  end

  @impl Producer
  def prepare_for_draining(%{receive_timer: receive_timer} = state) do
    receive_timer && Process.cancel_timer(receive_timer)
    {:noreply, [], %{state | receive_timer: nil}}
  end

  defp handle_receive_messages(%{receive_timer: nil, demand: demand} = state) when demand > 0 do
    messages = receive_messages_from_sqs(state, demand)
    new_demand = demand - length(messages)

    receive_timer =
      case {messages, new_demand} do
        {[], _} -> schedule_receive_messages(state.receive_interval)
        {_, 0} -> nil
        _ -> schedule_receive_messages(0)
      end

    {:noreply, messages, %{state | demand: new_demand, receive_timer: receive_timer}}
  end

  defp handle_receive_messages(state) do
    {:noreply, [], state}
  end

  defp receive_messages_from_sqs(state, total_demand) do
    %{sqs_client: {client, opts}} = state
    metadata = %{name: opts[:ack_ref], demand: total_demand}

    :telemetry.span(
      [:broadway_sqs, :receive_messages],
      metadata,
      fn ->
        messages = client.receive_messages(total_demand, opts)
        {messages, Map.put(metadata, :messages, messages)}
      end
    )
  end

  defp schedule_receive_messages(interval) do
    Process.send_after(self(), :receive_messages, interval)
  end
end