lib/mete/scheduler.ex

defmodule Mete.Scheduler do
  use GenServer

  require Logger

  defstruct children: [], conn_args: [], ets: nil

  @scale_up_threshhold 500
  @scale_down_threshhold 50
  @scale_interval :timer.seconds(10)

  def start_link(init_arg) do
    GenServer.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  @impl true
  def init(_init_arg) do
    # Process.flag(:trap_exit, true)

    :ets.new(Mete, [:named_table, {:read_concurrency, true}])
    state = spawn_connection(%__MODULE__{ets: Mete})

    schedule_scaling()

    {:ok, state}
  end

  @impl true
  def handle_continue(:spawn, state) do
    state = spawn_connection(state)

    {:noreply, state}
  end

  # def handle_info({:EXIT, _pid, _stacktrace}, state) do
  #   {:noreply, state}
  # end

  @impl true
  def handle_call(:info, _from, state) do
    {:reply, __scheduler_info__(state), state}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, pid, _stacktrace}, state) do
    :ets.delete(state.ets, pid)

    case :lists.keyfind(pid, 1, state.children) do
      {_pid, ^ref} -> :ok
      {_pid, _ref} -> Logger.warning("Removed Mete child with unmatching monitor reference")
      false -> :ok
    end

    children = :lists.keydelete(pid, 1, state.children)
    state = %__MODULE__{state | children: children}

    {:noreply, state}
  end

  def handle_info(:"$mete_scale", state) do
    [connections: connections, mean_queue_length: mean_msg_queue_len] = __scheduler_info__(state)

    state =
      cond do
        mean_msg_queue_len >= @scale_up_threshhold ->
          # SacleUp
          spawn_connection(state)

        mean_msg_queue_len <= @scale_down_threshhold and connections > 1 ->
          # ScaleDown
          close_connection(state)

        true ->
          # Noop
          state
      end

    schedule_scaling()

    {:noreply, state}
  end

  ##  internal

  defp schedule_scaling do
    Process.send_after(self(), :"$mete_scale", @scale_interval)
  end

  defp spawn_connection(%__MODULE__{} = state) do
    # {:ok, pid} = Mete.Connection.start_link(state.conn_args)
    {:ok, pid} = Mete.Connection.start(state.conn_args)
    ref = Process.monitor(pid)

    state = %__MODULE__{state | children: [{pid, ref} | state.children]}
    :ets.insert(state.ets, {pid})

    state
  end

  defp close_connection(%__MODULE__{} = state) do
    {pid, _ref, _len} =
      state
      |> children_queues()
      |> Enum.sort(fn {_, _, len1}, {_, _, len2} -> len1 <= len2 end)
      |> hd()

    :ets.delete(state.ets, pid)
    # GenServer.cast(pid, :quit)
    children = :lists.keydelete(pid, 1, state.children)

    GenServer.stop(pid, :normal)

    %__MODULE__{state | children: children}
  end

  defp __scheduler_info__(state) do
    msg_queue_len =
      state
      |> children_queues()
      |> Enum.reduce(0, fn {_pid, _ref, len}, acc -> acc + len end)

    connections = Enum.count(state.children)
    mean_msg_queue_len = msg_queue_len / connections

    [connections: connections, mean_queue_length: mean_msg_queue_len]
  end

  defp children_queues(state) do
    Enum.map(state.children, fn {pid, ref} ->
      case Process.info(pid, :message_queue_len) do
        {:message_queue_len, len} -> {pid, ref, len}
        nil -> {pid, ref, 0}
      end
    end)
  end
end