Skip to main content

livebooks/coffee_shop.livemd

# Tutorial: Coffee Shop Job Queue

```elixir
Mix.install([
  {:bedrock, "~> 0.4"},
  {:bedrock_job_queue, "~> 0.1"},
  {:kino, "~> 0.16.0"}
])
```

## Welcome to Bedrock Job Queue!

In this tutorial, we'll build a background job system for a busy coffee shop. Along the way, you'll learn how Bedrock's job queue handles async operations reliably - from order confirmations to delivery syncing.

This tutorial follows the same patterns as the Class Scheduling tutorial, but focuses on background job processing.

## Setting Up Our Database

First, let's set up our Bedrock cluster. This gives us a reliable storage layer for our job queue:

<!-- livebook:{"reevaluate_automatically":true} -->

```elixir
# Create a temporary folder to persist data
working_dir = Path.join(System.tmp_dir!(), "coffee_shop_#{:rand.uniform(99999)}")
File.mkdir_p!(working_dir)

defmodule CoffeeShop.Cluster do
  use Bedrock.Cluster,
    otp_app: :coffee_shop,
    name: "coffee_shop",
    config: [
      capabilities: [:coordination, :log, :storage],
      trace: [],
      coordinator: [path: working_dir],
      storage: [path: working_dir],
      log: [path: working_dir]
    ]
end

defmodule CoffeeShop.Repo do
  use Bedrock.Repo, cluster: CoffeeShop.Cluster
end
```

Now let's start the cluster:

<!-- livebook:{"reevaluate_automatically":true} -->

```elixir
Kino.start_child!({CoffeeShop.Cluster, []})
```

Quick test to make sure everything works:

<!-- livebook:{"reevaluate_automatically":true} -->

```elixir
alias Bedrock.JobQueue.Job
alias CoffeeShop.{Repo, JobQueue}

Repo.transact(fn -> Repo.put("test", "coffee ready") end)

result = Repo.transact(fn -> Repo.get("test") end)
"Coffee status: #{result}"
```

## Our Mission: A Busy Coffee Shop

Imagine you're building software for a coffee chain. When customers place orders, many things need to happen:

* **Order confirmation** - Send a notification to the customer
* **Brewing updates** - Track when the barista starts making the drink
* **Ready for pickup** - Alert the customer when their order is ready
* **Delivery sync** - Sync with third-party delivery apps
* **Cleanup tasks** - Administrative tasks that can wait

These operations shouldn't block the order flow. That's where background jobs come in!

## Defining Job Types

Each job type is a module with a `perform/2` callback. The second argument is metadata containing `topic`, `queue_id`, `item_id`, and `attempt`. Let's define our coffee shop jobs:

<!-- livebook:{"reevaluate_automatically":true} -->

```elixir
defmodule CoffeeShop.Jobs.OrderConfirmation do
  use Job,
    topic: "order:confirm",
    priority: 100

  @impl true
  def perform(%{order_id: order_id, customer: customer}, _meta) do
    IO.puts("  [#{order_id}] Sending confirmation to #{customer}")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.BrewingStarted do
  use Job,
    topic: "order:brewing",
    priority: 50

  @impl true
  def perform(%{order_id: order_id, drink: drink}, _meta) do
    IO.puts("  [#{order_id}] Barista started: #{drink}")
    Process.sleep(100)
    {:ok, %{started_at: DateTime.utc_now()}}
  end
end

defmodule CoffeeShop.Jobs.ReadyForPickup do
  use Job,
    topic: "order:ready",
    priority: 10

  @impl true
  def perform(%{order_id: order_id, customer: customer}, _meta) do
    IO.puts("  [#{order_id}] READY! Paging #{customer}!")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.EspressoShot do
  use Job,
    topic: "brew:espresso",
    max_retries: 3,
    priority: 40

  @impl true
  def perform(%{order_id: order_id, shots: shots}, _meta) do
    IO.puts("  [#{order_id}] Pulling #{shots} shot(s)...")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.DeliverySync do
  use Job,
    topic: "delivery:sync",
    max_retries: 5

  @impl true
  def perform(%{order_id: order_id, platform: platform}, _meta) do
    IO.puts("  [#{order_id}] Syncing with #{platform}...")
    Process.sleep(100)
    :ok
  end
end

defmodule CoffeeShop.Jobs.AdminCleanup do
  use Job,
    topic: "admin:cleanup",
    priority: 200

  @impl true
  def perform(%{task: task}, _meta) do
    IO.puts("  Running cleanup: #{task}")
    Process.sleep(100)
    :ok
  end
end

"6 job modules defined!"
```

## Setting Up the Job Queue

Now let's define our JobQueue module. This is where we configure the repo and wire up all our workers:

<!-- livebook:{"reevaluate_automatically":true} -->

```elixir
defmodule CoffeeShop.JobQueue do
  use Bedrock.JobQueue,
    otp_app: :coffee_shop,
    repo: CoffeeShop.Repo,
    workers: %{
      "order:confirm" => CoffeeShop.Jobs.OrderConfirmation,
      "order:brewing" => CoffeeShop.Jobs.BrewingStarted,
      "order:ready" => CoffeeShop.Jobs.ReadyForPickup,
      "brew:espresso" => CoffeeShop.Jobs.EspressoShot,
      "delivery:sync" => CoffeeShop.Jobs.DeliverySync,
      "admin:cleanup" => CoffeeShop.Jobs.AdminCleanup
    }
end

"JobQueue module defined!"
```

