lib/chaperon/worker/supervisor.ex

defmodule Chaperon.Worker.Supervisor do
  @moduledoc """
  Chaperon worker process supervisor.
  Each `Chaperon.Scenario` is executed with a `Chaperon.Session` inside a
  `Chaperon.Worker` `Task` process supervised by this supervisor.
  """

  require Logger

  @name Chaperon.Worker.Supervisor

  def start_workers(nodes, amount, scenario_mod, config, timeout) do
    nodes
    |> Stream.cycle()
    |> Stream.take(amount)
    |> Enum.map(&start_worker(&1, scenario_mod, config, timeout))
  end

  def start_worker(node, scenario_mod, config, timeout) do
    start_worker_via(node, scenario_mod, :execute, [config], timeout)
  end

  def start_nested_worker(node, scenario_mod, session, config, timeout) do
    start_worker_via(
      node,
      scenario_mod,
      :execute_nested,
      [session, config],
      timeout
    )
  end

  defp start_worker_via(node, scenario_mod, function, args, timeout) do
    async(node, __MODULE__, :worker_task, [node, scenario_mod, function, args, timeout])
  end

  def worker_task(node, scenario_mod, function, args, timeout) do
    # This starts the actual worker scenario task and yields to it for the given
    # timeout. if the task hasn't finished within the timeout, return nil,
    # otherwise return the worker task's session return value.
    t = async(node, Chaperon.Scenario, function, [scenario_mod | args])

    case Task.yield(t, timeout) do
      {:ok, session} ->
        Logger.info("Worker finished: #{session.id}")
        session

      {:exit, reason} ->
        Logger.error("Worker exited with reason: #{inspect(scenario_mod)} : #{inspect(reason)}")
        nil

      nil ->
        Logger.error("Worker timed out: #{inspect(scenario_mod)}")
        nil
    end
  end

  def schedule_async(mod, func, args) do
    async(Chaperon.Worker.random_node(), mod, func, args)
  end

  def schedule_async(func) do
    async(Chaperon.Worker.random_node(), func)
  end

  def async(node, mod, func, args) do
    Task.Supervisor.async({@name, node}, mod, func, args)
  end

  def async(func) do
    Task.Supervisor.async(@name, func)
  end

  def async(node, func) do
    Task.Supervisor.async({@name, node}, func)
  end
end