lib/dripdrop/dispatch/concurrency.ex

defmodule DripDrop.Dispatch.Concurrency do
  @moduledoc """
  Limits in-flight dispatches by channel and adapter.

  The gate is intentionally database-backed so all worker runtimes see the same
  concurrency counts. Advisory transaction locks serialize cap checks per scope,
  and hits defer the execution instead of failing it.
  """

  alias DripDrop.{Clock, DBHelpers, Repo}

  @schema Application.compile_env(:dripdrop, :schema, "dripdrop")
  @default_defer_seconds 30

  @doc """
  Checks configured concurrency caps for the dispatch context and adapter.
  """
  @spec check(map(), map()) :: :ok | {:defer, DateTime.t(), map()} | {:error, term()}
  def check(context, adapter) do
    context
    |> scopes(adapter)
    |> Enum.reduce_while(:ok, &check_scope(&1, &2, context, adapter))
  end

  defp check_scope(_scope, {:defer, _defer_until, _metadata} = defer, _context, _adapter),
    do: {:halt, defer}

  defp check_scope(scope, :ok, context, adapter) do
    case count_inflight(scope, context, adapter) do
      {:ok, count} when count < scope.cap ->
        {:cont, :ok}

      {:ok, count} ->
        defer_until = Clock.seconds_from_now(defer_seconds())
        emit_hit(scope, context, adapter, count, defer_until)

        {:halt,
         {:defer, defer_until,
          %{
            reason: "concurrency_cap",
            scope: scope.name,
            cap: scope.cap,
            in_flight: count
          }}}

      {:error, reason} ->
        {:halt, {:error, reason}}
    end
  end

  defp scopes(context, adapter) do
    [
      scope(:channel, adapter.channel, channel_cap(adapter.channel, context.step)),
      scope(:adapter, adapter.id, adapter_cap(adapter, context.step))
    ]
    |> Enum.reject(&is_nil/1)
  end

  defp scope(_name, _key, nil), do: nil
  defp scope(_name, _key, cap) when cap <= 0, do: nil
  defp scope(name, key, cap), do: %{name: name, key: key, cap: cap}

  defp count_inflight(scope, context, adapter) do
    schema = @schema
    {filter, params} = filter(scope, context, adapter)

    sql = """
    WITH locked AS (
      SELECT pg_advisory_xact_lock(hashtextextended($1::text, 0))
    ),
    marked AS (
      UPDATE #{schema}.step_executions
      SET metadata = coalesce(metadata, '{}'::jsonb) || $2::jsonb,
          updated_at = now()
      FROM locked
      WHERE id = $3::uuid
      RETURNING id
    )
    SELECT count(*)::int
    FROM #{schema}.step_executions
    WHERE state IN ('claiming', 'sending')
      AND id != $3::uuid
      AND (($4::text IS NULL AND tenant_key IS NULL) OR tenant_key = $4::text)
      #{filter}
    """

    base_params = [
      "dripdrop:concurrency:#{scope.name}:#{scope.key}",
      %{adapter_id: adapter.id},
      DBHelpers.dump_uuid(context.execution.id),
      context.execution.tenant_key
    ]

    case Repo.query(sql, base_params ++ params) do
      {:ok, %{rows: [[count]]}} -> {:ok, count}
      {:error, reason} -> {:error, reason}
    end
  end

  defp filter(%{name: :channel}, _context, adapter),
    do: {"AND channel = $5::text", [adapter.channel]}

  defp filter(%{name: :adapter}, _context, adapter),
    do: {"AND metadata->>'adapter_id' = $5::text", [adapter.id]}

  defp channel_cap(channel, step) do
    step_cap(step, ["concurrency", "channel"]) ||
      config_cap([:channel, channel])
  end

  defp adapter_cap(adapter, step) do
    step_cap(step, ["concurrency", "adapter"]) ||
      config_cap([:adapter, adapter.id]) ||
      config_cap([:adapter, adapter.name])
  end

  defp step_cap(%{config: config}, path) when is_map(config), do: get_in(config, path)
  defp step_cap(_step, _path), do: nil

  defp config_cap(path) do
    :dripdrop
    |> Application.get_env(:dispatch_concurrency, [])
    |> get_config_path(path)
    |> parse_cap()
  end

  defp get_config_path(config, path) when is_list(config),
    do: config |> Map.new() |> get_config_path(path)

  defp get_config_path(config, path) when is_map(config),
    do: Enum.reduce(path, config, &config_key/2)

  defp get_config_path(_config, _path), do: nil

  defp config_key(key, config) when is_map(config) do
    Map.get(config, key) || Map.get(config, to_string(key)) || Map.get(config, to_atom(key))
  end

  defp config_key(_key, _config), do: nil

  defp to_atom(key) when is_atom(key), do: key

  defp to_atom(key) when is_binary(key) do
    String.to_existing_atom(key)
  rescue
    ArgumentError -> key
  end

  defp parse_cap(cap) when is_integer(cap), do: cap

  defp parse_cap(cap) when is_binary(cap) do
    case Integer.parse(cap) do
      {cap, ""} -> cap
      _invalid -> nil
    end
  end

  defp parse_cap(_cap), do: nil

  defp defer_seconds do
    :dripdrop
    |> Application.get_env(:dispatch_concurrency, [])
    |> get_config_path([:defer_seconds])
    |> parse_cap()
    |> Kernel.||(@default_defer_seconds)
  end

  defp emit_hit(scope, context, adapter, count, defer_until) do
    :telemetry.execute([:dripdrop, :dispatch, :concurrency_limited], %{count: 1}, %{
      step_execution_id: context.execution.id,
      tenant_key: context.execution.tenant_key,
      channel: adapter.channel,
      adapter_id: adapter.id,
      scope: scope.name,
      cap: scope.cap,
      in_flight: count,
      defer_until: defer_until
    })
  end
end