Skip to main content

lib/ex_check/check/pipeline.ex

defmodule ExCheck.Check.Pipeline do
  @moduledoc false

  # This function takes `pending` in a form of list of pending `payload`s. It executes
  # `opts[:start_fn]` with next `payload`s that have passed through `opts[:throttle_fn]` (which can
  # be used to manage cross-payload dependencies or to throttle the parallel execution).
  #
  # At the same time `opts[:collect_fn]` is called with result of `opts[:start_fn]`. This is done
  # sequentially in the order in which `opts[:start_fn]` were called. Each time `opts[:collect_fn]`
  # finishes, its result is collected and an attempt is made to start a new set of `opts[:start_fn]`
  # for nodes that now have all their dependencies collected.
  #
  # In the end it returns tuple with two lists:
  # - a list of results of finished calls to `opts[:collect_fn]`
  # - a list of payloads that were never reached because they were throttled out until the end

  def run(pending, opts) do
    loop({pending, [], nil, []}, opts)
  end

  defp loop({pending, running, collecting, finished}, opts) do
    {pending, running} = run_next(pending, running, finished, opts)
    {running, collecting} = collect_next(running, collecting)

    if collecting do
      receive do
        {:finished, payload, result} ->
          {pending, collecting, finished} = finish(payload, result, pending, collecting, finished)
          loop({pending, running, collecting, finished}, opts)
      end
    else
      {finished, pending}
    end
  end

  defp run_next(pending, running, finished, opts) do
    throttle_fn = Keyword.fetch!(opts, :throttle_fn)
    start_fn = Keyword.fetch!(opts, :start_fn)
    collect_fn = Keyword.fetch!(opts, :collect_fn)

    selected = select_next(pending, running, finished, throttle_fn)
    new_running = start_next(selected, start_fn, collect_fn)

    {pending -- selected, running ++ new_running}
  end

  defp select_next(pending, running, finished, throttle_fn) do
    throttle_fn.(pending, Enum.map(running, &elem(&1, 0)), finished)
  end

  defp start_next(new_running, start_fn, collect_fn) do
    Enum.map(new_running, fn payload ->
      runner_pid =
        spawn_link(fn ->
          payload = start_fn.(payload)

          receive do
            {:collect, collector_pid} ->
              result = collect_fn.(payload)
              send(collector_pid, {:result, result})
          end
        end)

      {payload, runner_pid}
    end)
  end

  defp collect_next([{payload, runner_pid} | rest], nil) do
    orchestrator_pid = self()

    spawn_link(fn ->
      collector_pid = self()
      send(runner_pid, {:collect, collector_pid})

      result =
        receive do
          {:result, result} -> result
        end

      send(orchestrator_pid, {:finished, payload, result})
    end)

    {rest, payload}
  end

  defp collect_next(running, collecting) do
    {running, collecting}
  end

  defp finish(payload, result, pending, collecting, finished) do
    collecting = if payload == collecting, do: nil, else: collecting
    finished = finished ++ [result]

    {pending, collecting, finished}
  end
end