lib/elsa/elsa_supervisor.ex

defmodule Elsa.ElsaSupervisor do
  @moduledoc """
  Top-level supervisor that orchestrates all other components
  of the Elsa library. Allows for a single point of integration
  into your application supervision tree and configuration by way
  of a series of nested keyword lists

  Components not needed by a running application (if your application
  _only_ consumes messages from Kafka and never producers back to it)
  can be safely omitted from the configuration.
  """
  use Supervisor

  alias Elsa.ElsaRegistry

  @doc """
  Defines a connection for locating the Elsa Registry process.
  """
  @spec registry(String.t() | atom()) :: atom()
  def registry(connection) do
    :"elsa_registry_#{connection}"
  end

  def via_name(registry, name) do
    {:via, ElsaRegistry, {registry, name}}
  end

  def dynamic_supervisor(registry) do
    via_name(registry, DynamicSupervisor)
  end

  @doc """
  Starts the top-level Elsa supervisor and links it to the current process.
  Starts a brod client and a custom process registry by default
  and then conditionally starts and takes supervision of any
  brod group-based consumers or producer processes defined.

  ## Options

  * `:endpoints` - Required. Keyword list of kafka brokers. ex. `[localhost: 9092]`

  * `:connection` - Required. Atom used to track kafka connection.

  * `:config` - Optional. Client configuration options passed to brod.

  * `:producer` - Optional. Can be a single producer configuration of multiples in a list.

  * `:group_consumer` - Optional. Group consumer configuration.

  * `:consumer` - Optional. Simple topic consumer configuration.


  ## Producer Config

  * `:topic` - Required. Producer will be started for configured topic.

  * `:poll` - Optional. If set to a number in milliseconds, will poll for new partitions and startup producers on the fly.

  * `:config` - Optional. Producer configuration options passed to `brod_producer`.

  * `:metadata_request_config` - Optional. See Metadata Request Config


  ## Group Consumer Config

  * `:group` - Required. Name of consumer group.

  * `:topics` - Required. List of topics to subscribe to.

  * `:handler` - Required. Module that implements Elsa.Consumer.MessageHandler behaviour.

  * `:handler_init_args` - Optional. Any args to be passed to init function in handler module.

  * `:assignment_received_handler` - Optional. Arity 4 Function that will be called with any partition assignments.
     Return `:ok` to for assignment to be subscribed to.  Return `{:error, reason}` to stop subscription.
     Arguments are group, topic, partition, generation_id.

  * `:assignments_revoked_handler` - Optional. Zero arity function that will be called when assignments are revoked.
    All workers will be shutdown before callback is invoked and must return `:ok`.

  * `:worker_supervisor_max_restarts` - Optional. max_restarts option passed to the WorkerSupervisor.  Default 30.

  # `:worker_supervisor_max_seconds` - Optional. max_seconds option passed to the WorkerSupervisor.  Default 5.

  * `:config` - Optional. Consumer configuration options passed to `brod_consumer`.

  * `:metadata_request_config` - Optional. See Metadata Request Config


  ## Consumer Config

  * `:topic` - Required. Topic to subscribe to.

  * `:begin_offset` - Required. Where to begin consuming from. Must be either `:earliest`, `:latest`, or a valid offset integer.

  * `:handler` - Required. Module that implements `Elsa.Consumer.MessageHandler` behaviour.

  * `:partition` - Optional. Topic partition to subscribe to. If `nil`, will default to all partitions.

  * `:handler_init_args` - Optional. Any args to be passed to init function in handler module.

  * `:poll` - Optional. If set to number of milliseconds, will poll for new partitions and startup consumers on the fly.

  * `:metadata_request_config` - Optional. See Metadata Request Config


  ## Metadata Request Config

  * `:metadata_request_tries` - Optional, default 5.  The number of tries allowed when querying topic metadata.
    Typically these retries are necessary when creating topics on the fly and immediately attempting to connect
    a producer or consumer to them.  That's because topic creation in kafka is asynchronous -- even though a
    call to Elsa.create_topic may return success, that doesn't mean that the metadata for that new topic has
    propagated to all the brokers.

    The defaults for `metadata_request_tries` and `metadata_request_dwell_ms` work well for testing with kafka in a
    local docker container. These values may need to be increased if creating topics on the fly with remote kafka
    brokers.

  * `:metadata_request_dwell_ms` - Optional, default 100.  The amount of time to wait between tries when querying topic metadata.


  ## Example

  ```
    Elsa.ElsaSupervisor.start_link([
      endpoints: [localhost: 9092],
      connection: :conn,
      producer: [topic: "topic1"],
      consumer: [
        topic: "topic2",
        partition: 0,
        begin_offset: :earliest,
        handler: ExampleHandler
      ],
      group_consumer: [
        group: "example-group",
        topics: ["topic1"],
        handler: ExampleHandler,
        config: [
          begin_offset: :earliest,
          offset_reset_policy: :reset_to_earliest
        ]
      ]
    ])
  ```

  """
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(args) do
    opts = Keyword.take(args, [:name])
    Supervisor.start_link(__MODULE__, args, opts)
  end

  @doc """
  Starts producer processes under Elsa's `DynamicSupervisor` for the specified connection.

  Polling cannot be configured for producers at runtime. Configuration at `Elsa.ElsaSupervisor` start
  is how polling will behave for all producers on that connection. Other than polling, producer
  configuration is the same as `Elsa.ElsaSupervisor.start_link/1`.

  ## Producer Config

  * `:topic` - Required. Producer will be started for configured topic.

  * `:config` - Optional. Producer configuration options passed to `brod_producer`.
  """
  @spec start_producer(String.t() | atom, keyword) :: [DynamicSupervisor.on_start_child()]
  def start_producer(connection, args) do
    registry = registry(connection)
    process_manager = via_name(registry, :producer_process_manager)

    Elsa.Producer.Initializer.init(registry, args)
    |> Enum.map(&Elsa.DynamicProcessManager.start_child(process_manager, &1))
  end

  def init(args) do
    connection = Keyword.fetch!(args, :connection)
    registry = registry(connection)

    children =
      [
        {ElsaRegistry, name: registry},
        {DynamicSupervisor, strategy: :one_for_one, name: dynamic_supervisor(registry)},
        start_client(args),
        producer_spec(registry, Keyword.get(args, :producer)),
        start_group_consumer(connection, registry, Keyword.get(args, :group_consumer)),
        start_consumer(connection, registry, Keyword.get(args, :consumer))
      ]
      |> List.flatten()

    Supervisor.init(children, strategy: :rest_for_one)
  end

  defp start_client(args) do
    connection = Keyword.fetch!(args, :connection)
    endpoints = Keyword.fetch!(args, :endpoints)
    config = Keyword.get(args, :config, [])

    {Elsa.Wrapper,
     mfa: {:brod_client, :start_link, [endpoints, connection, config]}, register: {registry(connection), :brod_client}}
  end

  defp start_group_consumer(_connection, _registry, nil), do: []

  defp start_group_consumer(connection, registry, args) do
    group_consumer_args =
      args
      |> Keyword.put(:registry, registry)
      |> Keyword.put(:connection, connection)
      |> Keyword.put(:name, via_name(registry, Elsa.Group.GroupSupervisor))

    {Elsa.Group.GroupSupervisor, group_consumer_args}
  end

  defp start_consumer(_connection, _registry, nil), do: []

  defp start_consumer(connection, registry, args) do
    topics =
      case Keyword.has_key?(args, :partition) do
        true -> [{Keyword.fetch!(args, :topic), Keyword.fetch!(args, :partition)}]
        false -> [Keyword.fetch!(args, :topic)]
      end

    consumer_args =
      args
      |> Keyword.put(:registry, registry)
      |> Keyword.put(:connection, connection)
      |> Keyword.put(:topics, topics)
      |> Keyword.put_new(:config, [])

    {Elsa.DynamicProcessManager,
     id: :worker_process_manager,
     dynamic_supervisor: dynamic_supervisor(registry),
     poll: Keyword.get(args, :poll, false),
     initializer: {Elsa.Consumer.Worker.Initializer, :init, [consumer_args]}}
  end

  defp producer_spec(registry, nil) do
    [
      {
        Elsa.DynamicProcessManager,
        id: :producer_process_manager,
        dynamic_supervisor: dynamic_supervisor(registry),
        initializer: nil,
        poll: false,
        name: via_name(registry, :producer_process_manager)
      }
    ]
  end

  defp producer_spec(registry, args) do
    [
      {
        Elsa.DynamicProcessManager,
        id: :producer_process_manager,
        dynamic_supervisor: dynamic_supervisor(registry),
        initializer: {Elsa.Producer.Initializer, :init, [registry, args]},
        poll: Keyword.get(args, :poll, false),
        name: via_name(registry, :producer_process_manager)
      }
    ]
  end
end