Skip to main content

lib/core/job_store/sql.ex

defmodule Core.JobStore.SQL do
  @moduledoc """
  Pure helper functions for building standard job-store SQL.

  Use this module when writing a custom `Core.JobStore` adapter for any
  SQL database (PostgreSQL, MySQL, SQLite, etc.).  It handles schema DDL,
  param encoding, and row deserialization so the adapter only needs to
  manage the connection.

  ## Example PostgreSQL adapter skeleton

      defmodule MyApp.JobStore.Postgres do
        @behaviour Core.JobStore
        alias Core.JobStore.SQL

        def init(_opts) do
          {table, index} = SQL.schema()
          Postgrex.query!(conn(), table, [])
          Postgrex.query!(conn(), index, [])
          {:ok, conn()}
        end

        def insert_job(conn, job) do
          {sql, params} = SQL.insert_params(job)
          %{rows: [[id]]} = Postgrex.query!(conn, sql <> " RETURNING id", params)
          {:ok, %Core.Workers.Job{job | id: id}}
        end

        def update_job(conn, id, changes) do
          {sql, params} = SQL.update_params(id, changes)
          Postgrex.query!(conn, sql, params)
          :ok
        end

        def get_job(conn, id) do
          %{rows: rows} = Postgrex.query!(conn, SQL.select_by_id(), [id])
          case rows do
            [row | _] -> {:ok, SQL.from_row(row)}
            []        -> {:error, :not_found}
          end
        end

        def list_jobs(conn, opts) do
          {sql, params} =
            if status = Keyword.get(opts, :status) do
              {SQL.select_by_status(), [Atom.to_string(status)]}
            else
              {SQL.select_all(), []}
            end

          %{rows: rows} = Postgrex.query!(conn, sql, params)
          Enum.map(rows, &SQL.from_row/1)
        end

        def cleanup(conn, opts) do
          {sql, params} = SQL.cleanup_params(Keyword.get(opts, :max_age_days, 7))
          Postgrex.query!(conn, sql, params)
          :ok
        end

        defp conn, do: MyApp.Repo
      end
  """

  alias Core.Workers.Job

  # ------------------------------------------------------------------
  # Schema
  # ------------------------------------------------------------------

  @create_table """
  CREATE TABLE IF NOT EXISTS jobs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    payload TEXT NOT NULL,
    status TEXT NOT NULL,
    attempt INTEGER DEFAULT 0,
    max_attempts INTEGER DEFAULT 3,
    result TEXT,
    retry_at TEXT,
    run_at TEXT,
    inserted_at TEXT NOT NULL,
    started_at TEXT,
    finished_at TEXT
  )
  """

  @create_index "CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status)"

  @doc "Returns the `{create_table, create_index}` DDL strings."
  def schema, do: {@create_table, @create_index}

  # ------------------------------------------------------------------
  # Queries
  # ------------------------------------------------------------------

  @doc "INSERT statement and params for a `%Job{}`."
  def insert_params(%Job{} = job) do
    sql = """
    INSERT INTO jobs
      (payload, status, attempt, max_attempts, result, retry_at, run_at, inserted_at, started_at, finished_at)
    VALUES
      (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
    """

    params = [
      encode_json(job.payload),
      Atom.to_string(job.status),
      job.attempt,
      job.max_attempts,
      encode_json(job.result),
      iso8601(job.retry_at),
      iso8601(job.run_at),
      iso8601(job.inserted_at),
      iso8601(job.started_at),
      iso8601(job.finished_at)
    ]

    {sql, params}
  end

  @doc "UPDATE statement and params. `changes` is a keyword list of fields."
  def update_params(id, changes) do
    {fields, values} = build_set(changes)
    {"UPDATE jobs SET #{Enum.join(fields, ", ")} WHERE id = ?", values ++ [id]}
  end

  @doc "SELECT * with WHERE id = ?"
  def select_by_id, do: "SELECT * FROM jobs WHERE id = ?"

  @doc "SELECT * ordered by insertion time."
  def select_all, do: "SELECT * FROM jobs ORDER BY inserted_at ASC"

  @doc "SELECT * filtered by status."
  def select_by_status, do: "SELECT * FROM jobs WHERE status = ? ORDER BY inserted_at ASC"

  @doc "DELETE statement and cutoff param for old completed / failed jobs."
  def cleanup_params(max_age_days) do
    cutoff = DateTime.add(DateTime.utc_now(), -max_age_days, :day)
    {"DELETE FROM jobs WHERE status IN ('done', 'failed') AND finished_at < ?", [iso8601(cutoff)]}
  end

  # ------------------------------------------------------------------
  # Deserialization
  # ------------------------------------------------------------------

  @doc "Convert a raw row (list of columns in SELECT * order) into a `%Job{}`."
  def from_row([
        id,
        payload,
        status,
        attempt,
        max_attempts,
        result,
        retry_at,
        run_at,
        inserted_at,
        started_at,
        finished_at
      ]) do
    Job.from_map(%{
      id: id,
      payload: payload,
      status: status,
      attempt: attempt,
      max_attempts: max_attempts,
      result: result,
      retry_at: retry_at,
      run_at: run_at,
      inserted_at: inserted_at,
      started_at: started_at,
      finished_at: finished_at
    })
  end

  # ------------------------------------------------------------------
  # Private helpers
  # ------------------------------------------------------------------

  defp encode_json(nil), do: nil
  defp encode_json(val), do: Jason.encode!(val)

  defp iso8601(nil), do: nil
  defp iso8601(%DateTime{} = dt), do: DateTime.to_iso8601(dt)

  defp build_set(changes) do
    Enum.reduce(changes, {[], []}, fn
      {:status, v}, {fs, vs} ->
        {["status = ?" | fs], [Atom.to_string(v) | vs]}

      {:attempt, v}, {fs, vs} ->
        {["attempt = ?" | fs], [v | vs]}

      {:result, v}, {fs, vs} ->
        {["result = ?" | fs], [encode_json(v) | vs]}

      {:retry_at, v}, {fs, vs} ->
        {["retry_at = ?" | fs], [iso8601(v) | vs]}

      {:run_at, v}, {fs, vs} ->
        {["run_at = ?" | fs], [iso8601(v) | vs]}

      {:started_at, v}, {fs, vs} ->
        {["started_at = ?" | fs], [iso8601(v) | vs]}

      {:finished_at, v}, {fs, vs} ->
        {["finished_at = ?" | fs], [iso8601(v) | vs]}

      _, acc ->
        acc
    end)
    |> then(fn {fs, vs} -> {Enum.reverse(fs), Enum.reverse(vs)} end)
  end
end