lib/tasque.ex

defmodule Tasque do
  @moduledoc """
  An asynchronous, bounded-concurrency task queue for Elixir.

  Tasque lets you enqueue work — anonymous functions or MFA tuples — and
  execute it asynchronously under a supervised `Task.Supervisor`, with a
  configurable ceiling on simultaneous tasks. Results are delivered back to
  the caller via plain OTP messages, so you can choose between blocking
  (`await/2`) and non-blocking (selective `receive`) consumption styles.

  ## Quick Start

  Add a Tasque instance to your application's supervision tree:

      children = [
        {Tasque, name: MyApp.Queue, max_concurrency: 10}
      ]

      Supervisor.start_link(children, strategy: :one_for_one)

  Then enqueue work from anywhere:

      {:ok, ref} = Tasque.queue_task(MyApp.Queue, fn -> expensive_work() end)

      # Block until the result arrives (or timeout):
      {:ok, result} = Tasque.await(ref)

  ## Result Format

  Every task result is delivered as a `{:tasque_result, ref, outcome}` message
  to the process that called `queue_task/3`. The `outcome` is one of:

  | Outcome | Meaning |
  |---|---|
  | `{:ok, result}` | Task completed successfully and returned `result` |
  | `{:exit, reason}` | Task crashed, was killed, or timed out |

  When using `await/2`, the outcome tuple is returned directly.

  ## Task Formats

  Tasks can be provided as either a zero-arity anonymous function or an
  MFA tuple. MFA tuples are normalized internally to `fn -> apply(m, f, a) end`.

      # Anonymous function
      Tasque.queue_task(queue, fn -> String.upcase("hello") end)

      # MFA tuple
      Tasque.queue_task(queue, {String, :upcase, ["hello"]})

  ## Supervision Architecture

  Each Tasque instance starts a small supervision subtree:

      Tasque.Supervisor (:one_for_all)
      ├── Task.Supervisor   — runs tasks via async_nolink
      └── Tasque.Queue      — GenServer managing the FIFO queue

  The `:one_for_all` strategy ensures that if either child crashes, both
  are restarted together, keeping the system in a consistent state. The
  `Task.Supervisor` starts first so its name is available to `Tasque.Queue`
  at init time.

  ## Concurrency Control

  When the number of in-flight tasks is below `:max_concurrency`, enqueued
  work is dispatched immediately. Once the limit is reached, tasks wait in
  a FIFO queue and are dispatched as running tasks complete, crash, or
  time out — each of these events frees a concurrency slot.

  ## Fault Isolation

  Tasks run under `Task.Supervisor.async_nolink/2`, so a crashing task:

    * Does **not** crash the queue GenServer
    * Does **not** affect other concurrently running tasks
    * Produces an `{:exit, reason}` result delivered to the original caller
    * Frees its concurrency slot, allowing the next queued task to dispatch

  ## Per-Task Timeouts

  You can set a timeout on individual tasks at enqueue time:

      Tasque.queue_task(queue, fn -> slow_work() end, timeout: 5_000)

  The timeout starts when the task is enqueued, so it includes time spent
  waiting in the queue as well as time spent running.

  If the timeout fires while the task is still queued, the queue tombstones
  it, delivers `{:exit, :timeout}` to the caller, and skips it during
  dispatch.

  If the timeout fires while the task is already running:

    1. The task process is killed (`Task.Supervisor.terminate_child/2`)
    2. `{:exit, :timeout}` is delivered to the caller
    3. The concurrency slot is freed

  Pass `timeout: :infinity` to disable the per-task timeout. Otherwise,
  `:timeout` must be a positive integer in milliseconds. If no `:timeout`
  option is given, the task runs without a timeout.

  > #### Await timeout vs. task timeout {: .warning}
  >
  > `Tasque.await/2`'s timeout controls how long the *caller* is willing
  > to wait. It does **not** cancel the task — the task will continue
  > running, its result will still be delivered to the caller's mailbox,
  > and it will still occupy a concurrency slot.
  >
  > The `:timeout` option on `queue_task/3` is enforced by the queue
  > itself. If it fires while the task is already running, the task
  > process is terminated. If it fires while the task is still queued,
  > the task is dropped before dispatch.

  ## Multiple Queues

  You can run independent queues with different concurrency limits for
  different workload categories:

      children = [
        {Tasque, name: MyApp.CpuQueue, max_concurrency: System.schedulers_online()},
        {Tasque, name: MyApp.IoQueue,  max_concurrency: 50}
      ]

  Each queue maintains its own task supervisor, dispatch loop, and
  concurrency budget.

  ## Caller Refs

  The reference returned by `queue_task/3` is a **caller ref** — a
  `make_ref()` created by the queue GenServer and decoupled from the
  internal `Task` ref used by the task supervisor. This means:

    * The caller never holds a direct reference to the underlying task process
    * Refs are unique per enqueue call, even if the same function is submitted twice
    * The queue maintains the mapping from internal task refs to caller refs

  ## Edge Cases

    * **Caller dies before task completes** — the result message is sent
      to a dead PID and silently dropped. The task still runs to completion
      (or timeout) and the concurrency slot is freed normally.

    * **Await times out** — `await/2` returns `{:error, :timeout}` but
      the task keeps running. The `{:tasque_result, ref, outcome}` message
      will eventually arrive in the caller's mailbox. You can call `await/2`
      again with the same ref to retrieve it, or use a selective `receive`.
  """

  @default_await_timeout 5_000

  @doc false
  def task_supervisor_name(name) when is_atom(name), do: :"#{name}.TaskSupervisor"
  def task_supervisor_name({:global, term}), do: {:global, {term, :task_supervisor}}

  def task_supervisor_name({:via, Registry, {registry, key}}),
    do: {:via, Registry, {registry, {key, :task_supervisor}}}

  def task_supervisor_name({:via, module, term}), do: {:via, module, {term, :task_supervisor}}

  @doc false
  def supervisor_name(name) when is_atom(name), do: :"#{name}.Supervisor"
  def supervisor_name({:global, term}), do: {:global, {term, :supervisor}}

  def supervisor_name({:via, Registry, {registry, key}}),
    do: {:via, Registry, {registry, {key, :supervisor}}}

  def supervisor_name({:via, module, term}), do: {:via, module, {term, :supervisor}}

  @doc """
  Starts a Tasque instance directly.

  This is primarily useful for testing or starting a queue dynamically.
  In production applications, it is heavily recommended to start `Tasque`
  as part of your application's supervision tree instead of calling
  `start_link/1` directly.

  See `child_spec/1` for supported options.
  """
  defdelegate start_link(opts), to: Tasque.Supervisor

  @doc """
  Returns a child specification for starting a Tasque instance under a supervisor.

  This is invoked automatically when you use `{Tasque, opts}` tuple syntax in
  a supervision tree.

  ## Options

    * `:name` (required) — the name used to register the queue. Can be an
      atom, a `{:global, term}` tuple, or a `{:via, module, term}` tuple.
      Internal processes derive their names from this value.
      For example, if `name: MyApp.Queue` is provided, the derived names
      are `MyApp.Queue.TaskSupervisor` and `MyApp.Queue.Supervisor`.
      For `:global` and `:via` names, Tasque derives matching companion
      names using the same naming strategy.

    * `:max_concurrency` — the maximum number of tasks that may execute
      simultaneously. Defaults to `10`.

  ## Examples

  As part of a supervision tree:

      children = [
        {Tasque, name: MyApp.Queue, max_concurrency: 5}
      ]

  Or build the spec manually:

      Tasque.child_spec(name: MyApp.Queue, max_concurrency: 5)
      #=> %{id: MyApp.Queue, start: {Tasque.Supervisor, :start_link, [[name: MyApp.Queue, max_concurrency: 5]]}, type: :supervisor}

  """
  def child_spec(opts) do
    name = Keyword.fetch!(opts, :name)

    %{
      id: name,
      start: {Tasque.Supervisor, :start_link, [opts]},
      type: :supervisor
    }
  end

  @doc """
  Enqueue a task for asynchronous execution. Returns immediately with
  `{:ok, ref}`, where `ref` is a unique reference for tracking the result.

  The result will be delivered as a `{:tasque_result, ref, outcome}` message
  to the calling process. Use `await/2` or a selective `receive` to consume it.

  ## Task Formats

    * **Zero-arity function** — `fn -> :work end`
    * **MFA tuple** — `{Module, :function, [args]}`

  ## Options

    * `:timeout` — per-task timeout as a positive integer in milliseconds,
      or `:infinity`. The timeout starts when the task is enqueued, so it
      includes time spent waiting in the queue as well as time spent running.
      If it fires while queued, the task is dropped before dispatch; if it
      fires while running, the task process is killed. In either case,
      `{:exit, :timeout}` is delivered. If omitted, no timeout is applied.

  ## Returns

    * `{:ok, ref}` on success.
    * `{:error, :invalid_task}` if the task is neither a zero-arity function nor an MFA tuple.

  ## Examples

      iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskDoc, max_concurrency: 5)
      iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskDoc, fn -> 1 + 1 end)
      iex> is_reference(ref)
      true
      iex> Tasque.await(ref)
      {:ok, 2}

  With an MFA tuple:

      iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskMfaDoc, max_concurrency: 5)
      iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskMfaDoc, {String, :upcase, ["hello"]})
      iex> Tasque.await(ref)
      {:ok, "HELLO"}

  With a per-task timeout:

      iex> {:ok, _} = Tasque.start_link(name: Tasque.QueueTaskTimeoutDoc, max_concurrency: 5)
      iex> {:ok, ref} = Tasque.queue_task(Tasque.QueueTaskTimeoutDoc, fn -> Process.sleep(:infinity) end, timeout: 50)
      iex> receive do
      ...>   {:tasque_result, ^ref, outcome} -> outcome
      ...> after
      ...>   1_000 -> :never_arrived
      ...> end
      {:exit, :timeout}

  """
  @type task :: (-> any()) | {module(), atom(), [any()]}

  @spec queue_task(GenServer.server(), task(), keyword()) ::
          {:ok, ref :: reference()} | {:error, :invalid_task}
  def queue_task(queue, task, opts \\ []) do
    validate_queue_task_opts!(opts)
    GenServer.call(queue, {:queue_task, task, opts})
  end

  @doc """
  Block the calling process until the task identified by `ref` completes
  or the timeout expires.

  Returns the task outcome directly:

    * `{:ok, result}` — task completed successfully
    * `{:exit, reason}` — task crashed or was killed (including `:timeout`)
    * `{:error, :timeout}` — the *await* timed out (task is still running)

  The default timeout is #{@default_await_timeout} ms.

  > #### Important {: .info}
  >
  > An await timeout does **not** cancel the task. The task continues
  > running, occupies its concurrency slot, and its result message will
  > still arrive in the caller's mailbox. If you need to enforce a hard
  > deadline, use the `:timeout` option on `queue_task/3` instead.

  ## Examples

      iex> {:ok, _} = Tasque.start_link(name: Tasque.AwaitDoc, max_concurrency: 5)
      iex> {:ok, ref} = Tasque.queue_task(Tasque.AwaitDoc, fn -> 42 end)
      iex> Tasque.await(ref)
      {:ok, 42}

  With a custom timeout:

      iex> {:ok, _} = Tasque.start_link(name: Tasque.AwaitTimeoutDoc, max_concurrency: 5)
      iex> {:ok, ref} = Tasque.queue_task(Tasque.AwaitTimeoutDoc, fn -> Process.sleep(:infinity) end)
      iex> Tasque.await(ref, 50)
      {:error, :timeout}

  """
  @spec await(ref :: reference(), timeout :: timeout()) ::
          {:ok, result :: any()} | {:exit, reason :: any()} | {:error, :timeout}
  def await(ref, timeout \\ @default_await_timeout) do
    receive do
      {:tasque_result, ^ref, result} -> result
    after
      timeout -> {:error, :timeout}
    end
  end

  # Don't allow negative or other invalid arguments for timeout
  defp validate_queue_task_opts!(opts) do
    case Keyword.get(opts, :timeout) do
      nil ->
        :ok

      :infinity ->
        :ok

      timeout when is_integer(timeout) and timeout > 0 ->
        :ok

      timeout ->
        raise ArgumentError,
              "timeout must be a positive integer or :infinity, got: #{inspect(timeout)}"
    end
  end
end