lib/envio/subscriber.ex

defmodule Envio.Subscriber do
  @moduledoc """
  Subscriber helper scaffold.

  To easy register the `pub_sub` consumer in `Envio.Registry`, one might
  use this helper to scaffold the registering/unregistering code.
  It turns the module into the `GenServer` and provides the handy wrapper
  for the respective `handle_info/2`. One might override
  `handle_envio` to implement custom handling.

  The typical usage would be:

  ```elixir
  defmodule PubSubscriber do
    use Envio.Subscriber, channels: [{PubPublisher, :foo}]

    def handle_envio(message, state) do
      with {:noreply, state} <- super(message, state) do
        IO.inspect({message, state}, label: "Received message")
        {:noreply, state}
      end
    end
  end
  ```

  If channels are not specified as a parameter in call to `use Envio.Subscriber`,
  this module might subscribe to any publisher later with `subscribe/1`:

  ```elixir
  PubSubscriber.subscribe(%Envio.Channel{source: PubPublisher, name: :foo})
  ```

  For how to publish, see `Envio.Publisher`.
  """

  @doc """
  The callback to subscribe stuff to `Envio`.
  """
  @callback handle_envio(message :: :timeout | term(), state :: Envio.State.t()) ::
              {:noreply, new_state}
              | {:noreply, new_state, timeout() | :hibernate | {:continue, term()}}
              | {:stop, reason :: term(), new_state}
            when new_state: Envio.State.t()

  defmacro __using__(opts \\ []) do
    {manager, opts} = Keyword.pop(opts, :manager, :registry)
    manager = Module.concat(["Envio", "Subscribers", Macro.camelize("#{manager}")])

    quote location: :keep, generated: true do
      require Logger

      use GenServer

      @behaviour Envio.Subscriber
      @after_compile {Envio.Utils, :subscriber_finalizer}

      use unquote(manager), unquote(opts)

      @doc """
      Helper generated by the `Envio.Subscriber` scaffold. This `pub_sub`
      `GenServer` might be started by invoking `#{__MODULE__}.start_link`.
      """
      @spec start_link(opts :: keyword()) :: GenServer.on_start()
      def start_link(opts \\ []),
        do: GenServer.start_link(__MODULE__, %Envio.State{options: opts}, name: __MODULE__)

      @impl GenServer
      @doc false
      def init(%Envio.State{} = state),
        do: {:ok, state, {:continue, :connect}}

      @impl GenServer
      @doc false
      def handle_continue(:connect, %Envio.State{} = state),
        do: {:noreply, do_subscribe(state)}

      @spec state :: Envio.State.t()
      @doc """
      Returns the state of this process, including all the subscriptions,
      last messages processed, the PID of the underlying `Phoenix.PubSub` etc.
      """
      def state, do: GenServer.call(__MODULE__, :state)

      @impl GenServer
      @doc false
      def handle_call(:state, _from, %Envio.State{} = state),
        do: {:reply, state, state}

      @namespace Macro.underscore(__MODULE__)
      @fq_joiner "."
      @max_messages Application.compile_env(:envio, :subscriber_queue_size, 10)

      @impl Envio.Subscriber
      @doc """
      Default implementation of the callback invoked when the message is received.
      """
      def handle_envio(message, state) do
        messages =
          case Enum.count(state.messages) do
            n when n > @max_messages ->
              with [_ | tail] <- :lists.reverse(state.messages),
                   do: :lists.reverse([message | tail])

            _ ->
              [message | state.messages]
          end

        {:noreply, %Envio.State{state | messages: messages}}
      end

      defoverridable handle_envio: 2

      ##########################################################################

      @spec subscribe(channel :: Envio.Channel.t() | [Envio.Channel.t()]) ::
              {:ok, Envio.State.t()}
      @doc """
      Subscribes to the channel(s) given in a runtime.
      """
      def subscribe(%Envio.Channel{} = channel), do: subscribe([channel])

      def subscribe([_ | _] = channels),
        do: {:ok, GenServer.call(__MODULE__, {:subscribe, channels})}

      @impl GenServer
      @doc false
      def handle_call({:subscribe, [_ | _] = channels}, _from, %Envio.State{} = state),
        do: {:reply, :ok, do_subscribe(channels, state)}

      @impl GenServer
      @doc false
      def handle_info({:envio, {_channel, message}}, state),
        do: handle_envio(message, state)
    end
  end
end