lib/manager/components.ex

defmodule ALF.Manager.Components do
  @moduledoc """
    Utils function for working with pipeline components
  """

  alias ALF.Components.Stage
  alias ALF.Builder

  def add_component(components, stage_set_ref, pipeline_sup_pid) do
    existing_workers = find_and_reload_existing_workers(components, stage_set_ref)

    new_stage = Builder.add_stage_worker(pipeline_sup_pid, existing_workers)

    stages_to_subscribe_pids = Enum.map(new_stage.subscribers, fn {pid, _opts} -> pid end)

    new_components =
      components
      |> Enum.reduce([], fn component, acc ->
        cond do
          Enum.member?(stages_to_subscribe_pids, component.pid) ->
            GenStage.sync_subscribe(component.pid,
              to: new_stage.pid,
              max_demand: 1,
              cancel: :transient
            )

            [component | acc]

          component.type == :stage and component.stage_set_ref == new_stage.stage_set_ref ->
            component = Stage.inc_count(component)

            if component.number == new_stage.number - 1 do
              [new_stage | [component | acc]]
            else
              [component | acc]
            end

          true ->
            [component | acc]
        end
      end)
      |> Enum.reverse()

    {new_stage, new_components}
  end

  def remove_component(components, stage_set_ref) do
    existing_workers = find_and_reload_existing_workers(components, stage_set_ref)

    if length(existing_workers) > 1 do
      stage_to_delete = Enum.max_by(existing_workers, & &1.number)

      Enum.map(stage_to_delete.subscribed_to, fn subscription ->
        :ok = GenStage.cancel(subscription, :shutdown)
      end)

      new_components =
        components
        |> Enum.reduce([], fn component, acc ->
          cond do
            component.pid == stage_to_delete.pid ->
              acc

            component.type == :stage && component.stage_set_ref == stage_to_delete.stage_set_ref ->
              component = Stage.dec_count(component)
              [component | acc]

            subscribed_to =
                Enum.find(component.subscribed_to, fn {pid, _ref} ->
                  pid == stage_to_delete.pid
                end) ->
              :ok = GenStage.cancel(subscribed_to, :shutdown)
              [component | acc]

            true ->
              [component | acc]
          end
        end)
        |> Enum.reverse()

      {:ok, {stage_to_delete, new_components}}
    else
      {:error, :only_one_left}
    end
  end

  defp find_and_reload_existing_workers(components, stage_set_ref) do
    Enum.reduce(components, [], fn
      %Stage{stage_set_ref: ^stage_set_ref, pid: pid}, acc ->
        [Stage.__state__(pid) | acc]

      _other, acc ->
        acc
    end)
  end
end