Skip to main content

lib/kathikon/dispatcher.ex

defmodule Kathikon.Dispatcher do
  @moduledoc """
  Claims and executes jobs for a single queue.

  Uses atomic storage claiming and lifecycle callbacks.
  """

  use GenServer

  alias Kathikon.{Batch, Job, QueueControl, Storage, Telemetry}

  @poll_message :poll

  def start_link(opts) do
    queue = Keyword.fetch!(opts, :queue)
    GenServer.start_link(__MODULE__, opts, name: via(queue))
  end

  @doc false
  def via(queue), do: {:via, Registry, {Kathikon.Registry, {:dispatcher, queue}}}

  @impl true
  def init(opts) do
    queue = Keyword.fetch!(opts, :queue)
    config = Keyword.fetch!(opts, :config)
    poll_interval = Keyword.get(opts, :poll_interval, Kathikon.Config.poll_interval())
    storage = Keyword.get(opts, :storage, Storage)

    state = %{
      queue: queue,
      config: config,
      concurrency: Keyword.get(config, :concurrency, 10),
      poll_interval: poll_interval,
      storage: storage,
      running: %{},
      dispatcher_id: self()
    }

    schedule_poll(poll_interval)
    {:ok, state}
  end

  @impl true
  def handle_info(@poll_message, state) do
    state =
      if QueueControl.paused?(state.queue) do
        state
      else
        maybe_claim_jobs(state)
      end

    schedule_poll(state.poll_interval)
    {:noreply, state}
  end

  @impl true
  def handle_info({ref, _result}, state) when is_reference(ref) do
    {:noreply, %{state | running: Map.delete(state.running, ref)}}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
    {:noreply, %{state | running: Map.delete(state.running, ref)}}
  end

  defp maybe_claim_jobs(state) do
    slots = state.concurrency - map_size(state.running)

    if slots > 0 do
      claim_and_run(state, slots)
    else
      state
    end
  end

  defp claim_and_run(state, slots) do
    claimant = build_claimant(state)

    case state.storage.claim_and_start_available_jobs(state.queue, slots, claimant) do
      {:ok, jobs} ->
        Enum.reduce(jobs, state, fn running, acc ->
          emit_claim_telemetry(state.queue, running)
          run_job(acc, running)
        end)

      {:error, _} ->
        state
    end
  end

  defp emit_claim_telemetry(queue, running) do
    Telemetry.event([:dispatcher, :poll], %{count: 1}, %{queue: queue, job_id: running.id})
    Telemetry.event([:job, :claimed], %{}, job_metadata(running))
    Telemetry.event([:job, :started], %{}, job_metadata(running))
  end

  defp build_claimant(state) do
    %{
      node: node(),
      pid: inspect(self()),
      claimed_at: DateTime.utc_now(),
      dispatcher_id: state.dispatcher_id
    }
  end

  defp run_job(state, job) do
    parent = self()

    task =
      Task.async(fn ->
        execute_job(job, parent, state.storage)
      end)

    %{state | running: Map.put(state.running, task.ref, task.pid)}
  end

  defp execute_job(job, dispatcher, storage) do
    start_time = System.monotonic_time()
    attempt = job.attempts + 1

    result =
      try do
        job.worker.perform(job)
      rescue
        exception ->
          {:error, {:exception, exception, __STACKTRACE__}}
      catch
        kind, reason ->
          {:error, {kind, reason}}
      end

    duration = System.monotonic_time() - start_time
    GenServer.cast(dispatcher, {:job_finished, job, result, duration, attempt, storage})
  end

  @impl true
  def handle_cast({:job_finished, job, result, duration, attempt, storage}, state) do
    metadata = Map.merge(job_metadata(job), %{attempt: attempt, duration: duration})
    updated = persist_job_result(storage, job, result, metadata, duration)

    case updated do
      {:ok, finished} -> Batch.handle_child_finished(finished)
      _ -> :ok
    end

    {:noreply, state}
  end

  defp persist_job_result(storage, job, result, metadata, duration) do
    case result do
      :ok ->
        storage.complete_job(job.id, :ok, metadata)
        |> emit_stop(metadata, duration)

      {:ok, value} ->
        storage.complete_job(job.id, value, metadata)
        |> emit_stop(metadata, duration)

      {:sleep, seconds} when is_integer(seconds) and seconds > 0 ->
        defer_job(storage, job, seconds, metadata, duration)

      {:sleep, invalid} ->
        storage.fail_job(job.id, {:invalid_sleep, invalid}, metadata)
        |> emit_failure(metadata, duration)

      {:discard, reason} ->
        storage.discard_job(job.id, reason, metadata)
        |> emit_discard(metadata, duration)

      {:retry, reason} ->
        storage.fail_job(job.id, reason, Map.put(metadata, :force_retry, true))
        |> emit_retry(metadata, duration)

      {:error, reason} ->
        storage.fail_job(job.id, reason, metadata)
        |> emit_failure(metadata, duration)
    end
  end

  defp defer_job(storage, job, seconds, metadata, duration) do
    at = DateTime.add(DateTime.utc_now(), seconds, :second)

    case storage.defer_job(job.id, at, Map.put(metadata, :seconds, seconds)) do
      {:ok, _} ->
        Telemetry.event([:job, :sleep], %{duration: duration}, metadata)
        nil

      {:error, reason} ->
        storage.fail_job(job.id, {:defer_failed, reason}, metadata)
        |> emit_failure(metadata, duration)
    end
  end

  defp emit_stop({:ok, job}, metadata, duration) do
    Telemetry.event(
      [:job, :completed],
      %{duration: duration},
      Map.put(metadata, :state, job.state)
    )

    Telemetry.event([:job, :stop], %{duration: duration}, Map.put(metadata, :result, :ok))
    {:ok, job}
  end

  defp emit_failure({:ok, job}, metadata, duration) do
    if job.state == :dead do
      Telemetry.event([:job, :dead], %{duration: duration}, metadata)
    end

    if job.state == :retryable do
      Telemetry.event([:job, :retried], %{duration: duration}, metadata)
      Telemetry.event([:job, :retry], %{duration: duration}, metadata)
    else
      Telemetry.event([:job, :failed], %{duration: duration}, metadata)
    end

    {:ok, job}
  end

  defp emit_retry(result, metadata, duration), do: emit_failure(result, metadata, duration)

  defp emit_discard({:ok, job}, metadata, duration) do
    Telemetry.event([:job, :discard], %{duration: duration}, metadata)
    {:ok, job}
  end

  defp job_metadata(%Job{} = job) do
    %{
      queue: job.queue,
      job_id: job.id,
      worker: job.worker,
      state: job.state,
      attempt: job.attempts + 1,
      node: node()
    }
  end

  defp schedule_poll(interval) do
    Process.send_after(self(), @poll_message, interval)
  end
end