lib/builder.ex

defmodule ALF.Builder do
  alias ALF.Pipeline

  alias ALF.Components.{
    Producer,
    DeadEnd,
    Switch,
    Consumer
  }

  @spec build(atom, pid, boolean) :: {:ok, Pipeline.t()}
  def build(pipeline_module, supervisor_pid, telemetry) do
    pipe_spec = pipeline_module.alf_components()
    producer = start_producer(supervisor_pid, pipeline_module, telemetry)

    {last_stages, final_stages} =
      do_build_pipeline(
        pipeline_module,
        pipe_spec,
        [producer],
        supervisor_pid,
        [],
        telemetry
      )

    consumer =
      start_consumer(
        supervisor_pid,
        last_stages,
        pipeline_module,
        telemetry
      )

    {producer, consumer} =
      set_modules_to_producer_and_consumer({producer, consumer}, pipeline_module)

    pipeline = %Pipeline{producer: producer, consumer: consumer, components: final_stages}
    {:ok, pipeline}
  end

  defp do_build_pipeline(
         pipeline_module,
         pipe_spec,
         producers,
         supervisor_pid,
         final_stages,
         telemetry
       )
       when is_list(pipe_spec) do
    pipe_spec
    |> Enum.reduce({producers, final_stages}, fn stage_spec, {prev_stages, stages} ->
      case stage_spec do
        %Switch{branches: branches, count: count} = switch ->
          set_ref = make_ref()

          {all_last_stages, switches} =
            Enum.reduce(0..(count - 1), {[], []}, fn number, {acc_last_stages, acc_switches} ->
              switch =
                switch
                |> Map.merge(%{
                  pipeline_module: pipeline_module,
                  number: number,
                  set_ref: set_ref,
                  telemetry: telemetry
                })
                |> start_stage(supervisor_pid, prev_stages)

              {last_stages, branches} =
                Enum.reduce(branches, {[], %{}}, fn {key, inner_pipe_spec},
                                                    {all_last_stages, branches} ->
                  {last_stages, final_stages} =
                    do_build_pipeline(
                      pipeline_module,
                      inner_pipe_spec,
                      [{switch, partition: key}],
                      supervisor_pid,
                      [],
                      telemetry
                    )

                  {all_last_stages ++ last_stages, Map.put(branches, key, final_stages)}
                end)

              switch = %{switch | branches: branches}
              {acc_last_stages ++ last_stages, acc_switches ++ [switch]}
            end)

          {all_last_stages, stages ++ switches}

        %DeadEnd{} = dead_end ->
          dead_ends =
            build_component_set(dead_end, supervisor_pid, prev_stages, pipeline_module, telemetry)

          {[], stages ++ dead_ends}

        component ->
          new_components =
            build_component_set(
              component,
              supervisor_pid,
              prev_stages,
              pipeline_module,
              telemetry
            )

          {new_components, stages ++ new_components}
      end
    end)
  end

  defp build_component_set(component, supervisor_pid, prev_stages, pipeline_module, telemetry) do
    set_ref = make_ref()

    Enum.map(0..(component.count - 1), fn number ->
      component
      |> Map.merge(%{
        pipeline_module: pipeline_module,
        set_ref: set_ref,
        number: number,
        telemetry: telemetry
      })
      |> start_stage(supervisor_pid, prev_stages)
    end)
  end

  @spec build_sync(atom, boolean) :: [map]
  def build_sync(pipeline_module, telemetry) do
    pipe_spec = pipeline_module.alf_components()
    producer = Producer.init_sync(%Producer{pipeline_module: pipeline_module}, telemetry)
    {components, last_stage_refs} = do_build_sync(pipe_spec, [producer.pid], telemetry)
    consumer = Consumer.init_sync(%Consumer{pipeline_module: pipeline_module}, telemetry)
    subscribed_to = Enum.map(last_stage_refs, &{&1, :sync})
    consumer = %{consumer | subscribed_to: subscribed_to}
    [producer | components] ++ [consumer]
  end

  defp do_build_sync(pipe_spec, stage_refs, telemetry) when is_list(pipe_spec) do
    Enum.reduce(pipe_spec, {[], stage_refs}, fn comp, {stages, last_stage_refs} ->
      subscribed_to = Enum.map(last_stage_refs, &{&1, :sync})

      case comp do
        %Switch{branches: branches} = switch ->
          switch = switch.__struct__.init_sync(switch, telemetry)

          branches =
            Enum.reduce(branches, %{}, fn {key, inner_pipe_spec}, branch_pipes ->
              {branch_stages, _last_ref} = do_build_sync(inner_pipe_spec, [switch.pid], telemetry)

              Map.put(branch_pipes, key, branch_stages)
            end)

          switch = %{switch | branches: branches, subscribed_to: subscribed_to}

          last_stage_refs =
            Enum.map(branches, fn {_key, stages} ->
              case List.last(stages) do
                nil -> nil
                stage -> stage.pid
              end
            end)

          {stages ++ [switch], last_stage_refs}

        component ->
          component = component.__struct__.init_sync(component, telemetry)
          component = %{component | subscribed_to: subscribed_to}
          {stages ++ [component], [component.pid]}
      end
    end)
  end

  defp start_producer(supervisor_pid, pipeline_module, telemetry) do
    producer = %Producer{
      pipeline_module: pipeline_module,
      set_ref: make_ref(),
      telemetry: telemetry
    }

    {:ok, producer_pid} = DynamicSupervisor.start_child(supervisor_pid, {Producer, producer})
    %{producer | pid: producer_pid}
  end

  defp start_consumer(
         supervisor_pid,
         last_stages,
         pipeline_module,
         telemetry
       ) do
    consumer = %Consumer{
      pipeline_module: pipeline_module,
      telemetry: telemetry
    }

    {:ok, consumer_pid} = DynamicSupervisor.start_child(supervisor_pid, {Consumer, consumer})
    subscribe(consumer_pid, last_stages)

    %{consumer | pid: consumer_pid}
  end

  defp set_modules_to_producer_and_consumer({producer, consumer}, pipeline_module) do
    producer = %{producer | pipeline_module: pipeline_module}
    consumer = %{consumer | pipeline_module: pipeline_module}

    {producer, consumer}
  end

  defp start_stage(stage, supervisor_pid, prev_stages) do
    {:ok, stage_pid} = DynamicSupervisor.start_child(supervisor_pid, {stage.__struct__, stage})
    subscribe(stage_pid, prev_stages)
    %{stage | pid: stage_pid}
  end

  defp subscribe(stage_pid, stages) do
    stages
    |> Enum.each(fn stage ->
      case stage do
        {stage, partition: key} ->
          GenStage.async_subscribe(stage_pid,
            to: stage.pid,
            max_demand: 1,
            cancel: :temporary,
            partition: key
          )

        stage ->
          GenStage.async_subscribe(stage_pid, to: stage.pid, max_demand: 1, cancel: :temporary)
      end
    end)
  end
end