lib/roger/partition.ex

defmodule Roger.Partition do
  @moduledoc """
  Per-node partition registry

  Roger implements multi-tenancy by dividing all its work between
  different "Partitions". Each partition is identified by a unique
  ID. Partitions consist of a list of queues, which are defined by its
  type (an atom) and a max_workers value which sets the concurrency
  level. The RabbitMQ queue name is constructed of the partition ID +
  the queue type.

  To spread out the work, partitions can be started in the cluster on
  multiple nodes. The partition's queue configuration can be different
  between nodes - i.e. some node might be able to handle more
  concurrency than others.

  Within the cluster, there is one global process
  (`Roger.Partition.Global`) which manages the partition's state. In
  it, it manages job's uniqueness, states of paused queues, et cetera.

  """

  use GenServer
  alias Roger.{Partition.Consumer, System, Partition.Global}

  require Logger

  @type queue_def :: {id :: String.t(), max_workers :: non_neg_integer}

  @doc false
  def start_link([]) do
    GenServer.start_link(__MODULE__, [], name: __MODULE__)
  end

  @doc """
  Start a Roger partition

  Given a unique ID and a list of queues, starts the partition
  supervision structure. When the partition has already been
  started, this calls `reconfigure/2` instead.
  """
  @spec start(id :: String.t(), queues :: [queue_def]) :: {:ok, pid}
  def start(id, queues) do
    GenServer.call(__MODULE__, {:start, id, queues})
  end

  @doc """
  Stop the given Roger partition
  """
  @spec stop(id :: String.t()) :: :ok | {:error, :not_running}
  def stop(id) do
    GenServer.call(__MODULE__, {:stop, id})
  end

  @doc """
  Stop the parition of accepting new jobs so it can finish of the remaining jobs
  """
  @spec safe_stop(id :: String.t()) :: :ok | {:error, :not_running}
  def safe_stop(id) do
    GenServer.call(__MODULE__, {:safe_stop, id})
  end

  @doc """
  Reconfigure the given Roger partition

  Use this function to adjust the queues for the partition. The
  queues argument is *complete*: any queues that are not mentioned,
  are stopped and messages in them will no longer be processed on this
  node.
  """
  @spec reconfigure(id :: String.t(), queues :: [queue_def]) :: :ok | {:error, :not_running}
  def reconfigure(id, queues) do
    if Consumer.is_alive?(id) do
      Consumer.reconfigure(id, queues)
    else
      {:error, :not_running}
    end
  end

  defmodule State do
    @moduledoc false
    defstruct waiting: %{}, monitored: %{}
  end

  def init([]) do
    {:ok, %State{}, 0}
  end

  def handle_call({:start, id, queues}, _from, state) do
    {reply, state} = start_partition(id, queues, state)
    {:reply, reply, state}
  end

  def handle_call({:stop, partition}, _from, state) do
    {reply, state} = stop_partition(partition, state)
    {:reply, reply, state}
  end

  def handle_call({:safe_stop, partition}, _from, state) do
    {reply, state} = safe_stop_partition(partition, state)
    {:reply, reply, state}
  end

  def handle_call(:waiting_partitions, _from, state) do
    {:reply, state.waiting, state}
  end

  def handle_cast(:check_partitions, state) do
    if Enum.count(state.waiting) > 0 do
      # try to connect some partitions
      {_pids, apps} = Enum.unzip(state.waiting)
      {:noreply, start_all(apps, Map.put(state, :waiting, %{}))}
    else
      {:noreply, state}
    end
  end

  def handle_info({:stop_partition, partition_id}, state) do
    :ok = Roger.Partition.ContainingSupervisor.stop(partition_id)
    {:noreply, state}
  end

  def handle_info(:timeout, state) do
    Process.send_after(Process.whereis(Roger.System), :check_started_partitions, 1000)
    apps = get_predefined_apps()
    state = start_all(apps, state)
    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, pid, _}, state) do
    # remove from monitors, put in waiting state
    case state.monitored[pid] do
      nil ->
        {:noreply, state}

      {id, queues} ->
        {:noreply,
         %State{state | waiting: Map.put(state.waiting, id, {id, queues}), monitored: Map.delete(state.monitored, pid)}}
    end
  end

  defp start_all(partitions, state) do
    partitions
    |> Enum.reduce(state, fn {id, queues}, state ->
      {_reply, state} = start_partition(id, queues, state)
      state
    end)
  end

  defp start_partition(id, queues, state) when is_atom(id) do
    start_partition(Atom.to_string(id), queues, state)
  end

  defp start_partition(id, queues, state) do
    if System.connected?() && System.active?() do
      # make sure a statemanager instance is running
      Singleton.start_child(Global, [id], {:app_global, id})
      # start partition supervision tree
      pid =
        case Roger.PartitionSupervisor.start_child(id) do
          {:ok, pid} ->
            Logger.debug("Started Roger partition: #{id}")
            pid

          {:error, {:already_started, pid}} ->
            pid
        end

      :ok = Consumer.reconfigure(id, queues)
      _ref = Process.monitor(pid)
      {{:ok, pid}, %State{state | monitored: Map.put(state.monitored, pid, {id, queues})}}
    else
      {:waiting, %State{state | waiting: Map.put(state.waiting, id, {id, queues})}}
    end
  end

  defp safe_stop_partition(id, state) do
    case Roger.GProc.whereis({:app_supervisor, id}) do
      pid when is_pid(pid) ->
        with :ok <- Consumer.pause_all(id) do
          {:ok, state}
        else
          err -> Logger.debug("Something went wrong stopping the partition #{inspect(err)}")
        end

      nil ->
        {{:error, :not_running}, state}
    end
  end

  defp stop_partition(id, state) do
    case Roger.GProc.whereis({:app_supervisor, id}) do
      pid when is_pid(pid) ->
        :ok = Consumer.force_shutdown(id)
        :ok = Singleton.stop_child(Global, [id])
        :ok = Roger.PartitionSupervisor.stop_child(pid)
        {:ok, %State{state | monitored: Map.delete(state.monitored, pid)}}

      nil ->
        {{:error, :not_running}, state}
    end
  end

  defp get_predefined_apps() do
    Application.get_env(:roger, :partitions, []) || []
  end
end