Skip to main content

lib/core/job_store/memory.ex

defmodule Core.JobStore.Memory do
  @moduledoc """
  In-memory job store that retains jobs for the lifetime of the VM.

  Jobs are stored in an Agent and survive `JobQueue` restarts, but are
  lost when the entire VM exits. Suitable for prototyping and testing.

  When used with multiple named queues, pass a unique `:name` in
  `store_opts` so each queue gets its own isolated Agent:

      {Core.Workers.JobQueue,
        name: MyApp.Queue,
        store: Core.JobStore.Memory,
        store_opts: [name: :my_app_queue_store]}

  ## Options

    * `:name` – Agent registration name (default: `Core.JobStore.Memory`)
  """
  @behaviour Core.JobStore
  alias Core.Workers.Job

  @impl true
  def init(opts) do
    agent_name = Keyword.get(opts, :name, __MODULE__)

    case Agent.start(fn -> %{} end, name: agent_name) do
      {:ok, _pid} -> {:ok, agent_name}
      {:error, {:already_started, _pid}} -> {:ok, agent_name}
    end
  end

  @impl true
  def insert_job(agent_name, %Job{} = job) do
    id = System.unique_integer([:positive, :monotonic])
    job = %Job{job | id: id}
    Agent.update(agent_name, &Map.put(&1, id, job))
    {:ok, job}
  end

  @impl true
  def update_job(agent_name, id, changes) do
    Agent.update(agent_name, fn jobs ->
      case Map.get(jobs, id) do
        nil -> jobs
        job -> Map.put(jobs, id, apply_changes(job, changes))
      end
    end)

    :ok
  end

  @impl true
  def get_job(agent_name, id) do
    case Agent.get(agent_name, &Map.get(&1, id)) do
      nil -> {:error, :not_found}
      job -> {:ok, job}
    end
  end

  @impl true
  def list_jobs(agent_name, opts \\ []) do
    jobs = Agent.get(agent_name, &Map.values/1)

    if status = Keyword.get(opts, :status) do
      Enum.filter(jobs, &(&1.status == status))
    else
      jobs
    end
  end

  @impl true
  def cleanup(agent_name, opts) do
    max_age_days = Keyword.get(opts, :max_age_days, 7)
    cutoff = DateTime.add(DateTime.utc_now(), -max_age_days, :day)

    Agent.update(agent_name, fn jobs ->
      Enum.reject(jobs, fn {_id, job} ->
        job.status in [:done, :failed] and
          not is_nil(job.finished_at) and
          DateTime.compare(job.finished_at, cutoff) == :lt
      end)
      |> Map.new()
    end)

    :ok
  end

  # -- Private --

  defp apply_changes(%Job{} = job, changes) do
    Enum.reduce(changes, job, fn
      {:status, v}, %Job{} = acc -> %{acc | status: v}
      {:attempt, v}, %Job{} = acc -> %{acc | attempt: v}
      {:result, v}, %Job{} = acc -> %{acc | result: v}
      {:retry_at, v}, %Job{} = acc -> %{acc | retry_at: v}
      {:run_at, v}, %Job{} = acc -> %{acc | run_at: v}
      {:started_at, v}, %Job{} = acc -> %{acc | started_at: v}
      {:finished_at, v}, %Job{} = acc -> %{acc | finished_at: v}
      _, %Job{} = acc -> acc
    end)
  end
end