lib/elsa/producer/initializer.ex

defmodule Elsa.Producer.Initializer do
  @spec init(registry :: atom(), producer_configs :: list(keyword)) :: [Supervisor.child_spec()]
  def init(registry, producer_configs) do
    brod_client = Elsa.Registry.whereis_name({registry, :brod_client})

    case Keyword.keyword?(producer_configs) do
      true ->
        child_spec(registry, brod_client, producer_configs)

      false ->
        Enum.map(producer_configs, fn pc ->
          child_spec(registry, brod_client, pc)
        end)
        |> List.flatten()
    end
  end

  defp child_spec(registry, brod_client, producer_config) do
    topic = Keyword.fetch!(producer_config, :topic)
    config = Keyword.get(producer_config, :config, [])

    partitions = Elsa.Util.partition_count(brod_client, topic)

    0..(partitions - 1)
    |> Enum.map(fn partition ->
      child_spec(registry, brod_client, topic, partition, config)
    end)
  end

  defp child_spec(registry, brod_client, topic, partition, config) do
    name = :"producer_#{topic}_#{partition}"

    wrapper_args = [
      mfa: {:brod_producer, :start_link, [brod_client, topic, partition, config]},
      register: {registry, name}
    ]

    %{
      id: name,
      start: {Elsa.Wrapper, :start_link, [wrapper_args]}
    }
  end
end