lib/stargate/receiver/supervisor.ex

defmodule Stargate.Receiver.Supervisor do
  @moduledoc """
  Defines a supervisor for the `Stargate.Receiver` reader
  and consumer connections and the associated GenStage pipeline
  for processing and acknowledging messages received on the connection.

  The top-level `Stargate.Supervisor` passes the shared connection and
  `:consumer` or `:reader` configurations to the receiver supervisor
  to delegate management of all receiving processes.
  """
  use Supervisor
  import Stargate.Supervisor, only: [via: 2]

  @doc """
  Starts a `Stargate.Receiver.Supevisor` and links it to the calling
  process.
  """
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(args) do
    type = Keyword.fetch!(args, :type)
    registry = Keyword.fetch!(args, :registry)
    persistence = Keyword.get(args, :persistence, "persistent")
    tenant = Keyword.fetch!(args, :tenant)
    namespace = Keyword.fetch!(args, :namespace)
    topic = Keyword.fetch!(args, :topic)

    Supervisor.start_link(__MODULE__, args,
      name:
        via(registry, {:"#{type}_sup", "#{persistence}", "#{tenant}", "#{namespace}", "#{topic}"})
    )
  end

  @doc """
  Generates a list of child processes to initialize and
  start them under the supervisor with a `:one_for_all` strategy
  to ensure messages are not dropped if any single stage in
  the pipeline fails.

  The processors stage is configurable to a desired number of processes
  for parallelizing complex or long-running message handling operations.
  """
  @impl Supervisor
  def init(init_args) do
    children =
      [
        {Stargate.Receiver.Dispatcher, init_args},
        processors(init_args),
        {Stargate.Receiver.Acknowledger, init_args}
      ]
      |> List.flatten()

    Supervisor.init(children, strategy: :one_for_all)
  end

  defp processors(args) do
    count = Keyword.get(args, :processors, 1)
    Enum.map(0..(count - 1), &to_child_spec(&1, args))
  end

  defp to_child_spec(number, init_args) do
    persistence = Keyword.get(init_args, :persistence, "persistent")
    tenant = Keyword.fetch!(init_args, :tenant)
    ns = Keyword.fetch!(init_args, :namespace)
    topic = Keyword.fetch!(init_args, :topic)
    name = {:processor, "#{persistence}", "#{tenant}", "#{ns}", "#{topic}_#{number}"}
    named_args = Keyword.put(init_args, :processor_name, name)

    Supervisor.child_spec({Stargate.Receiver.Processor, named_args}, id: name)
  end
end