lib/elsa/dynamic_process_manager.ex

defmodule Elsa.DynamicProcessManager do
  @moduledoc false
  use GenServer
  require Logger

  @type child_spec :: Supervisor.child_spec() | {module, term} | module

  def start_child(server, child_spec) do
    GenServer.call(server, {:start_child, child_spec})
  end

  def start_link(init_arg) do
    server_opts = Keyword.take(init_arg, [:name])
    GenServer.start_link(__MODULE__, init_arg, server_opts)
  end

  def child_spec(init_arg) do
    id = Keyword.fetch!(init_arg, :id)

    %{
      id: id,
      start: {__MODULE__, :start_link, [init_arg]}
    }
  end

  def wait_ready(server, timeout \\ 10_000) do
    GenServer.call(server, :ready?, timeout)
  end

  def init(init_arg) do
    Process.flag(:trap_exit, true)

    dynamic_supervisor = Keyword.fetch!(init_arg, :dynamic_supervisor)

    initializer =
      case Keyword.get(init_arg, :initializer) do
        {module, function, args} ->
          fn -> apply(module, function, args) end

        function when is_function(function, 0) ->
          function

        _ ->
          fn -> [] end
      end

    state = %{
      dynamic_supervisor: dynamic_supervisor,
      dynamic_supervisor_ref: whereis(dynamic_supervisor) |> Process.monitor(),
      poll: Keyword.get(init_arg, :poll, false),
      initializer: initializer,
      child_specs: Keyword.get(init_arg, :children, [])
    }

    {:ok, state, {:continue, :initialize}}
  end

  def handle_continue(:initialize, state) do
    start_children(state.dynamic_supervisor, state.child_specs)

    new_state = start_new_children(state)
    _ = setup_poll(state.poll)

    {:noreply, new_state}
  end

  def handle_call({:start_child, child}, _from, state) do
    output = DynamicSupervisor.start_child(state.dynamic_supervisor, child)
    {:reply, output, Map.update!(state, :child_specs, fn specs -> specs ++ [child] end)}
  end

  # When handle_continue has completed this process can
  # reply to this message and is therefore ready.
  def handle_call(:ready?, _from, state) do
    {:reply, true, state}
  end

  def handle_info(:poll, state) do
    Logger.debug(fn -> "#{__MODULE__} for #{inspect(state.dynamic_supervisor)}: Polling for new children" end)
    new_state = start_new_children(state)
    _ = setup_poll(state.poll)
    {:noreply, new_state}
  end

  def handle_info({:DOWN, ref, _, _, _}, %{dynamic_supervisor_ref: ref} = state) do
    Logger.info(fn ->
      "#{__MODULE__}: Dynamic Supervisor #{state.dynamic_supervisor} has died, restarting recorded children"
    end)

    wait_for(state.dynamic_supervisor)

    start_children(state.dynamic_supervisor, state.child_specs)
    {:noreply, state}
  end

  defp start_new_children(state) do
    new_child_specs = initialize_until_success(state.initializer) -- state.child_specs
    start_children(state.dynamic_supervisor, new_child_specs)

    Map.update!(state, :child_specs, fn specs ->
      specs ++ new_child_specs
    end)
  end

  defp start_children(supervisor, child_specs) do
    child_specs
    |> Enum.each(fn child ->
      output = DynamicSupervisor.start_child(supervisor, child)

      Logger.debug(fn ->
        "#{__MODULE__}: output of starting #{inspect(child)}: #{inspect(output)}"
      end)
    end)
  end

  defp initialize_until_success(initializer) do
    initializer.()
  rescue
    e ->
      Logger.warn(fn -> "#{__MODULE__}: initializer raised exception, retrying: #{inspect(e)}" end)

      Process.sleep(1_000)
      initialize_until_success(initializer)
  catch
    :exit, e ->
      Logger.warn(fn -> "#{__MODULE__}: initializer raised exception, retrying: #{inspect(e)}" end)

      Process.sleep(1_000)
      initialize_until_success(initializer)
  end

  defp setup_poll(time) when is_integer(time) do
    :timer.send_after(time, :poll)
  end

  defp setup_poll(_), do: nil

  defp whereis(name) when is_atom(name) do
    Process.whereis(name)
  end

  defp whereis({:via, registry_module, lookup}) do
    registry_module.whereis_name(lookup)
  end

  defp wait_for(name) do
    case Process.whereis(name) do
      nil ->
        Process.sleep(200)
        wait_for(name)

      pid ->
        case Process.alive?(pid) do
          false ->
            Process.sleep(200)
            wait_for(name)

          true ->
            :ok
        end
    end
  end
end