Skip to main content

lib/arrea.ex

defmodule Arrea do
  @moduledoc """
  Fachada principal del orquestador `Arrea`.

  Responsable de:
  - Ejecución paralela de tareas y comandos
  - Gestión de workers vía `Arrea.Leader`
  - Tolerancia a fallos con `Arrea.CircuitBreaker`
  - Reporte de estado vía `Arrea.Monitor`

  ## Arquitectura

      ┌─────────────────────────────────────────────────────────┐
      │                      Arrea                              │
      │                    (Fachada)                             │
      └─────────────────────────┬───────────────────────────────┘
                               │
      ┌────────────────────────▼───────────────────────────────┐
      │              Arrea.Leader (GenServer)                   │
      │         Coordina ejecución, gestiona workers           │
      │         Emite eventos {:leader_event, event}           │
      └─────────────────────────┬───────────────────────────────┘
                               │
      ┌────────────────────────▼───────────────────────────────┐
      │       Arrea.WorkerSupervisor (DynamicSupervisor)       │
      │              Workers efímeros                          │
      └─────────────────────────┬───────────────────────────────┘
                               │
      ┌────────────────────────▼───────────────────────────────┐
      │              Arrea.Worker (GenServer)                  │
      │              Ejecuta tareas individuales                │
      └─────────────────────────────────────────────────────────┘

      Arrea.Monitor — Estadísticas del ciclo de vida de workers
      Arrea.CircuitBreaker — Tolerancia a fallos

  El árbol de supervisión arranca automáticamente al incluir Arrea como
  dependencia. No es necesario arrancar `Arrea.Supervisor` manualmente.

  ## Uso rápido

      # Ejecución simple
      {:ok, result} = Arrea.execute(fn -> :ok end)

      # Ejecución paralela
      {:ok, results} = Arrea.run([fn -> 1 end, fn -> 2 end], workers: 4)

      # Suscripción a eventos del Leader
      :ok = Arrea.subscribe()

      receive do
        {:leader_event, %{type: :worker_started} = event} ->
          IO.inspect(event)
        {:leader_event, %{type: :finished} = event} ->
          IO.inspect(event)
      end

      :ok = Arrea.unsubscribe()
  """

  alias Arrea.Config, as: Config
  alias Arrea.{Leader, Monitor, Parallel}

  @type execution_option ::
          {:workers, non_neg_integer()}
          | {:timeout, non_neg_integer()}
          | {:retry, boolean()}

  @doc """
  Devuelve el número máximo de workers configurados.

  ## Ejemplo

      iex> Arrea.max_workers()
      100
  """
  @spec max_workers() :: non_neg_integer()
  def max_workers, do: Config.get(:max_workers, 100)

  @doc """
  Ejecuta un comando único de forma síncrona.

  ## Parámetros

    * `cmd` — Un binary (comando shell) o una función sin argumentos
    * `opts` — Opciones adicionales:
      - `:timeout` — Timeout en ms (por defecto `30_000`). Timeout real: cancela la ejecución.
      - `:retry` — Si se debe reintentar en caso de fallo
      - `:shell` — Shell a usar (máxima prioridad sobre config y entorno)

  ## Retorna

    * `{:ok, Arrea.Result.t()}` — Éxito con resultado
    * `{:error, Arrea.Error.t()}` — Error con código y mensaje

  ## Ejemplos

      iex> Arrea.execute("echo hello")
      {:ok, %Arrea.Result{success: true, data: %{stdout: "hello\\n", ...}, failures: []}}

      iex> Arrea.execute(fn -> :work end)
      {:ok, %Arrea.Result{success: true, data: :work, failures: []}}
  """
  @spec execute(binary() | (-> term()), [execution_option()]) ::
          {:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}
  def execute(cmd, opts \\ []) when is_binary(cmd) or is_function(cmd, 0) do
    start_time = System.monotonic_time()

    :telemetry.execute(
      [:arrea, :engine, :execute, :start],
      %{},
      %{command: safe_command_label(cmd)}
    )

    result = Parallel.execute(cmd, opts)

    duration_ms =
      System.convert_time_unit(
        System.monotonic_time() - start_time,
        :native,
        :millisecond
      )

    case result do
      {:ok, r} ->
        :telemetry.execute(
          [:arrea, :engine, :execute, :stop],
          %{duration: duration_ms},
          %{command: safe_command_label(cmd), success: true}
        )

        {:ok, %Arrea.Result{success: true, data: r, failures: []}}

      {:error, reason} ->
        :telemetry.execute(
          [:arrea, :engine, :execute, :error],
          %{duration: duration_ms},
          %{command: safe_command_label(cmd), reason: inspect(reason)}
        )

        {:error, %Arrea.Error{code: :engine_failure, message: inspect(reason)}}
    end
  catch
    :exit, reason ->
      {:error, %Arrea.Error{code: :process_exited, message: inspect(reason)}}
  end

  @doc """
  Ejecuta múltiples comandos en paralelo.

  ## Parámetros

    * `commands` — Lista de binaries o funciones
    * `opts` — Opciones:
      - `:workers` — Número máximo de workers paralelos (por defecto `max_workers()`)
      - `:timeout` — Timeout total en ms

  ## Retorna

    * `{:ok, Arrea.Result.t()}` — Con `batch_id` para correlacionar eventos
    * `{:error, Arrea.Error.t()}` — Si todos fallaron o no hay workers disponibles

  ## Ejemplo

      iex> {:ok, result} = Arrea.run([fn -> 1 end, fn -> 2 end, fn -> 3 end], workers: 2)
      iex> result.data.batch_id
      "batch_..."
  """
  @spec run([binary() | (-> term())], [execution_option()]) ::
          {:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}
  def run(commands, opts \\ []) when is_list(commands) do
    workers = Keyword.get(opts, :workers, max_workers())

    :telemetry.execute(
      [:arrea, :engine, :run, :start],
      %{},
      %{count: length(commands), workers: workers}
    )

    case Parallel.run(commands, opts) do
      {:ok, batch_id} ->
        :telemetry.execute(
          [:arrea, :engine, :run, :stop],
          %{},
          %{batch_id: batch_id}
        )

        {:ok,
         %Arrea.Result{
           success: true,
           data: %{batch_id: batch_id, pending: length(commands)},
           failures: []
         }}

      {:ok, batch_id, info} ->
        :telemetry.execute(
          [:arrea, :engine, :run, :stop],
          %{},
          %{batch_id: batch_id, partial: true}
        )

        {:ok,
         %Arrea.Result{
           success: true,
           data: Map.merge(%{batch_id: batch_id, pending: Map.get(info, :started, 0)}, info),
           failures: []
         }}

      {:error, reason} ->
        {:error, %Arrea.Error{code: :parallel_failure, message: inspect(reason)}}
    end
  end

  @doc """
  Suscribe el proceso actual a los eventos semánticos del Leader.

  Los mensajes recibidos tienen la forma `{:leader_event, event}` donde
  `event` es un mapa con al menos la clave `:type`.

  Tipos de evento habituales:
  - `%{type: :worker_started, worker_id: id}`
  - `%{type: :progress, worker_id: id, percent: float, ...}`
  - `%{type: :finished, worker_id: id}`
  - `%{type: :error, worker_id: id, reason: term}`
  - `%{type: :result, worker_id: id, data: term}`

  Para desuscribirse, llamar a `unsubscribe/0`.

  ## Ejemplo

      :ok = Arrea.subscribe()

      receive do
        {:leader_event, %{type: :finished, worker_id: id}} ->
          IO.puts("Worker \#{id} terminó")
      end
  """
  @spec subscribe() :: :ok
  def subscribe do
    Leader.subscribe()
  end

  @doc """
  Cancela la suscripción del proceso actual a los eventos del Leader.

  ## Ejemplo

      :ok = Arrea.unsubscribe()
  """
  @spec unsubscribe() :: :ok
  def unsubscribe do
    Leader.unsubscribe()
  end

  @doc """
  Obtiene las estadísticas actuales del Engine.

  Las estadísticas las provee `Arrea.Monitor`, que trackea el ciclo de vida
  de todos los workers arrancados bajo el Leader.

  ## Ejemplo

      iex> {:ok, stats} = Arrea.stats()
      iex> Map.keys(stats)
      [:active_workers, :completed_tasks, :failed_tasks, :total_workers]
  """
  @spec stats() :: {:ok, map()} | {:error, :monitor_unavailable}
  def stats do
    Monitor.get_stats()
  end

  # ── Helpers privados ──────────────────────────────────────────────────────

  @spec safe_command_label(binary() | function()) :: String.t()
  defp safe_command_label(cmd) when is_binary(cmd), do: String.slice(cmd, 0, 100)

  defp safe_command_label(fun) when is_function(fun),
    do: "function/#{:erlang.fun_info(fun)[:arity]}"
end