lib/gen_stage/dispatcher.ex

defmodule GenStage.Dispatcher do
  @moduledoc """
  This module defines the behaviour used by `:producer` and
  `:producer_consumer` to dispatch events.

  When using a `:producer` or `:producer_consumer`, the dispatcher
  may be configured on init as follows:

      {:producer, state, dispatcher: GenStage.BroadcastDispatcher}

  Some dispatchers may require options to be given on initialization,
  those can be done with a tuple:

      {:producer, state, dispatcher: {GenStage.PartitionDispatcher, partitions: 0..3}}

  Elixir ships with the following dispatcher implementations:

    * `GenStage.DemandDispatcher` - dispatches the given batch of
      events to the consumer with the biggest demand in a FIFO
      ordering. This is the default dispatcher.

    * `GenStage.BroadcastDispatcher` - dispatches all events to all
      consumers. The demand is only sent upstream once all consumers
      ask for data.

    * `GenStage.PartitionDispatcher` - dispatches all events to a
      fixed amount of consumers that works as partitions according
      to a hash function.

  """

  @typedoc "Options used by `init/1`"
  @type options :: keyword

  @doc """
  Called on initialization with the options given on `c:GenStage.init/1`.
  """
  @callback init(opts :: options) :: {:ok, state} when state: any

  @doc """
  Called every time the producer gets a new subscriber.
  """
  @callback subscribe(opts :: keyword(), from :: {pid, reference}, state :: term) ::
              {:ok, demand :: non_neg_integer, new_state} | {:error, term}
            when new_state: term

  @doc """
  Called every time a subscription is cancelled or the consumer goes down.

  It is guaranteed the reference given in `from` points to a reference
  previously given in subscribe.
  """
  @callback cancel(from :: {pid, reference}, state :: term) ::
              {:ok, demand :: non_neg_integer, new_state}
            when new_state: term

  @doc """
  Called every time a consumer sends demand.

  The demand will always be a positive integer (more than 0).
  This callback must return the `actual_demand` as part of its
  return tuple. The returned demand is then sent to producers.

  It is guaranteed the reference given in `from` points to a
  reference previously given in subscribe.
  """
  @callback ask(demand :: pos_integer, from :: {pid, reference}, state :: term) ::
              {:ok, actual_demand :: non_neg_integer, new_state}
            when new_state: term

  @doc """
  Called every time a producer wants to dispatch an event.

  The events will always be a non empty list. This callback may
  receive more events than previously asked and therefore must
  return events it cannot not effectively deliver as part of its
  return tuple. Any `leftover_events` will be stored by producers
  in their buffer.

  It is important to emphasize that `leftover_events` can happen
  in any dispatcher implementation. After all, a consumer can
  subscribe, ask for events and crash. Eventually the events
  the consumer asked will be delivered while the consumer no longer
  exists, meaning they must be returned as left_over events until
  another consumer subscribes.

  It is guaranteed the reference given in `from` points to a
  reference previously given in subscribe. It is also recommended
  for events to be sent with `Process.send/3` and the `[:noconnect]`
  option as the consumers are all monitored by the producer. For
  example:

      Process.send(consumer, {:"$gen_consumer", {self(), consumer_ref}, events}, [:noconnect])

  """
  @callback dispatch(events :: nonempty_list(term), length :: pos_integer, state :: term) ::
              {:ok, leftover_events :: [term], new_state}
            when new_state: term

  @doc """
  Used to send an info message to the current process.

  In case the dispatcher is doing buffering, the message must
  only be sent after all currently buffered consumer messages are
  delivered.
  """
  @callback info(msg :: term, state :: term) :: {:ok, new_state} when new_state: term
end