lib/core/subscriber.ex

defmodule Polyn.Subscriber do
  @moduledoc """
  A GenServer wrapper to use when working with vanilla NATS subscriptions outside of JetStream.
  This process will hang around and listen for messages to come in and then trigger a `c:handle_message/3` callback

  ```elixir
  defmodule MySubscriber do
    use Polyn.Subscriber

    def start_link(init_args) do
      Polyn.Subscriber.start_link(__MODULE__, init_args,
        connection_name: :gnat,
        event: "user.created.v1")
    end

    def init(_arg) do
      {:ok, nil}
    end

    def handle_message(event, message, state) do
      # Do something cool with the event
      {:noreply, state}
    end
  end
  ```
  """

  use GenServer

  alias Polyn.Event
  alias Polyn.Serializers.JSON

  @type start_options :: GenServer.option() | {:connection_name, Gnat.t()} | {:event, binary()}

  @doc """
  Called when the subscribed event is published. Return the same values as you would for a
  `c:Genserver.handle_info/2` callback
  """
  @callback handle_message(event :: Event.t(), msg :: Gnat.message(), state :: any()) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout() | :hibernate | {:continue, term()}}
              | {:stop, reason :: term(), new_state}
            when new_state: term()

  @doc false
  defmacro __using__(opts) do
    quote location: :keep, bind_quoted: [opts: opts] do
      @behaviour Polyn.Subscriber
      use GenServer
    end
  end

  @doc """

  ## Options

  * `:connection_name` - Required, The pid or name of Gnat connection
  * `:event` - Required, The name of the event to subscribe to
  * `:queue_group` - a string that identifies which queue group you want to join

  Any other passed options will be assumed to be GenServer options
  """
  @spec start_link(module(), init_args :: any(), options :: [start_options()]) ::
          GenServer.on_start()
  def start_link(module, init_args, options) do
    {sub_opts, genserver_opts} =
      Keyword.split(options, [:connection_name, :event, :queue_group, :store_name])

    GenServer.start_link(__MODULE__, {module, init_args, sub_opts}, genserver_opts)
  end

  @impl true
  def init({module, init_args, opts}) do
    {opts, sub_opts} = Keyword.split(opts, [:connection_name, :event, :store_name])

    case sub(Keyword.fetch!(opts, :connection_name), Keyword.fetch!(opts, :event), sub_opts) do
      {:ok, subscription} ->
        on_subscribe_success(subscription, module, init_args, opts)

      {:error, reason} ->
        {:stop, reason}
    end
  end

  defp sub(conn, event, opts) do
    Gnat.sub(conn, self(), event, opts)
  end

  defp on_subscribe_success(subscription, module, init_args, opts) do
    case module.init(init_args) do
      {:ok, state} ->
        {:ok, initial_state(state, module, subscription, opts)}

      {:ok, state, other} ->
        {:ok, initial_state(state, module, subscription, opts), other}

      other ->
        other
    end
  end

  defp initial_state(state, module, subscription, opts) do
    %{
      state: state,
      module: module,
      subscription: subscription,
      opts: opts
    }
  end

  @impl true
  def handle_info({:msg, %{body: body} = msg}, internal_state) do
    event =
      JSON.deserialize!(
        body,
        Keyword.fetch!(internal_state.opts, :connection_name),
        internal_state.opts
      )

    case internal_state.module.handle_message(event, msg, internal_state.state) do
      {:noreply, state} ->
        {:noreply, Map.put(internal_state, :state, state)}

      {:noreply, state, other} ->
        {:noreply, Map.put(internal_state, :state, state), other}

      other ->
        other
    end
  end
end