lib/chronicle/session.ex

# Copyright (c) Cratis. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for full license information.

defmodule Chronicle.Session do
  @moduledoc false

  # Establishes and maintains a Chronicle client session via
  # ConnectionService.Connect. Chronicle requires a client to register its
  # ConnectionId before reactors or reducers can register observers.
  #
  # The server registers the client asynchronously in a Task.Run AFTER returning
  # the Connect stream, and only sends the first keepalive AFTER that registration
  # completes (plus ~1 second). So `ready?` is set to true on the first keepalive,
  # and observers (reactors/reducers) must wait until then before registering.

  use GenServer, restart: :permanent

  require Logger

  alias Chronicle.Connections.Connection

  alias Cratis.Chronicle.Contracts.Clients.{
    ConnectionService,
    ConnectRequest
  }

  @retry_delay 3_000

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: name_for(opts[:client_name]))
  end

  @doc """
  Returns the connection ID for the session registered under `client_name`.
  """
  @spec connection_id(atom()) :: String.t()
  def connection_id(session_name) do
    GenServer.call(session_name, :connection_id)
  end

  @doc """
  Blocks until the session has received the first server keepalive, which means
  the server has finished calling OnClientConnected and the client is registered.
  Returns `:ok`.
  """
  @spec wait_until_ready(atom(), timeout()) :: :ok | {:error, :timeout}
  def wait_until_ready(session_name, timeout \\ 30_000) do
    deadline = System.monotonic_time(:millisecond) + timeout
    do_wait(session_name, deadline)
  end

  defp do_wait(session_name, deadline) do
    case GenServer.call(session_name, :ready?) do
      true ->
        :ok

      false ->
        remaining = deadline - System.monotonic_time(:millisecond)

        if remaining <= 0 do
          {:error, :timeout}
        else
          Process.sleep(min(200, remaining))
          do_wait(session_name, deadline)
        end
    end
  end

  @impl true
  def init(opts) do
    state = %{
      connection: Keyword.fetch!(opts, :connection),
      connection_id: generate_connection_id(),
      keepalive_task: nil,
      ready?: false
    }

    send(self(), :connect)
    {:ok, state}
  end

  @impl true
  def handle_call(:connection_id, _from, state) do
    {:reply, state.connection_id, state}
  end

  def handle_call(:ready?, _from, state) do
    {:reply, state.ready?, state}
  end

  @impl true
  def handle_info(:connect, state) do
    case Connection.channel(state.connection) do
      {:ok, channel} ->
        case start_session(channel, state) do
          {:ok, new_state} ->
            Logger.debug("Chronicle session established: #{new_state.connection_id}")
            {:noreply, new_state}

          {:error, reason} ->
            Logger.warning("Chronicle session failed to start: #{inspect(reason)}, retrying...")
            Process.send_after(self(), :connect, @retry_delay)
            {:noreply, state}
        end

      {:error, _} ->
        Process.send_after(self(), :connect, @retry_delay)
        {:noreply, state}
    end
  end

  def handle_info(:keepalive_received, state) do
    {:noreply, %{state | ready?: true}}
  end

  def handle_info({:session_down, reason}, state) do
    Logger.warning("Chronicle session dropped: #{inspect(reason)}, reconnecting...")
    {:noreply, schedule_reconnect(%{state | keepalive_task: nil, ready?: false})}
  end

  def handle_info({:DOWN, _ref, :process, pid, reason}, %{keepalive_task: %Task{pid: pid}} = state) do
    Logger.warning("Chronicle session task exited: #{inspect(reason)}")
    {:noreply, schedule_reconnect(%{state | keepalive_task: nil, ready?: false})}
  end

  def handle_info(_msg, state), do: {:noreply, state}

  defp start_session(channel, state) do
    try do
      request = %ConnectRequest{
        ConnectionId: state.connection_id,
        ClientVersion: "1.0.0",
        IsRunningWithDebugger: false
      }

      case ConnectionService.Stub.connect(channel, request) do
        {:ok, reply_stream} ->
          handler = self()
          task = Task.async(fn -> keepalive_loop(handler, reply_stream) end)
          {:ok, %{state | keepalive_task: task}}

        {:error, reason} ->
          {:error, reason}
      end
    rescue
      e -> {:error, e}
    end
  end

  defp keepalive_loop(handler, reply_stream) do
    Enum.each(reply_stream, fn
      {:ok, _keepalive} ->
        send(handler, :keepalive_received)

      {:error, reason} ->
        send(handler, {:session_down, reason})
    end)
  end

  defp schedule_reconnect(state) do
    Process.send_after(self(), :connect, @retry_delay)
    state
  end

  defp name_for(nil), do: __MODULE__
  defp name_for(client_name), do: :"#{client_name}.Session"

  defp generate_connection_id do
    :crypto.strong_rand_bytes(8) |> Base.encode16(case: :lower)
  end
end