lib/reactor/executor/state.ex

defmodule Reactor.Executor.State do
  @moduledoc """
  Contains the reactor execution state.

  This is run-time only information.
  """

  @defaults %{
    async?: true,
    halt_timeout: 5000,
    max_iterations: :infinity,
    timeout: :infinity
  }

  defstruct async?: @defaults.async?,
            concurrency_key: nil,
            current_tasks: %{},
            errors: [],
            halt_timeout: @defaults.halt_timeout,
            max_concurrency: nil,
            max_iterations: @defaults.max_iterations,
            pool_owner: false,
            retries: %{},
            started_at: nil,
            timeout: @defaults.timeout

  alias Reactor.{Executor.ConcurrencyTracker, Step}

  @type t :: %__MODULE__{
          async?: boolean,
          concurrency_key: ConcurrencyTracker.pool_key(),
          current_tasks: %{Task.t() => Step.t()},
          errors: [any],
          halt_timeout: pos_integer() | :infinity,
          max_concurrency: pos_integer(),
          max_iterations: pos_integer() | :infinity,
          pool_owner: boolean,
          retries: %{reference() => pos_integer()},
          started_at: DateTime.t(),
          timeout: pos_integer() | :infinity
        }

  @doc false
  @spec init(map) :: t
  def init(attrs \\ %{}) do
    @defaults
    |> Map.merge(attrs)
    |> do_init()
  end

  defp do_init(attrs) do
    attrs
    |> maybe_set_max_concurrency()
    |> maybe_allocate_concurrency_pool()
    |> Map.put(:started_at, DateTime.utc_now())
    |> then(&struct(__MODULE__, &1))
  end

  defp maybe_set_max_concurrency(attrs)
       when is_integer(attrs.max_concurrency) and attrs.max_concurrency > 0,
       do: attrs

  defp maybe_set_max_concurrency(attrs) when attrs.async? == false,
    do: Map.put(attrs, :max_concurrency, 0)

  defp maybe_set_max_concurrency(attrs),
    do: Map.put(attrs, :max_concurrency, System.schedulers_online())

  defp maybe_allocate_concurrency_pool(attrs) when is_reference(attrs.concurrency_key) do
    attrs
    |> Map.put(:pool_owner, false)
  end

  defp maybe_allocate_concurrency_pool(attrs) do
    attrs
    |> Map.put(:concurrency_key, ConcurrencyTracker.allocate_pool(attrs.max_concurrency))
    |> Map.put(:pool_owner, true)
  end
end