Skip to main content

lib/continuum/runtime/lease/heartbeater.ex

defmodule Continuum.Runtime.Lease.Heartbeater do
  @moduledoc """
  Renews leases owned by local workflow engines.

  Engines register their acquired lease here. On each heartbeat, the process
  renews every registered lease with the same owner/token pair. If renewal
  fails because the row no longer matches, the engine is told to stop itself.
  """

  use GenServer
  require Logger

  alias Continuum.{Runtime.Lease, Telemetry}

  @default_interval_ms 10_000
  @default_ttl_seconds 30

  @doc false
  def start_link(opts \\ []) do
    instance = Continuum.Runtime.Instance.lookup(Keyword.get(opts, :instance, Continuum))
    name = Keyword.get(opts, :name, instance.heartbeater)
    GenServer.start_link(__MODULE__, opts, name: name)
  end

  @doc """
  Track a lease for periodic renewal.
  """
  def track(instance, %Lease{} = lease, pid) do
    GenServer.call(instance.heartbeater, {:track, lease, pid})
  end

  @doc """
  Stop tracking a run's lease.
  """
  def untrack(instance, run_id) do
    GenServer.call(instance.heartbeater, {:untrack, run_id})
  end

  @doc """
  Renew all tracked leases immediately.

  This is mainly useful for deterministic tests and shutdown paths.
  """
  def renew_once(instance) do
    GenServer.call(instance.heartbeater, :renew_once)
  end

  @impl true
  def init(opts) do
    instance = Continuum.Runtime.Instance.lookup(Keyword.get(opts, :instance, Continuum))

    state = %{
      interval_ms: Keyword.get(opts, :interval_ms, @default_interval_ms),
      ttl_seconds: Keyword.get(opts, :ttl_seconds, @default_ttl_seconds),
      instance: instance.name,
      repo: instance.repo,
      leases: %{},
      refs: %{}
    }

    schedule_tick(state.interval_ms)
    {:ok, state}
  end

  @impl true
  def handle_call({:track, %Lease{} = lease, pid}, _from, state) do
    state = untrack_run(state, lease.run_id)
    ref = Process.monitor(pid)

    entry = %{
      owner: lease.owner,
      token: lease.token,
      pid: pid,
      ref: ref
    }

    state = %{
      state
      | leases: Map.put(state.leases, lease.run_id, entry),
        refs: Map.put(state.refs, ref, lease.run_id)
    }

    {:reply, :ok, state}
  end

  def handle_call({:untrack, run_id}, _from, state) do
    {:reply, :ok, untrack_run(state, run_id)}
  end

  def handle_call(:renew_once, _from, state) do
    {:reply, :ok, renew_all(state)}
  end

  @impl true
  def handle_info(:heartbeat, state) do
    state = renew_all(state)
    schedule_tick(state.interval_ms)
    {:noreply, state}
  end

  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
    case Map.fetch(state.refs, ref) do
      {:ok, run_id} -> {:noreply, untrack_run(state, run_id)}
      :error -> {:noreply, state}
    end
  end

  defp renew_all(state) do
    Enum.reduce(state.leases, state, fn {run_id, entry}, acc ->
      case Lease.renew(run_id, entry.owner, entry.token,
             ttl_seconds: acc.ttl_seconds,
             repo: acc.repo
           ) do
        :ok ->
          acc

        {:ok, :cancel_requested} ->
          # A durable cancel request was recorded while no caller could reach
          # this engine; the owner honors it on its heartbeat.
          send(entry.pid, {:continuum_cancel_requested, run_id})
          acc

        {:error, :lost} ->
          Telemetry.execute([:continuum, :lease, :lost], %{}, %{
            instance: acc.instance,
            run_id: run_id,
            owner: entry.owner,
            lease_token: entry.token
          })

          send(entry.pid, {:continuum_lease_lost, run_id, entry.token})
          untrack_run(acc, run_id)

        {:error, reason} ->
          Logger.error("Lease renewal failed for #{run_id}: #{inspect(reason)}")
          acc
      end
    end)
  end

  defp untrack_run(state, run_id) do
    case Map.pop(state.leases, run_id) do
      {nil, _leases} ->
        state

      {%{ref: ref}, leases} ->
        Process.demonitor(ref, [:flush])
        %{state | leases: leases, refs: Map.delete(state.refs, ref)}
    end
  end

  defp schedule_tick(interval_ms) do
    Process.send_after(self(), :heartbeat, interval_ms)
  end
end