lib/builder.ex

defmodule ALF.Builder do
  alias ALF.Pipeline

  alias ALF.Components.{
    Producer,
    Stage,
    Goto,
    DeadEnd,
    GotoPoint,
    Switch,
    Clone,
    Consumer,
    Plug,
    Unplug,
    Decomposer,
    Recomposer,
    Tbd
  }

  def build(pipe_spec, supervisor_pid, manager_name, pipeline_module, telemetry_enabled \\ nil)
      when is_list(pipe_spec) do
    producer = start_producer(supervisor_pid, manager_name, pipeline_module, telemetry_enabled)

    {last_stages, final_stages} =
      do_build_pipeline(pipe_spec, [producer], supervisor_pid, [], telemetry_enabled)

    consumer =
      start_consumer(
        supervisor_pid,
        last_stages,
        manager_name,
        pipeline_module,
        telemetry_enabled
      )

    {producer, consumer} = set_modules({producer, consumer}, last_stages)
    pipeline = %Pipeline{producer: producer, consumer: consumer, components: final_stages}
    {:ok, pipeline}
  end

  def add_stage_worker(supervisor_pid, [%Stage{} = existing_stage | _] = existing_stages) do
    subscribe_to =
      Enum.map(existing_stage.subscribed_to, fn {pid, _ref} ->
        {pid, [max_demand: 1, cancel: :transient]}
      end)

    stage = %{
      existing_stage
      | subscribe_to: subscribe_to,
        subscribed_to: [],
        number: length(existing_stages),
        count: length(existing_stages) + 1
    }

    {:ok, stage_pid} = DynamicSupervisor.start_child(supervisor_pid, {stage.__struct__, stage})
    %{stage | pid: stage_pid}
  end

  defp start_producer(supervisor_pid, manager_name, pipeline_module, telemetry_enabled) do
    producer = %Producer{
      manager_name: manager_name,
      pipeline_module: pipeline_module,
      telemetry_enabled: telemetry_enabled
    }

    {:ok, producer_pid} = DynamicSupervisor.start_child(supervisor_pid, {Producer, producer})
    %{producer | pid: producer_pid}
  end

  defp start_consumer(
         supervisor_pid,
         last_stages,
         manager_name,
         pipeline_module,
         telemetry_enabled
       ) do
    subscribe_to = subscribe_to_opts(last_stages)

    consumer = %Consumer{
      subscribe_to: subscribe_to,
      manager_name: manager_name,
      pipeline_module: pipeline_module,
      telemetry_enabled: telemetry_enabled
    }

    {:ok, consumer_pid} = DynamicSupervisor.start_child(supervisor_pid, {Consumer, consumer})
    %{consumer | pid: consumer_pid}
  end

  defp set_modules({producer, consumer}, last_stages) do
    last_stage = hd(last_stages)

    producer = %{
      producer
      | pipe_module: last_stage.pipeline_module,
        pipeline_module: last_stage.pipeline_module
    }

    consumer = %{
      consumer
      | pipe_module: last_stage.pipeline_module,
        pipeline_module: last_stage.pipeline_module
    }

    {producer, consumer}
  end

  defp do_build_pipeline(pipe_spec, producers, supervisor_pid, final_stages, telemetry_enabled)
       when is_list(pipe_spec) do
    pipe_spec
    |> Enum.reduce({producers, final_stages}, fn stage_spec, {prev_stages, stages} ->
      case stage_spec do
        %Stage{count: count} = stage ->
          stage_set_ref = make_ref()

          new_stages =
            Enum.map(0..(count - 1), fn number ->
              start_stage(
                %{stage | stage_set_ref: stage_set_ref, number: number},
                supervisor_pid,
                prev_stages,
                telemetry_enabled
              )
            end)

          {new_stages, stages ++ new_stages}

        %Goto{} = goto ->
          goto = start_stage(goto, supervisor_pid, prev_stages, telemetry_enabled)
          {[goto], stages ++ [goto]}

        %DeadEnd{} = dead_end ->
          dead_end = start_stage(dead_end, supervisor_pid, prev_stages, telemetry_enabled)
          {[], stages ++ [dead_end]}

        %GotoPoint{} = goto_point ->
          goto_point = start_stage(goto_point, supervisor_pid, prev_stages, telemetry_enabled)
          {[goto_point], stages ++ [goto_point]}

        %Switch{branches: branches} = switch ->
          switch = start_stage(switch, supervisor_pid, prev_stages, telemetry_enabled)

          {last_stages, branches} =
            Enum.reduce(branches, {[], %{}}, fn {key, inner_pipe_spec},
                                                {all_last_stages, branches} ->
              {last_stages, final_stages} =
                do_build_pipeline(
                  inner_pipe_spec,
                  [{switch, partition: key}],
                  supervisor_pid,
                  [],
                  telemetry_enabled
                )

              {all_last_stages ++ last_stages, Map.put(branches, key, final_stages)}
            end)

          switch = %{switch | branches: branches}

          {last_stages, stages ++ [switch]}

        %Clone{to: pipe_stages} = clone ->
          clone = start_stage(clone, supervisor_pid, prev_stages, telemetry_enabled)

          {last_stages, final_stages} =
            do_build_pipeline(pipe_stages, [clone], supervisor_pid, [], telemetry_enabled)

          clone = %{clone | to: final_stages}

          {last_stages ++ [clone], stages ++ [clone]}

        %Plug{} = plug ->
          plug = start_stage(plug, supervisor_pid, prev_stages, telemetry_enabled)
          {[plug], stages ++ [plug]}

        %Unplug{} = unplug ->
          unplug = start_stage(unplug, supervisor_pid, prev_stages, telemetry_enabled)
          {[unplug], stages ++ [unplug]}

        %Decomposer{} = decomposer ->
          decomposer = start_stage(decomposer, supervisor_pid, prev_stages, telemetry_enabled)
          {[decomposer], stages ++ [decomposer]}

        %Recomposer{} = recomposer ->
          recomposer = start_stage(recomposer, supervisor_pid, prev_stages, telemetry_enabled)
          {[recomposer], stages ++ [recomposer]}

        %Tbd{} = tbd ->
          tbd = start_stage(tbd, supervisor_pid, prev_stages, telemetry_enabled)
          {[tbd], stages ++ [tbd]}
      end
    end)
  end

  defp start_stage(stage, supervisor_pid, prev_stages, telemetry_enabled) do
    stage = %{
      stage
      | subscribe_to: subscribe_to_opts(prev_stages),
        telemetry_enabled: telemetry_enabled
    }

    {:ok, stage_pid} = DynamicSupervisor.start_child(supervisor_pid, {stage.__struct__, stage})
    %{stage | pid: stage_pid}
  end

  defp subscribe_to_opts(stages) do
    Enum.map(stages, fn stage ->
      case stage do
        {stage, partition: key} ->
          {stage.pid, max_demand: 1, cancel: :transient, partition: key}

        stage ->
          {stage.pid, max_demand: 1, cancel: :transient}
      end
    end)
  end
end