lib/step_flow/amqp/supervisor.ex

defmodule StepFlow.Amqp.Supervisor do
  require Logger
  use Supervisor

  @moduledoc """
  Supervisor of Step Flow.

  It manages AMQP connection to emit and consume messages as well as
  manages the StepManager to drive workflows.
  """

  alias StepFlow.Repo.Checker

  @children [
    StepFlow.Amqp.Connection,
    StepFlow.Amqp.CompletedConsumer,
    StepFlow.Amqp.ErrorConsumer,
    StepFlow.Amqp.ProgressionConsumer,
    StepFlow.Amqp.StoppedConsumer,
    StepFlow.Amqp.WorkerDiscoveryConsumer,
    StepFlow.Amqp.WorkerCreatedConsumer,
    StepFlow.Amqp.WorkerInitializedConsumer,
    StepFlow.Amqp.WorkerStartedConsumer,
    StepFlow.Amqp.WorkerStatusConsumer,
    StepFlow.Amqp.WorkerTerminatedConsumer,
    StepFlow.Amqp.WorkerUpdatedConsumer
  ]

  def child_spec(_) do
    %{
      id: StepFlow.Amqp.Supervisor,
      start: {StepFlow.Amqp.Supervisor, :start_link, []},
      type: :supervisor,
      shutdown: 2_000
    }
  end

  @doc false
  def start_link do
    Logger.info("#{__MODULE__} start_link")
    supervisor = Supervisor.start_link(__MODULE__, [], name: __MODULE__)

    if Checker.repo_running?() do
      start_children()
    end

    supervisor
  end

  @doc false
  def init(_) do
    Logger.info("#{__MODULE__} init")

    Supervisor.init([], strategy: :one_for_one)
  end

  def start_children do
    @children
    |> Enum.each(fn child ->
      case StepFlow.ProcessManager.get_child_pid(StepFlow.Amqp.Supervisor, child) do
        nil ->
          {:ok, _pid} = Supervisor.start_child(StepFlow.Amqp.Supervisor, child)

        :undefined ->
          {:ok, _pid} = Supervisor.restart_child(StepFlow.Amqp.Supervisor, child)

        :restarting ->
          Logger.info("#{inspect(child)}} #{__MODULE__} child already restarting...")

        pid ->
          Logger.info("#{inspect(child)}} #{__MODULE__} child already running: #{inspect(pid)}")
      end
    end)
  end

  def stop_children do
    @children
    |> Enum.each(fn child ->
      stop_child(child)
    end)
  end

  defp stop_child(child) do
    child.disconnect()
    Supervisor.terminate_child(StepFlow.Amqp.Supervisor, child)
  end
end