lib/off_broadway/producer.ex

with {:module, _} <- Code.ensure_compiled(Broadway) do
  defmodule OffBroadway.Jetstream.Producer do
    @moduledoc """
    A GenStage producer meant to work with [Broadway](https://github.com/dashbitco/broadway).
    It continuously receives messages from a NATS JetStream and acknowledges them after being
    successfully processed.

    ## Options

    ### Connection options

    The following options are mandatory.

    * `connection_name` - The name of Gnat process or Gnat connection supervisor.

    * `stream_name` - The name of stream to consume from.

    * `consumer_name` - The name of consumer.


    Optional options:

    * `connection_retry_timeout` - time in milliseconds, after which the failing
      connection will retry. Defaults to `1000`.

    * `connection_retries` - number of failing connection retries the producer will
      make before shutting down. Defaults to `10`.

    * `inbox_prefix` - custom prefix for listening topic. Defaults to `_INBOX.`.

    * `domain` - Jetstream domain.

    ### Message pulling options

    * `receive_interval` - The duration in milliseconds for which the producer waits
      before making a request for more messages. Defaults to `5000`.

    * `receive_timeout` - The maximum time to wait for NATS Jetstream to respond with
      a requested message. Defaults to `:infinity`.

    ### Acknowledger options

    These options are passed to the acknowledger on its initialization.

    * `:on_success` - Configures the behaviour for successful messages. Defaults to `:ack`.

    * `:on_failure` - Configures the behaviour for failed messages. Defaults to `:nack`.

    ## Acknowledgements

    By default, successful messages are acknowledged, and failed ones are nack'ed. You can
    change this behaviour by setting the `:on_success` or/and `:on_failure` options.

    You can also change the acknowledgement action for individual messages using the
    `Broadway.Message.configure_ack/2` function.

    The supported options are:

    * `:ack` - Acknowledges a message was completely handled.

    * `:nack` - Signals that the message will not be processed now and will be redelivered.

    * `:term` - Tells the server to stop redelivery of a message without acknowledging it.

    ## Example

    Example Broadway module definition:

    ```
    defmodule MyBroadway do
      use Broadway

      def start_link(_opts) do
        Broadway.start_link(
          __MODULE__,
          name: MyBroadway,
          producer: [
            module: {
              OffBroadway.Jetstream.Producer,
              connection_name: :gnat,
              stream_name: "TEST_STREAM",
              consumer_name: "TEST_CONSUMER"
            },
            concurrency: 10
          ],
          processors: [
            default: [concurrency: 10]
          ],
          batchers: [
            example: [
              concurrency: 5,
              batch_size: 10,
              batch_timeout: 2_000
            ]
          ]
        )
      end

      def handle_message(_processor_name, message, _context) do
        message
        |> Message.update_data(&process_data/1)
        |> Message.put_batcher(:example)
      end

      defp process_data(data) do
        # Some data processing
      end

      def handle_batch(:example, messages, _batch_info, _context) do
        # Do something with batch messages
      end

    end
    ```

    Learn more about available options in [Broadway documentation](https://hexdocs.pm/broadway/Broadway.html).

    Once you have your Broadway pipeline defined, you can add it to your supervision tree:

    ```
    children = [
      {MyBroadway, []}
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
    ```
    """

    require Logger

    use GenStage

    alias Broadway.Message
    alias Broadway.Producer
    alias Jetstream.PullConsumer.ConnectionOptions
    alias Jetstream.API.Util
    alias OffBroadway.Jetstream.Acknowledger

    @behaviour Producer

    @default_receive_interval 5_000
    @default_receive_timeout :infinity

    @impl Producer
    def prepare_for_start(_module, broadway_opts) do
      {producer_module, module_opts} = broadway_opts[:producer][:module]

      broadway_opts_with_defaults =
        put_in(broadway_opts, [:producer, :module], {producer_module, module_opts})

      {[], broadway_opts_with_defaults}
    end

    @impl true
    def init(opts) do
      receive_interval = opts[:receive_interval] || @default_receive_interval
      receive_timeout = opts[:receive_timeout] || @default_receive_timeout

      connection_options =
        opts
        |> Keyword.take([
          :connection_name,
          :stream_name,
          :consumer_name,
          :connection_retry_timeout,
          :connection_retries,
          :inbox_prefix,
          :domain
        ])
        |> ConnectionOptions.validate!()

      listening_topic = Util.reply_inbox(connection_options.inbox_prefix)

      case Acknowledger.init(opts) do
        {:ok, ack_ref} ->
          send(self(), :connect)

          {:producer,
           %{
             demand: 0,
             receive_timer: nil,
             receive_interval: receive_interval,
             receive_timeout: receive_timeout,
             connection_options: connection_options,
             connection_pid: nil,
             connection_retries_left: nil,
             subscription_id: nil,
             status: :disconnected,
             listening_topic: listening_topic,
             ack_ref: ack_ref
           }}

        {:error, message} ->
          raise ArgumentError, message
      end
    end

    @impl true
    def handle_demand(incoming_demand, %{demand: demand} = state) do
      handle_receive_messages(%{state | demand: demand + incoming_demand})
    end

    @impl true
    def handle_info(
          :connect,
          %{
            connection_options:
              %ConnectionOptions{
                connection_name: connection_name,
                connection_retries: retries,
                connection_retry_timeout: retry_timeout
              } = conn_options,
            listening_topic: listening_topic,
            connection_retries_left: retries_left
          } = state
        ) do
      Logger.debug(
        """
        #{__MODULE__} for #{conn_options.stream_name}.#{conn_options.consumer_name} \
        is connecting to Gnat.
        """,
        listening_topic: listening_topic,
        connection_name: connection_name
      )

      case Process.whereis(connection_name) do
        nil when retries_left > 0 ->
          Logger.debug(
            """
            #{__MODULE__} for #{conn_options.stream_name}.#{conn_options.consumer_name} \
            failed to connect to Gnat and will retry.
            """,
            listening_topic: listening_topic,
            connection_name: connection_name
          )

          retries_left = if retries_left, do: retries_left - 1, else: retries

          Process.send_after(self(), :connect, retry_timeout)
          {:noreply, [], %{state | connection_retries_left: retries_left}}

        nil ->
          Logger.error(
            """
            #{__MODULE__} for #{conn_options.stream_name}.#{conn_options.consumer_name} \
            failed to connect to NATS and retries limit has been exhausted. Stopping.
            """,
            listening_topic: listening_topic,
            connection_name: connection_name
          )

          {:stop, {:shutdown, :connection_failed}, state}

        connection_pid ->
          Process.monitor(connection_pid)

          {:ok, sid} = Gnat.sub(connection_name, self(), listening_topic)

          {:noreply, [],
           %{
             state
             | status: :connected,
               connection_pid: connection_pid,
               connection_retries_left: nil,
               subscription_id: sid
           }}
      end
    end

    def handle_info(:receive_messages, %{status: :disconnected} = state) do
      {:noreply, [], state}
    end

    def handle_info(:receive_messages, %{receive_timer: nil} = state) do
      {:noreply, [], state}
    end

    def handle_info(:receive_messages, state) do
      handle_receive_messages(%{state | receive_timer: nil})
    end

    def handle_info(
          {:DOWN, _ref, :process, connection_pid, _reason},
          %{
            connection_pid: connection_pid,
            connection_options:
              %ConnectionOptions{connection_retry_timeout: retry_timeout} = conn_options
          } = state
        ) do
      Logger.debug(
        """
        #{__MODULE__} for #{conn_options.stream_name}.#{conn_options.consumer_name} \
        NATS connection has died. Producer is reconnecting.
        """,
        listening_topic: state.listening_topic,
        subscription_id: state.subscription_id,
        connection_name: state.connection_options.connection_name
      )

      Process.send_after(self(), :connect, retry_timeout)
      {:noreply, [], %{state | status: :disconnected, connection_pid: nil}}
    end

    def handle_info(unexpected_message, %{connection_options: conn_options} = state) do
      Logger.debug(
        """
        #{__MODULE__} for #{conn_options.stream_name}.#{conn_options.consumer_name} \
        received unexpected message: #{inspect(unexpected_message, pretty: true)}
        """,
        listening_topic: state.listening_topic,
        subscription_id: state.subscription_id,
        connection_name: state.connection_options.connection_name
      )

      {:noreply, [], state}
    end

    @impl true
    def prepare_for_draining(%{receive_timer: receive_timer} = state) do
      receive_timer && Process.cancel_timer(receive_timer)
      {:noreply, [], %{state | receive_timer: nil}}
    end

    defp handle_receive_messages(%{receive_timer: nil, demand: demand} = state) when demand > 0 do
      messages = receive_messages_from_jetstream(state, demand)
      new_demand = demand - length(messages)

      receive_timer =
        case {messages, new_demand} do
          {[], _} -> schedule_receive_messages(state.receive_interval)
          {_, 0} -> nil
          _ -> schedule_receive_messages(0)
        end

      {:noreply, messages, %{state | demand: new_demand, receive_timer: receive_timer}}
    end

    defp handle_receive_messages(state) do
      {:noreply, [], state}
    end

    defp receive_messages_from_jetstream(state, total_demand) do
      request_messages_from_jetstream(total_demand, state)

      do_receive_messages(total_demand, state.listening_topic, state.receive_timeout)
      |> wrap_received_messages(state.ack_ref)
    end

    defp request_messages_from_jetstream(total_demand, state) do
      Jetstream.API.Consumer.request_next_message(
        state.connection_options.connection_name,
        state.connection_options.stream_name,
        state.connection_options.consumer_name,
        state.listening_topic,
        state.connection_options.domain,
        batch: total_demand,
        no_wait: true
      )
    end

    defp do_receive_messages(total_demand, listening_topic, receive_timeout) do
      Enum.reduce_while(1..total_demand, [], fn _, acc ->
        receive do
          {:msg, %{reply_to: "$JS.ACK" <> _} = msg} ->
            {:cont, [msg | acc]}

          {:msg, %{topic: ^listening_topic}} ->
            {:halt, acc}
        after
          receive_timeout ->
            {:halt, acc}
        end
      end)
    end

    defp wrap_received_messages(jetstream_messages, ack_ref) do
      Enum.map(jetstream_messages, &jetstream_msg_to_broadway_msg(&1, ack_ref))
    end

    defp jetstream_msg_to_broadway_msg(jetstream_message, ack_ref) do
      acknowledger = Acknowledger.builder(ack_ref).(jetstream_message.reply_to)

      %Message{
        data: jetstream_message.body,
        metadata: %{
          topic: jetstream_message.topic,
          headers: Map.get(jetstream_message, :headers, [])
        },
        acknowledger: acknowledger
      }
    end

    defp schedule_receive_messages(interval) do
      Process.send_after(self(), :receive_messages, interval)
    end
  end
end