lib/buckaroo.ex

defmodule Buckaroo do
  @moduledoc ~S"""
  Simple `:cowboy` (v2) webserver with support for websockets.
  """
  alias Plug.Cowboy
  require Logger

  @doc ~S"""
  Start a simple webserver handling HTTP requests including websockets.
  """
  @spec start_link(Keyword.t()) :: term
  def start_link(opts \\ []) do
    %{start: {m, f, a}} = child_spec(opts)
    apply(m, f, a)
  end

  @doc ~S"""
  Setup a simple webserver handling HTTP requests including websockets.
  """
  @spec child_spec(Keyword.t()) :: Supervisor.child_spec()
  def child_spec(opts \\ []) do
    {decode?, opts} = Keyword.pop(opts, :decode_path, false)

    router =
      {plug, _} =
      {opts[:plug] || raise("Need to set plug: ... to the plug router."), opts[:opts] || []}

    if {:__sse__, 0} in plug.__info__(:functions) and plug.__sse__() do
      p = opts[:protocol_options] || []

      unless p[:idle_timeout] == :infinity do
        Logger.warn(fn ->
          """
          Buckaroo: serving SSE events, but `:idle_timeout` is not set to `:infinity`.

            This causes SSE EventSources to disconnect after the timeout passes.

            Please pass `protocol_options: [idle_timeout: :infinity]`
            to prevent disconnect after idle timeout.

            Example:

              Buckaroo.child_spec(plug: MyRouter, protocol_options: [idle_timeout: :infinity])
          """
        end)
      end
    end

    socket = if s = opts[:socket], do: {s, opts[:socket_opts] || opts[:opts] || []}

    prefix = opts[:prefix]

    match =
      if prefix do
        prefix |> String.trim_trailing("/") |> Kernel.<>("/[...]") |> String.to_charlist()
      else
        :_
      end

    prefix = if prefix, do: String.length(prefix) * 8

    options =
      [
        port: 3000,
        compress: true,
        dispatch: [{:_, [{match, __MODULE__, {socket || router, router, decode?, prefix}}]}]
      ]
      |> Keyword.merge(Keyword.drop(opts, ~w(prefix socket plug opts)a))
      |> Keyword.update!(:port, &if(is_binary(&1), do: String.to_integer(&1), else: &1))

    Cowboy.child_spec([scheme: :http, plug: elem(router, 0)] ++ options)
  end

  ### Handler ###
  @connection Plug.Cowboy.Conn
  @already_sent {:plug_conn, :sent}
  @behaviour :cowboy_websocket

  @impl :cowboy_websocket
  def init(req, {{socket, socket_opts}, {plug, plug_opts}, decode?, prefix}) do
    req = if decode?, do: %{req | path: URI.decode(req.path)}, else: req

    req =
      if prefix do
        <<_::size(prefix), path::binary>> = req.path
        %{req | path: path}
      else
        req
      end

    {conn, plug, opts} =
      if :cowboy_websocket.is_upgrade_request(req) do
        {%{@connection.conn(req) | method: "WEBSOCKET"}, socket, socket_opts}
      else
        {@connection.conn(req), plug, plug_opts}
      end

    try do
      conn = plug.call(conn, opts)

      case Map.get(conn.private, :websocket) do
        nil ->
          %{adapter: {@connection, req}} = maybe_send(conn, plug)
          {:ok, req, {plug, opts}}

        {:sse, {socket, opts}} ->
          sse_init(conn, socket, opts)

        {:sse, socket} ->
          sse_init(conn, socket)

        {socket, opts} ->
          {:cowboy_websocket, req, {socket, {conn, opts}}}

        socket ->
          {:cowboy_websocket, req, {socket, {conn, []}}}
      end
    catch
      kind, reason ->
        exit_on_error(kind, reason, __STACKTRACE__, {plug, :call, [conn, opts]})
    after
      receive do
        @already_sent -> :ok
      after
        0 -> :ok
      end
    end
  end

  defp maybe_send(%Plug.Conn{state: :unset}, _plug), do: raise(Plug.Conn.NotSentError)
  defp maybe_send(%Plug.Conn{state: :set} = conn, _plug), do: Plug.Conn.send_resp(conn)
  defp maybe_send(%Plug.Conn{} = conn, _plug), do: conn

  defp maybe_send(other, plug) do
    raise "Cowboy2 adapter expected #{inspect(plug)} to return Plug.Conn but got: " <>
            inspect(other)
  end

  defp exit_on_error(
         :error,
         %Plug.Conn.WrapperError{kind: kind, reason: reason, stack: stack},
         _stack,
         call
       ) do
    exit_on_error(kind, reason, stack, call)
  end

  defp exit_on_error(:error, value, stack, call) do
    exception = Exception.normalize(:error, value, stack)
    :erlang.raise(:exit, {{exception, stack}, call}, [])
  end

  defp exit_on_error(:throw, value, stack, call) do
    :erlang.raise(:exit, {{{:nocatch, value}, stack}, call}, [])
  end

  defp exit_on_error(:exit, value, _stack, call) do
    :erlang.raise(:exit, {value, call}, [])
  end

  ### Handling Socket ###

  @impl :cowboy_websocket
  def websocket_init({socket, {conn, state}}), do: conn |> socket.init(state) |> result(socket)

  @impl :cowboy_websocket
  def websocket_handle(frame, {socket, state}),
    do: frame |> socket.handle(state) |> result(socket)

  @impl :cowboy_websocket
  def websocket_info(info, {socket, state}),
    do: info |> socket.info(state) |> result(socket)

  # Note: terminate overlaps with loop handler (SSE) terminate
  @impl :cowboy_websocket
  def terminate(reason, req, state)

  def terminate(reason, req, {socket, state}) when is_atom(socket) do
    if :erlang.function_exported(socket, :terminate, 3),
      do: socket.terminate(reason, req, state),
      else: :ok
  end

  def terminate(_, _, _), do: :ok

  @spec result(tuple, module) :: tuple
  defp result({:stop, state}, socket), do: {:stop, {socket, state}}
  defp result({:ok, state}, socket), do: {:ok, {socket, state}}
  defp result({:ok, state, :hibernate}, socket), do: {:ok, {socket, state}, :hibernate}
  defp result({:reply, frame, state}, socket), do: {:reply, frame, {socket, state}}

  defp result({:reply, frame, state, :hibernate}, socket),
    do: {:reply, frame, {socket, state}, :hibernate}

  ## Loop Handler

  @spec sse_init(term, module, term) :: tuple
  defp sse_init(c, handler, opts \\ []) do
    {:ok, conn, state} =
      case handler.init(c, opts) do
        {:ok, s} -> {:ok, c, s}
        {:ok, updated_c, s} -> {:ok, updated_c, s}
      end

    %{adapter: {_, req}} =
      conn
      |> Plug.Conn.put_resp_content_type("text/event-stream")
      |> Plug.Conn.put_resp_header("cache-control", "no-cache")
      |> Plug.Conn.send_chunked(200)

    {:cowboy_loop, req, {handler, state}, :hibernate}
  end

  @spec info(term, map, {module, term}) :: tuple
  def info(:eof, req, s = {handler, state}) do
    handler.terminate(:eof, req, state)
    {:stop, req, {handler, s}}
  end

  def info(msg, req, {handler, state}) do
    case handler.info(msg, state) do
      {:ok, s} -> {:ok, req, {handler, s}}
      {:ok, s, :hibernate} -> {:ok, req, {handler, s}, :hibernate}
      {:reply, events, s} -> {:ok, send_events(req, events), {handler, s}}
      {:reply, events, s, :hibernate} -> {:ok, send_events(req, events), {handler, s}, :hibernate}
      {:stop, s} -> {:stop, req, {handler, s}}
    end
  end

  @spec send_events(map, [map] | map) :: map
  defp send_events(req, events) when is_list(events),
    do: Enum.reduce(events, req, &send_events(&2, &1))

  defp send_events(req, event) do
    :cowboy_req.stream_body(sse_event(event), :nofin, req)
    req
  end

  defp sse_event(%{id: id, event: event, retry: retry, data: data}),
    do: "id: #{id}\nevent: #{event}\nretry: #{retry}\ndata: #{data}\n\n"

  defp sse_event(%{id: id, event: event, data: data}),
    do: "id: #{id}\nevent: #{event}\ndata: #{data}\n\n"

  defp sse_event(%{event: event, retry: retry, data: data}),
    do: "event: #{event}\nretry: #{retry}\ndata: #{data}\n\n"

  defp sse_event(%{id: id, retry: retry, data: data}),
    do: "id: #{id}\nretry: #{retry}\ndata: #{data}\n\n"

  defp sse_event(%{id: id, data: data}), do: "id: #{id}\ndata: #{data}\n\n"
  defp sse_event(%{event: event, data: data}), do: "event: #{event}\ndata: #{data}\n\n"
  defp sse_event(%{retry: retry, data: data}), do: "retry: #{retry}\ndata: #{data}\n\n"
  defp sse_event(%{data: data}), do: "data: #{data}\n\n"
  defp sse_event(data), do: "data: #{data}\n\n"
end