lib/core/pull_consumer.ex

defmodule Polyn.PullConsumer do
  @moduledoc """
  Use `Polyn.PullConsumer` to connect and process messages from an existing [NATS Consumer](https://docs.nats.io/nats-concepts/jetstream/consumers)
  that was setup with [Polyn CLI](https://github.com/SpiffInc/polyn-cli). This module is a
  wrapper around `Jetstream.PullConsumer` that does schema validation with the received messages.
  A key difference that Polyn adds is that the `:consumer_name` will be taken care of for you
  by using the passed `type` and configured `:source_root`. You can pass `:source` to `start_link/3`
  to get a more specific `:consumer_name`. This type of Consumer is meant for simple use cases that
  don't involve concurrency or batching.

  ## Example

      defmodule MyApp.PullConsumer do
        use Polyn.PullConsumer

        def start_link(arg) do
          Polyn.PullConsumer.start_link(__MODULE__, arg,
            connection_name: :gnat,
            type: "user.created.v1")
        end

        @impl true
        def init(_arg) do
          {:ok, nil}
        end

        @impl true
        def handle_message(message, state) do
          # Do some processing with the message.
          {:ack, state}
        end
      end
  """

  use Jetstream.PullConsumer
  alias Polyn.Serializers.JSON

  defstruct [:module, :state, :store_name, :connection_name, :type, :source]

  @doc """
  Invoked when the server is started. `start_link/3` or `start/3` will block until it returns.

  `init_arg` is the argument term (second argument) passed to `start_link/3`.

  See `c:Connection.init/1` for more details.
  """
  @callback init(init_arg :: term) ::
              {:ok, state :: term()}
              | :ignore
              | {:stop, reason :: any}

  @doc """
  Invoked to synchronously process a message pulled by the consumer.
  Depending on the value it returns, the acknowledgement is or is not sent.
  Polyn will deserialize the message body into a `Polyn.Event` struct and use
  that as the first argument, followed by the original message, follwed by the state.

  ## ACK actions

  See `c:Jetstream.PullConsumer.handle_message/2` for available options

  ## Example

      def handle_message(event, _message, state) do
        IO.inspect(event)
        {:ack, state}
      end

  """
  @callback handle_message(
              event :: Polyn.Event.t(),
              message :: Jetstream.message(),
              state :: term()
            ) ::
              {ack_action, new_state}
            when ack_action: :ack | :nack | :term | :noreply, new_state: term()

  @typedoc """
  Options for starting a Polyn.PullConsumer

  * `:type` - Required. The event type to consume
  * `:connection_name` - Required. The Gnat connection identifier
  * `:source` - Optional. More specific name for the consumer to add to the `:source_root`
  * All other options will be assumed to be GenServer options
  """
  @type option ::
          {:type, binary()}
          | {:source, binary()}
          | {:connection_name, Gnat.t()}
          | GenServer.option()

  @doc false
  defmacro __using__(opts) do
    quote location: :keep, bind_quoted: [opts: opts] do
      @behaviour Polyn.PullConsumer

      @spec child_spec(arg :: GenServer.options()) :: Supervisor.child_spec()
      def child_spec(arg) do
        default = %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, [arg]}
        }

        Supervisor.child_spec(default, unquote(Macro.escape(opts)))
      end

      defoverridable child_spec: 1
    end
  end

  defp new(module, opts) do
    opts = Keyword.put(opts, :module, module)
    struct(__MODULE__, opts)
  end

  @doc """
  Starts a pull consumer linked to the current process with the given function.

  This is often used to start the pull consumer as part of a supervision tree.

  Once the server is started, the `c:init/1` function of the given `module` is called with
  `init_arg` as its argument to initialize the server. To ensure a synchronized start-up procedure,
  this function does not return until `c:init/1` has returned.

  See `GenServer.start_link/3` for more details.

  ## Example

      {:ok, consumer} =
        Polyn.PullConsumer.start_link(ExamplePullConsumer, %{initial_arg: "foo"},
          connection_name: :gnat,
          type: "user.updated.v1",
          stream: "TEST_STREAM",
        )
  """
  @spec start_link(module(), init_arg :: term(), options :: [option()]) ::
          GenServer.on_start()
  def start_link(module, init_arg, opts \\ []) when is_atom(module) and is_list(opts) do
    pull_consumer(opts).start_link(
      __MODULE__,
      {new(module, opts), init_arg},
      opts
    )
  end

  @doc """
  Starts a `Jetstream.PullConsumer` process without links (outside of a supervision tree).

  See `start_link/3` for more information.
  """
  @spec start(module(), init_arg :: term(), options :: [option()]) ::
          GenServer.on_start()
  def start(module, init_arg, opts \\ []) when is_atom(module) and is_list(opts) do
    pull_consumer(opts).start(__MODULE__, {new(module, opts), init_arg}, opts)
  end

  @doc """
  Closes the pull consumer and stops underlying process.

  ## Example

      {:ok, consumer} =
        PullConsumer.start_link(ExamplePullConsumer, %{initial_arg: "foo"},
          connection_name: :gnat,
          type: "user.updated.v1",
          stream: "TEST_STREAM",
        )

      :ok = PullConsumer.close(consumer)

  """
  @spec close(consumer :: Jetstream.PullConsumer.consumer()) :: :ok
  def close(consumer) do
    pull_consumer().close(consumer)
  end

  @impl Jetstream.PullConsumer
  def init({%{module: module} = internal_state, init_arg}) do
    case module.init(init_arg) do
      {:ok, state} ->
        # Keep the `module` in the internal state so we can know
        # what functions to call
        internal_state = %{internal_state | state: state}

        {:ok, internal_state, connection_options(internal_state)}

      other ->
        other
    end
  end

  @impl Jetstream.PullConsumer
  def handle_message(message, %{module: module, state: state} = internal_state) do
    case JSON.deserialize(message.body, store_name: internal_state.store_name) do
      {:ok, event} ->
        {response, state} = module.handle_message(event, message, state)

        {response, %{internal_state | state: state}}

      {:error, error} ->
        # If a validation error happens we want to tell NATS to stop sending the message
        # and that it won't be processed (ACKTERM) and will prevent us from raising the
        # same error over and over.
        Jetstream.ack_term(message)

        raise Polyn.ValidationException, error
    end
  end

  defp connection_options(%{
         type: type,
         source: source,
         connection_name: connection_name
       }) do
    consumer_name = Polyn.Naming.consumer_name(type, source)
    stream = Polyn.Naming.lookup_stream_name!(connection_name, type)
    [connection_name: connection_name, stream_name: stream, consumer_name: consumer_name]
  end

  defp pull_consumer(opts \\ []) do
    if sandbox(opts), do: Polyn.Jetstream.MockPullConsumer, else: Polyn.Jetstream.PullConsumer
  end

  # If Application config dictates "sandbox mode" we prioritize that
  # otherwise we defer to a passed in option
  defp sandbox(opts) do
    case Application.get_env(:polyn, :sandbox) do
      nil -> Keyword.get(opts, :sandbox, false)
      other -> other
    end
  end
end