lib/elsa/consumer/worker/initializer.ex

defmodule Elsa.Consumer.Worker.Initializer do
  @type init_opts :: [
          connection: atom(),
          registry: atom(),
          topics: [Elsa.topic() | {Elsa.topic(), Elsa.partition()}]
        ]

  @spec init(init_opts) :: [Supervisor.child_spec()]
  def init(init_arg) do
    registry = Keyword.fetch!(init_arg, :registry)
    topics = Keyword.fetch!(init_arg, :topics)

    brod_client = Elsa.Registry.whereis_name({registry, :brod_client})

    Enum.map(topics, &configure_topic(&1, registry, brod_client, init_arg))
    |> List.flatten()
  end

  defp configure_topic({topic, partition}, registry, _brod_client, init_arg) do
    child_spec(registry, topic, partition, init_arg)
    |> List.wrap()
  end

  defp configure_topic(topic, registry, brod_client, init_arg) do
    Elsa.Util.partition_count(brod_client, topic)
    |> to_child_specs(registry, topic, init_arg)
  end

  defp to_child_specs(partitions, registry, topic, init_arg) do
    0..(partitions - 1)
    |> Enum.map(fn partition ->
      child_spec(registry, topic, partition, init_arg)
    end)
  end

  defp child_spec(registry, topic, partition, init_arg) do
    name = :"topic_consumer_worker_#{topic}_#{partition}"

    {Elsa.Consumer.Worker,
     init_arg
     |> Keyword.put(:name, {:via, Elsa.Registry, {registry, name}})
     |> Keyword.put(:topic, topic)
     |> Keyword.put(:partition, partition)}
    |> Supervisor.child_spec(id: name)
  end
end