# lib/core/workers/job_queue.ex
defmodule Core.Workers.JobQueue do
@moduledoc """
FIFO job queue backed by a pluggable `Core.JobStore`.
Each `JobQueue` is a GenServer that maintains an in-memory queue of job IDs
and a map of job structs. Jobs are persisted via the store so they survive
VM restarts (when using a durable backend such as `Core.JobStore.SQLite`).
## Named queues
You can start multiple isolated queues by giving each a unique `:name`:
{Core.Workers.JobQueue, name: MyApp.EmailQueue, pool: MyApp.EmailPool}
{Core.Workers.JobQueue, name: MyApp.MediaQueue, pool: MyApp.MediaPool}
All public functions accept a server reference (atom or pid) as the first
argument. The zero-arity versions operate on the default queue
`Core.Workers.JobQueue`.
## Options
* `:name` – registered name for this queue (default: `Core.Workers.JobQueue`)
* `:pool` – the `WorkerPool` name to notify when new jobs arrive
* `:store` – `Core.JobStore` module (default: `Core.JobStore.Memory`)
* `:store_opts` – options passed to the store's `init/1`
* `:cleanup_interval_ms` – how often to purge old jobs (default: 1 hour)
* `:max_age_days` – retention for completed / failed jobs (default: 7)
## Worker notification
When a job is submitted (or a retry is scheduled), `JobQueue` calls
`notify_workers/1` which sends `:work_available` to every worker process
under the configured `WorkerPool`. Workers use this as an immediate
wake-up signal in addition to their fallback polling timer.
"""
use GenServer
require Logger
alias Core.Workers.Job
@default_cleanup_interval_ms :timer.hours(1)
@default_max_age_days 7
@max_dequeue_scan 1000
## ============================================================
## Client API
## ============================================================
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
@doc """
Submit a new job to the queue.
Returns synchronously with the assigned job id.
"""
def submit(payload) do
submit(__MODULE__, payload, [])
end
def submit(payload, opts) do
submit(__MODULE__, payload, opts)
end
def submit(server, payload, opts) do
max_attempts = Keyword.get(opts, :max_attempts, 3)
job = %Job{
payload: payload,
inserted_at: DateTime.utc_now(),
status: :queued,
max_attempts: max_attempts
}
GenServer.call(server, {:submit, job})
end
@doc """
Submit a job to run at a specific future time.
The job is persisted immediately but enters the in-memory queue only at `run_at`.
"""
def submit_at(payload, %DateTime{} = run_at) do
submit_at(__MODULE__, payload, run_at, [])
end
def submit_at(payload, %DateTime{} = run_at, opts) do
submit_at(__MODULE__, payload, run_at, opts)
end
def submit_at(server, payload, %DateTime{} = run_at) do
submit_at(server, payload, run_at, [])
end
def submit_at(server, payload, %DateTime{} = run_at, opts) do
delay_ms = max(DateTime.diff(run_at, DateTime.utc_now(), :millisecond), 0)
max_attempts = Keyword.get(opts, :max_attempts, 3)
job = %Job{
payload: payload,
inserted_at: DateTime.utc_now(),
status: :queued,
max_attempts: max_attempts,
run_at: run_at
}
GenServer.call(server, {:submit_scheduled, job, delay_ms})
end
@doc """
Claim the next available job for processing.
Marks it as :running and returns it to the worker.
"""
def claim_next do
claim_next(__MODULE__)
end
def claim_next(server) do
GenServer.call(server, :claim_next)
end
@doc """
Mark a job as completed with a result
"""
def mark_done(id, result) do
mark_done(__MODULE__, id, result)
end
def mark_done(server, id, result) do
GenServer.call(server, {:finish_job, id, :done, result})
end
@doc """
Mark a job as failed with an error reason
"""
def mark_failed(id, reason) do
mark_failed(__MODULE__, id, reason)
end
def mark_failed(server, id, reason) do
GenServer.call(server, {:fail_job, id, reason})
end
@doc """
Get all jobs in the queue
"""
def all do
all(__MODULE__, [])
end
def all(opts) do
all(__MODULE__, opts)
end
def all(server, opts) do
GenServer.call(server, {:all, opts})
end
@doc """
Get a specific job by ID
"""
def get(id) do
get(__MODULE__, id)
end
def get(server, id) do
GenServer.call(server, {:get, id})
end
def stats do
stats(__MODULE__)
end
def stats(server) do
GenServer.call(server, :stats)
end
@doc """
Reset the queue — clears all in-memory state. Intended for test use only.
"""
def reset do
reset(__MODULE__)
end
def reset(server) do
GenServer.call(server, :reset)
end
## ============================================================
## GenServer Callbacks
## ============================================================
@impl true
def init(opts) do
store = Keyword.get(opts, :store, Core.JobStore.Memory)
store_opts = Keyword.get(opts, :store_opts, [])
cleanup_interval = Keyword.get(opts, :cleanup_interval_ms, @default_cleanup_interval_ms)
max_age_days = Keyword.get(opts, :max_age_days, @default_max_age_days)
pool_name = Keyword.get(opts, :pool, Core.Workers.WorkerPool)
case store.init(store_opts) do
{:ok, store_state} ->
now = DateTime.utc_now()
# Load queued jobs. Split into "ready now" vs "scheduled for future".
all_queued = store.list_jobs(store_state, status: :queued)
{ready_jobs, future_jobs} =
Enum.split_with(all_queued, fn job ->
is_nil(job.run_at) or DateTime.compare(job.run_at, now) != :gt
end)
# Recover running jobs as queued (they never finished)
running_jobs = store.list_jobs(store_state, status: :running)
recovered_jobs =
Enum.map(running_jobs, fn %Job{} = job ->
recovered = %Job{job | status: :queued, attempt: job.attempt + 1}
:ok =
store.update_job(store_state, job.id, status: :queued, attempt: recovered.attempt)
recovered
end)
all_ready = ready_jobs ++ recovered_jobs
jobs_map =
all_ready
|> Enum.map(fn job -> {job.id, job} end)
|> Map.new()
queue =
all_ready
|> Enum.sort_by(& &1.inserted_at)
|> Enum.reduce(:queue.new(), fn job, q -> :queue.in(job.id, q) end)
# Re-schedule future jobs
Enum.each(future_jobs, fn job ->
delay_ms = max(DateTime.diff(job.run_at, now, :millisecond), 0)
Process.send_after(self(), {:enqueue_delayed, job.id}, delay_ms)
end)
# Schedule periodic cleanup
schedule_cleanup(cleanup_interval)
{:ok,
%{
queue: queue,
jobs: jobs_map,
store: store,
store_state: store_state,
store_opts: store_opts,
cleanup_interval_ms: cleanup_interval,
max_age_days: max_age_days,
pool_name: pool_name
}}
{:error, reason} ->
{:stop, reason}
end
end
@impl true
def handle_call(
{:submit, job},
_from,
%{queue: queue, jobs: jobs, store: store, store_state: store_state, pool_name: pool_name} =
state
) do
{:ok, job} = store.insert_job(store_state, job)
updated_queue = :queue.in(job.id, queue)
updated_jobs = Map.put(jobs, job.id, job)
# Wake all idle workers immediately
notify_workers(pool_name)
{:reply, {:ok, job.id}, %{state | queue: updated_queue, jobs: updated_jobs}}
end
@impl true
def handle_call(
{:submit_scheduled, job, delay_ms},
_from,
%{jobs: jobs, store: store, store_state: store_state} = state
) do
{:ok, job} = store.insert_job(store_state, job)
# Persisted but not yet in the in-memory queue
updated_jobs = Map.put(jobs, job.id, job)
Process.send_after(self(), {:enqueue_delayed, job.id}, delay_ms)
{:reply, {:ok, job.id}, %{state | jobs: updated_jobs}}
end
@impl true
def handle_call(
:claim_next,
_from,
%{queue: queue, jobs: jobs, store: store, store_state: store_state} = state
) do
case dequeue_next_queued(queue, jobs) do
{:ok, job_id, remaining_queue} ->
%Job{} = job = Map.fetch!(jobs, job_id)
updated_job = %Job{
job
| status: :running,
started_at: DateTime.utc_now(),
attempt: job.attempt + 1
}
updated_jobs = Map.put(jobs, job_id, updated_job)
:ok =
store.update_job(store_state, job_id,
status: :running,
started_at: updated_job.started_at,
attempt: updated_job.attempt
)
new_state = %{state | queue: remaining_queue, jobs: updated_jobs}
{:reply, {:ok, updated_job}, new_state}
:empty ->
{:reply, :empty, state}
end
end
@impl true
def handle_call(
{:finish_job, id, status, result},
_from,
%{jobs: jobs, store: store, store_state: store_state} = state
) do
case Map.get(jobs, id) do
nil ->
{:reply, {:error, :not_found}, state}
%Job{} = job ->
updated_job = %Job{job | status: status, result: result, finished_at: DateTime.utc_now()}
updated_jobs = Map.put(jobs, id, updated_job)
:ok =
store.update_job(store_state, id,
status: status,
result: result,
finished_at: updated_job.finished_at
)
{:reply, :ok, %{state | jobs: updated_jobs}}
end
end
@impl true
def handle_call(
{:fail_job, id, reason},
_from,
%{jobs: jobs, store: store, store_state: store_state} = state
) do
case Map.get(jobs, id) do
nil ->
{:reply, {:error, :not_found}, state}
%Job{} = job ->
if Job.retries_exhausted?(job) do
# Permanently failed
updated_job = %Job{
job
| status: :failed,
result: reason,
finished_at: DateTime.utc_now()
}
updated_jobs = Map.put(jobs, id, updated_job)
:ok =
store.update_job(store_state, id,
status: :failed,
result: reason,
finished_at: updated_job.finished_at
)
{:reply, :ok, %{state | jobs: updated_jobs}}
else
# Schedule retry
backoff = Job.backoff_ms(job)
retry_at = DateTime.add(DateTime.utc_now(), backoff, :millisecond)
updated_job = %Job{job | status: :queued, result: reason, retry_at: retry_at}
updated_jobs = Map.put(jobs, id, updated_job)
# Re-enqueue after backoff
Process.send_after(self(), {:re_enqueue, id}, backoff)
:ok =
store.update_job(store_state, id,
status: :queued,
result: reason,
retry_at: retry_at
)
Logger.warning(
"Job #{id} failed (attempt #{job.attempt}/#{job.max_attempts}), retrying in #{backoff}ms"
)
{:reply, :ok, %{state | jobs: updated_jobs}}
end
end
end
@impl true
def handle_call({:all, opts}, _from, %{jobs: jobs} = state) do
status_filter = Keyword.get(opts, :status)
page = max(Keyword.get(opts, :page, 1), 1)
per_page = min(Keyword.get(opts, :per_page, 50), 200)
result =
jobs
|> Map.values()
|> then(fn list ->
if status_filter, do: Enum.filter(list, &(&1.status == status_filter)), else: list
end)
|> Enum.sort_by(& &1.inserted_at, {:desc, DateTime})
|> Enum.drop((page - 1) * per_page)
|> Enum.take(per_page)
{:reply, result, state}
end
@impl true
def handle_call({:get, id}, _from, %{jobs: jobs} = state) do
case Map.get(jobs, id) do
nil -> {:reply, {:error, :not_found}, state}
job -> {:reply, {:ok, job}, state}
end
end
@impl true
def handle_call(:stats, _from, %{jobs: jobs} = state) do
counts =
jobs
|> Map.values()
|> Enum.group_by(& &1.status)
|> Map.new(fn {status, list} -> {status, length(list)} end)
stats = %{
queued: Map.get(counts, :queued, 0),
running: Map.get(counts, :running, 0),
done: Map.get(counts, :done, 0),
failed: Map.get(counts, :failed, 0),
total: map_size(jobs)
}
{:reply, stats, state}
end
@impl true
def handle_call(
:reset,
_from,
%{store: store, store_state: store_state, store_opts: store_opts} = state
) do
store.cleanup(store_state, max_age_days: 0)
case store.init(store_opts) do
{:ok, new_store_state} ->
{:reply, :ok, %{state | queue: :queue.new(), jobs: %{}, store_state: new_store_state}}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
@impl true
def handle_info({:enqueue_delayed, id}, %{queue: queue} = state) do
# Job already persisted; just add its id to the in-memory queue
{:noreply, %{state | queue: :queue.in(id, queue)}}
end
@impl true
def handle_info({:re_enqueue, id}, %{queue: queue, jobs: jobs, pool_name: pool_name} = state) do
case Map.get(jobs, id) do
%Job{status: :queued} = _job ->
updated_queue = :queue.in(id, queue)
# Wake workers so the retry is picked up immediately
notify_workers(pool_name)
{:noreply, %{state | queue: updated_queue}}
_ ->
# Job may have been cancelled or already processed
{:noreply, state}
end
end
@impl true
def handle_info(
:cleanup,
%{
store: store,
store_state: store_state,
max_age_days: max_age_days,
cleanup_interval_ms: interval
} = state
) do
:ok = store.cleanup(store_state, max_age_days: max_age_days)
schedule_cleanup(interval)
{:noreply, state}
end
@impl true
def terminate(reason, %{jobs: jobs, store: store, store_state: store_state}) do
Logger.warning("JobQueue terminating (#{inspect(reason)}), marking running jobs as failed")
Enum.each(jobs, fn
{_id, %Job{status: :running} = job} ->
Logger.error("Job #{job.id} was :running at shutdown — marking :failed")
:ok =
store.update_job(store_state, job.id,
status: :failed,
finished_at: DateTime.utc_now()
)
_ ->
:ok
end)
:ok
end
## ============================================================
## Private Helpers
## ============================================================
# Pops from the front of the queue until a :queued job is found.
# Bounded scan — discards at most @max_dequeue_scan stale entries.
defp dequeue_next_queued(queue, jobs, scanned \\ 0)
defp dequeue_next_queued(_queue, _jobs, scanned) when scanned >= @max_dequeue_scan do
Logger.warning("JobQueue dequeue scan exceeded #{@max_dequeue_scan} stale entries")
:empty
end
defp dequeue_next_queued(queue, jobs, scanned) do
case :queue.out(queue) do
{:empty, _} ->
:empty
{{:value, job_id}, remaining} ->
case Map.get(jobs, job_id) do
%Job{status: :queued} ->
{:ok, job_id, remaining}
_ ->
dequeue_next_queued(remaining, jobs, scanned + 1)
end
end
end
defp notify_workers(pool_name) do
# Notify all worker processes under the given WorkerPool supervisor
if pid = Process.whereis(pool_name) do
for {_, child_pid, :worker, _} <- Supervisor.which_children(pid), is_pid(child_pid) do
send(child_pid, :work_available)
end
end
rescue
_ -> :ok
end
defp schedule_cleanup(interval_ms) do
Process.send_after(self(), :cleanup, interval_ms)
end
end