lib/gen_stage/dispatchers/demand_dispatcher.ex

defmodule GenStage.DemandDispatcher do
  @moduledoc """
  A dispatcher that sends batches to the highest demand.

  This is the default dispatcher used by `GenStage`. In order
  to avoid greedy consumers, it is recommended that all consumers
  have exactly the same maximum demand.

  ## Options

  The demand dispatcher accepts the following options
  on initialization:

    * `:shuffle_demands_on_first_dispatch` - when `true`, shuffle the initial demands list
      which is constructed on subscription before first dispatch. It prevents overloading
      the first consumer on first dispatch. Defaults to `false`.

    * `:max_demand` - the maximum demand expected on `GenStage.ask/3`.
      Defaults to the first demand asked.

  ### Examples

  To start a producer with demands shuffled on first dispatch:

      {:producer, state, dispatcher: {GenStage.DemandDispatcher, shuffle_demands_on_first_dispatch: true}}
  """

  @behaviour GenStage.Dispatcher

  @doc false
  def init(opts) do
    shuffle_demand = Keyword.get(opts, :shuffle_demands_on_first_dispatch, false)
    max_demand = Keyword.get(opts, :max_demand)

    {:ok, {[], 0, max_demand, shuffle_demand}}
  end

  @doc false
  def info(msg, state) do
    send(self(), msg)
    {:ok, state}
  end

  @doc false
  def subscribe(_opts, {pid, ref}, {demands, pending, max, shuffle_demand}) do
    {:ok, 0, {demands ++ [{0, pid, ref}], pending, max, shuffle_demand}}
  end

  @doc false
  def cancel({_, ref}, {demands, pending, max, shuffle_demand}) do
    {current, demands} = pop_demand(ref, demands)
    {:ok, 0, {demands, current + pending, max, shuffle_demand}}
  end

  @doc false
  def ask(counter, {pid, ref}, {demands, pending, max, shuffle_demand}) do
    max = max || counter

    if counter > max do
      warning =
        ~c"GenStage producer DemandDispatcher expects a maximum demand of ~tp. " ++
          ~c"Using different maximum demands will overload greedy consumers. " ++
          ~c"Got demand for ~tp events from ~tp~n"

      :error_logger.warning_msg(warning, [max, counter, pid])
    end

    {current, demands} = pop_demand(ref, demands)
    demands = add_demand(current + counter, pid, ref, demands)

    already_sent = min(pending, counter)
    {:ok, counter - already_sent, {demands, pending - already_sent, max, shuffle_demand}}
  end

  @doc false
  def dispatch(events, length, {demands, pending, max, true}) do
    dispatch(events, length, {Enum.shuffle(demands), pending, max, false})
  end

  def dispatch(events, length, {demands, pending, max, false}) do
    {events, demands} = dispatch_demand(events, length, demands)
    {:ok, events, {demands, pending, max, false}}
  end

  defp dispatch_demand([], _length, demands) do
    {[], demands}
  end

  defp dispatch_demand(events, _length, [{0, _, _} | _] = demands) do
    {events, demands}
  end

  defp dispatch_demand(events, length, [{counter, pid, ref} | demands]) do
    {deliver_now, deliver_later, length, counter} = split_events(events, length, counter)
    Process.send(pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect])
    demands = add_demand(counter, pid, ref, demands)
    dispatch_demand(deliver_later, length, demands)
  end

  defp split_events(events, length, counter) when length <= counter do
    {events, [], 0, counter - length}
  end

  defp split_events(events, length, counter) do
    {now, later} = Enum.split(events, counter)
    {now, later, length - counter, 0}
  end

  defp add_demand(counter, pid, ref, [{current, _, _} | _] = demands) when counter > current do
    [{counter, pid, ref} | demands]
  end

  defp add_demand(counter, pid, ref, [demand | demands]) do
    [demand | add_demand(counter, pid, ref, demands)]
  end

  defp add_demand(counter, pid, ref, []) when is_integer(counter) do
    [{counter, pid, ref}]
  end

  defp pop_demand(ref, demands) do
    {{current, _pid, ^ref}, rest} = List.keytake(demands, ref, 2)
    {current, rest}
  end
end