defmodule Wampex.Roles.Broker do
@moduledoc """
Handles requests and responses for a Broker
"""
alias Wampex.Role
alias Wampex.Roles.Peer.Error
alias Wampex.Roles.Publisher.Publish
alias Wampex.Roles.Subscriber.{Subscribe, Unsubscribe}
@behaviour Role
@error 8
@publish 16
@published 17
@subscribe 32
@subscribed 33
@unsubscribe 34
@unsubscribed 35
@event 36
defmodule Event do
@moduledoc false
@enforce_keys [:subscription_id, :publication_id]
defstruct [:subscription_id, :publication_id, arg_list: [], arg_kw: %{}, details: %{}]
@type t :: %__MODULE__{
subscription_id: integer(),
publication_id: integer(),
arg_list: list(any()),
arg_kw: map(),
details: map()
}
end
defmodule Subscribed do
@moduledoc false
@enforce_keys [:request_id, :subscription_id]
defstruct [:request_id, :subscription_id]
@type t :: %__MODULE__{
request_id: integer(),
subscription_id: integer()
}
end
defmodule Unsubscribed do
@moduledoc false
@enforce_keys [:request_id]
defstruct [:request_id]
@type t :: %__MODULE__{
request_id: integer()
}
end
defmodule Published do
@moduledoc false
@enforce_keys [:request_id, :publication_id]
defstruct [:request_id, :publication_id]
@type t :: %__MODULE__{
request_id: integer(),
publication_id: integer()
}
end
@impl true
def add(roles) do
Map.put(roles, :broker, %{})
end
@spec event(Event.t()) :: Wampex.message()
def event(%Event{
subscription_id: si,
publication_id: pi,
arg_list: al,
arg_kw: akw,
details: opts
}) do
[@event, si, pi, opts, al, akw]
end
@spec subscribed(Subscribed.t()) :: Wampex.message()
def subscribed(%Subscribed{request_id: ri, subscription_id: si}) do
[@subscribed, ri, si]
end
@spec subscribe_error(Error.t()) :: Wampex.message()
def subscribe_error(%Error{request_id: rid, error: er, details: dets}) do
[@error, @subscribe, rid, dets, er]
end
@spec unsubscribed(Unsubscribed.t()) :: Wampex.message()
def unsubscribed(%Unsubscribed{request_id: ri}) do
[@unsubscribed, ri]
end
@spec published(Published.t()) :: Wampex.message()
def published(%Published{request_id: ri, publication_id: pi}) do
[@published, ri, pi]
end
@spec publish_error(Error.t()) :: Wampex.message()
def publish_error(%Error{request_id: ri, error: error, details: dets}) do
[@error, @publish, ri, dets, error]
end
@impl true
def handle([@subscribe, request_id, opts, topic]) do
{[{:next_event, :internal, :subscribe}], request_id,
{:update, :subscribe, %Subscribe{request_id: request_id, options: opts, topic: topic}}}
end
@impl true
def handle([@unsubscribe, request_id, id]) do
{[{:next_event, :internal, :unsubscribe}], request_id,
{:update, :unsubscribe, %Unsubscribe{request_id: request_id, subscription_id: id}}}
end
@impl true
def handle([@publish, id, opts, topic]) do
handle([@publish, id, opts, topic, [], %{}])
end
@impl true
def handle([@publish, id, opts, topic, arg_l]) do
handle([@publish, id, opts, topic, arg_l, %{}])
end
@impl true
def handle([@publish, id, opts, topic, arg_l, arg_kw]) do
{[{:next_event, :internal, :publish}], id,
{:update, :publish,
%Publish{request_id: id, options: opts, topic: topic, arg_list: arg_l, arg_kw: arg_kw}}}
end
end