lib/yggdrasil/subscriber/publisher.ex

defmodule Yggdrasil.Subscriber.Publisher do
  @moduledoc """
  A server to distribute the messages.
  """
  use GenServer

  alias Yggdrasil.Backend
  alias Yggdrasil.Channel
  alias Yggdrasil.Transformer

  require Logger

  ############
  # Client API

  @doc """
  Starts a server to distribute messages in a `channel`. Additionally can
  receive `GenServer` `options`.
  """
  @spec start_link(Channel.t()) :: GenServer.on_start()
  @spec start_link(Channel.t(), GenServer.options()) :: GenServer.on_start()
  def start_link(channel, options \\ [])

  def start_link(%Channel{} = channel, options) do
    GenServer.start_link(__MODULE__, channel, options)
  end

  @doc """
  Stops a `publisher`.
  """
  @spec stop(GenServer.name()) :: :ok
  def stop(publisher)

  def stop(publisher) do
    GenServer.stop(publisher)
  end

  @doc """
  Notifies synchronously of a new `message` coming from a `channel`.
  """
  @spec notify(
          channel :: Channel.t(),
          message :: term()
        ) :: :ok | {:error, term()}
  def notify(%Channel{name: name} = channel, message, metadata \\ nil) do
    publisher = ExReg.local({__MODULE__, channel})
    notify(publisher, name, message, metadata)
  end

  @doc """
  Notifies synchronously of a new `message` coming from a `channel_name` to a
  `publisher` with some `metadata`.
  """
  @spec notify(
          publisher :: GenServer.name(),
          channel_name :: term(),
          message :: term(),
          metadata :: term()
        ) :: :ok | {:error, term()}
  def notify(publisher, channel_name, message, metadata) do
    GenServer.call(publisher, {:notify, channel_name, message, metadata})
  end

  #####################
  # GenServer callbacks

  @impl true
  def init(%Channel{} = channel) do
    Logger.debug(fn -> "Started #{__MODULE__} for #{inspect(channel)}" end)
    {:ok, channel}
  end

  @impl true
  def handle_call(
        {:notify, channel_name, message, metadata},
        _from,
        %Channel{} = channel
      ) do
    real_channel = %Channel{channel | name: channel_name}

    result =
      with {:ok, decoded} <- Transformer.decode(real_channel, message) do
        Backend.publish(real_channel, decoded, metadata)
      end

    {:reply, result, channel}
  end

  def handle_call(_msg, _from, %Channel{} = channel) do
    {:noreply, channel}
  end

  @impl true
  def terminate(:normal, %Channel{} = channel) do
    Logger.debug(fn -> "Stopped #{__MODULE__} for #{inspect(channel)}" end)
  end

  def terminate(reason, %Channel{} = channel) do
    Logger.warn(fn ->
      "Stopped #{__MODULE__} for #{inspect(channel)} due to #{inspect(reason)}"
    end)
  end
end