lib/bio/keyed_reduce.ex

# Copyright 2018 - 2022, Mathijs Saey, Vrije Universiteit Brussel

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import Skitter.DSL.Operation, only: :macros

defoperation Skitter.BIO.KeyedReduce, in: _, out: _, strategy: Skitter.BIS.KeyedState do
  @operationdoc """
  Keyed Reduce operation.

  This operation implements a reduce operation. It accepts three arguments wrapped in a tuple when
  embedded inside a workflow: a key function, a reduce function and an initial state. When this
  operation receives data, the key function is called with the received data as its first
  argument. The key function should return a key, which is used to obtain the state associated
  with the key. Afterwards, the reduce function is called with the received data as its first
  argument and the state associated with the key returned by the key function as its second
  argument. This function should return a `{state, emit}` tuple. The first value of this tuple
  will be stored as the new state of the key, while the second value of this tuple will be emitted
  on the `_` out port.
  """
  defcb conf(args), do: args

  defcb initial_state do
    {_, _, initial_state} = config()
    initial_state
  end

  defcb key(val) do
    {key_fn, _, _} = config()
    key_fn.(val)
  end

  defcb react(val) do
    {_, red_fn, _} = config()
    {new_state, emit} = red_fn.(val, state())
    state <~ new_state
    emit ~> _
  end
end