Skip to main content

lib/pi/agent/job.ex

defmodule Pi.Agent.Job do
  @moduledoc "Supervised agent job lifecycle handle."

  use GenServer

  alias Pi.Agent.Manager
  alias Pi.Session, as: RuntimeSession
  alias Pi.Session.Event

  @enforce_keys [:id, :task, :child_session_id]
  defstruct [
    :id,
    :task,
    :role,
    :model,
    :parent_session_id,
    :child_session_id,
    :pid,
    :status,
    :result,
    :error,
    :started_at,
    :finished_at,
    :duration_ms
  ]

  @type status :: :running | :done | :failed | :cancelled

  @type t :: %__MODULE__{
          id: String.t(),
          task: String.t(),
          role: atom() | String.t() | nil,
          model: term(),
          parent_session_id: String.t() | nil,
          child_session_id: String.t(),
          pid: pid() | nil,
          status: status(),
          result: term(),
          error: term(),
          started_at: DateTime.t() | nil,
          finished_at: DateTime.t() | nil,
          duration_ms: non_neg_integer() | nil
        }

  @default_timeout 60_000

  def new(task, opts \\ []) when is_binary(task) do
    %__MODULE__{
      id: Keyword.get_lazy(opts, :id, &id/0),
      task: task,
      role: Keyword.get(opts, :role),
      model: Keyword.get(opts, :model),
      parent_session_id: Keyword.get(opts, :parent_session_id),
      child_session_id: Keyword.get_lazy(opts, :child_session_id, &session_id/0),
      status: :running,
      started_at: DateTime.utc_now()
    }
  end

  def start_link({%__MODULE__{} = job, opts}) do
    GenServer.start_link(__MODULE__, {job, opts})
  end

  def cancel(pid), do: GenServer.call(pid, :cancel)

  @impl true
  def init({job, opts}) do
    send(self(), :run)

    {:ok,
     %{
       job: job,
       opts: opts,
       session: nil,
       task: nil,
       task_ref: nil,
       started_at: System.monotonic_time(:millisecond)
     }}
  end

  @impl true
  def handle_info(:run, state) do
    case start_session(state.job, state.opts) do
      {:ok, session} ->
        {:ok, _session_state} = RuntimeSession.subscribe(session, self())
        emit_parent_event(state.job, :agent_job_started, job_data(state.job))
        timeout = Keyword.get(state.opts, :timeout, @default_timeout)

        task =
          Task.async(fn ->
            RuntimeSession.run(
              session,
              state.job.task,
              Keyword.put(state.opts, :timeout, timeout)
            )
          end)

        {:noreply, %{state | session: session, task: task, task_ref: task.ref}}

      {:error, reason} ->
        {:stop, :normal, finish(state, {:error, reason})}
    end
  end

  def handle_info({ref, {:ok, result}}, %{task_ref: ref} = state) do
    Process.demonitor(ref, [:flush])
    {:stop, :normal, finish(%{state | task: nil, task_ref: nil}, {:ok, result})}
  end

  def handle_info({ref, {:error, :cancelled}}, %{task_ref: ref} = state) do
    Process.demonitor(ref, [:flush])
    {:stop, :normal, finish(%{state | task: nil, task_ref: nil}, {:cancelled, :cancelled})}
  end

  def handle_info({ref, {:error, reason}}, %{task_ref: ref} = state) do
    Process.demonitor(ref, [:flush])
    {:stop, :normal, finish(%{state | task: nil, task_ref: nil}, {:error, reason})}
  end

  def handle_info({:DOWN, ref, :process, _pid, reason}, %{task_ref: ref} = state) do
    {:stop, :normal, finish(%{state | task: nil, task_ref: nil}, {:error, reason})}
  end

  def handle_info({:pi_session, _id, _session_state}, state), do: {:noreply, state}

  @impl true
  def handle_call(:cancel, _from, state) do
    if state.session, do: RuntimeSession.cancel(state.session)
    if state.task, do: Task.shutdown(state.task, :brutal_kill)

    job = finish(state, {:cancelled, :cancelled})
    {:stop, :normal, :ok, %{state | job: job}}
  end

  defp start_session(job, opts) do
    session_opts = [
      id: job.child_session_id,
      parent_id: job.parent_session_id,
      name: Keyword.get(opts, :name) || job.role || job.task,
      system: Keyword.get(opts, :system),
      messages: Keyword.get(opts, :messages, []),
      metadata:
        Map.merge(
          %{agent_job_id: job.id, agent_role: job.role},
          Map.new(Keyword.get(opts, :metadata, %{}))
        )
    ]

    RuntimeSession.start(session_opts)
  end

  defp finish(state, {:ok, result}) do
    complete_job(state, :done, result, nil)
  end

  defp finish(state, {:cancelled, reason}) do
    complete_job(state, :cancelled, nil, reason)
  end

  defp finish(state, {:error, reason}) do
    complete_job(state, :failed, nil, reason)
  end

  defp complete_job(state, status, result, error) do
    job = %{
      state.job
      | status: status,
        result: result,
        error: error,
        finished_at: DateTime.utc_now(),
        duration_ms: System.monotonic_time(:millisecond) - state.started_at
    }

    emit_parent_event(job, :agent_job_finished, job_data(job))
    Manager.job_finished(job)
    job
  end

  defp emit_parent_event(%{parent_session_id: nil}, _type, _data), do: :ok

  defp emit_parent_event(%{parent_session_id: parent_session_id}, type, data) do
    with {:ok, parent} <- RuntimeSession.lookup(parent_session_id) do
      RuntimeSession.emit_event(parent, Event.new(type, data))
    end

    :ok
  end

  defp job_data(job) do
    %{
      id: job.id,
      task: job.task,
      role: job.role,
      model: job.model,
      parent_session_id: job.parent_session_id,
      child_session_id: job.child_session_id,
      status: job.status,
      result: job.result,
      error: job.error,
      started_at: datetime(job.started_at),
      finished_at: datetime(job.finished_at),
      duration_ms: job.duration_ms
    }
  end

  defp datetime(nil), do: nil
  defp datetime(%DateTime{} = datetime), do: DateTime.to_iso8601(datetime)
  defp datetime(value), do: value

  defp id, do: "job_#{System.unique_integer([:positive])}"
  defp session_id, do: "session_#{System.unique_integer([:positive])}"
end