defmodule Wampex.Client.Session do
@moduledoc """
A state machine based process for managing a WAMP Session. Utilizes StatesLanguage to implement the gen_statem process. See priv/session.json for the JSON representation
"""
use StatesLanguage, data: "priv/client.json"
@max_id 9_007_199_254_740_992
alias StatesLanguage, as: SL
alias Wampex.Client
alias Wampex.Client.Realm
alias Wampex.Client.Transports.WebSocket
alias Wampex.Roles.Broker.Event
alias Wampex.Roles.Dealer.Invocation
alias Wampex.Roles.Peer
alias Wampex.Roles.Peer.{Authenticate, Challenge, Hello}
alias Wampex.Serializers.MessagePack
alias __MODULE__, as: Sess
@yield 70
@hello 1
@abort 3
@error 8
@enforce_keys [:url, :roles]
defstruct [
:id,
:url,
:transport_pid,
:message,
:event,
:invocation,
:goodbye,
:error,
:realm,
:name,
:roles,
:challenge,
:interrupt,
message_queue: [],
connected: false,
request_id: 0,
protocol: "wamp.2.msgpack",
transport: WebSocket,
serializer: MessagePack,
requests: []
]
@type t :: %__MODULE__{
id: integer() | nil,
url: binary(),
transport_pid: pid() | module() | nil,
message: Wampex.message() | nil,
event: Event.t() | nil,
invocation: Invocation.t() | nil,
goodbye: binary() | nil,
error: binary(),
realm: Realm.t(),
name: module() | nil,
roles: [module()],
challenge: Challenge.t() | nil,
interrupt: integer() | nil,
message_queue: [],
connected: boolean(),
request_id: integer(),
protocol: binary(),
transport: module(),
serializer: module(),
requests: []
}
## States
@established "Established"
@init "Init"
@challenge "Challenge"
@welcome "Welcome"
@abort "Abort"
@goodbye "Goodbye"
@event "Event"
@invocation "Invocation"
@interrupt "Interrupt"
@handshake "Handshake"
## Resources
@hello "Hello"
@handle_challenge "HandleChallenge"
@handle_welcome "HandleWelcome"
@handle_established "HandleEstablished"
@handle_abort "HandleAbort"
@handle_goodbye "HandleGoodbye"
@handle_event "HandleEvent"
@handle_invocation "HandleInvocation"
@handle_message "HandleMessage"
@handle_interrupt "HandleInterrupt"
@handle_handshake "HandleHandshake"
@spec cast_send_request(name :: atom() | pid(), request :: Wampex.message(), from :: pid()) ::
:ok
def cast_send_request(name, request, pid) do
__MODULE__.cast(name, {:send_request, request, pid})
end
@spec cast_send_request(name :: atom() | pid(), request :: Wampex.message()) :: :ok
def cast_send_request(name, request) do
__MODULE__.cast(name, {:send_request, request})
end
@spec send_request(name :: atom() | pid(), request :: Wampex.message(), timeout :: integer()) ::
term()
def send_request(name, request, timeout) do
__MODULE__.call(name, {:send_request, request}, timeout)
end
@spec connected?(name :: atom()) :: true | false
def connected?(name) do
__MODULE__.call(name, :connected)
end
@impl true
def handle_call(:connected, from, _, %SL{data: %Sess{connected: conn}} = sl),
do: {:ok, sl, [{:reply, from, conn}]}
@impl true
def handle_call(
{:send_request, request},
from,
@established,
%SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
) do
request_id = do_send(r_id, tt, t, request)
{:ok,
%SL{
data
| data: %Sess{
sess
| request_id: request_id,
requests: [{request_id, from} | sess.requests]
}
}, []}
end
@impl true
def handle_call(
{:send_request, request},
from,
_,
%SL{data: %Sess{message_queue: mq} = sess} = data
) do
Logger.debug("Queueing request: #{inspect(request)}")
{:ok, %SL{data | data: %Sess{sess | message_queue: [{request, from} | mq]}}, []}
end
@impl true
def handle_cast(
{:send_request, request, from},
@established,
%SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
) do
request_id = do_send(r_id, tt, t, request)
{:ok,
%SL{
data
| data: %Sess{
sess
| request_id: request_id,
requests: [{request_id, from} | sess.requests]
}
}, []}
end
@impl true
def handle_cast(
{:send_request, request},
@established,
%SL{data: %Sess{request_id: r_id, transport: tt, transport_pid: t} = sess} = data
) do
request_id = do_send(r_id, tt, t, request)
{:ok,
%SL{
data
| data: %Sess{sess | request_id: request_id}
}, []}
end
@impl true
def handle_cast(
{:send_request, request},
_,
%SL{data: %Sess{message_queue: mq} = sess} = data
) do
Logger.debug("Queueing request: #{inspect(request)}")
{:ok, %SL{data | data: %Sess{sess | message_queue: [{request, nil} | mq]}}, []}
end
@impl true
def handle_resource(
@hello,
_,
@init,
%SL{
data: %Sess{
transport: tt,
transport_pid: t,
roles: roles,
realm: %Realm{name: realm, authentication: auth}
}
} = data
) do
message = maybe_authentication(%Hello{realm: realm, roles: roles}, auth)
tt.send_request(t, Peer.hello(message))
{:ok, data, [{:next_event, :internal, :hello_sent}]}
end
@impl true
def handle_resource(
@handle_challenge,
_,
@challenge,
%SL{
data: %Sess{
transport: tt,
transport_pid: t,
challenge: %Challenge{} = challenge,
realm: %Realm{authentication: auth}
}
} = sl
) do
{signature, extra} = auth.__struct__.handle(challenge, auth)
tt.send_request(t, Peer.authenticate(%Authenticate{signature: signature, extra: extra}))
{:ok, sl, [{:next_event, :internal, :challenged}]}
end
@impl true
def handle_resource(
@handle_welcome,
_,
@welcome,
%SL{data: %Sess{name: name} = session} = sl
) do
deps = Registry.select(Client.dependent_registry_name(name), [{{:"$1", :_, :_}, [], [:"$1"]}])
Enum.each(deps, fn pid ->
send(pid, {:connected, name})
end)
{:ok, %SL{sl | data: %Sess{session | connected: true}},
[{:next_event, :internal, :established}]}
end
@impl true
def handle_resource(
@goodbye,
_,
@handle_goodbye,
%SL{data: %Sess{transport: tt, transport_pid: t, goodbye: goodbye}} = data
) do
tt.send_message(t, Peer.goodbye(goodbye))
{:ok, data, []}
end
@impl true
def handle_resource(
@handle_message,
_,
_,
%SL{data: %Sess{message: msg, requests: requests, roles: roles}} = data
) do
Logger.debug("Handling Message #{inspect(msg)}")
case handle_message(msg, roles) do
{actions, id, response} ->
{%SL{data: sess} = data, response} = maybe_update_response(data, response)
{requests, actions} = resp = handle_response(id, actions, requests, response)
Logger.debug("Response: #{inspect(resp)}")
{:ok, %SL{data | data: %Sess{sess | requests: requests}}, actions}
nil ->
Logger.error("Not supported by roles #{inspect(roles)} - #{inspect(msg)}")
{:ok, %SL{data | data: %Sess{data.data | error: "wamp.error.not_supported_by_roles"}},
[{:next_event, :internal, :abort}]}
end
end
@impl true
def handle_resource(@handle_abort, _, @abort, data) do
Logger.error("Aborting: #{data.data.error}")
{:ok, data, []}
end
@impl true
def handle_resource(
@handle_established,
_,
@established,
%SL{
data:
%Sess{
request_id: r_id,
transport: tt,
transport_pid: t,
message_queue: mq,
requests: reqs
} = sess
} = data
) do
{request_id, requests} = send_message_queue(r_id, mq, tt, t, reqs)
requests = remove_cast_requests(requests)
{:ok,
%SL{
data
| data: %Sess{sess | requests: requests, request_id: request_id, message_queue: []}
}, []}
end
@impl true
def handle_resource(@handle_handshake, _, @handshake, data) do
{:ok, data, []}
end
@impl true
def handle_resource(
@handle_event,
_,
@event,
%SL{data: %Sess{name: name, event: %Event{subscription_id: sub_id} = event}} = sl
) do
Logger.debug("Received Event #{inspect(event)}")
sub = Client.subscriber_registry_name(name)
Registry.dispatch(sub, sub_id, fn entries ->
for {pid, _topic} <- entries, do: send(pid, event)
end)
{:ok, sl, [{:next_event, :internal, :transition}]}
end
@impl true
def handle_resource(
@handle_invocation,
_,
@invocation,
%SL{
data: %Sess{name: name, invocation: %Invocation{registration_id: reg_id} = invocation}
} = sl
) do
reg = Client.callee_registry_name(name)
Registry.dispatch(reg, reg_id, fn entries ->
for {pid, _procedure} <- entries, do: send(pid, invocation)
end)
{:ok, sl, [{:next_event, :internal, :transition}]}
end
@impl true
def handle_resource(
@handle_interrupt,
_,
@interrupt,
%SL{data: %Sess{interrupt: {id, opts}, name: name}} = data
) do
[{callee, _procedure}] = Registry.lookup(Client.callee_registry_name(name), id)
send(callee, {:interrupt, id, opts})
{:ok, data, [{:next_event, :internal, :transition}]}
end
@impl true
def handle_resource(resource, _, state, data) do
Logger.error("No specific resource handler for #{resource} in state #{state}")
{:ok, data, []}
end
@impl true
def handle_info({:set_message, message}, _, %SL{data: %Sess{} = sess} = data) do
{:ok, %SL{data | data: %Sess{sess | message: message}},
[{:next_event, :internal, :message_received}]}
end
defp handle_response(nil, actions, requests, _), do: {requests, actions}
defp handle_response(id, actions, requests, response) do
Enum.reduce_while(requests, {requests, actions}, fn
{^id, nil}, _ ->
{:halt, {remove_request(id, requests), actions}}
{^id, from}, _ ->
{:halt, {remove_request(id, requests), [get_response_action(from, response) | actions]}}
{_, _}, acc ->
{:cont, acc}
end)
end
defp get_response_action(from, {:ok, %{progress: true}, arg_l, arg_kw}) do
send(from, {:progress, arg_l, arg_kw})
[]
end
defp get_response_action(from, response), do: {:reply, from, response}
defp maybe_update_response(data, {:update, key, resp}) do
{%SL{data | data: Map.put(data.data, key, resp)}, resp}
end
defp maybe_update_response(data, resp), do: {data, resp}
defp do_send(r_id, tt, t, message) do
{request_id, message} = maybe_inject_request_id(r_id, message)
tt.send_request(t, remove_nil_values(message))
request_id
end
defp maybe_authentication(message, nil), do: message
defp maybe_authentication(%Hello{} = message, auth) do
%Hello{message | options: Map.from_struct(auth)}
end
defp remove_nil_values(message), do: Enum.reject(message, &is_nil/1)
defp maybe_inject_request_id(r_id, [@yield | _] = message), do: {r_id, message}
defp maybe_inject_request_id(r_id, [@hello | _] = message), do: {r_id, message}
defp maybe_inject_request_id(r_id, [@abort | _] = message), do: {r_id, message}
defp maybe_inject_request_id(r_id, [@error | _] = message), do: {r_id, message}
defp maybe_inject_request_id(r_id, message) do
request_id = get_request_id(r_id)
{request_id, List.insert_at(message, 1, request_id)}
end
defp get_request_id(current_id) when current_id == @max_id do
1
end
defp get_request_id(current_id) do
current_id + 1
end
defp remove_request(id, requests) do
Enum.filter(requests, fn
{^id, _} -> false
{_id, _} -> true
end)
end
defp send_message_queue(r_id, [], _, _, reqs), do: {r_id, reqs}
defp send_message_queue(r_id, mq, tt, t, reqs) do
mq
|> Enum.reverse()
|> Enum.reduce({r_id, reqs}, fn {request, from}, {id, requests} ->
r = do_send(id, tt, t, request)
{r, [{r, from} | requests]}
end)
end
def remove_cast_requests([]), do: []
def remove_cast_requests(requests) do
Enum.filter(requests, fn
{_, nil} -> false
{_, _} -> true
end)
end
defp handle_message(msg, roles) do
Enum.reduce_while(roles, nil, fn r, _ ->
try do
res = r.handle(msg)
{:halt, res}
rescue
FunctionClauseError ->
{:cont, nil}
end
end)
end
end