lib/pgflow/context.ex

defmodule PgFlow.Context do
  @moduledoc """
  Context struct passed to step handler functions.

  The context provides metadata about the current execution environment and
  utilities for accessing flow data.

  ## Fields

    * `:run_id` - UUID of the current flow run
    * `:step_slug` - Slug of the current step being executed
    * `:task_index` - Index of the task within the step (0 for single steps)
    * `:attempt` - Current attempt number (1-indexed)
    * `:flow_input` - Lazy-loaded flow input (use `get_flow_input/1` to access)
    * `:repo` - Ecto repository module for database access

  ## Usage

  Step handlers receive the context as their second argument:

      step :process, depends_on: [:fetch] do
        fn deps, ctx ->
          # Access context fields
          IO.puts("Running step \#{ctx.step_slug} for run \#{ctx.run_id}")
          IO.puts("This is attempt \#{ctx.attempt}")

          # Get flow input if needed
          input = PgFlow.Context.get_flow_input(ctx)

          # Use dependencies from previous steps
          %{result: deps.fetch.data}
        end
      end

  """

  alias PgFlow.Schema.Run

  @type t :: %__MODULE__{
          run_id: Ecto.UUID.t(),
          step_slug: atom(),
          task_index: non_neg_integer(),
          attempt: pos_integer(),
          flow_input: map() | :not_loaded,
          repo: module()
        }

  @enforce_keys [:run_id, :step_slug, :task_index, :attempt, :repo]
  defstruct [:run_id, :step_slug, :task_index, :attempt, :repo, flow_input: :not_loaded]

  @doc """
  Creates a new context struct.

  ## Examples

      ctx = PgFlow.Context.new(
        run_id: "550e8400-e29b-41d4-a716-446655440000",
        step_slug: :process,
        task_index: 0,
        attempt: 1,
        repo: MyApp.Repo
      )

  """
  @spec new(keyword()) :: t()
  def new(opts) do
    struct!(__MODULE__, opts)
  end

  @doc """
  Loads the flow input from the database.

  The flow input is lazily loaded to avoid unnecessary database queries when
  the input is not needed by the step handler.

  Returns the flow input as a map, or raises if the run cannot be found.

  ## Examples

      input = PgFlow.Context.get_flow_input(ctx)
      #=> %{"order_id" => 123, "customer_id" => 456}

  """
  @spec get_flow_input(t()) :: map()
  def get_flow_input(%__MODULE__{flow_input: input}) when is_map(input) do
    input
  end

  def get_flow_input(%__MODULE__{flow_input: :not_loaded, run_id: run_id, repo: repo} = ctx) do
    case repo.get(Run, run_id) do
      nil ->
        raise "Run #{run_id} not found"

      run ->
        input = run.input
        # Cache the loaded input in the context
        loaded_ctx = %{ctx | flow_input: input}
        loaded_ctx.flow_input
    end
  end

  @doc """
  Preloads the flow input into the context.

  This is useful when you want to load the flow input eagerly, such as when
  processing multiple tasks that will all need access to the flow input.

  ## Examples

      ctx = PgFlow.Context.preload_flow_input(ctx)
      # flow_input is now loaded and cached in the context

  """
  @spec preload_flow_input(t()) :: t()
  def preload_flow_input(%__MODULE__{flow_input: input} = ctx) when is_map(input) do
    ctx
  end

  def preload_flow_input(%__MODULE__{run_id: run_id, repo: repo} = ctx) do
    case repo.get(Run, run_id) do
      nil ->
        raise "Run #{run_id} not found"

      run ->
        %{ctx | flow_input: run.input}
    end
  end
end