Skip to main content

lib/pi/agent/manager.ex

defmodule Pi.Agent.Manager do
  @moduledoc "Agent job lifecycle manager."

  use GenServer

  alias Pi.Agent.Job
  alias Pi.Agent.JobSupervisor

  defstruct jobs: %{}

  def start_link(opts \\ []), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)

  def install do
    JobSupervisor.install()

    case Process.whereis(__MODULE__) do
      nil ->
        case start_link([]) do
          {:ok, pid} ->
            Process.unlink(pid)
            {:ok, pid}

          other ->
            other
        end

      _pid ->
        :ok
    end
  end

  def start_job(task, opts \\ []) when is_binary(task) do
    install()
    GenServer.call(__MODULE__, {:start_job, task, opts})
  end

  def jobs do
    install()
    GenServer.call(__MODULE__, :jobs)
  end

  def status(id) when is_binary(id) do
    install()
    GenServer.call(__MODULE__, {:status, id})
  end

  def result(id) when is_binary(id) do
    install()
    GenServer.call(__MODULE__, {:result, id})
  end

  def cancel(id) when is_binary(id) do
    install()
    GenServer.call(__MODULE__, {:cancel, id})
  end

  def job_finished(%Job{} = job) do
    install()
    GenServer.cast(__MODULE__, {:job_finished, job})
  end

  @impl true
  def init(_opts), do: {:ok, %__MODULE__{}}

  @impl true
  def handle_call({:start_job, task, opts}, _from, state) do
    job = Job.new(task, opts)

    case JobSupervisor.start_job(job, opts) do
      {:ok, pid} ->
        job = %{job | pid: pid}
        Process.monitor(pid)
        {:reply, {:ok, job}, put_in(state.jobs[job.id], job)}

      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end

  def handle_call(:jobs, _from, state) do
    jobs =
      state.jobs
      |> Map.values()
      |> Enum.sort_by(&(&1.started_at || DateTime.utc_now()), {:desc, DateTime})

    {:reply, jobs, state}
  end

  def handle_call({:status, id}, _from, state) do
    {:reply, fetch_job(state, id), state}
  end

  def handle_call({:result, id}, _from, state) do
    reply =
      case Map.fetch(state.jobs, id) do
        {:ok, %Job{status: :done, result: result}} -> {:ok, result}
        {:ok, %Job{status: :failed, error: error}} -> {:error, error}
        {:ok, %Job{status: :cancelled}} -> {:error, :cancelled}
        {:ok, %Job{status: status}} -> {:error, status}
        :error -> {:error, :not_found}
      end

    {:reply, reply, state}
  end

  def handle_call({:cancel, id}, _from, state) do
    reply =
      case Map.fetch(state.jobs, id) do
        {:ok, %Job{pid: pid, status: :running}} when is_pid(pid) -> Job.cancel(pid)
        {:ok, _job} -> :ok
        :error -> {:error, :not_found}
      end

    {:reply, reply, state}
  end

  @impl true
  def handle_cast({:job_finished, %Job{} = job}, state) do
    {:noreply, put_in(state.jobs[job.id], job)}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
    case Enum.find(state.jobs, fn {_id, job} -> job.pid == pid and job.status == :running end) do
      {id, job} ->
        job = %{
          job
          | status: :failed,
            error: reason,
            finished_at: DateTime.utc_now()
        }

        {:noreply, put_in(state.jobs[id], job)}

      nil ->
        {:noreply, state}
    end
  end

  defp fetch_job(state, id) do
    case Map.fetch(state.jobs, id) do
      {:ok, job} -> {:ok, job}
      :error -> {:error, :not_found}
    end
  end
end