lib/spear/connection.ex

defmodule Spear.Connection do
  @moduledoc """
  A GenServer which brokers a connection to an EventStoreDB

  `Spear.Connection` will attempt to connect immediately after GenServer init.
  Failures to connect will result in back-off retries in segments of 500ms.
  Any GenServer to a connection process may return `{:error, :closed}` if the
  connection process is alive but the HTTP2 connection to the EventStoreDB
  is not yet (re)established. `Spear.Connection` will automatically attempt
  to re-connect to the EventStoreDB if the connection is severed.

  `Spear.Connection` processes accept `GenServer.call/3`s of `:close` to
  force a disconnect from an EventStoreDB and a subsequent `GenServer.cast/2`
  of `:connect` to reconnect based on the configuration supplied at init-time.

  If configuration parameters must change between disconnects and reconnects,
  spawning and killing connections with a `DynamicSupervisor` is recommended.

  ## Configuration

  See the `Spear.Connection.Configuration` module for available configuration
  of connections.

  ## TLS/SSL configuration and credentials

  See the [Security guide](guides/security.md) for information about
  certificates, credentials, and access control lists (ACLs).

  ## Keep-alive

  Spear and the EventStoreDB use gRPC keep-alive to ensure that any hung
  connections are noticed and restarted.

  EventStoreDB has its own configuration for keep-alive and each Spear
  client's configuration should not necessarily match the server configuration.
  With a `:keep_alive_interval` too low on the Spear-side and with very many
  connected clients, the keep-alive pings can effectively become a DDoS
  even while no clients perform any operations. This does not necessarily
  apply to the keep-alive settings in EventStoreDB: a client connects to a
  single EventStoreDB but an EventStoreDB may have hundreds or more clients
  connected at once.

  `:keep_alive_interval` does not express an interval in the way of
  `:timer.send_interval/3`. Instead the keep-alive timer is optimized to
  reset upon any data received from the connection.

  This means that (assuming consistent network, which is generous) either the
  Spear client or EventStoreDB server will dominate the keep-alive ping
  communication, the driver of the conversation being which ever has the
  lowest keep-alive interval configured. For this reason, it is generally
  advisable to set the client keep-alive just higher than the server
  keep-alive, adding noise for network latency, since the client's keep-alive
  routine will only trigger when the server's keep-alive message is behind
  schedule.

  Note that there may not be much value in attempting to optimize the
  keep-alive settings unless the network is very unstable: keep-alive only
  has utility when the client, server, or network is seriously delayed or
  silently severed.

  ## Examples

      iex> {:ok, conn} = Spear.Connection.start_link(connection_string: "esdb://localhost:2113")
      iex> Spear.stream!(conn, "es_supported_clients") |> Enum.take(3)
      [%Spear.Event{}, %Spear.Event{}, %Spear.Event{}]
  """

  # see the very similar original implementation of this process architecture
  # from the Mint documentation:
  # https://github.com/elixir-mint/mint/blob/796b8db097d69ede7163acba223ab2045c2773a4/pages/Architecture.md

  use Connection
  require Logger

  alias Spear.Connection.{Request, KeepAliveTimer}
  alias Spear.Connection.Configuration, as: Config

  @post "POST"
  @closed %Mint.TransportError{reason: :closed}
  @read_apis %{
    Spear.Records.Streams => [:Read],
    Spear.Records.Gossip => [:Read],
    Spear.Records.Persistent => [:Read],
    Spear.Records.Projections => [:Statistics, :State, :Result],
    Spear.Records.Users => [:Details],
    Spear.Records.Operations => []
  }

  defstruct [:config, :conn, requests: %{}, keep_alive_timer: %KeepAliveTimer{}]

  @typedoc """
  A connection process

  A connection process (either referred to as `conn` or `connection` in the
  documentation) may either be a PID or a name such as a module or otherwise
  atom.

  ## Examples


      iex> {:ok, conn} = Spear.Connection.start_link(connection_string: "esdb://localhost:2113")
      {:ok, #PID<0.225.0>}
      iex> Spear.read_stream(conn, "es_supported_clients", max_count: 1)
      {:ok,
       #Stream<[
         enum: #Function<62.80860365/2 in Stream.unfold/2>,
         funs: [#Function<48.80860365/1 in Stream.map/2>]
       ]>}
  """
  @typedoc since: "0.1.0"
  @type t :: pid() | GenServer.name()

  @doc false
  def child_spec(init_arg) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [init_arg]}
    }
  end

  @doc """
  Starts a connection process

  This function can be called directly in order to link it to the current
  process, but the more common workflow is to start a `Spear.Connection`
  GenServer as a part of a supervision tree.

  ## Examples

  E.g. in an application's supervision tree defined in
  `lib/my_app/application.ex`:

      children = [
        {Spear.Connection, name: MyConnection, connection_string: "esdb://localhost:2113"}
      ]
      Supervisor.start_link(children, strategy: :one_for_one)
  """
  @typedoc since: "0.1.0"
  @spec start_link(opts :: Keyword.t()) :: {:ok, t()} | GenServer.on_start()
  def start_link(opts) do
    name = Keyword.take(opts, [:name])
    rest = Keyword.delete(opts, :name)

    Connection.start_link(__MODULE__, rest, name)
  end

  @impl Connection
  def init(opts) do
    case Config.new(opts) do
      %Config{valid?: true} = config ->
        {:connect, :init, %__MODULE__{config: config}}

      %Config{errors: errors} ->
        error_lines =
          errors
          |> Enum.map(fn {key, error} -> "\t#{inspect(key)}: #{error}" end)
          |> Enum.join("\n")

        Logger.error("""
        Invalid configuration passed to #{inspect(__MODULE__)}. Found the following errors:

        #{error_lines}
        """)

        :ignore
    end
  end

  @impl Connection
  def connect(_, s) do
    case do_connect(s.config) do
      {:ok, conn} ->
        run_function(s.config.on_connect)

        {:ok, %__MODULE__{s | conn: conn, keep_alive_timer: KeepAliveTimer.start(s.config)}}

      {:error, _reason} ->
        {:backoff, 500, s}
    end
  end

  @impl Connection
  def disconnect(info, %__MODULE__{conn: conn} = s) do
    {:ok, _conn} = Mint.HTTP.close(conn)

    :ok = close_requests(s)

    s = %__MODULE__{
      s
      | conn: nil,
        requests: %{},
        keep_alive_timer: KeepAliveTimer.clear(s.keep_alive_timer)
    }

    run_function(s.config.on_disconnect)

    case info do
      {:close, from} ->
        Connection.reply(from, {:ok, :closed})

        {:noconnect, s}

      _ ->
        {:connect, :reconnect, s}
    end
  end

  @impl Connection
  def handle_cast(:connect, s), do: {:connect, s.config, s}

  def handle_cast({:push, request_ref, message}, s) when is_reference(request_ref) do
    case Map.fetch(s.requests, request_ref) do
      {:ok, %{rpc: rpc} = request} ->
        {wire_data, _size} =
          Spear.Request.to_wire_data(message, rpc.service_module, rpc.request_type)

        request = Request.append_data(request, IO.iodata_to_binary(wire_data))
        s = Request.continue_request(s, request)
        {:noreply, s}

      :error ->
        # coveralls-ignore-start
        {:noreply, s}
        # coveralls-ignore-stop
    end
  end

  @impl Connection
  def handle_call(_call, _from, %__MODULE__{conn: nil} = s) do
    {:reply, {:error, :closed}, s}
  end

  def handle_call(:close, from, s), do: {:disconnect, {:close, from}, s}

  def handle_call(:ping, from, s) do
    case Mint.HTTP2.ping(s.conn) do
      {:ok, conn, request_ref} ->
        s = put_in(s.conn, conn)
        s = put_in(s.requests[request_ref], {:ping, from})
        # put request ref
        {:noreply, s}

      # coveralls-ignore-start
      {:error, conn, @closed} ->
        {:disconnect, :closed, {:error, :closed}, put_in(s.conn, conn)}

      {:error, conn, reason} ->
        {:reply, {:error, reason}, put_in(s.conn, conn)}
        # coveralls-ignore-stop
    end
  end

  def handle_call({:cancel, request_ref}, _from, s) when is_reference(request_ref) do
    with true <- Map.has_key?(s.requests, request_ref),
         {:ok, conn} <- Mint.HTTP2.cancel_request(s.conn, request_ref) do
      {:reply, :ok, put_in(s.conn, conn)}
    else
      # coveralls-ignore-start
      false ->
        # idempotent success when the request_ref is not active
        {:reply, :ok, s}

      {:error, conn, @closed} ->
        {:disconnect, :closed, {:error, :closed}, put_in(s.conn, conn)}

      {:error, conn, reason} ->
        {:reply, {:error, reason}, put_in(s.conn, conn)}
        # coveralls-ignore-stop
    end
  end

  def handle_call({type, request}, from, s) do
    request = Spear.Request.merge_credentials(request, Config.credentials(s.config))

    with :ok <- read_only_check(request, s),
         {:ok, s} <- request_and_stream_body(s, request, from, type) do
      {:noreply, s}
    else
      {:error, :read_only} ->
        {:reply, {:error, :read_only}, s}

      # coveralls-ignore-start
      {:error, s, @closed} ->
        {:disconnect, :closed, {:error, :closed}, s}

      {:error, s, reason} ->
        {:reply, {:error, reason}, s}
        # coveralls-ignore-stop
    end
  end

  @impl Connection
  def handle_info({:DOWN, monitor_ref, :process, _object, _reason}, s) do
    with {:ok, %{request_ref: request_ref} = request} <-
           fetch_subscription(s, monitor_ref),
         {^request, s} <- pop_in(s.requests[request_ref]),
         {:ok, conn} <- Mint.HTTP2.cancel_request(s.conn, request_ref) do
      {:noreply, put_in(s.conn, conn)}
    else
      # coveralls-ignore-start
      {:error, conn, reason} ->
        s = put_in(s.conn, conn)

        if reason == @closed, do: {:disconnect, :closed, s}, else: {:noreply, s}

      _ ->
        {:noreply, s}
        # coveralls-ignore-stop
    end
  end

  def handle_info(:keep_alive, s) do
    case Mint.HTTP2.ping(s.conn) do
      {:ok, conn, request_ref} ->
        s = put_in(s.conn, conn)
        s = update_in(s.keep_alive_timer, &KeepAliveTimer.start_timeout_timer(&1, request_ref))

        {:noreply, s}

      # coveralls-ignore-start
      {:error, conn, reason} ->
        s = put_in(s.conn, conn)

        if reason == @closed, do: {:disconnect, :closed, s}, else: {:noreply, s}
        # coveralls-ignore-stop
    end
  end

  def handle_info(:keep_alive_expired, s), do: {:disconnect, :keep_alive_timeout, s}

  def handle_info(message, s) do
    with %Mint.HTTP2{} = conn <- s.conn,
         {:ok, conn, responses} <- Mint.HTTP2.stream(conn, message) do
      {:noreply, put_in(s.conn, conn) |> handle_responses(responses)}
    else
      # coveralls-ignore-start
      {:error, conn, reason, responses} ->
        s = put_in(s.conn, conn) |> handle_responses(responses)

        # YARD error handling
        if reason == @closed, do: {:disconnect, :closed, s}, else: {:noreply, s}

      # coveralls-ignore-stop

      # unknown message / no active conn in state
      _ ->
        {:noreply, s}
    end
  end

  @spec handle_responses(%__MODULE__{}, list()) :: %__MODULE__{}
  defp handle_responses(s, responses) do
    s = update_in(s.keep_alive_timer, &KeepAliveTimer.reset_interval_timer/1)

    responses
    |> Enum.reduce(s, &process_response/2)
    |> Request.continue_requests()
  end

  defp process_response({:status, request_ref, status}, s) do
    put_in(s.requests[request_ref].response.status, status)
  end

  defp process_response({:headers, request_ref, new_headers}, s) do
    update_in(
      s.requests[request_ref].response.headers,
      fn headers -> headers ++ new_headers end
    )
  end

  defp process_response({:data, request_ref, new_data}, s) do
    update_in(
      s.requests[request_ref],
      &Request.handle_data(&1, new_data)
    )
  end

  defp process_response({:pong, request_ref}, s) do
    case pop_in(s.requests[request_ref]) do
      {{:ping, from}, s} ->
        # ping was initiated by a GenServer.call/3
        Connection.reply(from, :pong)

        s

      {nil, s} ->
        # ping was initiated by the keepalive timer
        update_in(s.keep_alive_timer, &KeepAliveTimer.clear_after_timer(&1, request_ref))
    end
  end

  defp process_response({:done, request_ref}, s) do
    {request, s} = pop_in(s.requests[request_ref])

    case request do
      %{type: {:subscription, subscriber, _through}, from: nil} ->
        send(subscriber, {:eos, request_ref, :dropped})

      %{from: from, response: response} ->
        Connection.reply(from, {:ok, response})
    end

    s
  end

  # coveralls-ignore-start
  defp process_response(_unknown, s), do: s
  # coveralls-ignore-stop

  defp request_and_stream_body(s, request, from, request_type) do
    with {:ok, conn, request_ref} <-
           Mint.HTTP2.request(s.conn, @post, request.path, request.headers, :stream),
         request = Request.new(request, request_ref, from, request_type),
         s = put_in(s.conn, conn),
         s = put_in(s.requests[request_ref], request),
         {:ok, s} <- Request.emit_messages(s, request) do
      {:ok, s}
    else
      # coveralls-ignore-start
      {:error, %__MODULE__{} = s, reason} ->
        {:error, s, reason}

      {:error, conn, reason} ->
        {:error, put_in(s.conn, conn), reason}
        # coveralls-ignore-stop
    end
  end

  defp do_connect(config) do
    Mint.HTTP.connect(config.scheme, config.host, config.port, config.mint_opts)
  end

  defp close_requests(s) do
    :ok = s.requests |> Map.values() |> Enum.each(&close_request/1)
  end

  defp close_request(%{
         type: {:subscription, proc, _through},
         from: nil,
         request_ref: request_ref
       }) do
    send(proc, {:eos, request_ref, :closed})
  end

  defp close_request(%{type: _, from: from}) do
    Connection.reply(from, {:error, :closed})
  end

  @doc false
  @spec fetch_subscription(%__MODULE__{}, reference()) :: {:ok, Request.t()} | :error
  def fetch_subscription(s, monitor_ref) do
    Enum.find_value(s.requests, :error, fn {_request_ref, request} ->
      request.monitor_ref == monitor_ref && {:ok, request}
    end)
  end

  @doc """
  Returns the list of read-only APIs

  This list is used to determine which requests are allowed for read-only
  clients.
  """
  @doc since: "0.8.0"
  @spec read_apis() :: %{(api :: module()) => [rpc :: atom()]}
  def read_apis, do: @read_apis

  @doc """
  Declares whether an API+RPC combination is read-only or not

  ## Examples

      iex> Spear.Connection.read_api?(Spear.Records.Streams, :Read)
      true
      iex> Spear.Connection.read_api?(Spear.Records.Streams, :Append)
      false
  """
  @doc since: "0.8.0"
  @spec read_api?(api :: module(), rpc :: atom()) :: boolean()
  def read_api?(api, rpc) when is_atom(api) and is_atom(rpc) do
    read_apis() |> Map.get(api, []) |> Enum.member?(rpc)
  end

  defp read_only_check(%Spear.Request{api: {api, rpc}}, %__MODULE__{
         config: %Spear.Connection.Configuration{read_only?: true}
       }) do
    if read_api?(api, rpc), do: :ok, else: {:error, :read_only}
  end

  defp read_only_check(_request, _s), do: :ok

  defp run_function(f) when is_function(f, 0), do: f.()

  defp run_function({m, f, args}) when is_list(args) do
    case function_exported?(m, f, Enum.count(args)) do
      true -> apply(m, f, args)
      _ -> nil
    end
  end

  defp run_function(_), do: nil
end