defmodule Oban.LiveDashboard do
use Phoenix.LiveDashboard.PageBuilder, refresher?: true
import Phoenix.LiveDashboard.Helpers, only: [format_value: 2]
import Ecto.Query
@per_page_limits [20, 50, 100]
@oban_sorted_job_states [
"executing",
"available",
"scheduled",
"retryable",
"cancelled",
"discarded",
"completed"
]
@impl true
def render(assigns) do
assigns = assign(assigns, :now, DateTime.utc_now())
~H"""
<h5 class="mb-3">Oban</h5>
<.live_nav_bar id="oban_states" page={@page} nav_param="job_state" style={:bar} extra_params={["nav"]}>
<:item :for={{job_state, count} <- @job_state_counts} name={job_state} label={job_state_label(job_state, count)} method="navigate">
<.live_table id="oban_jobs" limit={per_page_limits()} dom_id={"oban-jobs-#{job_state}"} page={@page} row_attrs={&row_attrs/1} row_fetcher={&fetch_jobs(&1, &2, job_state)} default_sort_by={@timestamp_field} title="" search={false}>
<:col :let={job} field={:worker} sortable={:desc}>
<p class="font-weight-bold m-0"><%= job.worker %></p>
<pre class="font-weight-lighter text-muted m-0"><%= truncate(inspect(job.args)) %></pre>
</:col>
<:col :if={job_state == "all"} :let={job} field={:state} sortable={:desc}><%= job_state_label(job.state) %></:col>
<:col :let={job} field={:attempt} header="Attempt" sortable={:desc}>
<%= job.attempt %>/<%= job.max_attempts %>
</:col>
<:col field={:queue} header="Queue" sortable={:desc} />
<:col :let={job} field={@timestamp_field} sortable={:desc}>
<%= render_time(timestamp(job, @timestamp_field), @now) %>
</:col>
</.live_table>
</:item>
</.live_nav_bar>
<.live_modal :if={@job != nil} id="job-modal" title={"Job - #{@job.id}"} return_to={live_dashboard_path(@socket, @page, params: %{})}>
<div class="mb-4 btn-toolbar" role="toolbar" aria-label="Oban Job actions">
<div class="btn-group" role="group">
<button type="button" class="btn btn-primary btn-sm mr-2" phx-click="run_job" phx-value-job={@job.id} disabled={!can_retry_job?(@job)}>Retry Job</button>
</div>
<div class="btn-group" role="group">
<button type="button" class="btn btn-primary btn-sm" phx-click="cancel_job" phx-value-job={@job.id} disabled={!can_cancel_job?(@job)}>Cancel Job</button>
</div>
</div>
<div class="tabular-info">
<.label_value_list>
<:elem label="ID"><%= @job.id %></:elem>
<:elem label="State"><%= @job.state %></:elem>
<:elem label="Queue"><%= @job.queue %></:elem>
<:elem label="Worker"><%= @job.worker %></:elem>
<:elem label="Args"><%= format_value(@job.args, nil) %></:elem>
<:elem :if={@job.meta != %{}} label="Meta"><%= format_value(@job.meta, nil) %></:elem>
<:elem :if={@job.tags != []} label="Tags"><%= format_value(@job.tags, nil) %></:elem>
<:elem :if={@job.errors != []} label="Errors"><%= format_errors(@job.errors) %></:elem>
<:elem label="Attempts"><%= @job.attempt %>/<%= @job.max_attempts %></:elem>
<:elem label="Priority"><%= @job.priority %></:elem>
<:elem label="Attempted at"><%= format_value(@job.attempted_at) %></:elem>
<:elem :if={@job.cancelled_at} label="Cancelled at"><%= render_time(@job.cancelled_at, @now) %></:elem>
<:elem :if={@job.completed_at} label="Completed at"><%= render_time(@job.completed_at, @now) %></:elem>
<:elem :if={@job.discarded_at} label="Discarded at"><%= render_time(@job.discarded_at, @now) %></:elem>
<:elem label="Inserted at"><%= render_time(@job.inserted_at, @now) %></:elem>
<:elem label="Scheduled at"><%= render_time(@job.scheduled_at, @now) %></:elem>
</.label_value_list>
</div>
</.live_modal>
"""
end
defp render_time(nil, _), do: nil
defp render_time(datetime, now) do
assigns = %{
datetime: datetime,
now: now,
iso8601: DateTime.to_iso8601(datetime),
title: DateTime.to_string(datetime)
}
~H|<time datetime={@iso8601} title={@title}><%= format_relative(@datetime, @now) %></time>|
end
@impl true
def mount(_params, _, socket) do
{:ok, socket}
end
@impl true
def menu_link(_, _) do
{:ok, "Oban"}
end
@impl true
def handle_params(%{"params" => %{"job" => job_id}} = params, _url, socket) do
socket =
socket
|> assign(job_state: Map.get(params, "job_state", "executing"))
|> assign(sort_by: Map.get(params, "job_state"))
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()
case fetch_job(job_id) do
{:ok, job} ->
{:noreply, assign(socket, job: job)}
:error ->
to = live_dashboard_path(socket, socket.assigns.page, params: %{})
{:noreply, push_patch(socket, to: to)}
end
end
def handle_params(params, _uri, socket) do
socket =
socket
|> assign(job_state: Map.get(params, "job_state", "executing"))
|> assign(sort_by: Map.get(params, "job_state"))
|> assign(job: nil)
|> assign_job_state_counts()
|> assign_timestamp_field()
{:noreply, socket}
end
@impl true
def handle_event("show_job", params, socket) do
to = live_dashboard_path(socket, socket.assigns.page, params: params)
{:noreply, push_patch(socket, to: to)}
end
def handle_event("run_job", %{"job" => job_id}, socket) do
with {:ok, job} <- fetch_job(job_id),
:ok <- Oban.Engine.retry_job(Oban.config(), job),
# Refresh job
{:ok, job} <- fetch_job(job.id) do
{:noreply, assign(socket, :job, job)}
else
_ ->
{:noreply, socket}
end
end
def handle_event("cancel_job", %{"job" => job_id}, socket) do
with {:ok, job} <- fetch_job(job_id),
:ok <- Oban.Engine.cancel_job(Oban.config(), job) do
to = live_dashboard_path(socket, socket.assigns.page, params: %{})
{:noreply, push_patch(socket, to: to)}
else
_ ->
{:noreply, socket}
end
end
defp get_job(id) do
Oban.Repo.get(Oban.config(), Oban.Job, id)
end
@impl true
def handle_refresh(socket) do
{:noreply,
socket
|> assign_job_state_counts()
|> Phoenix.Component.update(:job, fn
nil -> nil
%{id: job_id} -> get_job(job_id)
end)}
end
defp assign_job_state_counts(socket) do
job_state_counts_in_db =
Oban.Repo.all(
Oban.config(),
Oban.Job
|> group_by([j], [j.state])
|> order_by([j], [j.state])
|> select([j], {j.state, count(j.id)})
)
|> Enum.into(%{})
job_state_counts =
for job_state <- @oban_sorted_job_states,
do: {job_state, Map.get(job_state_counts_in_db, job_state, 0)}
total_count = Keyword.values(job_state_counts) |> Enum.sum()
job_state_counts = [{"all", total_count} | job_state_counts]
assign(socket, job_state_counts: job_state_counts)
end
defp job_state_label(job_state, count) do
"#{job_state_label(job_state)} (#{count})"
end
defp job_state_label(job_state) do
Phoenix.Naming.humanize(job_state)
end
defp fetch_jobs(params, _node, job_state) do
total_jobs = Oban.Repo.aggregate(Oban.config(), jobs_count_query(job_state), :count)
jobs =
Oban.Repo.all(Oban.config(), jobs_query(params, job_state)) |> Enum.map(&Map.from_struct/1)
{jobs, total_jobs}
end
defp fetch_job(id) do
case Oban.Repo.get(Oban.config(), Oban.Job, id) do
%Oban.Job{} = job ->
{:ok, job}
_ ->
:error
end
end
defp can_retry_job?(%Oban.Job{state: state}), do: state not in ["available", "executing"]
defp can_cancel_job?(%Oban.Job{state: state}), do: state != "cancelled"
defp jobs_query(%{sort_by: sort_by, sort_dir: sort_dir, limit: limit}, "all") do
Oban.Job
|> limit(^limit)
|> order_by({^sort_dir, ^sort_by})
end
defp jobs_query(params, job_state) do
Oban.Job
|> filter_by_job_state(job_state)
|> filter_by_params(params)
end
defp jobs_count_query("all") do
Oban.Job
end
defp jobs_count_query(job_state) do
filter_by_job_state(Oban.Job, job_state)
end
defp filter_by_params(queryable, %{sort_by: sort_by, sort_dir: sort_dir, limit: limit}) do
queryable
|> limit(^limit)
|> order_by({^sort_dir, ^sort_by})
end
defp filter_by_job_state(queryable, job_state) do
where(queryable, [job], job.state == ^job_state)
end
defp row_attrs(job) do
[
{"phx-click", "show_job"},
{"phx-value-job", job[:id]},
{"phx-page-loading", true}
]
end
defp format_errors(errors) do
Enum.map(errors, &Map.get(&1, "error"))
end
defp format_value(%DateTime{} = datetime) do
Calendar.strftime(datetime, "%Y-%m-%d %H:%M:%S")
end
defp format_value(value), do: value
defp format_relative(%DateTime{} = a, %DateTime{} = b) do
delta = DateTime.diff(a, b, :second)
{prefix, suffix} = if delta < 0, do: {"", " ago"}, else: {"in ", ""}
{d, {h, m, s}} = :calendar.seconds_to_daystime(delta)
cond do
d > 1 -> "#{d} days"
d == 1 -> "1 day"
h > 1 -> "#{h} hours"
h == 1 -> "1 hour"
m > 1 -> "#{m} minutes"
m == 1 -> "1 minute"
s == 1 -> "1 second"
true -> "#{s} seconds"
end
|> then(fn x -> "#{prefix}#{x}#{suffix}" end)
end
defp format_relative(%DateTime{} = a, nil) do
format_relative(a, DateTime.utc_now())
end
defp format_relative(nil, _), do: nil
defp timestamp(job, timestamp_field) do
Map.get(job, timestamp_field)
end
defp assign_timestamp_field(%{assigns: %{job_state: job_state}} = socket) do
timestamp_field =
case job_state do
"available" -> :scheduled_at
"cancelled" -> :cancelled_at
"completed" -> :completed_at
"discarded" -> :discarded_at
"executing" -> :attempted_at
"retryable" -> :scheduled_at
"scheduled" -> :scheduled_at
_ -> :inserted_at
end
assign(socket, timestamp_field: timestamp_field)
end
defp truncate(string, max_length \\ 50) do
if String.length(string) > max_length do
String.slice(string, 0, max_length) <> "…"
else
string
end
end
defp per_page_limits, do: @per_page_limits
end