lib/roles/broker.ex

defmodule Wampex.Roles.Broker do
  @moduledoc """
  Handles requests and responses for a Broker
  """
  alias Wampex.Role
  alias Wampex.Roles.Peer.Error
  alias Wampex.Roles.Publisher.Publish
  alias Wampex.Roles.Subscriber.{Subscribe, Unsubscribe}
  @behaviour Role

  @error 8
  @publish 16
  @published 17
  @subscribe 32
  @subscribed 33
  @unsubscribe 34
  @unsubscribed 35
  @event 36

  defmodule Event do
    @moduledoc false
    @enforce_keys [:subscription_id, :publication_id]
    defstruct [:subscription_id, :publication_id, arg_list: [], arg_kw: %{}, details: %{}]

    @type t :: %__MODULE__{
            subscription_id: integer(),
            publication_id: integer(),
            arg_list: list(any()),
            arg_kw: map(),
            details: map()
          }
  end

  defmodule Subscribed do
    @moduledoc false
    @enforce_keys [:request_id, :subscription_id]
    defstruct [:request_id, :subscription_id]

    @type t :: %__MODULE__{
            request_id: integer(),
            subscription_id: integer()
          }
  end

  defmodule Unsubscribed do
    @moduledoc false
    @enforce_keys [:request_id]
    defstruct [:request_id]

    @type t :: %__MODULE__{
            request_id: integer()
          }
  end

  defmodule Published do
    @moduledoc false
    @enforce_keys [:request_id, :publication_id]
    defstruct [:request_id, :publication_id]

    @type t :: %__MODULE__{
            request_id: integer(),
            publication_id: integer()
          }
  end

  @impl true
  def add(roles) do
    Map.put(roles, :broker, %{})
  end

  @spec event(Event.t()) :: Wampex.message()
  def event(%Event{
        subscription_id: si,
        publication_id: pi,
        arg_list: al,
        arg_kw: akw,
        details: opts
      }) do
    [@event, si, pi, opts, al, akw]
  end

  @spec subscribed(Subscribed.t()) :: Wampex.message()
  def subscribed(%Subscribed{request_id: ri, subscription_id: si}) do
    [@subscribed, ri, si]
  end

  @spec subscribe_error(Error.t()) :: Wampex.message()
  def subscribe_error(%Error{request_id: rid, error: er, details: dets}) do
    [@error, @subscribe, rid, dets, er]
  end

  @spec unsubscribed(Unsubscribed.t()) :: Wampex.message()
  def unsubscribed(%Unsubscribed{request_id: ri}) do
    [@unsubscribed, ri]
  end

  @spec published(Published.t()) :: Wampex.message()
  def published(%Published{request_id: ri, publication_id: pi}) do
    [@published, ri, pi]
  end

  @spec publish_error(Error.t()) :: Wampex.message()
  def publish_error(%Error{request_id: ri, error: error, details: dets}) do
    [@error, @publish, ri, dets, error]
  end

  @impl true
  def handle([@subscribe, request_id, opts, topic]) do
    {[{:next_event, :internal, :subscribe}], request_id,
     {:update, :subscribe, %Subscribe{request_id: request_id, options: opts, topic: topic}}}
  end

  @impl true
  def handle([@unsubscribe, request_id, id]) do
    {[{:next_event, :internal, :unsubscribe}], request_id,
     {:update, :unsubscribe, %Unsubscribe{request_id: request_id, subscription_id: id}}}
  end

  @impl true
  def handle([@publish, id, opts, topic]) do
    handle([@publish, id, opts, topic, [], %{}])
  end

  @impl true
  def handle([@publish, id, opts, topic, arg_l]) do
    handle([@publish, id, opts, topic, arg_l, %{}])
  end

  @impl true
  def handle([@publish, id, opts, topic, arg_l, arg_kw]) do
    {[{:next_event, :internal, :publish}], id,
     {:update, :publish,
      %Publish{request_id: id, options: opts, topic: topic, arg_list: arg_l, arg_kw: arg_kw}}}
  end
end