lib/kalevala/communication/channel.ex

defmodule Kalevala.Communication.Channel do
  @moduledoc """
  A GenServer to handle channel communication
  """

  use GenServer

  alias Kalevala.Communication.Cache
  alias Kalevala.Event
  alias Kalevala.Event.Message

  @typedoc """
  Config is saved and stored in the GenServer, passed in for each callback
  """
  @type config() :: map()

  @typedoc """
  Options are for each subscribe/publish request

  This may contain information like charcater data
  """
  @type options() :: map()

  @typedoc """
  The "topic" in the channel pub/sub

  This might be a simple global channel name or something specific to rooms,
  maybe `rooms:id`.
  """
  @type channel_name() :: String.t()

  @doc """
  Called during initialization

  You can change any of the config you wish on start of the gen server
  """
  @callback init(config()) :: config()

  @doc """
  Called before a pid is allowed to subscribe to a channel
  """
  @callback subscribe_request(channel_name(), options(), config()) :: :ok

  @doc """
  Called before a pid is allowed to unsubscribe to a channel
  """
  @callback unsubscribe_request(channel_name(), options(), config()) :: :ok

  @doc """
  Called before a message is allowed to publish on the channel
  """
  @callback publish_request(channel_name(), Event.message(), options(), config()) :: :ok

  defstruct [:callback_module, :channel_name, :config, :subscriber_ets_key]

  defmacro __using__(_opts) do
    quote do
      @behaviour Kalevala.Communication.Channel

      @impl true
      def init(config), do: config

      @impl true
      def subscribe_request(_channel_name, _options, _config), do: :ok

      @impl true
      def unsubscribe_request(_channel_name, _options, _config), do: :ok

      @impl true
      def publish_request(_channel_name, _message, _options, _config), do: :ok

      defoverridable init: 1, subscribe_request: 3, unsubscribe_request: 3, publish_request: 4
    end
  end

  @doc """
  Publish a new message
  """
  def publish(pid, message, options) do
    GenServer.call(pid, {:publish, message, options})
  end

  @doc """
  Subscribe to a new channel

  Allows for the callback_module to reject subscription
  """
  def subscribe(pid, channel_name, subscriber_pid, options) do
    GenServer.call(pid, {:subscribe, channel_name, subscriber_pid, options})
  end

  @doc """
  Unsubscribe to a new channel

  Allows for the callback_module to reject unsubscription
  """
  def unsubscribe(pid, channel_name, subscriber_pid, options) do
    GenServer.call(pid, {:unsubscribe, channel_name, subscriber_pid, options})
  end

  @doc false
  def start_link(opts, genserver_opts \\ []) do
    GenServer.start_link(__MODULE__, opts, genserver_opts)
  end

  @impl true
  def init({channel_name, callback_module, options}) do
    state = %__MODULE__{
      callback_module: callback_module,
      channel_name: channel_name,
      config: callback_module.init(options[:config]),
      subscriber_ets_key: options[:subscriber_ets_key]
    }

    {:ok, state}
  end

  @impl true
  def handle_call({:publish, event = %Event{topic: Message}, options}, _from, state) do
    case state.callback_module.publish_request(state.channel_name, event, options, state.config) do
      :ok ->
        subscribers = Cache.subscribers(state.subscriber_ets_key, state.channel_name)

        Enum.each(subscribers, fn {_, subscriber_pid, _} ->
          send(subscriber_pid, event)
        end)

        {:reply, :ok, state}

      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end

  def handle_call({:subscribe, channel_name, subscriber_pid, options}, _from, state) do
    case state.callback_module.subscribe_request(channel_name, options, state.config) do
      :ok ->
        Process.monitor(subscriber_pid)
        :ets.insert(state.subscriber_ets_key, {channel_name, subscriber_pid, options})
        {:reply, :ok, state}

      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end

  def handle_call({:unsubscribe, channel_name, subscriber_pid, options}, _from, state) do
    case state.callback_module.unsubscribe_request(channel_name, options, state.config) do
      :ok ->
        :ets.match_delete(state.subscriber_ets_key, {channel_name, subscriber_pid, :_})
        {:reply, :ok, state}

      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, subscriber_pid, _reason}, state) do
    :ets.match_delete(state.subscriber_ets_key, {state.channel_name, subscriber_pid, :_})
    {:noreply, state}
  end
end