lib/client.ex

defmodule Wampex.Client do
  @moduledoc """
  Documentation for Wampex.
  """
  use Supervisor

  alias Wampex.Client.Session, as: Sess
  alias Wampex.Roles.{Callee, Subscriber}
  alias Wampex.Roles.Callee.{Register, Unregister, Yield}
  alias Wampex.Roles.Caller
  alias Wampex.Roles.Caller.Call
  alias Wampex.Roles.Dealer.Result
  alias Wampex.Roles.Peer
  alias Wampex.Roles.Peer.Error
  alias Wampex.Roles.Publisher
  alias Wampex.Roles.Publisher.Publish
  alias Wampex.Roles.Subscriber.{Subscribe, Unsubscribe}

  @spec start_link(name: atom(), session_data: Sess.t(), reconnect: boolean()) ::
          {:ok, pid()}
          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}
  def start_link(name: name, session: session_data, reconnect: reconnect) when is_atom(name) do
    Supervisor.start_link(__MODULE__, {name, session_data, reconnect}, name: name)
  end

  @spec init({atom(), Sess.t(), boolean()}) ::
          {:ok, {:supervisor.sup_flags(), [:supervisor.child_spec()]}} | :ignore
  def init(
        {name, %Sess{url: url, transport: t, serializer: s, protocol: p} = session_data,
         reconnect}
      ) do
    subscriber_registry = subscriber_registry_name(name)
    callee_registry = callee_registry_name(name)
    dependent_registry = dependent_registry_name(name)
    session_data = %Sess{session_data | name: name, roles: [Peer | session_data.roles]}

    children = [
      {Registry,
       [keys: :duplicate, name: subscriber_registry, partitions: System.schedulers_online()]},
      {Registry, [keys: :unique, name: callee_registry, partitions: System.schedulers_online()]},
      {Registry,
       [keys: :unique, name: dependent_registry, partitions: System.schedulers_online()]},
      {t,
       name: name,
       url: url,
       session_data: session_data,
       protocol: p,
       serializer: s,
       reconnect: reconnect}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  @spec session_name(module()) :: module()
  def session_name(name), do: Module.concat([name, Session])
  @spec subscriber_registry_name(module()) :: module()
  def subscriber_registry_name(name), do: Module.concat([name, SubscriberRegistry])
  @spec callee_registry_name(module()) :: module()
  def callee_registry_name(name), do: Module.concat([name, CalleeRegistry])
  @spec dependent_registry_name(module()) :: module()
  def dependent_registry_name(name), do: Module.concat([name, DependentRegistry])

  @spec subscribe(name :: module(), subscription :: Subscribe.t(), timeout :: integer()) ::
          {:ok, integer()}
  def subscribe(name, %Subscribe{topic: t} = sub, timeout \\ 5000) do
    case sync(name, Subscriber.subscribe(sub), timeout) do
      {:ok, id} ->
        Registry.register(subscriber_registry_name(name), id, t)
        {:ok, id}

      er ->
        er
    end
  end

  @spec unsubscribe(name :: module(), subscription :: Unsubscribe.t(), timeout :: integer()) ::
          :ok
  def unsubscribe(name, %Unsubscribe{subscription_id: si} = unsub, timeout \\ 5000) do
    case sync(name, Subscriber.unsubscribe(unsub), timeout) do
      :ok ->
        Registry.unregister(subscriber_registry_name(name), si)
        :ok

      er ->
        er
    end
  end

  @spec register(name :: module(), register :: Register.t(), timeout :: integer()) ::
          {:ok, integer()}
  def register(name, %Register{procedure: p} = reg, timeout \\ 5000) do
    case sync(name, Callee.register(reg), timeout) do
      {:ok, id} ->
        Registry.register(callee_registry_name(name), id, p)
        {:ok, id}

      er ->
        er
    end
  end

  @spec unregister(name :: module(), registration :: Unregister.t(), timeout :: integer()) ::
          :ok
  def unregister(name, %Unregister{registration_id: ri} = unreg, timeout \\ 5000) do
    case sync(name, Callee.unregister(unreg), timeout) do
      :ok ->
        Registry.unregister(callee_registry_name(name), ri)
        :ok

      er ->
        er
    end
  end

  @spec add(name :: module(), pid :: module() | pid()) :: {:ok, pid()} | {:error, term()}
  def add(name, pid) do
    case session_exists(name) do
      false ->
        %Error{error: "wamp.error.no_session"}

      sess_name ->
        case Sess.connected?(sess_name) do
          true ->
            send(pid, {:connected, name})

          false ->
            :noop
        end

        Registry.register(dependent_registry_name(name), pid, pid)
    end
  end

  @spec remove(name :: module(), pid :: module() | pid()) :: :ok
  def remove(name, pid) do
    Registry.unregister(dependent_registry_name(name), pid)
  end

  @spec yield(name :: module(), yield :: Yield.t()) :: :ok
  def yield(name, yield) do
    cast(name, Callee.yield(yield))
  end

  @spec call(name :: module(), call :: Call.t(), timeout :: integer) :: Result.t() | Error.t()
  def call(name, call, timeout \\ 5000) do
    sync(name, Caller.call(call), timeout)
  end

  @spec error(name :: module(), error :: Error.t()) :: :ok
  def error(name, error) do
    sync(name, Callee.invocation_error(error))
  end

  @spec publish(name :: module(), event :: Publish.t()) :: :ok
  def publish(name, event) do
    cast(name, Publisher.publish(event))
  end

  @spec sync(name :: module(), request :: Wampex.message(), timeout :: integer()) ::
          term()
  def sync(name, request, timeout \\ 5000) do
    case session_exists(name) do
      false ->
        %Error{error: "wamp.error.no_session"}

      sess_name ->
        Sess.send_request(sess_name, request, timeout)
    end
  end

  @spec cast(name :: module(), Wampex.message()) :: :ok
  def cast(name, request) do
    case session_exists(name) do
      false ->
        %Error{error: "wamp.error.no_session"}

      sess_name ->
        Sess.cast_send_request(sess_name, request)
    end
  end

  defp session_exists(name) do
    name = session_name(name)

    case Process.whereis(name) do
      nil -> false
      _pid -> name
    end
  end
end