lib/chaperon/master.ex

defmodule Chaperon.Master do
  @moduledoc """
  Master process for running load tests. Initiates running a load test and awaits
  results from a run. Needs to be started before used.
  The Chaperon.Master process is started only once per cluster and registered
  globally as `Chaperon.Master`.
  """
  @derive {Inspect, only: [:id]}
  defstruct id: nil,
            sessions: %{},
            tasks: %{},
            non_worker_nodes: [],
            scheduled_load_tests: EQ.new()

  @type t :: %Chaperon.Master{
          id: atom,
          sessions: %{atom => Chaperon.Session.t()},
          tasks: %{UUID.uuid4() => pid},
          non_worker_nodes: [atom],
          scheduled_load_tests: EQ.t()
        }

  @load_test_pause_interval Chaperon.Timing.seconds(30)

  use GenServer
  require Logger
  alias Chaperon.Util

  @name {:global, __MODULE__}

  def start do
    Chaperon.Master.Supervisor.start_master()
  end

  def start_link([]) do
    GenServer.start_link(__MODULE__, [], name: @name)
  end

  def init([]) do
    id = Node.self()
    Logger.info("Starting Chaperon.Master #{id}")
    {:ok, %Chaperon.Master{id: id}}
  end

  @spec run_load_test(module, Keyword.t()) :: Chaperon.Session.t()
  def run_load_test(lt_mod, options \\ []) do
    timeout = Chaperon.LoadTest.timeout(lt_mod)

    result = GenServer.call(@name, {:run_load_test, lt_mod, run_options(options)}, timeout)

    case result do
      {:remote, session, data} ->
        {exporter, options} = options |> Chaperon.exporter()

        exporter
        |> apply(:write_output, [
          lt_mod,
          options,
          data,
          options[:output]
        ])

        session

      session ->
        session
    end
  end

  def running_load_tests() do
    GenServer.call(@name, :running_load_tests)
  end

  def schedule_load_test(lt) do
    GenServer.call(@name, {:schedule_load_test, lt})
  end

  def schedule_load_tests(lts) do
    GenServer.call(@name, {:schedule_load_tests, lts})
  end

  def scheduled_load_tests() do
    GenServer.call(@name, :scheduled_load_tests)
  end

  def cancel_all() do
    GenServer.call(@name, :cancel_all)
  end

  def cancel_running_or_scheduled(id) do
    GenServer.call(@name, {:cancel_running_or_scheduled, id})
  end

  def cancel_scheduled() do
    GenServer.call(@name, :cancel_scheduled)
  end

  @spec ignore_node_as_worker(atom) :: :ok
  def ignore_node_as_worker(node) do
    GenServer.call(@name, {:ignore_node_as_worker, node})
  end

  def handle_call({:run_load_test, lt_mod, options}, client, state) do
    Logger.info(
      "Chaperon.Master | Starting LoadTest #{Chaperon.LoadTest.name(lt_mod)} @ Master #{state.id}"
    )

    %{state: state, id: _task_id} =
      start_load_test(state, client, %{test: lt_mod, options: options})

    {:noreply, state}
  end

  def handle_call({:ignore_node_as_worker, node}, _, state) do
    state = update_in(state.non_worker_nodes, &[node | &1])
    {:reply, :ok, state}
  end

  def handle_call(:running_load_tests, _, state) do
    Logger.info("Chaperon.Master | Requesting running load tests")

    running =
      for {id, %{load_test: lt_conf, options: options}} <- state.tasks do
        Logger.debug("Got load test options: #{inspect(options)}")
        %{name: Chaperon.LoadTest.name(lt_conf), id: id, tag: options[:tag]}
      end

    {:reply, running, state}
  end

  def handle_call(:scheduled_load_tests, _, state) do
    Logger.info("Chaperon.Master | Requesting scheduled load tests")

    scheduled =
      for %{test: lt_conf, id: id, options: options} <- EQ.to_list(state.scheduled_load_tests) do
        %{name: Chaperon.LoadTest.name(lt_conf), id: id, tag: options[:tag]}
      end

    {:reply, scheduled, state}
  end

  def handle_call(
        {:schedule_load_test, lt = %{test: lt_mod, options: _}},
        _client,
        state
      ) do
    name = Chaperon.LoadTest.name(lt_mod)
    Logger.info("Chaperon.Master | Scheduling load test with name: #{name}")

    %{state: state, id: id} =
      if running_load_test?(state) do
        state |> add_load_test(lt)
      else
        state |> start_load_test(nil, lt)
      end

    {:reply, id, state}
  end

  def handle_call({:schedule_load_tests, []}, client, state) do
    Logger.warn(
      "Chaperon.Master | Client #{inspect(client)} tried to schedule empty list of load tests - Aborting"
    )

    {:reply, {:error, :no_load_tests_given}, state}
  end

  def handle_call({:schedule_load_tests, load_tests}, _, state) do
    lt_names = for %{test: lt_mod} <- load_tests, do: Chaperon.LoadTest.name(lt_mod)

    Logger.info(
      "Chaperon.Master | Scheduling #{Enum.count(load_tests)} load tests with names: #{
        inspect(lt_names)
      }"
    )

    %{state: state, ids: ids} = state |> add_load_tests(load_tests)

    state =
      if running_load_test?(state) do
        state
      else
        state
        |> schedule_next()
      end

    {:reply, {:ok, ids}, state}
  end

  def handle_call(:cancel_all, _, state) do
    for {task_id, %{task: task}} <- state.tasks do
      state
      |> cancel_running_task(task_id, task)
    end

    {:reply, :ok, %{state | tasks: %{}, scheduled_load_tests: EQ.new()}}
  end

  def handle_call({:cancel_running_or_scheduled, id}, _, state) do
    state =
      case state.tasks[id] do
        nil ->
          state
          |> remove_scheduled(id)

        %{task: task} ->
          state
          |> cancel_running_task(id, task)
          |> schedule_next()
      end

    {:reply, :ok, state}
  end

  def handle_call(:cancel_scheduled, _, state) do
    {:reply, :ok, %{state | scheduled_load_tests: EQ.new()}}
  end

  def handle_cast({:load_test_finished, {lt_mod, task_id}, session}, state) do
    lt_name = Chaperon.LoadTest.name(lt_mod)
    Logger.info("Chaperon.Master | LoadTest finished: #{lt_name} / #{task_id}")

    case state.tasks[task_id] do
      nil ->
        Logger.error(
          "Chaperon.Master | No client found for finished load test: #{lt_name} @ #{task_id}"
        )

      %{client: client} ->
        if client do
          GenServer.reply(client, session)
        end
    end

    state =
      state
      |> remove_task(task_id)
      |> schedule_next()

    {:noreply, state}
  end

  def handle_cast({:load_test_failed, {lt_mod, task_id}, err}, state) do
    case state.tasks[task_id] do
      %{client: client} ->
        lt_name = Chaperon.LoadTest.name(lt_mod)

        Logger.info(
          "Chaperon.Master | LoadTest failed: #{lt_name} / #{task_id} with error: #{inspect(err)}"
        )

        if client do
          GenServer.reply(client, {:error, err})
        end

      nil ->
        Logger.info(
          "Chaperon.Master | Unknown LoadTest with id #{task_id} failed with error: #{
            inspect(err)
          }"
        )
    end

    state
    |> remove_task(task_id)
    |> schedule_next()

    {:noreply, state}
  end

  def handle_info({:DOWN, _ref, :process, pid, :normal}, state) do
    Logger.debug("Chaperon.Master | Load test finished: #{inspect(pid)}")
    {:noreply, state}
  end

  def handle_info({:DOWN, ref, :process, pid, {error, _context}}, state) do
    task_id = find_task_id(state, pid, ref)
    Logger.error("Chaperon.Master | LoadTest died: #{task_id} | #{inspect(error)}")

    state =
      state
      |> remove_task(task_id)
      |> schedule_next()

    {:noreply, state}
  end

  def handle_info(msg, state) do
    Logger.error("Chaperon.Master | Ignoring unknown message: #{inspect(msg)}")

    {:noreply, state}
  end

  defp run_options(options) do
    case {:global.whereis_name(Chaperon.Master), options[:output]} do
      {_, nil} ->
        options

      {pid, _} when is_pid(pid) ->
        if Util.local_pid?(pid) do
          options
        else
          options
          |> Keyword.merge(output: :remote)
        end
    end
  end

  defp add_load_test(state, lt = %{test: lt_mod, options: _}) do
    id = UUID.uuid4()
    name = Chaperon.LoadTest.name(lt_mod)
    Logger.debug("Chaperon.Master | Scheduling load test #{name} with ID #{id}")
    state = update_in(state.scheduled_load_tests, &EQ.push(&1, Map.merge(%{id: id}, lt)))
    %{state: state, id: id}
  end

  defp add_load_tests(state, load_tests) when is_list(load_tests) do
    init_acc = %{state: state, ids: []}

    %{state: state, ids: ids} =
      load_tests
      |> Enum.reduce(init_acc, fn lt, %{state: state, ids: ids} ->
        %{state: state, id: id} = state |> add_load_test(lt)
        %{state: state, ids: [id | ids]}
      end)

    %{state: state, ids: ids |> Enum.reverse()}
  end

  defp running_load_test?(state) do
    Kernel.map_size(state.tasks) > 0
  end

  defp start_load_test(state, client, %{test: lt_mod, options: options}, task_id \\ UUID.uuid4()) do
    {:ok, task_pid} =
      Task.start(fn ->
        Process.sleep(@load_test_pause_interval)

        try do
          session = Chaperon.run_load_test(lt_mod, options)
          GenServer.cast(@name, {:load_test_finished, {lt_mod, task_id}, session})
        catch
          err ->
            GenServer.cast(@name, {:load_test_failed, {lt_mod, task_id}, err})
        end
      end)

    task_ref = Process.monitor(task_pid)

    state =
      update_in(
        state.tasks,
        &Map.put(&1, task_id, %{
          client: client,
          load_test: lt_mod,
          options: options,
          task: {task_pid, task_ref}
        })
      )

    %{state: state, id: task_id}
  end

  defp remove_task(state, task_id) do
    update_in(state.tasks, &Map.delete(&1, task_id))
  end

  defp remove_scheduled(state, id) do
    remaining =
      state.scheduled_load_tests
      |> EQ.filter(fn s -> s.id != id end)

    %{state | scheduled_load_tests: remaining}
  end

  defp schedule_next(state) do
    if EQ.empty?(state.scheduled_load_tests) do
      Logger.warn("Chaperon.Master | No other load tests scheduled - aborting schedule_next")
      state
    else
      case EQ.pop(state.scheduled_load_tests) do
        {{:value, next}, remaining} ->
          Logger.info("Chaperon.Master | Starting next scheduled load test with id #{next.id}")
          %{state: state, id: _} = state |> start_load_test(nil, next, next.id)
          %{state | scheduled_load_tests: remaining}

        {:empty, _remaining} ->
          Logger.warn(
            "Chaperon.Master | No remaining scheduled load tests available. Doing nothing for now..."
          )

          state
      end
    end
  end

  defp cancel_running_task(state, task_id, task = {pid, _ref}) do
    Logger.info("Chaperon.Master | Canceling running task #{inspect(task)}")
    Process.exit(pid, :kill)

    state
    |> remove_task(task_id)
  end

  defp find_task_id(state, pid, ref) do
    for {task_id, %{task: {^pid, ^ref}}} <- state.tasks do
      task_id
    end
    |> Enum.at(0)
  end
end