lib/broadway_sqs/options.ex

defmodule BroadwaySQS.Options do
  @moduledoc """
  Broadway Sqs Option definitions and custom validators.
  """

  def definition() do
    [
      queue_url: [
        required: true,
        type: {
          :custom,
          __MODULE__,
          :type_non_empty_string,
          [[{:name, :queue_url}]]
        },
        doc: """
        The url for the SQS queue. *Note this does not have to be a
        regional endpoint*. For example, `https://sqs.amazonaws.com/0000000000/my_queue`.
        """
      ],
      sqs_client: [
        doc: """
        A module that implements the `BroadwaySQS.SQSClient`
        behaviour. This module is responsible for fetching and acknowledging the
        messages. Pay attention that all options passed to the producer will be forwarded
        to the client.
        """,
        default: BroadwaySQS.ExAwsClient
      ],
      receive_interval: [
        type: :non_neg_integer,
        doc: """
        The duration (in milliseconds) for which the producer
        waits before making a request for more messages.
        """,
        default: 5000
      ],
      on_success: [
        type: :atom,
        default: :ack,
        doc: """
        configures the acking behaviour for successful messages. See the
        "Acknowledgments" section below for all the possible values.
        """
      ],
      on_failure: [
        type: :atom,
        default: :noop,
        doc: """
        configures the acking behaviour for failed messages. See the
        "Acknowledgments" section below for all the possible values.
        """
      ],
      config: [
        type: :keyword_list,
        default: [],
        doc: """
        A set of options that overrides the default ExAws configuration
        options. The most commonly used options are: `:access_key_id`, `:secret_access_key`,
        `:scheme`, `:region` and `:port`. For a complete list of configuration options and
        their default values, please see the `ExAws` documentation.
        """
      ],
      max_number_of_messages: [
        type: {
          :custom,
          __MODULE__,
          :type_bounded_integer,
          [[{:name, :max_number_of_messages}, {:min, 1}, {:max, 10}]]
        },
        default: 10,
        doc: """
        The maximum number of messages to be fetched
        per request. This value must be between `1` and `10`, which is the maximum number
        allowed by AWS.
        """
      ],
      wait_time_seconds: [
        type: {
          :custom,
          __MODULE__,
          :type_bounded_integer,
          [[{:name, :wait_time_seconds}, {:min, 0}, {:max, 20}]]
        },
        doc: """
        The duration (in seconds) for which the call waits
        for a message to arrive in the queue before returning. This value must be
        between `0` and `20`, which is the maximum number allowed by AWS. For more
        information see ["WaitTimeSeconds" on the Amazon SQS documentation](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html).
        """
      ],
      visibility_timeout: [
        type: {
          :custom,
          __MODULE__,
          :type_bounded_integer,
          [[{:name, :visibility_timeout}, {:min, 0}, {:max, 43200}]]
        },
        doc: """
        The time period (in seconds) that a message will
        remain _invisible_ to other consumers whilst still on the queue and not acknowledged.
        This is passed to SQS when the message (or messages) are read.
        This value must be between 0 and 43200 (12 hours).
        """
      ],
      attribute_names: [
        type: {
          :custom,
          __MODULE__,
          :type_list_limited_member_or_all_atom,
          [
            [
              {:name, :attribute_names},
              {:allowed_members,
               [
                 :sender_id,
                 :sent_timestamp,
                 :approximate_receive_count,
                 :approximate_first_receive_timestamp,
                 :sequence_number,
                 :message_deduplication_id,
                 :message_group_id,
                 :aws_trace_header
               ]}
            ]
          ]
        },
        doc: """
        A list containing the names of attributes that should be
        attached to the response and appended to the `metadata`
        field of the message. You can also use `:all` instead of
        a list if you want to retrieve all attributes. Supported
        values are:

            * `:sender_id`
            * `:sent_timestamp`
            * `:approximate_receive_count`
            * `:approximate_first_receive_timestamp`
            * `:sequence_number`
            * `:message_deduplication_id`
            * `:message_group_id`
            * `:aws_trace_header`

        """
      ],
      message_attribute_names: [
        type: {
          :custom,
          __MODULE__,
          :type_array_non_empty_string_or_all_atom,
          [[{:name, :queue_url}]]
        },
        doc: """
        A list containing the names of custom message attributes
        that should be attached to the response and appended to the `metadata` field of the
        message. Wildcards `[".*"]` and prefixes `["bar.*"]` will retrieve multiple fields.
        You can also use `:all` instead of the list if you want to retrieve all attributes.
        """
      ],
      test_pid: [
        type: :pid,
        doc: false
      ],
      message_server: [
        type: :pid,
        doc: false
      ]
    ]
  end

  def type_bounded_integer(value, [{:name, _}, {:min, min}, {:max, max}])
      when is_integer(value) and value >= min and value <= max do
    {:ok, value}
  end

  def type_bounded_integer(value, [{:name, name}, {:min, min}, {:max, max}]) do
    {:error,
     "expected :#{name} to be an integer between #{min} and #{max}, got: #{inspect(value)}"}
  end

  def type_array_non_empty_string_or_all_atom(:all, _) do
    {:ok, :all}
  end

  def type_array_non_empty_string_or_all_atom(value, [{:name, name}]) do
    invalid_members =
      value
      |> Enum.filter(&(is_nil(&1) || &1 == "" || !is_binary(&1)))

    if invalid_members == [] do
      {:ok, value}
    else
      {:error,
       "expected :#{name} to be a list with non-empty strings, got: #{inspect(invalid_members)}"}
    end
  end

  def type_non_empty_string("", [{:name, name}]) do
    {:error, "expected :#{name} to be a non-empty string, got: \"\""}
  end

  def type_non_empty_string(value, _)
      when not is_nil(value) and is_binary(value) do
    {:ok, value}
  end

  def type_non_empty_string(value, [{:name, name}]) do
    {:error, "expected :#{name} to be a non-empty string, got: #{inspect(value)}"}
  end

  def type_list_limited_member_or_all_atom(:all, _) do
    {:ok, :all}
  end

  def type_list_limited_member_or_all_atom(value, [
        {:name, name},
        {:allowed_members, allowed_members}
      ]) do
    if value -- allowed_members == [] do
      {:ok, value}
    else
      {:error,
       "expected :#{name} to be a list with possible members #{inspect(allowed_members)}, got: #{
         inspect(value)
       }"}
    end
  end
end