defmodule PS2.Socket do
@moduledoc """
A Websockex client that connects to Planetside's Event Streaming Service (ESS).
After writing a `PS2.SocketClient`, you can start receiving and handling ESS events by spinning up a `PS2.Socket` with
your desired event subscriptions. You should start this process in your supervision tree. For example:
```elixir
defmodule MyApp.Application do
use Application
@impl Application
def start(_type, _args) do
subscriptions = [
events: [PS2.player_login],
worlds: [PS2.connery, PS2.miller, PS2.soltech],
characters: ["all"]
]
clients = [MyApp.EventHandler]
ess_opts = [
subscriptions: subscriptions,
clients: clients,
service_id: YOUR_SERVICE_ID,
# you may also add a :name option. The name defaults to `PS2.Socket`, so if you want to run multiple sockets
# for some reason, you can specify `name: :none` for no name to be registered.
]
children = [
# ...
{PS2.Socket, ess_opts},
# ...
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
```
You can also include metadata in the ESS opts to be passed with every event. For example:
```elixir
ess_opts = [
subscriptions: subscriptions,
clients: clients,
service_id: YOUR_SERVICE_ID,
metadata: [hello: :websocket]
]
# in your SocketClient:
def handle_event({event_name, _payload}, metadata) do
IO.inspect("Received \#{event_name} with metadata \#{inspect(metadata)}")
end
```
Since your service ID should be kept a secret, if you're using version control (e.g. git), you should
use `Application.get_env(:your_app, :service_id)`, or use environment variables with
`System.get_env(:your_app, :service_id)`, in place of `YOUR_SERVICE_ID`. You can read more about configuring Elixir
applications in [Nicd's awesome blog post](https://blog.nytsoi.net/2020/05/05/elixir-time-for-configuration).
"""
@max_reconnects 3
use WebSockex
require Logger
alias PS2.Socket
@enforce_keys [:me]
defstruct subscriptions: [], clients: [], metadata: :none, me: nil
def start_link(opts) do
case Keyword.fetch(opts, :service_id) do
{:ok, sid} ->
{name, opts} = Keyword.pop(opts, :name, __MODULE__)
clients = Keyword.get(opts, :clients, [])
subscriptions = Keyword.get(opts, :subscriptions, [])
metadata = Keyword.get(opts, :metadata, :none)
endpoint = Keyword.get(opts, :endpoint, "push.planetside2.com/streaming")
environment = Keyword.get(opts, :environment, "ps2")
ws_opts =
[
async: true,
handle_initial_conn_failure: true
] ++ if name == :none, do: [], else: [name: name]
ws =
WebSockex.start_link(
"wss://#{endpoint}?environment=#{environment}&service-id=s:#{sid}",
__MODULE__,
%Socket{subscriptions: subscriptions, clients: clients, metadata: metadata, me: name},
ws_opts
)
case ws do
{:ok, pid} when name == :none ->
send(pid, {:update_me, pid})
{:ok, pid}
start_result ->
start_result
end
:error ->
{:stop, no_sid_error_message()}
end
end
@doc """
Resubscribe to all events
"""
def resubscribe(name \\ __MODULE__) do
WebSockex.cast(name, :resubscribe)
end
@doc """
Add a new subscription. Raises a `KeyError` if `subscription` is not formatted correctly.
`subscription` should be a keyword list similar to the one passed in the options to `PS2.Socket`.
"""
def subscribe!(name \\ __MODULE__, subscription) do
Keyword.fetch!(subscription, :events)
Keyword.fetch!(subscription, :worlds)
Keyword.fetch!(subscription, :characters)
WebSockex.cast(name, {:subscribe, subscription})
end
def no_sid_error_message do
"Please provide a Census service ID under the :service_id option. (See module documentation)"
end
## WebSockex callbacks
def handle_frame({_type, nil}, state), do: {:ok, state}
def handle_frame({_type, msg}, state) do
handle_message(msg, state)
{:ok, state}
end
def handle_cast({:send, frame}, state), do: {:reply, frame, state}
def handle_cast({:new_client, new_client}, %Socket{clients: clients} = state) do
{:ok, %Socket{state | clients: [new_client | clients]}}
end
def handle_cast(:resubscribe, %Socket{subscriptions: subs, me: me} = state) do
do_subscribe(me, subs)
{:ok, state}
end
def handle_cast({:subscribe, subscription}, %Socket{subscriptions: subs, me: me} = state) do
do_subscribe(me, subscription)
events = subs[:events] ++ subscription[:events]
worlds = subs[:worlds] ++ subscription[:worlds]
characters = subs[:characters] ++ subscription[:characters]
{:ok,
%Socket{state | subscriptions: [events: events, worlds: worlds, characters: characters]}}
end
def handle_connect(_conn, %Socket{subscriptions: subs, me: me} = state) do
Logger.info("Connected to the Socket.")
do_subscribe(me, subs)
{:ok, state}
end
def handle_disconnect(
%{reason: %WebSockex.RequestError{code: 403 = code, message: message}},
state
) do
Logger.error(
"Disconnected from the Socket: \"#{message}\" (error code #{code}). Make sure you have provided a valid service ID!"
)
{:ok, state}
end
# Handle ESS timing out
def handle_disconnect(
%{attempt_number: @max_reconnects},
state
) do
Logger.warning(
"ESS disconnected #{@max_reconnects} time(s), will retry initial connection in 30 seconds...",
inspect(state)
)
Process.sleep(30_000)
{:ok, state}
end
def handle_disconnect(%{attempt_number: attempt} = conn, state) do
Logger.info(
"Disconnected from the Socket, attempting to reconnect (#{attempt}/#{@max_reconnects}).",
inspect(state)
)
Logger.debug(inspect(conn))
{:reconnect, state}
end
def handle_info({:update_me, pid}, %Socket{} = state) do
do_subscribe(pid, state.subscriptions)
{:ok, %Socket{state | me: pid}}
end
def handle_info(unknown, state) do
Logger.warn("received unknown message: #{inspect(unknown)}")
{:ok, state}
end
## Data Transformation and Dispatch
defp handle_message(msg, %Socket{clients: clients, metadata: metadata}) do
case Jason.decode(msg) do
{:ok, %{"connected" => "true"}} ->
Logger.info("Connected to the ESS.")
{:ok, %{"subscription" => subscriptions}} ->
Logger.info("""
Received subscription acknowledgement:
#{inspect(subscriptions)}
""")
{:ok, %{"send this for help" => _}} ->
nil
# server status sent on connect, ignore for now
{:ok, %{"detail" => "EventServerEndpoint" <> _rest}} ->
nil
{:ok, %{"online" => payload}} ->
event = {PS2.server_health_update(), payload}
send_event(event, clients, metadata)
{:ok, message} ->
with {:ok, event} <- create_event(message) do
send_event(event, clients, metadata)
end
{:error, e} ->
Logger.error(e)
end
end
defp do_subscribe(:none, _subs) do
:no_dest
end
defp do_subscribe(me, subscriptions) do
payload =
Jason.encode!(%{
"service" => "event",
"action" => "subscribe",
"characters" => subscriptions[:characters],
"worlds" => subscriptions[:worlds],
"eventNames" => subscriptions[:events]
})
WebSockex.cast(me, {:send, {:text, payload}})
:ok
end
defp create_event(message) do
with payload when not is_nil(payload) and is_map(payload) <- message["payload"],
event_name when not is_nil(event_name) <- payload["event_name"] do
{:ok, {event_name, Map.delete(payload, "event_name")}}
else
_ ->
Logger.debug("Couldn't create event from message: #{inspect(message)}")
:error
end
end
defp send_event(event, clients, metadata) do
args =
if metadata == :none do
[event]
else
[event, metadata]
end
Enum.each(clients, fn client ->
Task.start(client, :handle_event, args)
end)
end
end