defmodule Wampex.Client do
@moduledoc """
Documentation for Wampex.
"""
use Supervisor
alias Wampex.Client.Session, as: Sess
alias Wampex.Roles.{Callee, Subscriber}
alias Wampex.Roles.Callee.{Register, Unregister, Yield}
alias Wampex.Roles.Caller
alias Wampex.Roles.Caller.Call
alias Wampex.Roles.Dealer.Result
alias Wampex.Roles.Peer
alias Wampex.Roles.Peer.Error
alias Wampex.Roles.Publisher
alias Wampex.Roles.Publisher.Publish
alias Wampex.Roles.Subscriber.{Subscribe, Unsubscribe}
@spec start_link(name: atom(), session_data: Sess.t(), reconnect: boolean()) ::
{:ok, pid()}
| {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
def start_link(name: name, session: session_data, reconnect: reconnect) when is_atom(name) do
Supervisor.start_link(__MODULE__, {name, session_data, reconnect}, name: name)
end
@spec init({atom(), Sess.t(), boolean()}) ::
{:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
def init(
{name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data,
reconnect}
) do
subscriber_registry = subscriber_registry_name(name)
callee_registry = callee_registry_name(name)
dependent_registry = dependent_registry_name(name)
session_data = %Sess{session_data | name: name, roles: [Peer | session_data.roles]}
children = [
{Registry,
[keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
{Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]},
{Registry,
[keys: :unique, name: dependent_registry, partitions: System.schedulers_online()]},
{t,
name: name,
url: url,
session_data: session_data,
protocol: p,
serializer: s,
reconnect: reconnect}
]
Supervisor.init(children, strategy: :one_for_one)
end
@spec session_name(module()) :: module()
def session_name(name), do: Module.concat([name, Session])
@spec subscriber_registry_name(module()) :: module()
def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
@spec callee_registry_name(module()) :: module()
def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
@spec dependent_registry_name(module()) :: module()
def dependent_registry_name(name), do: Module.concat([name, DependentRegistry])
@spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
{:ok, integer()}
def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
case sync(name, Subscriber.subscribe(sub), timeout) do
{:ok, id} ->
Registry.register(subscriber_registry_name(name), id, t)
{:ok, id}
er ->
er
end
end
@spec unsubscribe(name :: module(), subscription :: Unsubscribe.t(), timeout :: integer()) ::
:ok
def unsubscribe(name, %Unsubscribe{subscription_id: si} = unsub, timeout \\ 5000) do
case sync(name, Subscriber.unsubscribe(unsub), timeout) do
:ok ->
Registry.unregister(subscriber_registry_name(name), si)
:ok
er ->
er
end
end
@spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
{:ok, integer()}
def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
case sync(name, Callee.register(reg), timeout) do
{:ok, id} ->
Registry.register(callee_registry_name(name), id, p)
{:ok, id}
er ->
er
end
end
@spec unregister(name :: module(), registration :: Unregister.t(), timeout :: integer()) ::
:ok
def unregister(name, %Unregister{registration_id: ri} = unreg, timeout \\ 5000) do
case sync(name, Callee.unregister(unreg), timeout) do
:ok ->
Registry.unregister(callee_registry_name(name), ri)
:ok
er ->
er
end
end
@spec add(name :: module(), pid :: module() | pid()) :: {:ok, pid()} | {:error, term()}
def add(name, pid) do
case session_exists(name) do
false ->
%Error{error: "wamp.error.no_session"}
sess_name ->
case Sess.connected?(sess_name) do
true ->
send(pid, {:connected, name})
false ->
:noop
end
Registry.register(dependent_registry_name(name), pid, pid)
end
end
@spec remove(name :: module(), pid :: module() | pid()) :: :ok
def remove(name, pid) do
Registry.unregister(dependent_registry_name(name), pid)
end
@spec yield(name :: module(), yield :: Yield.t()) :: :ok
def yield(name, yield) do
cast(name, Callee.yield(yield))
end
@spec call(name :: module(), call :: Call.t(), timeout :: integer) :: Result.t() | Error.t()
def call(name, call, timeout \\ 5000) do
sync(name, Caller.call(call), timeout)
end
@spec error(name :: module(), error :: Error.t()) :: :ok
def error(name, error) do
sync(name, Callee.invocation_error(error))
end
@spec publish(name :: module(), event :: Publish.t()) :: :ok
def publish(name, event) do
cast(name, Publisher.publish(event))
end
@spec sync(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
term()
def sync(name, request, timeout \\ 5000) do
case session_exists(name) do
false ->
%Error{error: "wamp.error.no_session"}
sess_name ->
Sess.send_request(sess_name, request, timeout)
end
end
@spec cast(name :: module(), Wampex.message()) :: :ok
def cast(name, request) do
case session_exists(name) do
false ->
%Error{error: "wamp.error.no_session"}
sess_name ->
Sess.cast_send_request(sess_name, request)
end
end
defp session_exists(name) do
name = session_name(name)
case Process.whereis(name) do
nil -> false
_pid -> name
end
end
end