lib/manager/stream_registry.ex

defmodule ALF.Manager.StreamRegistry do
  defstruct ref: nil,
            queue: :queue.new(),
            inputs: %{},
            in_progress: %{},
            decomposed: %{},
            recomposed: %{}

  def empty?(%__MODULE__{} = registry) do
    Enum.empty?(registry.inputs) and
      Enum.empty?(registry.in_progress) and
      Enum.empty?(registry.decomposed) and
      Enum.empty?(registry.recomposed)
  end

  def add_to_registry(%__MODULE__{} = registry, ips) do
    {inputs, in_progress, decomposed, recomposed} =
      Enum.reduce(
        ips,
        {registry.inputs, registry.in_progress, registry.decomposed, registry.recomposed},
        fn ip, {inputs, in_progress, decomposed, recomposed} ->
          cond do
            ip.in_progress ->
              {inputs, Map.put(in_progress, ip.ref, ip.event), decomposed, recomposed}

            ip.decomposed ->
              {inputs, in_progress, Map.put(decomposed, ip.ref, ip.event), recomposed}

            ip.recomposed ->
              {inputs, in_progress, decomposed, Map.put(recomposed, ip.ref, ip.event)}

            true ->
              {Map.put(inputs, ip.ref, ip.event), in_progress, decomposed, recomposed}
          end
        end
      )

    %{
      registry
      | inputs: inputs,
        in_progress: in_progress,
        decomposed: decomposed,
        recomposed: recomposed
    }
  end

  def remove_from_registry(%__MODULE__{} = registry, ips) do
    {inputs, in_progress, decomposed, recomposed} =
      Enum.reduce(
        ips,
        {registry.inputs, registry.in_progress, registry.decomposed, registry.recomposed},
        fn ip, {inputs, in_progress, decomposed, recomposed} ->
          cond do
            ip.in_progress ->
              {inputs, Map.delete(in_progress, ip.ref), decomposed, recomposed}

            ip.decomposed ->
              {inputs, in_progress, Map.delete(decomposed, ip.ref), recomposed}

            ip.recomposed ->
              {inputs, in_progress, decomposed, Map.delete(recomposed, ip.ref)}

            true ->
              {Map.delete(inputs, ip.ref), in_progress, decomposed, recomposed}
          end
        end
      )

    %{
      registry
      | inputs: inputs,
        in_progress: in_progress,
        decomposed: decomposed,
        recomposed: recomposed
    }
  end
end