defmodule HareMq.DynamicSupervisor do
use DynamicSupervisor
alias HareMq.AutoScalerConfiguration
@timeout 70_000
def start_link([config: _, consume: _] = opts) do
DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
@doc """
Initializes the dynamic supervisor with the specified options.
It starts a Task to asynchronously run the start_consumers function and initializes
the dynamic supervisor with a one_for_one restart strategy.
## Examples
iex> HareMq.DynamicSupervisor.start_link([config: %{consumer_count: 3}, consume: MyApp.Consumer])
{:ok, #PID<0.123.0>}
"""
def init([config: _, consume: _] = opts) do
{:ok, _} =
Task.start_link(fn ->
start_consumers(opts)
start_auto_scaler(opts)
end)
DynamicSupervisor.init(strategy: :one_for_one)
end
defp start_consumers([config: config, consume: _] = opts) do
Enum.each(1..config[:consumer_count], fn number ->
start_child(
worker: config[:consumer_worker],
name: "#{config[:module_name]}.W#{number}",
opts: opts
)
end)
end
@doc """
Starts a child worker process with the specified worker and options.
## Examples
iex> HareMq.DynamicSupervisor.start_child(worker: MyApp.Consumer, name: :consumer1, opts: [config: %{}, consume: MyApp.Consumer])
{:ok, #PID<0.124.0>}
"""
def start_child(worker: worker, name: name, opts: opts) do
DynamicSupervisor.start_child(__MODULE__, {worker, {name, opts}})
end
@doc """
Adds a new consumer process to the dynamic supervisor.
## Examples
iex> HareMq.DynamicSupervisor.add_consumer(worker: MyApp.Consumer, name: "MyApp.Consumer.W4", opts: [config: %{}, consume: MyApp.Consumer])
{:ok, #PID<0.125.0>}
"""
def add_consumer(worker: worker, name: name, opts: opts) do
start_child(worker: worker, name: name, opts: opts)
end
@doc """
This function looks up the consumer by name in the `:consumers` registry,
sends a cancellation message to the consumer to allow it to stop processing,
and then terminates the child process.
## Examples
iex> HareMq.DynamicSupervisor.remove_consumer("MyApp.Consumer.W4")
:ok
"""
def remove_consumer(name) do
case :global.whereis_name(name) do
pid when is_pid(pid) ->
# Send a cancellation message to allow the consumer to finish processing gracefully
GenServer.call(pid, :cancel_consume, @timeout)
DynamicSupervisor.terminate_child(__MODULE__, pid)
_ ->
:ok
end
end
@doc """
Returns a list of all consumers managed by this dynamic supervisor.
"""
def list_consumers do
DynamicSupervisor.which_children(__MODULE__)
end
@doc """
Starts the AutoScaler as a child under this DynamicSupervisor.
"""
def start_auto_scaler([config: config, consume: consume] = opts) do
if config[:auto_scaling] do
configuration =
AutoScalerConfiguration.get_auto_scaler_configuration(
queue_name: config[:queue_name],
consumer_worker: config[:consumer_worker],
module_name: config[:module_name],
consumer_count: config[:consumer_count],
consume: consume,
auto_scaling: config[:auto_scaling],
consumer_opts: opts
)
DynamicSupervisor.start_child(__MODULE__, {HareMq.AutoScaler, configuration})
else
:ok
end
end
end