Skip to main content

lib/mix/tasks/kathikon.ops.ex

defmodule Mix.Tasks.Kathikon.Ops do
  @shortdoc "Inspect and control Kathikon queues and jobs"

  @moduledoc """
  Operations CLI for Kathikon (inspect queues/jobs and run management commands).

      mix kathikon.ops summary
      mix kathikon.ops jobs --queue default --state completed --limit 20
      mix kathikon.ops show JOB_ID
      mix kathikon.ops pause --all
      mix kathikon.ops resume --queue emails
      mix kathikon.ops cancel JOB_ID
      mix kathikon.ops retry JOB_ID
      mix kathikon.ops rerun JOB_ID
      mix kathikon.ops purge --queue default --state completed
      mix kathikon.ops --node kathikon@host summary

  Remote use requires a **named local node** and matching cookie on both sides:

      elixir --name ops@127.0.0.1 --cookie SECRET -S mix kathikon.ops --node kathikon@127.0.0.1 summary

  Uses `Kathikon.Dashboard` locally or `Kathikon.Dashboard.RPC` on a remote node.
  """

  use Mix.Task

  alias Kathikon.Dashboard
  alias Kathikon.Dashboard.RPC

  @impl Mix.Task
  def run(args) do
    Mix.Task.run("app.start")

    {opts, argv, _} =
      OptionParser.parse(args,
        strict: [
          node: :string,
          queue: :string,
          state: :string,
          tab: :string,
          limit: :integer,
          offset: :integer,
          all: :boolean
        ],
        aliases: [n: :node, q: :queue, s: :state]
      )

    case argv do
      [] ->
        Mix.shell().error("usage: mix kathikon.ops COMMAND [options]")

        Mix.shell().error(
          "commands: summary, jobs, show, pause, resume, cancel, retry, rerun, purge, prune"
        )

      [command | rest] ->
        run_command(command, rest, opts)
    end
  end

  defp run_command(command, rest, opts) do
    dispatch_command(command, rest, remote_node(opts), opts)
  end

  defp dispatch_command("summary", _rest, node, _opts) do
    with {:ok, rows} <- rpc(node, :queue_summary, [[]]), do: print_summary(rows)
  end

  defp dispatch_command("jobs", _rest, node, opts) do
    list_opts = list_job_opts(opts)
    validate_pagination!(list_opts)

    with {:ok, page} <- rpc(node, :list_jobs, [list_opts]), do: print_jobs(page)
  end

  defp dispatch_command("show", rest, node, _opts) do
    case rest do
      [job_id | _] when is_binary(job_id) and job_id != "" ->
        with {:ok, detail} <- rpc(node, :fetch_job, [job_id]), do: print_job_detail(detail)

      _ ->
        Mix.raise("usage: mix kathikon.ops show JOB_ID")
    end
  end

  defp dispatch_command("pause", _rest, node, opts),
    do: run_queue_action(node, :pause_queue, opts)

  defp dispatch_command("resume", _rest, node, opts),
    do: run_queue_action(node, :resume_queue, opts)

  defp dispatch_command("cancel", rest, node, opts),
    do: run_job_or_bulk(rest, node, opts, :cancel_job, :cancel_jobs)

  defp dispatch_command("retry", rest, node, opts),
    do: run_job_or_bulk(rest, node, opts, :retry_job, :retry_jobs)

  defp dispatch_command("rerun", rest, node, opts),
    do: run_job_or_bulk(rest, node, opts, :rerun_job, :rerun_jobs)

  defp dispatch_command("purge", _rest, node, opts) do
    with {:ok, result} <- rpc(node, :purge_jobs, [filter_opts(opts)]) do
      IO.puts("Purged #{result.purged} job(s)")
      print_bulk_errors(Map.get(result, :errors, []))
    end
  end

  defp dispatch_command("prune", _rest, node, _opts) do
    rpc(node, :prune_now, []) |> print_result()
  end

  defp dispatch_command(command, _rest, _node, _opts) do
    Mix.raise("unknown command #{inspect(command)}")
  end

  defp run_job_or_bulk([job_id | _], node, _opts, single, _bulk) do
    rpc(node, single, [job_id]) |> print_result()
  end

  defp run_job_or_bulk([], node, opts, _single, bulk) do
    with {:ok, result} <- rpc(node, bulk, [filter_opts(opts)]), do: print_bulk(result)
  end

  defp run_queue_action(node, fun, opts) do
    cond do
      opts[:all] ->
        rpc(node, action_all(fun), []) |> print_result()

      queue = opts[:queue] ->
        rpc(node, fun, [parse_queue!(queue)]) |> print_result()

      true ->
        Mix.raise("pass --queue NAME or --all")
    end
  end

  defp action_all(:pause_queue), do: :pause_all
  defp action_all(:resume_queue), do: :resume_all

  defp list_job_opts(opts) do
    []
    |> maybe_put(:queue, parse_queue(opts[:queue]))
    |> maybe_put(:limit, opts[:limit])
    |> maybe_put(:offset, opts[:offset])
    |> maybe_put(:states, parse_states(opts))
    |> maybe_put(:tab, parse_tab(opts[:tab]))
  end

  defp filter_opts(opts) do
    []
    |> maybe_put(:queue, parse_queue(opts[:queue]))
    |> maybe_put(:states, parse_states(opts))
  end

  defp maybe_put(opts, _key, nil), do: opts
  defp maybe_put(opts, key, value), do: Keyword.put(opts, key, value)

  defp parse_queue(nil), do: nil
  defp parse_queue(queue), do: parse_queue!(queue)

  defp parse_queue!(queue) when is_binary(queue), do: String.to_atom(queue)

  defp parse_states(opts) do
    cond do
      states = opts[:state] ->
        states
        |> String.split(",", trim: true)
        |> Enum.map(&String.to_atom/1)

      tab = opts[:tab] ->
        Dashboard.states_for_tab(String.to_atom(tab))

      true ->
        nil
    end
  end

  defp parse_tab(nil), do: nil

  defp parse_tab(tab) do
    atom = String.to_atom(tab)

    if atom in Dashboard.state_tabs() do
      atom
    else
      Mix.raise(
        "unknown tab #{inspect(tab)} (expected one of #{inspect(Dashboard.state_tabs())})"
      )
    end
  end

  defp validate_pagination!(opts) do
    if limit = Keyword.get(opts, :limit), do: validate_non_negative!(limit, "--limit")
    if offset = Keyword.get(opts, :offset), do: validate_non_negative!(offset, "--offset")
    :ok
  end

  defp validate_non_negative!(value, flag) when value < 0 do
    Mix.raise("#{flag} must be non-negative")
  end

  defp validate_non_negative!(_value, _flag), do: :ok

  defp remote_node(opts) do
    case opts[:node] do
      nil -> nil
      name -> String.to_atom(name)
    end
  end

  defp rpc(node, fun, args) when not is_nil(node) do
    case RPC.call(node, fun, args) do
      {:error, _} = err ->
        Mix.raise("RPC #{inspect(fun)} on #{node} failed: #{inspect(elem(err, 1))}")

      other ->
        other
    end
  end

  defp rpc(nil, fun, args), do: apply(Dashboard, fun, args)

  defp print_summary(rows) do
    header =
      :io_lib.format(
        "~-16s ~8s ~8s ~10s ~9s ~9s ~7s ~7s ~6s",
        ["Queue", "Avail", "Exec", "Completed", "Retry", "Cancel", "Failed", "Total", "Pause"]
      )

    IO.puts(header)
    IO.puts(String.duplicate("-", 90))

    for row <- rows do
      u = Map.get(row, :ui_counts, %{})

      IO.puts(
        :io_lib.format(
          "~-16s ~8w ~8w ~10w ~9w ~9w ~7w ~7w ~6s",
          [
            row.queue,
            u[:available] || 0,
            u[:executing] || row.executing,
            u[:completed] || 0,
            u[:retryable] || 0,
            u[:cancelled] || 0,
            u[:failed] || row.failed,
            row.total,
            if(row.paused, do: "yes", else: "no")
          ]
        )
      )
    end
  end

  defp print_jobs(%{jobs: jobs, total: total, limit: limit, offset: offset}) do
    IO.puts("Showing #{length(jobs)} job(s) (offset #{offset}, limit #{limit}) out of #{total}")
    IO.puts("")

    header =
      :io_lib.format(
        "~-34s ~-12s ~-12s ~-28s ~8s ~22s",
        ["ID", "State", "Queue", "Worker", "Attempts", "Timestamp"]
      )

    IO.puts(header)
    IO.puts(String.duplicate("-", 120))

    for job <- jobs do
      IO.puts(
        :io_lib.format(
          "~-34s ~-12w ~-12w ~-28s ~8s ~22s",
          [
            job.id,
            job.state,
            job.queue,
            job.worker,
            job.attempts_label,
            format_timestamp(job.timestamp)
          ]
        )
      )
    end
  end

  defp print_job_detail(%{job: job, history: history}) do
    IO.puts("job:")
    IO.puts(inspect(job, pretty: true, limit: :infinity))
    IO.puts("")
    IO.puts("history events: #{length(history)}")
    Enum.each(history, fn event -> IO.puts(inspect(event)) end)
  end

  defp print_result(:ok), do: IO.puts("ok")

  defp print_result({:ok, job}) when is_map(job) do
    IO.puts("ok #{job.id} state=#{job.state}")
  end

  defp print_result({:ok, other}), do: IO.puts(inspect(other))

  defp print_result({:error, reason}), do: Mix.raise(inspect(reason))

  defp print_bulk(%{succeeded: n, errors: errors}) do
    IO.puts("succeeded: #{n}")
    print_bulk_errors(errors)
  end

  defp print_bulk_errors([]), do: :ok

  defp print_bulk_errors(errors) do
    IO.puts("errors:")
    Enum.each(errors, fn {id, reason} -> IO.puts("  #{id}: #{inspect(reason)}") end)
  end

  defp format_timestamp(nil), do: "-"

  defp format_timestamp(%DateTime{} = dt),
    do: dt |> DateTime.truncate(:second) |> DateTime.to_iso8601()
end