Start the consumer:

<!-- livebook:{"reevaluate_automatically":true} -->

```elixir
Kino.start_child!({JobQueue, concurrency: 2, batch_size: 5})

# Give it a moment to initialize
Process.sleep(500)
"Consumer started!"
```

## Taking Orders: Basic Enqueueing

Let's enqueue our first job - an order confirmation:

```elixir
{:ok, job} = JobQueue.enqueue("main_shop", "order:confirm",
  %{order_id: "ORD-001", customer: "Alice"})

IO.puts("Enqueued job: #{Base.encode16(job.id, case: :lower) |> binary_part(0, 8)}...")

# Wait for processing
Process.sleep(500)
"Check the output above!"
```

## Rush Hour: Priorities

Not all jobs are equal! A "ready for pickup" notification should jump ahead of cleanup tasks.

Priority is a number where **lower = higher priority**:

* Priority 10: Urgent (pickup notifications)
* Priority 50: Normal (brewing updates)
* Priority 200: Low (cleanup tasks)

Let's see priorities in action:

```elixir
# Enqueue all jobs in a single transaction so they're visible atomically.
# This ensures priority ordering is demonstrated correctly - otherwise
# jobs get processed as they arrive (race between enqueue and consumer).

Repo.transact(fn ->
  JobQueue.enqueue("main_shop", "admin:cleanup",
    %{task: "clear_old_orders"}, priority: 200)
  JobQueue.enqueue("main_shop", "order:brewing",
    %{order_id: "ORD-002", drink: "Latte"}, priority: 50)
  JobQueue.enqueue("main_shop", "order:ready",
    %{order_id: "ORD-003", customer: "Bob"}, priority: 10)
end)

IO.puts("Enqueued atomically: cleanup (200), brewing (50), ready (10)")
IO.puts("\nWatching execution order (should be: ready → brewing → cleanup):")
Process.sleep(2000)
"Jobs processed! Check the priority order above."
```

## Happy Hour: Scheduled Jobs

Sometimes you want a job to run at a specific time:

```elixir
# Schedule a job for 2 seconds from now
scheduled_time = DateTime.utc_now() |> DateTime.add(2, :second)

{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
  %{order_id: "ORD-SCHEDULED", customer: "Charlie"},
  at: scheduled_time)

IO.puts("Scheduled job for #{DateTime.to_iso8601(scheduled_time)}")
IO.puts("Waiting for it to execute...")
Process.sleep(3500)
"Check above - Charlie's confirmation should have processed!"
```

Or use a delay in milliseconds:

```elixir
# Enqueue with a 1 second delay
{:ok, _} = JobQueue.enqueue("main_shop", "order:confirm",
  %{order_id: "ORD-DELAYED", customer: "Diana"},
  in: 1000)

IO.puts("Enqueued with 1 second delay...")
Process.sleep(2000)
"Diana's confirmation should have processed!"
```

## Multiple Shops: Tenant Isolation

Each coffee shop location can have its own queue, completely isolated:

```elixir
# Different shops, different queues
{:ok, _} = JobQueue.enqueue("downtown_shop", "order:confirm",
  %{order_id: "DT-001", customer: "Eve"})

{:ok, _} = JobQueue.enqueue("airport_kiosk", "order:confirm",
  %{order_id: "AP-001", customer: "Frank"})

IO.puts("Enqueued to downtown_shop and airport_kiosk")
Process.sleep(1000)
"Jobs processed in their respective queues!"
```

## Checking Queue Stats

You can inspect queue status at any time:

```elixir
for queue_id <- ["main_shop", "downtown_shop", "airport_kiosk"] do
  stats = JobQueue.stats(queue_id)
  IO.puts("#{queue_id}: pending=#{stats.pending_count}, processing=#{stats.processing_count}")
end

"Stats shown above!"
```

## What We've Learned

Congratulations! You've built a complete job queue system. Here's what we covered:

| Concept           | What You Learned                                                                  |
| ----------------- | --------------------------------------------------------------------------------- |
| **Job Modules**   | Use `use Bedrock.JobQueue.Job` with topic and priority                            |
| **Return Values** | `:ok`, `{:ok, result}`, `{:error, reason}`, `{:snooze, ms}`, `{:discard, reason}` |
| **Enqueueing**    | `enqueue/4` with `:at`, `:in`, `:priority` opts                                   |
| **Priorities**    | Lower number = higher priority (10 > 50 > 200)                                    |
| **Topics**        | String keys routing to job modules via workers map                                |
| **Tenants**       | Use `queue_id` to isolate jobs per tenant                                         |

## Keep Exploring

* Check out `Bedrock.JobQueue.Job` for all job options
* Experiment with `{:snooze, delay_ms}` for rate limiting
* Try `{:error, reason}` to see automatic retries with backoff

The job queue is inspired by the QuiCK paper for high-performance distributed queues.

Happy brewing!