lib/pgflow.ex

defmodule PgFlow do
  @pgflow_core_version "0.5.0"

  @moduledoc """
  PgFlow is an Elixir implementation of the pgflow workflow engine.

  It provides a macro-based DSL for defining workflow DAGs that execute
  on PostgreSQL using pgmq for task coordination.

  ## Compatibility

  This Elixir implementation is compatible with pgflow core version #{@pgflow_core_version}.
  It uses the same database schema and SQL functions as the TypeScript/Deno implementation,
  allowing both to run side-by-side against the same database.

  ## Quick Start

      defmodule MyApp.Flows.ProcessOrder do
        use PgFlow.Flow

        @flow slug: :process_order, max_attempts: 3

        step :validate do
          fn input, _ctx ->
            # Validate order data
            %{valid: true, order_id: input["order_id"]}
          end
        end

        step :charge, depends_on: [:validate] do
          fn deps, _ctx ->
            # Charge the customer
            %{charged: true, amount: 100}
          end
        end

        step :fulfill, depends_on: [:charge] do
          fn deps, _ctx ->
            # Fulfill the order
            %{fulfilled: true}
          end
        end
      end

  ## Starting a Flow

      {:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})

  ## Configuration

  Add PgFlow to your supervision tree:

      children = [
        {PgFlow, repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder]}
      ]

  See `PgFlow.Config` for configuration options.
  """

  alias PgFlow.{Client, Config, FlowRegistry, WorkerSupervisor}

  @doc """
  Returns a child specification for starting PgFlow under a supervisor.

  ## Options

  See `PgFlow.Config` for available options.
  """
  @spec child_spec(keyword()) :: Supervisor.child_spec()
  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :supervisor
    }
  end

  @doc """
  Starts the PgFlow supervision tree.

  ## Options

  See `PgFlow.Config` for available options.

  ## Examples

      PgFlow.start_link(repo: MyApp.Repo, flows: [MyApp.Flows.ProcessOrder])

  """
  @spec start_link(keyword()) :: Supervisor.on_start()
  def start_link(opts) do
    config = Config.validate!(opts)
    PgFlow.Supervisor.start_link(config)
  end

  @doc """
  Starts a flow run with the given input.

  The flow can be specified by module name, slug atom, or slug string.

  ## Examples

      {:ok, run_id} = PgFlow.start_flow(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
      {:ok, run_id} = PgFlow.start_flow(:process_order, %{"order_id" => 123})

  """
  @spec start_flow(module() | atom() | String.t(), map()) :: {:ok, String.t()} | {:error, term()}
  defdelegate start_flow(flow_module_or_slug, input), to: Client

  @doc """
  Starts a flow and waits for completion.

  Blocks until the flow completes or the timeout is reached.

  ## Options

    * `:timeout` - Maximum time to wait in milliseconds (default: 60_000)
    * `:poll_interval` - How often to check status in milliseconds (default: 500)

  ## Examples

      {:ok, run} = PgFlow.start_flow_sync(MyApp.Flows.ProcessOrder, %{"order_id" => 123})
      {:error, run} = PgFlow.start_flow_sync(MyApp.Flows.FailingFlow, %{})

  """
  @spec start_flow_sync(module() | atom() | String.t(), map(), keyword()) ::
          {:ok, PgFlow.Schema.Run.t()}
          | {:error, PgFlow.Schema.Run.t()}
          | {:error, :timeout}
          | {:error, term()}
  defdelegate start_flow_sync(flow_module_or_slug, input, opts \\ []), to: Client

  @doc """
  Gets a run by ID.

  ## Examples

      {:ok, run} = PgFlow.get_run("550e8400-e29b-41d4-a716-446655440000")
      {:error, :not_found} = PgFlow.get_run("nonexistent-id")

  """
  @spec get_run(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}
  defdelegate get_run(run_id), to: Client

  @doc """
  Gets a run with all step states preloaded.

  ## Examples

      {:ok, run} = PgFlow.get_run_with_states("550e8400-e29b-41d4-a716-446655440000")
      run.step_states  # => [%StepState{}, ...]

  """
  @spec get_run_with_states(String.t()) :: {:ok, PgFlow.Schema.Run.t()} | {:error, :not_found}
  defdelegate get_run_with_states(run_id), to: Client

  @doc """
  Recompiles a flow definition at runtime.

  Unlike the compile-time DSL (`use PgFlow.Flow`), this creates flow
  definitions from plain data - for per-tenant automations and dynamic workflows.

  If the flow already exists, this operation is destructive: existing
  definition and historical run/task data for the slug are deleted first.

  ## Examples

       PgFlow.upsert_flow("acct_123_hubspot_sync_v1",
         max_attempts: 3,
         steps: [
           %{slug: "reshape", deps: []},
           %{slug: "create_contact", deps: ["reshape"]}
         ]
       )

  """
  @spec upsert_flow(String.t(), keyword()) :: {:ok, map()} | {:error, term()}
  defdelegate upsert_flow(slug, opts), to: Client

  @doc """
  Deletes a flow and all associated data (runs, tasks, queue).

  ## Examples

      PgFlow.delete_flow("acct_123_hubspot_sync_v1")

  """
  @spec delete_flow(String.t()) :: :ok | {:error, term()}
  defdelegate delete_flow(slug), to: Client

  @doc """
  Checks if a flow exists in the database.

  ## Examples

      PgFlow.flow_exists?("my_flow")
      # => {:ok, true}

  """
  @spec flow_exists?(String.t()) :: {:ok, boolean()} | {:error, term()}
  defdelegate flow_exists?(slug), to: Client

  @doc """
  Enqueues a background job with the given input.

  This is the primary API for dispatching jobs. Under the hood, jobs are
  single-step flows, so this delegates to `start_flow/2`.

  ## Examples

      {:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"})

  """
  @spec enqueue(module(), map()) :: {:ok, String.t()} | {:error, term()}
  def enqueue(job_module, input) when is_atom(job_module) and is_map(input) do
    Client.enqueue(job_module, input)
  end

  @doc """
  Enqueues a background job with options.

  Supported options:

    * `:delay_seconds` - non-negative integer seconds before the job is available
    * `:scheduled_at` - `DateTime` when the job should become available

  ## Examples

      {:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, delay_seconds: 60)
      {:ok, run_id} = PgFlow.enqueue(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, scheduled_at: ~U[2026-05-08 12:00:00Z])

  """
  @spec enqueue(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}
  def enqueue(job_module, input, opts) when is_atom(job_module) and is_map(input) do
    Client.enqueue(job_module, input, opts)
  end

  @doc """
  Enqueues a background job that becomes available after `delay_seconds`.

  PgFlow persists the run immediately, then delays the initial pgmq task
  visibility so workers cannot execute it until the delay elapses. Values of
  `0` enqueue the job for immediate execution.

  ## Examples

      {:ok, run_id} = PgFlow.enqueue_in(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, 60)

  """
  @spec enqueue_in(module(), map(), non_neg_integer()) :: {:ok, String.t()} | {:error, term()}
  def enqueue_in(job_module, input, delay_seconds)
      when is_atom(job_module) and is_map(input) do
    Client.enqueue_in(job_module, input, delay_seconds)
  end

  @doc """
  Enqueues a background job that becomes available at `scheduled_at`.

  Timestamps in the past enqueue the job for immediate execution. Any `DateTime`
  time zone is accepted; PgFlow compares the scheduled timestamp as an instant.

  ## Examples

      {:ok, run_id} = PgFlow.enqueue_at(MyApp.Jobs.SendEmail, %{"to" => "user@example.com"}, ~U[2026-05-08 12:00:00Z])

  """
  @spec enqueue_at(module(), map(), DateTime.t()) :: {:ok, String.t()} | {:error, term()}
  def enqueue_at(job_module, input, scheduled_at)
      when is_atom(job_module) and is_map(input) do
    Client.enqueue_at(job_module, input, scheduled_at)
  end

  @doc """
  Starts a worker for the given flow.

  ## Options

    * `:poll_interval` - How often to poll for messages (default: 1000ms)
    * `:visibility_timeout` - How long to hold messages (default: 30s)

  ## Examples

      {:ok, pid} = PgFlow.start_worker(MyApp.Flows.ProcessOrder)

  """
  @spec start_worker(module(), keyword()) :: {:ok, pid()} | {:error, term()}
  def start_worker(flow_module, opts \\ []) do
    WorkerSupervisor.start_worker(flow_module, opts)
  end

  @doc """
  Stops a worker for the given flow.

  ## Examples

      :ok = PgFlow.stop_worker(MyApp.Flows.ProcessOrder)

  """
  @spec stop_worker(module()) :: :ok | {:error, :not_found}
  def stop_worker(flow_module) do
    WorkerSupervisor.stop_worker(flow_module)
  end

  @doc """
  Lists all registered flows.

  ## Examples

      flows = PgFlow.list_flows()
      #=> [%{module: MyApp.Flows.ProcessOrder, slug: :process_order, ...}, ...]

  """
  @spec list_flows() :: [map()]
  def list_flows do
    FlowRegistry.list()
  end

  @doc """
  Gets the definition for a flow by module or slug.

  ## Examples

      {:ok, flow_def} = PgFlow.get_flow(MyApp.Flows.ProcessOrder)
      {:ok, flow_def} = PgFlow.get_flow(:process_order)
      {:error, :not_found} = PgFlow.get_flow(:unknown)

  """
  @spec get_flow(module() | atom()) :: {:ok, map()} | {:error, :not_found}
  def get_flow(flow_module_or_slug) do
    FlowRegistry.get(flow_module_or_slug)
  end

  @doc """
  Returns health check information.

  ## Examples

      PgFlow.health_check()
      #=> %{status: :ok, workers: [...], flows: [...]}

  """
  @spec health_check() :: %{status: :ok, workers: [map()], flows: [map()]}
  def health_check do
    %{
      status: :ok,
      workers: WorkerSupervisor.list_workers(),
      flows: FlowRegistry.list()
    }
  end

  @doc """
  Returns the compatible pgflow core version.

  This version indicates which pgflow database schema and SQL functions
  this Elixir implementation is compatible with.

  ## Examples

      PgFlow.core_version()
      #=> "0.5.0"

  """
  @spec core_version() :: String.t()
  def core_version, do: @pgflow_core_version
end