lib/baby/connection.ex

defmodule Baby.Connection do
  @behaviour :gen_statem
  @behaviour :ranch_protocol
  alias Baby.{Protocol, Util}

  @moduledoc """
  Statee machine connection handler
  """

  @inrate 59
  @outrate 61
  @idle_timeout {{:timeout, :idle}, 59957, :nothing_happening}
  @impl true
  def callback_mode(), do: [:handle_event_function, :state_enter]

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :worker,
      restart: :transient
    }
  end

  @impl true
  def start_link(ref, _, transport, opts) do
    {:ok, :proc_lib.spawn_link(__MODULE__, :init, [{ref, transport, opts}])}
  end

  def start_link(opts) do
    :gen_statem.start_link(__MODULE__, opts, [])
  end

  @impl true
  def init({ref, transport, opts}) do
    {:ok, socket} = :ranch.handshake(ref)
    :ok = transport.setopts(socket, active: :once)

    :gen_statem.enter_loop(
      __MODULE__,
      [],
      :hello,
      initial_conn_info(opts, socket, transport),
      [
        @idle_timeout
      ]
    )
  end

  def init(opts) do
    case :gen_tcp.connect(Keyword.get(opts, :host), Keyword.get(opts, :port), [
           :binary,
           active: :once
         ]) do
      {:ok, socket} ->
        {:ok, :hello, initial_conn_info(opts, socket, nil), []}

      _ ->
        {:stop, :normal}
    end
  end

  @impl true
  def terminate(_, conn_info, _data) when is_map(conn_info), do: close_connection(conn_info)
  def terminate(_, _, _), do: :ok

  defp initial_conn_info(opts, socket, transport) do
    identity = Keyword.get(opts, :identity)
    clump_id = Keyword.get(opts, :clump_id, "Quagga")
    Process.send_after(self(), :outbox, @outrate, [])
    Process.send_after(self(), :inbox, @inrate, [])

    %{
      pid: self(),
      have: stored_info_map(clump_id),
      want: %{},
      shoots: [],
      clump_id: clump_id,
      socket: socket,
      transport: transport,
      our_pk: Baobab.Identity.key(identity, :public),
      our_sk: Baobab.Identity.key(identity, :secret),
      their_nonces: MapSet.new(),
      inbox: [],
      outbox: [],
      wire: <<>>
    }
  end

  # Generic TCP handling stuff. Non-state dependant
  @impl true
  # There is nothing queued to do, go ahead and disconnect
  def handle_event(
        {:timeout, :idle},
        :nothing_happening,
        _,
        %{inbox: [], outbox: [], shoots: []} = conn_info
      ),
      do: disconnect(conn_info)

  # Still stuff happening, check back next period
  def handle_event({:timeout, :idle}, :nothing_happening, _, conn_info) do
    {:keep_state, conn_info, [@idle_timeout]}
  end

  def handle_event(:info, {:tcp_closed, _socket}, _, conn_info), do: disconnect(conn_info)
  def handle_event(:info, {:tcp, _socket, data}, _, conn_info), do: wire_buffer(data, conn_info)

  def handle_event(:info, :outbox, _, %{outbox: [{packet, type} | rest]} = conn_info) do
    Util.connection_log(conn_info, :out, type)
    send_packet(packet, conn_info)
    Process.send_after(conn_info.pid, :outbox, @outrate)
    {:keep_state, %{conn_info | outbox: rest}, []}
  end

  def handle_event(:info, :outbox, _, %{shoots: shoots} = conn_info) when length(shoots) > 0 do
    # We have a non-empty shoots list
    # Yeah, this is unsatisfyingly written
    Process.send_after(conn_info.pid, :outbox, @outrate)
    {:keep_state, Baby.Protocol.outbound(conn_info, :BAMB), []}
  end

  # This has matches so that we catch if we messed up the
  # enqueueing process somewhere.
  def handle_event(:info, :outbox, _, %{outbox: [], shoots: []} = conn_info) do
    Process.send_after(conn_info.pid, :outbox, @outrate)
    {:keep_state, conn_info, []}
  end

  def handle_event(:enter, :hello, :hello, conn_info) do
    {:keep_state, Protocol.outbound(conn_info, :HELLO), []}
  end

  def handle_event(:enter, :hello, :auth, conn_info) do
    {:keep_state, Protocol.outbound(conn_info, :AUTH), []}
  end

  def handle_event(:enter, :auth, :replicate, conn_info) do
    {:keep_state, Protocol.outbound(conn_info, :HAVE), []}
  end

  # Write out the proto handlers
  for {name, %{type: type, instate: instate, outstate: outstate}} <-
        Protocol.definition() |> Map.to_list() do
    def handle_event(
          :info,
          :inbox,
          unquote(instate),
          %{inbox: [{unquote(type), _} = packet | rest]} = conn_info
        ) do
      case Protocol.inbound(packet, conn_info, unquote(name)) do
        :error ->
          disconnect(conn_info)

        nci ->
          Util.connection_log(nci, :in, unquote(name))
          Process.send_after(nci.pid, :inbox, @inrate, [])
          {:next_state, unquote(outstate), %{nci | inbox: rest}, []}
      end
    end
  end

  # Not defined in the protocol
  def handle_event(:info, :inbox, state, %{inbox: [{type, _} | _]} = conn_info) do
    stype =
      case Protocol.msglookup(type) do
        nil -> Integer.to_string(type)
        a -> Atom.to_string(a)
      end

    Util.connection_log(
      conn_info,
      :in,
      "type " <> stype <> " message in state " <> Atom.to_string(state),
      :warn
    )

    disconnect(conn_info)
  end

  # We might be out of sync, so we'll just go around again
  def handle_event(:info, :inbox, _, conn_info) do
    Process.send_after(conn_info.pid, :inbox, @inrate, [])
    {:keep_state, conn_info, []}
  end

  defp wire_buffer(data, conn_info) do
    active_once(conn_info)
    wire = conn_info.wire <> data

    case Stlv.decode(wire) do
      :error ->
        {:keep_state, %{conn_info | :wire => wire}, []}

      {type, value, rest} ->
        wire_buffer(rest, %{conn_info | inbox: conn_info.inbox ++ [{type, value}], wire: <<>>})

      _ ->
        disconnect(conn_info)
    end
  end

  defp disconnect(conn_info) do
    # We don't want to log health check connections
    case Map.has_key?(conn_info, :short_peer) do
      false -> :ok
      true -> Util.connection_log(conn_info, :both, "disconnected", :info)
    end

    {:stop, :normal}
  end

  defp send_packet(packet, %{:transport => nil, :socket => sock}), do: :gen_tcp.send(sock, packet)
  defp send_packet(packet, %{:transport => trans, :socket => sock}), do: trans.send(sock, packet)

  defp active_once(%{:transport => nil, :socket => socket}),
    do: :inet.setopts(socket, active: :once)

  defp active_once(%{:transport => transport, :socket => socket}),
    do: transport.setopts(socket, active: :once)

  defp close_connection(%{:transport => nil, :socket => socket}), do: :gen_tcp.close(socket)

  defp close_connection(%{:transport => transport, :socket => socket}),
    do: transport.close(socket)

  defp stored_info_map(clump_id) do
    Baobab.stored_info(clump_id)
    |> Enum.reduce(%{}, fn {a, l, e}, acc -> Map.put(acc, {a, l}, e) end)
  end
end