# bg_jobs - Background job processing for your Gleam app
[![Package Version](https://img.shields.io/hexpm/v/bg_jobs)](https://hex.pm/packages/bg_jobs)
[![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/bg_jobs/)
# Docs are WIP right now
Most of it is correct but not complete, some parts are duplicated.
# What is bg_jobs
When developing web applications, certain tasks, like processing large files or
performing heavy calculations can slow down regular request handling.
bg_jobs helps by moving these time-consuming operations to a background
queue, keeping your application responsive.
This library is SQL-driven, with adapters for sqlite and postgres included.
So you don’t need extra dependencies to get background processing up
and running. Just connect to your existing database, set up your
background jobs, and your app can handle more demanding tasks without
impacting user experience.
Bg_jobs provide the following:
- *Queues* - including delayed execution
- *Scheduled jobs* - on an interval or cron like schedule
# Time and date
All dates should be passed in UTC+0
# Installation
```sh
gleam add bg_jobs
```
# Example usage
A complete example setup can be found in the /example directory.
```gleam
import bg_jobs
import bg_jobs/job
import bg_jobs/queue
import gleam/otp/static_supervisor
fn example_worker() {
jobs.Worker(job_name: "example_job", handler: fn(job: jobs.Job) {
io.print(job.payload)
})
}
pub fn main() {
let conn = todo as "the sqlight connection"
let bg =
static_supervisor.new(static_supervisor.OneForOne)
|> bg_jobs.new(sqlite_db_adapter.new(conn, []))
|> bg_jobs.with_queue(
queue.new("default_queue")
|> queue.with_worker(example_worker())
)
|> bg_jobs.build()
// Dispatch a new job
jobs.new(example_worker.job_name, "Hello!"))
|> bg_jobs.enqueue(bg)
process.sleep_forever()
}
```
This example sets up a `default_queue` to handle `example_worker` jobs.
When a job with the payload "Hello!" is dispatched, it is stored in the
database, picked up by the queue, and logged.
# Architecture
*TODO:* expand this section
Program architecture:
- *otp supervisor*
- *otp worker* - queue (0 or more)
- Polls the database on specified interval looking for avaliable jobs of the type it can handle
- *otp worker* - scheduled jobs queue (0 or more)
- Polls the database on specified interval looking for avaliable jobs of the type it can handle
- Enqueus a new job with apropriate avalible at after processing is done
- *otp worker* - monitor (1 per program)
- Holds Subjects to all workers in bg_jobs. This is used for named lookup
## Durability
TODO: update, this is not exactly correct any more..
~~Jobs are claimed with a timestamp and queue/scheduled job name.
Each queue and scheduled job should have a unique name even across machines.
Whenever the worker starts up it clears out previously claimed jobs.
This way if a worker crashes and the supervisor restarts it there is no
abandoned jobs.~~
# Supervisor Integration in bg_jobs
bg_jobs is an OTP-based job scheduling system that integrates with
Erlang’s supervision tree. When setting up bg_jobs, you need to pass
a `static_supervisor.Builder` along with a db_adapter. This provides
flexibility, allowing you to configure the supervisor's settings
before passing it into bg_jobs. You can also choose to supervise the
created supervisor itself.
## Configuring the Supervisor for bg_jobs
To set custom parameters for the bg_jobs supervisor, you need to
first create the supervisor with the desired strategy and
configuration settings.
**Example 1: Setting restart_tolerance for the bg_jobs Supervisor**
You can configure the restart_tolerance setting for the supervisor,
which determines how many restarts are tolerated in a given time
window before the supervisor itself is terminated.
```gleam
static_supervisor.new(static_supervisor.OneForOne) // Create a new supervisor with the OneForOne strategy
|> static_supervisor.restart_tolerance(10, 1) // Set restart tolerance to 10 restarts in 1 second
|> bg_jobs.new(db_adapter) // Create a new bg_jobs instance with the specified db_adapter
|> bg_jobs.build() // Build and start the bg_jobs supervision tree
```
In this example:
- `sup.new(sup.OneForOne)` creates a new supervisor with the OneForOne strategy.
- `sup.restart_tolerance(100, 1)` configures the supervisor to tolerate 100 restarts within a 1-second window.
- `bg_jobs.new(db_adapter)` creates a new instance of bg_jobs, passing in the necessary db_adapter.
- `bg_jobs.build()` builds and starts the bg_jobs system under the supervision tree.
**Example 2: Supervising the bg_jobs Supervisor**
You can also add the bg_jobs supervisor as a child to another
supervisor, allowing you to control its lifecycle and supervise it
as part of your system.
```gleam
fn setup_bg_jobs() -> Result(bg_jobs.BgJobs, errors.BgJobError) {
// Define how the bg_jobs supervisor is set up
todo
}
static_supervisor.new(sup.OneForOne) // Create a new supervisor with the OneForOne strategy
|> static_supervisor.add(static_supervisor.supervisor_child("bg_jobs", fn() { // Add the bg_jobs supervisor as a child
setup_bg_jobs() // Set up bg_jobs
|> result.map(fn(bg) { bg.supervisor }) // Map the result to return the supervisors pid
}))
```
In this example:
- `sup.new(sup.OneForOne)` creates a new supervisor with the OneForOne strategy.
- `sup.add(sup.supervisor_child("bg_jobs", fn() { ... }))` adds the bg_jobs supervisor as a child of the main supervisor.
- `setup_bg_jobs()` contains the logic for setting up the bg_jobs system, which could include creating job schedulers and configuring database adapters.
- `result.map(fn(bg) { bg.supervisor })` ensures that the supervisor for the bg_jobs system is returned and supervised correctly.
---
# Monitor Process in bg_jobs
The Monitor process in bg_jobs is responsible for overseeing the
health of all queues and scheduled job actors. It ensures that if
any job or queue process dies unexpectedly, the corresponding
reservation in the database is released, allowing other processes to
reserve and process the job.
## How it works
### Monitoring Process Lifecycle
- The monitor process uses `process.monitor_process()` from `gleam/erlang/
process` to track all queues and scheduled job
actors.
- If a queue or scheduled job process dies, the monitor receives a
process down signal.
- Upon receiving this signal, the monitor will release all
reservations made by that queue or job in the database. This makes
the job available for re-claiming.
### Periodic Job Reservation Cleanup (Not implemented yet)
- In cases where the monitor doesn't receive a process down signal
(e.g., if the signal is missed or delayed), it periodically loops
through reserved jobs in the database.
- The monitor looks for jobs associated with dead processes, and
releases their reservation, allowing these jobs to be claimed again
by a new process.
- The cleanup frequency is configurable, meaning you can define how
often the monitor checks for reserved jobs with dead processes. This
can be done using the configurable function some_fn().
### Handling Monitor Failures
- If the monitor itself dies, it’s designed to be fault-tolerant.
All queue and scheduled job processes are stored in an ETS (Erlang
Term Storage) table.
- Upon restart, the monitor performs a cleanup operation, where it
releases reservations for jobs that were associated with dead
processes.
- After cleanup, the monitor resumes its normal function by
restarting monitoring of the remaining alive processes.
---
# Queue
Each queue that you add to the bg_jobs program spawns an `erlang/otp/
actor` that manages that queue. It's responsible for polling the
database for jobs it has workers for.
>*A worker is added to a queue using the `queue.with_worker(Queue, Worker)`
>method.
>Jobs are added to the database by calling `bg_jobs.enqueue(JobRequest)`
When a queue actor finds a job it can handle it claims that job by
setting the `reserved_at` and the `reserved_by` columns in the
database to the current datetime (UTC+0) and current processes
pid. This ensures no other queue claims that job
It then spawns a new process and executed the jobs worker with
the job data from the database in the new process.
The worker should not panic but instead return a result, if the
worker panics the queue actor will crash and the supervisor will try
to restart it.
If the job execution results in an ok a message is sent back to the
queue actor and the actor moves the job from the jobs table to
the jobs_succeeded table.
However if the job execution results in an error, it's retried as
many times as specified by `queue.with_max_retries` (default: 3).
Should the job not succeed within the max_retries a message is sent
back to the queue actor and the actor moves the job from the jobs
table to the jobs_failed table with the string provided in the
error as the exception.
Multiple queues may have the same workers meaning multiple queues
may be able to process the same type of jobs. This can be used to
prioritize job execution.
For example if one type of job is extra important you could say all
queues can process that job type by specifying that jobs worker on
all queues. Or maybe you have a default queue that can handle all
jobs and then add another queue for a specific job.
Jobs can have delayed processing either at a specific time or
relative time in the future. This can be specified with the
`bg_jobs.job_with_available_at` or `job_with_available_in`
respectively.
# Scheduled job
Scheduled jobs work the same way as queues internally with some key
differences.
Each scheduled job has 1 and only 1 worker meaning it can only
handle one type of job.
A scheduled job is not manually enqueued. Instead on startup and successfull
processing of a job the next processing time is calculated and the job is
enqueued with the available_at value set to that next processing time.
This also means that if the job execution overlaps with what would
be the next processing time the job wont run that time.
For example if a job starts 13:30 and should run every minute the
next run time would be 13:31. But if the job execution takes 1min
30sec. the next run will be at 13:32 since it's only rescheduled
after it completes.
# Schedules
Schedules for scheduled jobs can be specified in two ways.
As an interval or as a cron-like schedule
Scheduled jobs are either executing or waiting to be executed. Since
the job's next run is calculated on startup the first thing it does
is wait.
However if there already is a scheduled job when the actor starts it
will do nothing and wait for that job to become available.
## Intervals
Intervals is the simplest for of schedules provided. Intervals can
be specified in millieseconds, seconds, minutes, hours , days or
weeks. This is done using the corresponding
`scheduled_job.new_interval_[ time unit]()` functions, ex.
`scheduled_job.new_interval_minutes(20)` would create a new interval
of 20 minutes.
Execution time is not included in the interval. This means, what you're
really setting, is how far in the future the job should be scheduled
for.
TODO: diagram
## Schedules
Sometimes you need to have more advanced schedules. For example you
may want a job run on the first day of the month at 08:30, or
on thursdays between the months march and june.
This can be achieved using schedules. Schedules allows you to
specify cron-like schedules ranges or specific values.
To achieve this flexibility, schedules are defined by configuring the
following time components:
- Seconds: (Default: 0)
- Minutes: (Default: Every)
- Hours: (Default: Every)
- Day of the Month: (Default: Every)
- Month: (Default: Every)
- Day of the Week: (Default: Every)
Each component can be set to:
*Every:* Matches all possible values for that component.
*List:* A combination of:
- Specific values (e.g., seconds 0, 15, 30).
- Ranges (e.g., hours from 1 to 10).
Using List, you can mix specific values and ranges, allowing highly customized schedules.
### Behavior of Every
Setting a component to Every means it matches all possible values for that component. For example:
>Setting hours to Every will trigger the job at any hour, as long as other time units match their criteria.
---
### Schedule Validation
Schedules are validated when the bg_jobs framework is started. If a
schedule is invalid, a `ScheduleValidationError` is raised, preventing
the application from starting with an incorrect schedule configuration.
**Common Validation Errors**
An invalid schedule can occur if:
- A specified value is out of bounds for the time unit.
- Example: Scheduling a job for the 13th month, which does not exist.
- A range is incorrectly defined (e.g., start is greater than end).
- Example: A between_seconds(50, 30) call would trigger an error.
*Handling Validation Errors*
If the schedule is invalid a ScheduleValidationError is returned and
will stop the creation of bg_jobs, returning said error. The error
message will provide details about the issue.
---
### ScheduleBuilder Functions
The ScheduleBuilder provides a series of methods to configure each time component.
*Creating a New Schedule*
```gleam
pub fn new_schedule() -> ScheduleBuilder
```
Creates a new schedule with the default configuration: triggers at the first second of every minute.
---
### Configuring Seconds
*every_second*
```gleam
pub fn every_second(self: ScheduleBuilder) -> ScheduleBuilder
```
Sets the schedule to trigger at every second.
*on_second*
```gleam
pub fn on_second(self: ScheduleBuilder, second: Int) -> ScheduleBuilder
```
Adds a specific second to the schedule. If other seconds are already
defined, this appends the new value.
*Example:*
```gleam
schedule.on_second(1).on_second(10);
// Triggers at second 1 or 10.
```
*between_seconds*
```gleam
pub fn between_seconds(self: ScheduleBuilder, start: Int, end: Int) -> ScheduleBuilder
```
Adds a range of seconds during which the schedule should trigger. If other seconds or ranges are already defined, this appends the range.
Example:
```gleam
schedule.on_second(1).between_seconds(10, 15);
// Triggers at second 1 or seconds 10–15.
```
---
### Combining Filters
Filters across time units are combined using AND logic. For example:
```gleam
let schedule = new_schedule()
.on_second(1)
.between_seconds(10, 15)
.on_minute(30)
.on_hour(8)
.between_hours(14, 16)
.on_thursdays()
.between_months(3, 6);
```
This schedule will trigger only when:
- The second is 1 **or** within the range 10–15, **and**
- The minute is 30, **and**
- The hour is 8 **or** between 14:00–16:00, **and**
- It is Thursday, **and**
- It is during March through June.
---
### Practical Example
Here’s a complete example demonstrating both OR and AND logic:
```gleam
let schedule = new_schedule()
.on_second(1)
.on_second(10)
.between_seconds(30, 40)
.on_minute(15)
.on_minute(45)
.on_hour(3)
.between_hours(10, 12)
.on_mondays()
.on_januaries()
.on_decembers();
```
This schedule will trigger only when:
- The second is 1, 10, **or** between 30–40, **and**
- The minute is 15 **or** 45, **and**
- The hour is 3 **or** between 10–12, **and**
- It is Monday, **and**
- It is January **or** December.
---
### Summary
The ScheduleBuilder provides a powerful and flexible way to define schedules:
- **OR logic within each time unit** allows combining multiple specific values and ranges.
- **AND logic across time units** ensures precise scheduling, requiring all units to match their criteria simultaneously. This design makes it easy to express even the most complex scheduling requirements.
---
## Event Listeners
bg_jobs generates events during job processing, which can be used for
logging, monitoring, or custom telemetry. An event listener can be
added globally to capture all job events, or it can be attached to
specific queues or scheduled jobs for targeted monitoring.
To use the built-in logger_event_listener:
```gleam
bg_jobs.with_event_listener(logger_event_listener.listener) // Logs events for all job processing events
```
Creating a Custom Event Listener
Implement a custom event listener by defining a function with the
following signature:
```gleam
pub type EventListener = fn(Event) -> Nil
```
If it's registered, this function will be called whenever an event
occurs, receiving an Event object as its parameter. You can
register your custom listener with bg_jobs (global listener), a queue,
or a scheduled job as needed.
Example of adding a custom event listener to a specific queue:
```gleam
queue.with_event_listener(my_custom_event_listener())
```
## Db adapters
bg_jobs includes two built-in database adapters: Postgres (via pog) and
SQLite (via sqlight). These adapters allow seamless integration with
popular databases, enabling you to use background job processing
without additional dependencies. If you are using a different database,
refer to these implementations for guidance on building a compatible
adapter.
Adding a database adapter to bg_jobs:
```gleam
let bg = bg_jobs.new(sqlite_db_adapter.new(conn, []))
```
Further documentation can be found at <https://hexdocs.pm/bg_jobs>.
## Development
```sh
gleam run # Run the project
gleam test # Run the tests
just watch-test # Run tests in watch mode
```
# TODO
- [ ] Documentation
- [ ] Split db adapters to their own packages
- Need to solve testing, since the tests depend on sqlite
- [ ] Implement loop in monitor actor to cover the case where down message was not received