lib/edgedb/pool.ex

defmodule EdgeDB.Pool do
  @moduledoc false

  use GenServer

  alias DBConnection.Holder

  alias EdgeDB.Pool.{
    Codel,
    ConnectionSupervisor,
    State
  }

  @typedoc false
  @type t() :: GenServer.server()

  @queue_target 50
  @queue_interval 1000
  @idle_interval 1000
  @time_unit 1000

  @doc false
  @spec start_link({module(), Keyword.t()}) :: GenServer.on_start()
  def start_link({conn_mod, opts}) do
    GenServer.start_link(__MODULE__, {conn_mod, opts}, start_opts(opts))
  end

  @doc false
  @spec checkout(t(), list(pid()), Keyword.t()) ::
          {:ok, any(), module(), any(), any()}
          | {:error, Exception.t()}
  def checkout(pool, callers, opts) do
    Holder.checkout(pool, callers, opts)
  end

  @doc false
  @spec disconnect_all(t(), integer(), Keyword.t()) :: :ok
  def disconnect_all(pool, interval, _opts) do
    GenServer.call(pool, {:disconnect_all, interval}, :infinity)
  end

  @doc false
  @spec concurrency(t()) :: integer()
  def concurrency(pool) do
    GenServer.call(pool, :get_current_concurrency)
  end

  @doc false
  @spec set_max_concurrency(t(), integer()) :: integer()
  def set_max_concurrency(pool, max_concurrency) do
    GenServer.call(pool, {:set_max_concurrency, max_concurrency})
  end

  @impl GenServer
  def init({conn_mod, opts}) do
    queue = :ets.new(__MODULE__.Queue, [:protected, :ordered_set])
    ts = {System.monotonic_time(), 0}
    now_in_native = System.monotonic_time()
    now_in_ms = System.convert_time_unit(now_in_native, :native, @time_unit)

    conn_opts = [owner: self(), queue: queue, conn: [mod: conn_mod, opts: opts]]
    {:ok, _pid} = ConnectionSupervisor.start_supervised(conn_opts)

    # if we're using sandbox connection then we shouldn't use many connections
    # since in EdgeDB there are only serializable transactions and concurrent requests
    # will break sandbox logic.
    max_concurrency =
      if conn_mod == EdgeDB.Sandbox do
        1
      else
        opts[:max_concurrency]
      end

    idle_limit = opts[:idle_limit]

    codel = %Codel{
      target: Keyword.get(opts, :queue_target, @queue_target),
      interval: Keyword.get(opts, :queue_interval, @queue_interval),
      delay: 0,
      slow: false,
      next: now_in_ms,
      poll: nil,
      idle_interval: Keyword.get(opts, :idle_interval, @idle_interval),
      idle_limit: idle_limit || (max_concurrency || 0),
      idle: nil
    }

    codel = start_idle(now_in_native, start_poll(now_in_ms, now_in_ms, codel))

    state = %State{
      type: :busy,
      queue: queue,
      codel: codel,
      ts: ts,
      current_concurrency: 0,
      max_concurrency: max_concurrency,
      conn_mod: conn_mod,
      conn_opts: conn_opts,
      pool_idle_limit: idle_limit
    }

    {:ok, state}
  end

  @impl GenServer
  def handle_call(:get_current_concurrency, _from, %State{} = state) do
    {:reply, state.current_concurrency, state}
  end

  @impl GenServer
  def handle_call({:set_max_concurrency, max_concurrency}, _from, %State{} = state) do
    {:reply, :ok, %State{state | max_concurrency: max_concurrency}}
  end

  @impl GenServer
  def handle_call(request, from, %State{} = state) do
    formatted_state = State.to_connection_pool_format(state)

    {:reply, result, conn_pool_state} =
      DBConnection.ConnectionPool.handle_call(request, from, formatted_state)

    {:reply, result, State.from_connection_pool_format(state, conn_pool_state)}
  end

  @impl GenServer
  def handle_info({:set_connections_supervisor, sup_pid}, state) do
    {:noreply, %State{state | conn_sup: sup_pid}}
  end

  @impl GenServer
  def handle_info({:concurrency_suggest, suggested_pool_concurrency}, state) do
    {:noreply, %State{state | suggested_concurrency: suggested_pool_concurrency}}
  end

  @impl GenServer
  def handle_info(
        {:disconnected, _conn_pid, %DBConnection.ConnectionError{reason: :exceed_limit}},
        %State{} = state
      ) do
    {:noreply, state}
  end

  @impl GenServer
  def handle_info({:disconnected, _conn_pid, exc}, %State{} = state) do
    state =
      with %EdgeDB.Error{tags: tags} <- exc,
           true <- :should_reconnect in tags do
        ConnectionSupervisor.start_connection(state.conn_sup, state.conn_opts)
        state
      else
        _other ->
          %State{state | current_concurrency: state.current_concurrency - 1}
      end

    {:noreply, state}
  end

  @impl GenServer
  def handle_info(
        {:"ETS-TRANSFER", holder, _pid, {:checkin, _queue, _extra}} = request,
        %State{} = state
      ) do
    owner = self()

    case :ets.info(holder, :owner) do
      ^owner ->
        maybe_disconnect(request, state)

      :undefined ->
        {:noreply, state}
    end
  end

  @impl GenServer
  def handle_info(
        {:db_connection, _from, {:checkout, _caller, _now, _queue?}} = request,
        %State{} = state
      ) do
    state = maybe_create_new_connection(state)
    formatted_state = State.to_connection_pool_format(state)

    {:noreply, conn_pool_state} =
      DBConnection.ConnectionPool.handle_info(request, formatted_state)

    {:noreply, State.from_connection_pool_format(state, conn_pool_state)}
  end

  @impl GenServer
  def handle_info(request, %State{} = state) do
    formatted_state = State.to_connection_pool_format(state)

    {:noreply, conn_pool_state} =
      DBConnection.ConnectionPool.handle_info(request, formatted_state)

    {:noreply, State.from_connection_pool_format(state, conn_pool_state)}
  end

  defp start_opts(opts) do
    Keyword.take(opts, [:name, :spawn_opt])
  end

  defp start_poll(now, last_sent, %Codel{interval: interval} = codel) do
    timeout = now + interval
    poll = :erlang.start_timer(timeout, self(), {timeout, last_sent}, abs: true)
    %Codel{codel | poll: poll}
  end

  defp start_idle(now_in_native, %Codel{idle_interval: interval} = codel) do
    timeout = System.convert_time_unit(now_in_native, :native, :millisecond) + interval
    idle = :erlang.start_timer(timeout, self(), now_in_native, abs: true)
    %Codel{codel | idle: idle}
  end

  defp maybe_create_new_connection(%State{} = state) do
    max_allowed_connections = min(state.max_concurrency, state.suggested_concurrency) || 1

    if (state.type == :busy or :ets.info(state.queue, :size) == 0) and
         state.current_concurrency < max_allowed_connections do
      ConnectionSupervisor.start_connection(state.conn_sup, state.conn_opts)

      concurrency = state.current_concurrency + 1

      %State{
        state
        | current_concurrency: concurrency,
          codel: maybe_set_codel_idle_limit(state, concurrency)
      }
    else
      state
    end
  end

  defp maybe_disconnect(
         {:"ETS-TRANSFER", holder, _pid, {:checkin, _queue, _extra}} = request,
         %State{} = state
       ) do
    if disconnect?(state) do
      conn_pid = connection_pid(holder)
      message = "disconnect connection via dynamic resizing"

      err =
        DBConnection.ConnectionError.exception(
          message: message,
          severity: :debug,
          reason: :exceed_limit
        )

      Holder.handle_disconnect(holder, err)
      ConnectionSupervisor.disconnect_connection(state.conn_sup, conn_pid)

      concurrency = state.current_concurrency - 1

      {:noreply,
       %State{
         state
         | current_concurrency: concurrency,
           codel: maybe_set_codel_idle_limit(state, concurrency)
       }}
    else
      formatted_state = State.to_connection_pool_format(state)

      {:noreply, conn_pool_state} =
        DBConnection.ConnectionPool.handle_info(request, formatted_state)

      state = State.from_connection_pool_format(state, conn_pool_state)
      {:noreply, state}
    end
  end

  defp disconnect?(%State{
         current_concurrency: current_concurrency,
         max_concurrency: max_concurrency
       })
       when current_concurrency > max_concurrency do
    true
  end

  defp disconnect?(%State{
         current_concurrency: current_concurrency,
         suggested_concurrency: suggested_concurrency
       })
       when current_concurrency > suggested_concurrency do
    true
  end

  defp disconnect?(_state) do
    false
  end

  defp connection_pid(holder) do
    [conn] = :ets.lookup(holder, :conn)
    elem(conn, 1)
  end

  defp maybe_set_codel_idle_limit(%State{pool_idle_limit: nil, codel: codel}, concurrency) do
    %Codel{codel | idle_limit: concurrency}
  end

  defp maybe_set_codel_idle_limit(%State{codel: codel}, _concurrency) do
    codel
  end
end