lib/module.ex

defmodule Strom.Module do
  defstruct module: nil, opts: [], state: nil

  def start(module, opts \\ []) do
    state = apply(module, :start, [opts])
    %__MODULE__{module: module, state: state, opts: opts}
  end

  def call(flow, %__MODULE__{module: module, state: state}, names)
      when is_map(flow) and is_list(names) do
    streams = Map.take(flow, names)

    sub_flows =
      Enum.reduce(streams, %{}, fn {name, stream}, acc ->
        stream =
          if is_pipeline_module?(module) do
            apply(module, :stream, [stream])
          else
            apply(module, :call, [stream, state])
          end

        Map.put(acc, name, stream)
      end)

    Map.merge(flow, sub_flows)
  end

  def call(flow, %__MODULE__{} = state, name) do
    call(flow, state, [name])
  end

  defp is_pipeline_module?(module) when is_atom(module) do
    is_list(module.alf_components())
  rescue
    _error -> false
  end

  def stop(%__MODULE__{module: module, state: state}) do
    if is_pipeline_module?(module) do
      apply(module, :stop, [])
    else
      apply(module, :stop, [state])
    end
  end
end