defmodule StepFlow.Amqp.Supervisor do
require Logger
use Supervisor
@moduledoc """
Supervisor of Step Flow.
It manages AMQP connection to emit and consume messages as well as
manages the StepManager to drive workflows.
"""
alias StepFlow.Repo.Checker
@children [
StepFlow.Amqp.Connection,
StepFlow.Amqp.CompletedConsumer,
StepFlow.Amqp.ErrorConsumer,
StepFlow.Amqp.ProgressionConsumer,
StepFlow.Amqp.StoppedConsumer,
StepFlow.Amqp.WorkerDiscoveryConsumer,
StepFlow.Amqp.WorkerCreatedConsumer,
StepFlow.Amqp.WorkerInitializedConsumer,
StepFlow.Amqp.WorkerStartedConsumer,
StepFlow.Amqp.WorkerStatusConsumer,
StepFlow.Amqp.WorkerTerminatedConsumer,
StepFlow.Amqp.WorkerUpdatedConsumer
]
def child_spec(_) do
%{
id: StepFlow.Amqp.Supervisor,
start: {StepFlow.Amqp.Supervisor, :start_link, []},
type: :supervisor,
shutdown: 2_000
}
end
@doc false
def start_link do
Logger.info("#{__MODULE__} start_link")
supervisor = Supervisor.start_link(__MODULE__, [], name: __MODULE__)
if Checker.repo_running?() do
start_children()
end
supervisor
end
@doc false
def init(_) do
Logger.info("#{__MODULE__} init")
Supervisor.init([], strategy: :one_for_one)
end
def start_children do
@children
|> Enum.each(fn child ->
case StepFlow.ProcessManager.get_child_pid(StepFlow.Amqp.Supervisor, child) do
nil ->
{:ok, _pid} = Supervisor.start_child(StepFlow.Amqp.Supervisor, child)
:undefined ->
{:ok, _pid} = Supervisor.restart_child(StepFlow.Amqp.Supervisor, child)
:restarting ->
Logger.info("#{inspect(child)}} #{__MODULE__} child already restarting...")
pid ->
Logger.info("#{inspect(child)}} #{__MODULE__} child already running: #{inspect(pid)}")
end
end)
end
def stop_children do
@children
|> Enum.each(fn child ->
stop_child(child)
end)
end
defp stop_child(child) do
child.disconnect()
Supervisor.terminate_child(StepFlow.Amqp.Supervisor, child)
end
end