# Tutorial: Coffee Shop Job Queue
```elixir
Mix.install([
{:bedrock, "~> 0.4"},
{:bedrock_job_queue, "~> 0.1"},
{:kino, "~> 0.16.0"}
])
```
## Welcome to Bedrock Job Queue!
In this tutorial, we'll build a background job system for a busy coffee shop. Along the way, you'll learn how Bedrock's job queue handles async operations reliably - from order confirmations to delivery syncing.
This tutorial follows the same patterns as the Class Scheduling tutorial, but focuses on background job processing.
## Setting Up Our Database
First, let's set up our Bedrock cluster. This gives us a reliable storage layer for our job queue:
<!-- livebook:{"reevaluate_automatically":true} -->
```elixir
# Create a temporary folder to persist data
working_dir = Path.join(System.tmp_dir!(), "coffee_shop_#{:rand.uniform(99999)}")
File.mkdir_p!(working_dir)
defmodule CoffeeShop.Cluster do
use Bedrock.Cluster,
otp_app: :coffee_shop,
name: "coffee_shop",
config: [
capabilities: [:coordination, :log, :storage],
trace: [],
coordinator: [path: working_dir],
storage: [path: working_dir],
log: [path: working_dir]
]
end
defmodule CoffeeShop.Repo do
use Bedrock.Repo, cluster: CoffeeShop.Cluster
end
```
Now let's start the cluster:
<!-- livebook:{"reevaluate_automatically":true} -->
```elixir
Kino.start_child!({CoffeeShop.Cluster, []})
```
Quick test to make sure everything works:
<!-- livebook:{"reevaluate_automatically":true} -->
```elixir
alias Bedrock.JobQueue.Job
alias CoffeeShop.{Repo, JobQueue}
Repo.transact(fn -> Repo.put("test", "coffee ready") end)
result = Repo.transact(fn -> Repo.get("test") end)
"Coffee status: #{result}"
```
## Our Mission: A Busy Coffee Shop
Imagine you're building software for a coffee chain. When customers place orders, many things need to happen:
* **Order confirmation** - Send a notification to the customer
* **Brewing updates** - Track when the barista starts making the drink
* **Ready for pickup** - Alert the customer when their order is ready
* **Delivery sync** - Sync with third-party delivery apps
* **Cleanup tasks** - Administrative tasks that can wait
These operations shouldn't block the order flow. That's where background jobs come in!
## Defining Job Types
Each job type is a module with a `perform/2` callback. The second argument is metadata containing `topic`, `queue_id`, `item_id`, and `attempt`. Let's define our coffee shop jobs:
<!-- livebook:{"reevaluate_automatically":true} -->
```elixir
defmodule CoffeeShop.Jobs.OrderConfirmation do
use Job,
topic: "order:confirm",
priority: 100
@impl true
def perform(%{order_id: order_id, customer: customer}, _meta) do
IO.puts(" [#{order_id}] Sending confirmation to #{customer}")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.BrewingStarted do
use Job,
topic: "order:brewing",
priority: 50
@impl true
def perform(%{order_id: order_id, drink: drink}, _meta) do
IO.puts(" [#{order_id}] Barista started: #{drink}")
Process.sleep(100)
{:ok, %{started_at: DateTime.utc_now()}}
end
end
defmodule CoffeeShop.Jobs.ReadyForPickup do
use Job,
topic: "order:ready",
priority: 10
@impl true
def perform(%{order_id: order_id, customer: customer}, _meta) do
IO.puts(" [#{order_id}] READY! Paging #{customer}!")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.EspressoShot do
use Job,
topic: "brew:espresso",
max_retries: 3,
priority: 40
@impl true
def perform(%{order_id: order_id, shots: shots}, _meta) do
IO.puts(" [#{order_id}] Pulling #{shots} shot(s)...")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.DeliverySync do
use Job,
topic: "delivery:sync",
max_retries: 5
@impl true
def perform(%{order_id: order_id, platform: platform}, _meta) do
IO.puts(" [#{order_id}] Syncing with #{platform}...")
Process.sleep(100)
:ok
end
end
defmodule CoffeeShop.Jobs.AdminCleanup do
use Job,
topic: "admin:cleanup",
priority: 200
@impl true
def perform(%{task: task}, _meta) do
IO.puts(" Running cleanup: #{task}")
Process.sleep(100)
:ok
end
end
"6 job modules defined!"
```
## Setting Up the Job Queue
Now let's define our JobQueue module. This is where we configure the repo and wire up all our workers:
<!-- livebook:{"reevaluate_automatically":true} -->
```elixir
defmodule CoffeeShop.JobQueue do
use Bedrock.JobQueue,
otp_app: :coffee_shop,
repo: CoffeeShop.Repo,
workers: %{
"order:confirm" => CoffeeShop.Jobs.OrderConfirmation,
"order:brewing" => CoffeeShop.Jobs.BrewingStarted,
"order:ready" => CoffeeShop.Jobs.ReadyForPickup,
"brew:espresso" => CoffeeShop.Jobs.EspressoShot,
"delivery:sync" => CoffeeShop.Jobs.DeliverySync,
"admin:cleanup" => CoffeeShop.Jobs.AdminCleanup
}
end
"JobQueue module defined!"
```
Start the consumer:
<!-- livebook:{"reevaluate_automatically":true} -->
```elixir
Kino.start_child!({JobQueue, concurrency: 2, batch_size: 5})
# Give it a moment to initialize
Process.sleep(500)
"Consumer started!"
```
## Taking Orders: Basic Enqueueing
Let's enqueue our first job - an order confirmation:
```elixir
{:ok, job} = JobQueue.enqueue("main_shop", "order:confirm",
%{order_id: "ORD-001", customer: "Alice"})
IO.puts("Enqueued job: #{Base.encode16(job.id, case: :lower) |> binary_part(0, 8)}...")
# Wait for processing
Process.sleep(500)
"Check the output above!"
```
## Rush Hour: Priorities
Not all jobs are equal! A "ready for pickup" notification should jump ahead of cleanup tasks.
Priority is a number where **lower = higher priority**:
* Priority 10: Urgent (pickup notifications)
* Priority 50: Normal (brewing updates)
* Priority 200: Low (cleanup tasks)
Let's see priorities in action:
```elixir
# Enqueue all jobs in a single transaction so they're visible atomically.
# This ensures priority ordering is demonstrated correctly - otherwise
# jobs get processed as they arrive (race between enqueue and consumer).
Repo.transact(fn ->
JobQueue.enqueue("main_shop", "admin:cleanup",
%{task: "clear_old_orders"}, priority: 200)
JobQueue.enqueue("main_shop", "order:brewing",
%{order_id: "ORD-002", drink: "Latte"}, priority: 50)
JobQueue.enqueue("main_shop", "order:ready",
%{order_id: "ORD-003", customer: "Bob"}, priority: 10)
end)
IO.puts("Enqueued atomically: cleanup (200), brewing (50), ready (10)")
IO.puts("\nWatching execution order (should be: ready → brewing → cleanup):")
Process.sleep(2000)
"Jobs processed! Check the priority order above."
```
## Happy Hour: Scheduled Jobs
Sometimes you want a job to run at a specific time:
```elixir
# Schedule a job for 2 seconds from now
scheduled_time = DateTime.utc_now() |> DateTime.add(2, :second)
{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
%{order_id: "ORD-SCHEDULED", customer: "Charlie"},
at: scheduled_time)
IO.puts("Scheduled job for #{DateTime.to_iso8601(scheduled_time)}")
IO.puts("Waiting for it to execute...")
Process.sleep(3500)
"Check above - Charlie's confirmation should have processed!"
```
Or use a delay in milliseconds:
```elixir
# Enqueue with a 1 second delay
{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
%{order_id: "ORD-DELAYED", customer: "Diana"},
in: 1000)
IO.puts("Enqueued with 1 second delay...")
Process.sleep(2000)
"Diana's confirmation should have processed!"
```
## Multiple Shops: Tenant Isolation
Each coffee shop location can have its own queue, completely isolated:
```elixir
# Different shops, different queues
{:ok, _} = JobQueue.enqueue("downtown_shop", "order:confirm",
%{order_id: "DT-001", customer: "Eve"})
{:ok, _} = JobQueue.enqueue("airport_kiosk", "order:confirm",
%{order_id: "AP-001", customer: "Frank"})
IO.puts("Enqueued to downtown_shop and airport_kiosk")
Process.sleep(1000)
"Jobs processed in their respective queues!"
```
## Checking Queue Stats
You can inspect queue status at any time:
```elixir
for queue_id <- ["main_shop", "downtown_shop", "airport_kiosk"] do
stats = JobQueue.stats(queue_id)
IO.puts("#{queue_id}: pending=#{stats.pending_count}, processing=#{stats.processing_count}")
end
"Stats shown above!"
```
## What We've Learned
Congratulations! You've built a complete job queue system. Here's what we covered:
| Concept | What You Learned |
| ----------------- | --------------------------------------------------------------------------------- |
| **Job Modules** | Use `use Bedrock.JobQueue.Job` with topic and priority |
| **Return Values** | `:ok`, `{:ok, result}`, `{:error, reason}`, `{:snooze, ms}`, `{:discard, reason}` |
| **Enqueueing** | `enqueue/4` with `:at`, `:in`, `:priority` opts |
| **Priorities** | Lower number = higher priority (10 > 50 > 200) |
| **Topics** | String keys routing to job modules via workers map |
| **Tenants** | Use `queue_id` to isolate jobs per tenant |
## Keep Exploring
* Check out `Bedrock.JobQueue.Job` for all job options
* Experiment with `{:snooze, delay_ms}` for rate limiting
* Try `{:error, reason}` to see automatic retries with backoff
The job queue is inspired by the QuiCK paper for high-performance distributed queues.
Happy brewing!