lib/bis/stream_source.ex

# Copyright 2018 - 2022, Mathijs Saey, Vrije Universiteit Brussel

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import Skitter.DSL.Strategy, only: :macros

defstrategy Skitter.BIS.StreamSource do
  @moduledoc """
  Strategy for stream-based source operations.

  This strategy can be used to create a source operation. It is designed for operations which
  generate a stream of data that is to be sent into the workflow.

  When the operation is deployed, this strategy will spawn a single worker and call the operations
  `stream` callback. This callback should return a stream. Once deployed, the elements of this
  stream will be emitted by one by one. The strategy ensures these values are shuffled over the
  available worker nodes.

  ## Operation Properties

  * in ports: none
  * out ports: a single out port.
  * callbacks:
    * `stream`: Called at deployment time. This callback should return a stream, which will be
    emitted once the operation has been deployed.
  """
  defhook deploy(args) do
    call(:stream, args: [args]).result |> remote_worker(:source) |> send(:start)
    Remote.on_all_workers(fn -> local_worker(nil, :sender) end) |> Enum.map(&elem(&1, 1))
  end

  defhook process(:start, stream, :source) do
    stream
    |> Stream.each(&send(Enum.random(deployment()), {:emit, &1}))
    |> Stream.run()
  end

  defhook process({:emit, emit}, nil, :sender) do
    emit(to_port(0, [emit]))
    nil
  end
end