lib/client/session.ex

defmodule Wampex.Client.Session do
  @moduledoc """
  A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
  """
  use StatesLanguage, data: "priv/client.json"

  @max_id 9_007_199_254_740_992

  alias StatesLanguage, as: SL
  alias Wampex.Client
  alias Wampex.Client.Realm
  alias Wampex.Client.Transports.WebSocket
  alias Wampex.Roles.Broker.Event
  alias Wampex.Roles.Dealer.Invocation
  alias Wampex.Roles.Peer
  alias Wampex.Roles.Peer.{Authenticate, Challenge, Hello}
  alias Wampex.Serializers.MessagePack
  alias __MODULE__, as: Sess

  @yield 70
  @hello 1
  @abort 3
  @error 8

  @enforce_keys [:url, :roles]

  defstruct [
    :id,
    :url,
    :transport_pid,
    :message,
    :event,
    :invocation,
    :goodbye,
    :error,
    :realm,
    :name,
    :roles,
    :challenge,
    :interrupt,
    message_queue: [],
    connected: false,
    request_id: 0,
    protocol: "wamp.2.msgpack",
    transport: WebSocket,
    serializer: MessagePack,
    requests: []
  ]

  @type t :: %__MODULE__{
          id: integer() | nil,
          url: binary(),
          transport_pid: pid() | module() | nil,
          message: Wampex.message() | nil,
          event: Event.t() | nil,
          invocation: Invocation.t() | nil,
          goodbye: binary() | nil,
          error: binary(),
          realm: Realm.t(),
          name: module() | nil,
          roles: [module()],
          challenge: Challenge.t() | nil,
          interrupt: integer() | nil,
          message_queue: [],
          connected: boolean(),
          request_id: integer(),
          protocol: binary(),
          transport: module(),
          serializer: module(),
          requests: []
        }

  ## States
  @established "Established"
  @init "Init"
  @challenge "Challenge"
  @welcome "Welcome"
  @abort "Abort"
  @goodbye "Goodbye"
  @event "Event"
  @invocation "Invocation"
  @interrupt "Interrupt"
  @handshake "Handshake"

  ## Resources
  @hello "Hello"
  @handle_challenge "HandleChallenge"
  @handle_welcome "HandleWelcome"
  @handle_established "HandleEstablished"
  @handle_abort "HandleAbort"
  @handle_goodbye "HandleGoodbye"
  @handle_event "HandleEvent"
  @handle_invocation "HandleInvocation"
  @handle_message "HandleMessage"
  @handle_interrupt "HandleInterrupt"
  @handle_handshake "HandleHandshake"

  @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) ::
          :ok
  def cast_send_request(name, request, pid) do
    __MODULE__.cast(name, {:send_request, request, pid})
  end

  @spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
  def cast_send_request(name, request) do
    __MODULE__.cast(name, {:send_request, request})
  end

  @spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) ::
          term()
  def send_request(name, request, timeout) do
    __MODULE__.call(name, {:send_request, request}, timeout)
  end

  @spec connected?(name :: atom()) :: true | false
  def connected?(name) do
    __MODULE__.call(name, :connected)
  end

  @impl true
  def handle_call(:connected, from, _, %SL{data: %Sess{connected: conn}} = sl),
    do: {:ok, sl, [{:reply, from, conn}]}

  @impl true
  def handle_call(
        {:send_request, request},
        from,
        @established,
        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
      ) do
    request_id = do_send(r_id, tt, t, request)

    {:ok,
     %SL{
       data
       | data: %Sess{
           sess
           | request_id: request_id,
             requests: [{request_id, from} | sess.requests]
         }
     }, []}
  end

  @impl true
  def handle_call(
        {:send_request, request},
        from,
        _,
        %SL{data: %Sess{message_queue: mq} = sess} = data
      ) do
    Logger.debug("Queueing request: #{inspect(request)}")
    {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []}
  end

  @impl true
  def handle_cast(
        {:send_request, request, from},
        @established,
        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
      ) do
    request_id = do_send(r_id, tt, t, request)

    {:ok,
     %SL{
       data
       | data: %Sess{
           sess
           | request_id: request_id,
             requests: [{request_id, from} | sess.requests]
         }
     }, []}
  end

  @impl true
  def handle_cast(
        {:send_request, request},
        @established,
        %SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
      ) do
    request_id = do_send(r_id, tt, t, request)

    {:ok,
     %SL{
       data
       | data: %Sess{sess | request_id: request_id}
     }, []}
  end

  @impl true
  def handle_cast(
        {:send_request, request},
        _,
        %SL{data: %Sess{message_queue: mq} = sess} = data
      ) do
    Logger.debug("Queueing request: #{inspect(request)}")
    {:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []}
  end

  @impl true
  def handle_resource(
        @hello,
        _,
        @init,
        %SL{
          data: %Sess{
            transport: tt,
            transport_pid: t,
            roles: roles,
            realm: %Realm{name: realm, authentication: auth}
          }
        } = data
      ) do
    message = maybe_authentication(%Hello{realm: realm, roles: roles}, auth)
    tt.send_request(t, Peer.hello(message))
    {:ok, data, [{:next_event, :internal, :hello_sent}]}
  end

  @impl true
  def handle_resource(
        @handle_challenge,
        _,
        @challenge,
        %SL{
          data: %Sess{
            transport: tt,
            transport_pid: t,
            challenge: %Challenge{} = challenge,
            realm: %Realm{authentication: auth}
          }
        } = sl
      ) do
    {signature, extra} = auth.__struct__.handle(challenge, auth)
    tt.send_request(t, Peer.authenticate(%Authenticate{signature: signature, extra: extra}))
    {:ok, sl, [{:next_event, :internal, :challenged}]}
  end

  @impl true
  def handle_resource(
        @handle_welcome,
        _,
        @welcome,
        %SL{data: %Sess{name: name} = session} = sl
      ) do
    deps = Registry.select(Client.dependent_registry_name(name), [{{:"$1", :_, :_}, [], [:"$1"]}])

    Enum.each(deps, fn pid ->
      send(pid, {:connected, name})
    end)

    {:ok, %SL{sl | data: %Sess{session | connected: true}}, [{:next_event, :internal, :established}]}
  end

  @impl true
  def handle_resource(
        @goodbye,
        _,
        @handle_goodbye,
        %SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
      ) do
    tt.send_message(t, Peer.goodbye(goodbye))
    {:ok, data, []}
  end

  @impl true
  def handle_resource(
        @handle_message,
        _,
        _,
        %SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
      ) do
    Logger.debug("Handling Message #{inspect(msg)}")

    case handle_message(msg, roles) do
      {actions, id, response} ->
        {%SL{data: sess} = data, response} = maybe_update_response(data, response)
        {requests, actions} = resp = handle_response(id, actions, requests, response)
        Logger.debug("Response: #{inspect(resp)}")
        {:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}

      nil ->
        Logger.error("Not supported by roles #{inspect(roles)} - #{inspect(msg)}")

        {:ok, %SL{data | data: %Sess{data.data | error: "wamp.error.not_supported_by_roles"}},
         [{:next_event, :internal, :abort}]}
    end
  end

  @impl true
  def handle_resource(@handle_abort, _, @abort, data) do
    Logger.error("Aborting: #{data.data.error}")
    {:ok, data, []}
  end

  @impl true
  def handle_resource(
        @handle_established,
        _,
        @established,
        %SL{
          data:
            %Sess{
              request_id: r_id,
              transport: tt,
              transport_pid: t,
              message_queue: mq,
              requests: reqs
            } = sess
        } = data
      ) do
    {request_id, requests} = send_message_queue(r_id, mq, tt, t, reqs)
    requests = remove_cast_requests(requests)

    {:ok,
     %SL{
       data
       | data: %Sess{sess | requests: requests, request_id: request_id, message_queue: []}
     }, []}
  end

  @impl true
  def handle_resource(@handle_handshake, _, @handshake, data) do
    {:ok, data, []}
  end

  @impl true
  def handle_resource(
        @handle_event,
        _,
        @event,
        %SL{data: %Sess{name: name, event: %Event{subscription_id: sub_id} = event}} = sl
      ) do
    Logger.debug("Received Event #{inspect(event)}")

    sub = Client.subscriber_registry_name(name)

    Registry.dispatch(sub, sub_id, fn entries ->
      for {pid, _topic} <- entries, do: send(pid, event)
    end)

    {:ok, sl, [{:next_event, :internal, :transition}]}
  end

  @impl true
  def handle_resource(
        @handle_invocation,
        _,
        @invocation,
        %SL{
          data: %Sess{name: name, invocation: %Invocation{registration_id: reg_id} = invocation}
        } = sl
      ) do
    reg = Client.callee_registry_name(name)

    Registry.dispatch(reg, reg_id, fn entries ->
      for {pid, _procedure} <- entries, do: send(pid, invocation)
    end)

    {:ok, sl, [{:next_event, :internal, :transition}]}
  end

  @impl true
  def handle_resource(
        @handle_interrupt,
        _,
        @interrupt,
        %SL{data: %Sess{interrupt: {id, opts}, name: name}} = data
      ) do
    [{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id)
    send(callee, {:interrupt, id, opts})
    {:ok, data, [{:next_event, :internal, :transition}]}
  end

  @impl true
  def handle_resource(resource, _, state, data) do
    Logger.error("No specific resource handler for #{resource} in state #{state}")
    {:ok, data, []}
  end

  @impl true
  def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
    {:ok, %SL{data | data: %Sess{sess | message: message}}, [{:next_event, :internal, :message_received}]}
  end

  @impl true
  def handle_info({:EXIT, _, _}, _, state), do: {:stop, :normal, state}

  defp handle_response(nil, actions, requests, _), do: {requests, actions}

  defp handle_response(id, actions, requests, response) do
    Enum.reduce_while(requests, {requests, actions}, fn
      {^id, nil}, _ ->
        {:halt, {remove_request(id, requests), actions}}

      {^id, from}, _ ->
        {:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}

      {_, _}, acc ->
        {:cont, acc}
    end)
  end

  defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
    send(from, {:progress, arg_l, arg_kw})
    []
  end

  defp get_response_action(from, response), do: {:reply, from, response}

  defp maybe_update_response(data, {:update, key, resp}) do
    {%SL{data | data: Map.put(data.data, key, resp)}, resp}
  end

  defp maybe_update_response(data, resp), do: {data, resp}

  defp do_send(r_id, tt, t, message) do
    {request_id, message} = maybe_inject_request_id(r_id, message)
    tt.send_request(t, remove_nil_values(message))
    request_id
  end

  defp maybe_authentication(message, nil), do: message

  defp maybe_authentication(%Hello{} = message, auth) do
    %Hello{message | options: Map.from_struct(auth)}
  end

  defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)

  defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {get_request_id(r_id), message}
  defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {get_request_id(r_id), message}
  defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {get_request_id(r_id), message}
  defp maybe_inject_request_id(r_id, [@error | _] = message), do: {get_request_id(r_id), message}

  defp maybe_inject_request_id(r_id, message) do
    request_id = get_request_id(r_id)
    {request_id, List.insert_at(message, 1, request_id)}
  end

  defp get_request_id(current_id) when current_id == @max_id do
    1
  end

  defp get_request_id(current_id) do
    current_id + 1
  end

  defp remove_request(id, requests) do
    Enum.filter(requests, fn
      {^id, _} -> false
      {_id, _} -> true
    end)
  end

  defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}

  defp send_message_queue(r_id, mq, tt, t, reqs) do
    mq
    |> Enum.reverse()
    |> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
      r = do_send(id, tt, t, request)
      {r, [{r, from} | requests]}
    end)
  end

  def remove_cast_requests([]), do: []

  def remove_cast_requests(requests) do
    Enum.filter(requests, fn
      {_, nil} -> false
      {_, _} -> true
    end)
  end

  defp handle_message(msg, roles) do
    Enum.reduce_while(roles, nil, fn r, _ ->
      try do
        res = r.handle(msg)
        {:halt, res}
      rescue
        FunctionClauseError ->
          {:cont, nil}
      end
    end)
  end
end