lib/exmld.ex

defmodule Exmld do
  @moduledoc ~S"""
  This allows items extracted from Kinesis stream records (or sub-records in a [KPL
  aggregate record](https://github.com/AdRoll/erlmld/blob/HEAD/proto/kpl_agg.proto)) to
  be processed by a pipeline of workers which may differ in number from the number of
  shards owned by the current node (which is the normal processing model offered by
  [erlmld](https://github.com/AdRoll/erlmld)).

  This is beneficial when using aggregate records which can be processed in approximate
  order according to their partition keys as opposed to strict ordering based on the
  shards they arrived on.  For example, suppose the following two Kinesis records are
  received on two different shards:

      Record 1 (a KPL aggregate record)
        - partition key: "xyzzy"
        - subrecord a:
          - partition key: "asdf"
          - value: "12345"
        - subrecord b:
          - partition key: "fdsa"
          - value: "54321"

      Record 2 (a KPL aggregate record)
        - partition key: "qwer"
        - subrecord a:
          - partition key: "asdf"
          - value: "23456"
        - subrecord b:
          - partition key: "z"
          - value: "0"


  Using the normal Kinesis processing paradigm, each shard will be processed in order.
  `erlmld` supports this by spawning a process for each owned shard, which handles each
  record seen on the shard in sequence:

      Worker 1:
        1. handle record "xyzzy"
          a. handle sub-record "asdf"
          b. handle sub-record "fdsa"

      Worker 2:
        1. handle record "qwer"
          a. handle sub-record "asdf"
          b. handle sub-record "z"


  This can fail to make use of all available resources since the maximum concurrency is
  limited by the number of owned shards.  If the application can tolerate the handling of
  sub-records in a non-strict order, it can use a `Flow`-based MapReduce-style scheme:

      [Worker 1]  [Worker 2]     (processes which produce Kinesis records)
          |           |
          v           v
      [Exmld.KinesisStage, ...]  (stages receiving Exmld.KinesisWorker.Datums)
                |
                v
          [M1] .... [Mn]  (mappers which extract items)
            |\       /|
            | \     / |
            |  \   /  |
            |   \ /   |
            |    \    |
            |   / \   |
            |  /   \  |
            | /     \ |
            |/       \|
          [R1] .... [Rn]  (reducers which handle extracted items)

  The number of reducers is configurable and defaults to the number of schedulers online.
  The processing application will specify a means of extracting a partition key from each
  extracted item; these will be used to consistently map items to reducers (which is where
  the actual application work occurs).

  Using the above example and specifying a sub-record's partition key as an item key:

    1. Worker 1 will produce the "asdf" and "fdsa" sub-records from outer record "xyzzy"
    and send them to a pre-configured `Exmld.KinesisStage` (or round-robin to a list of
    such stages).

    2. Worker 2 will similarly produce the "asdf" and "z" sub-records from outer record
    "qwer".

    3. Each receiving stage will wrap and forward these sub-records for handling by the
    flow.

    4. The application will have provided an "identity" item extraction function since KPL
    aggregation is being used here (or otherwise a function accepting one record and
    returning a list containing a single item).

    5. The application will have provided a partition key extraction function which
    returns an appropriate partition key to be used in consistently mapping items to
    reducers.

    6. The first received "asdf" sub-record is provided to some reducer `Rx`.  The second
    received "asdf" sub-record is provided to the same reducer since its extracted key has
    the same hash.

    7. The "fdsa" and "z" sub-records are similarly provided to some worker `Ry` and/or
    `Rz` based on the hash of their partition keys.

    8. The application-provided reducer function notifies each originating stage of the
    disposition of processing for items received from it as processing progresses.

    9. Eventually, processing disposition is provided back to the originating workers,
    which can decide whether or not (and where) to checkpoint.

  """

  require Record
  Record.defrecord(:sequence_number, Record.extract(:sequence_number,
                                                    from_lib: "erlmld/include/erlmld.hrl"))
  Record.defrecord(:checkpoint, Record.extract(:checkpoint,
                                               from_lib: "erlmld/include/erlmld.hrl"))
  Record.defrecord(:stream_record, Record.extract(:stream_record,
                                                  from_lib: "erlmld/include/erlmld.hrl"))

  @type sequence_number :: record(:sequence_number)
  @type checkpoint :: record(:checkpoint)
  @type stream_record :: record(:stream_record)
  @type shard_id :: binary

  @type item :: any
  @type partition_key :: any
  @type reducer_state :: any

  @doc """
  Accepts a flow producing `Exmld.KinesisWorker.Datum`s (e.g,. a flow created from
  `Exmld.KinesisStage`s) and returns another flow.
  """
  # each stream record should be associated with the genstage which received it and the
  # worker which produced it.  each item extracted from a stream record should indicate
  # the record it came from, the item id within the record, and the total number of items
  # in the record.  the extraction and processing functions should correctly handle
  # heartbeats.  the processing function should process as much data as possible, and
  # periodically inform the source genstages of all the item ids which have been
  # (successfully or not) processed.  those genstages in turn will maintain information
  # about what has been successfully processed, which the producing kinesis workers can
  # use when checkpointing.
  @spec flow(# a flow which produces `Datum`s:
             flow :: Flow.t,
             # arity-1 function mapping a datum to list of zero or more items:
             extract_items_fn :: ((Exmld.KinesisWorker.Datum) -> [item]),
             # arity-1 function or flow partition key shortcut for partitioning items:
             partition_key :: {:elem, non_neg_integer}
                              | {:key, atom}
                              | ((item) -> partition_key),
             # arity-0 function returning initial reducer state:
             state0 :: (() -> reducer_state),
             # arity-2 function accepting item being processed and reducer state:
             process_fn :: ((item, reducer_state) -> reducer_state),
             opts :: keyword) :: Flow.t
  def flow(flow,
           extract_items_fn,
           partition_key,
           state0,
           process_fn,
           opts \\ []) do
    extra = opts[:append] || &(&1)
    flow
    |> Flow.flat_map(extract_items_fn)
    |> Flow.partition(key: partition_key,
                      stages: opts[:num_stages] || System.schedulers_online(),
                      min_demand: opts[:min_demand] || 1,
                      max_demand: opts[:max_demand] || 500,
                      window: opts[:window] || Flow.Window.global())
    |> Flow.reduce(state0, process_fn)
    |> extra.()
  end

  @doc """
  You can use this one to keep building your flow after calling flow/6 above.
  """
  def from_stages(opts) do
    Flow.from_stages(opts.stages)
    |> flow(opts.extract_items_fn, opts.partition_key,
            opts.state0, opts.process_fn, opts.flow_opts)
  end

  def start_link(opts) do
    from_stages(opts) |> Flow.start_link()
  end

  def child_spec(opts) do
    %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
  end
end