Skip to main content

lib/core/job_store/sqlite.ex

if Code.ensure_loaded?(Exqlite.Basic) do
  defmodule Core.JobStore.SQLite do
    @moduledoc """
    SQLite-backed job store with WAL mode enabled.

    Jobs are persisted synchronously. On VM restart, `:queued` and `:running`
    jobs are reloaded. `:running` jobs are reset to `:queued` so workers can
    reclaim them.

    This is a **stateful** adapter: `init/1` returns the database path as
    opaque state, and every subsequent callback receives that state as its
    first argument. This allows multiple `JobQueue` processes to each use
    their own SQLite file without sharing global configuration.

    ## Configuration

        {Core.Workers.JobQueue,
          store: Core.JobStore.SQLite,
          store_opts: [database: "priv/jobs.db"]}

    Requires `{:exqlite, "~> 0.29"}` in your `mix.exs` deps.

    ## Options

    * `:database` – path to the SQLite file (default: `"jobs.db"`)

    ## Connection model

    Each operation opens and closes its own connection.  This is simple but
    limits throughput (roughly ~1,000 ops/sec on a typical SSD).  Because
    `Core.Workers.JobQueue` is a single GenServer, calls are serialized, so
    the risk of a `last_insert_rowid()` race between concurrent inserts is
    low.  For higher throughput implement a connection-pool adapter.
    """
    @behaviour Core.JobStore
    alias Exqlite.Basic
    alias Core.JobStore.SQL
    alias Core.Workers.Job

    # ------------------------------------------------------------------
    # Callbacks
    # ------------------------------------------------------------------

    @impl true
    def init(opts) do
      db_path = Keyword.get(opts, :database, "jobs.db")
      db_path |> Path.dirname() |> File.mkdir_p!()

      conn = open!(db_path)
      {table, index} = SQL.schema()
      Basic.exec(conn, table)
      Basic.exec(conn, index)
      Basic.close(conn)
      {:ok, db_path}
    end

    @impl true
    def insert_job(db_path, %Job{} = job) do
      {sql, params} = SQL.insert_params(job)
      conn = open!(db_path)
      Basic.exec(conn, sql, params)
      {:ok, [[id]], _cols} = Basic.exec(conn, "SELECT last_insert_rowid()") |> Basic.rows()
      Basic.close(conn)

      {:ok, %Job{job | id: id}}
    end

    @impl true
    def update_job(db_path, id, changes) do
      {sql, params} = SQL.update_params(id, changes)
      conn = open!(db_path)
      Basic.exec(conn, sql, params)
      Basic.close(conn)
      :ok
    end

    @impl true
    def get_job(db_path, id) do
      conn = open!(db_path)
      {:ok, rows, _cols} = Basic.exec(conn, SQL.select_by_id(), [id]) |> Basic.rows()
      Basic.close(conn)

      case rows do
        [row | _] -> {:ok, SQL.from_row(row)}
        [] -> {:error, :not_found}
      end
    end

    @impl true
    def list_jobs(db_path, opts \\ []) do
      {sql, params} =
        if status = Keyword.get(opts, :status) do
          {SQL.select_by_status(), [Atom.to_string(status)]}
        else
          {SQL.select_all(), []}
        end

      conn = open!(db_path)
      {:ok, rows, _cols} = Basic.exec(conn, sql, params) |> Basic.rows()
      Basic.close(conn)

      Enum.map(rows, &SQL.from_row/1)
    end

    @impl true
    def cleanup(db_path, opts) do
      {sql, params} = SQL.cleanup_params(Keyword.get(opts, :max_age_days, 7))
      conn = open!(db_path)
      Basic.exec(conn, sql, params)
      Basic.close(conn)
      :ok
    end

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    defp open!(db_path) do
      {:ok, conn} = Basic.open(db_path)
      {:ok, _query, _result, _conn} = Basic.exec(conn, "PRAGMA journal_mode = WAL")
      conn
    end
  end
end