lib/elsa/producer.ex

defmodule Elsa.Producer do
  require Logger

  @moduledoc """
  Defines functions to write messages to topics based on either a list of endpoints or a named client.
  All produce functions support the following options:
    * An existing named client process to handle the request can be specified by the keyword option `connection:`.
    * If no partition is supplied, the first (zero) partition is chosen.
    * Value may be a single message or a list of messages.
    * If a list of messages is supplied as the value, the key is defaulted to an empty string binary.
    * Partition can be specified by the keyword option `partition:` and an integer corresponding to a specific
      partition, or the keyword option `partitioner:` and the atoms `:md5` or `:random`. The atoms
      correspond to partitioner functions that will uniformely select a random partition
      from the total available topic partitions or assign an integer based on an md5 hash of the messages.
  """

  @typedoc """
  Elsa messages can take a number of different forms, including a single binary, a key/value tuple, a map
  including `:key` and `:value` keys, or a list of iolists. Because Elsa supports both single messages and
  lists of messages and because an iolist is indistinguishable from a list of other message types from the
  perspective of the compiler, even single-message iolists must be wrapped in an additional list in order to
  be produced. Internally, all messages are converted to a map before being encoded and produced.
  """
  @type message :: {iodata(), iodata()} | binary() | %{key: iodata(), value: iodata()}

  alias Elsa.ElsaRegistry
  alias Elsa.ElsaSupervisor
  alias Elsa.Util

  @doc """
  Write the supplied message(s) to the desired topic/partition via an endpoint list and optional named client.
  If no client is supplied, the default named client is chosen.
  """
  @spec produce(
          Elsa.endpoints() | Elsa.connection(),
          Elsa.topic(),
          message() | [message()] | [iolist()],
          keyword()
        ) :: :ok | {:error, term} | {:error, String.t(), [Elsa.Message.elsa_message()]}
  def produce(endpoints_or_connection, topic, messages, opts \\ [])

  def produce(endpoints, topic, messages, opts) when is_list(endpoints) do
    connection = Keyword.get_lazy(opts, :connection, &Elsa.default_client/0)
    registry = ElsaSupervisor.registry(connection)

    _ =
      case Process.whereis(registry) do
        nil ->
          ad_hoc_produce(endpoints, connection, topic, messages, opts)

        _pid ->
          produce(connection, topic, messages, opts)
      end

    :ok
  end

  def produce(connection, topic, messages, opts) when is_atom(connection) and is_list(messages) do
    transformed_messages = Enum.map(messages, &transform_message/1)
    do_produce_sync(connection, topic, transformed_messages, opts)
  end

  def produce(connection, topic, message, opts) when is_atom(connection) do
    do_produce_sync(connection, topic, [transform_message(message)], opts)
  end

  def ready?(connection) do
    registry = ElsaSupervisor.registry(connection)
    via = ElsaSupervisor.via_name(registry, :producer_process_manager)
    Elsa.DynamicProcessManager.ready?(via)
  end

  defp ad_hoc_produce(endpoints, connection, topic, messages, opts) do
    with {:ok, pid} <-
           ElsaSupervisor.start_link(endpoints: endpoints, connection: connection, producer: [topic: topic]) do
      ready?(connection)
      _ = produce(connection, topic, messages, opts)
      Process.unlink(pid)
      Supervisor.stop(pid)
    end
  end

  defp transform_message(%{key: _key, value: _value} = msg) do
    msg
    |> Map.update!(:key, &IO.iodata_to_binary/1)
    |> Map.update!(:value, &IO.iodata_to_binary/1)
  end

  defp transform_message({key, value}), do: %{key: IO.iodata_to_binary(key), value: IO.iodata_to_binary(value)}
  defp transform_message(message), do: %{key: "", value: IO.iodata_to_binary(message)}

  defp do_produce_sync(connection, topic, messages, opts) do
    Util.with_registry(connection, fn registry ->
      with {:ok, partitioner} <- get_partitioner(registry, topic, opts),
           message_chunks <- create_message_chunks(partitioner, messages),
           {:ok, _} <- produce_sync_while_successful(registry, topic, message_chunks) do
        :ok
      else
        {:error, reason, messages_sent, failed_messages} -> failure_message(reason, messages_sent, failed_messages)
        error_result -> error_result
      end
    end)
  end

  defp produce_sync_while_successful(registry, topic, message_chunks) do
    Enum.reduce_while(message_chunks, {:ok, 0}, fn {partition, chunk}, {:ok, messages_sent} ->
      Logger.debug(fn ->
        "#{__MODULE__} Sending #{length(chunk)} messages to #{topic}:#{partition}"
      end)

      case brod_produce(registry, topic, partition, chunk) do
        :ok ->
          {:cont, {:ok, messages_sent + length(chunk)}}

        {:error, reason} ->
          failed_messages = failed_messages(message_chunks, messages_sent)

          {:halt, {:error, reason, messages_sent, failed_messages}}
      end
    end)
  end

  defp create_message_chunks(partitioner, messages) do
    messages
    |> Enum.group_by(partitioner)
    |> Enum.map(fn {partition, messages} -> {partition, Util.chunk_by_byte_size(messages)} end)
    |> Enum.flat_map(fn {partition, chunks} -> Enum.map(chunks, fn chunk -> {partition, chunk} end) end)
  end

  defp failure_message(reason, messages_sent, failed_messages) do
    reason_string =
      "#{messages_sent} messages succeeded before elsa producer failed midway through due to #{inspect(reason)}"

    {:error, reason_string, failed_messages}
  end

  defp failed_messages(message_chunks, messages_sent) do
    Enum.flat_map(message_chunks, fn {_partition, chunk} -> chunk end)
    |> Enum.drop(messages_sent)
  end

  defp get_partitioner(registry, topic, opts) do
    Util.with_client(registry, fn client ->
      case Keyword.get(opts, :partition) do
        nil ->
          partition_count = Util.partition_count!(client, topic, Elsa.RetryConfig.no_retry())
          partitioner = Keyword.get(opts, :partitioner, Elsa.Partitioner.Default) |> remap_deprecated()
          {:ok, fn %{key: key} -> partitioner.partition(partition_count, key) end}

        partition ->
          {:ok, fn _msg -> partition end}
      end
    end)
  end

  @partitioners %{default: Elsa.Partitioner.Default, md5: Elsa.Partitioner.Md5, random: Elsa.Partitioner.Random}

  defp remap_deprecated(key) when key in [:default, :md5, :random] do
    mod = Map.get(@partitioners, key)
    Logger.warn(fn -> ":#{key} partitioner is deprecated. Use #{mod} instead." end)
    mod
  end

  defp remap_deprecated(key), do: key

  defp brod_produce(registry, topic, partition, messages) do
    producer = :"producer_#{topic}_#{partition}"

    case ElsaRegistry.whereis_name({registry, producer}) do
      :undefined -> {:error, "Elsa Producer for #{topic}:#{partition} not found"}
      pid -> call_brod_producer(pid, messages)
    end
  end

  defp call_brod_producer(pid, messages) do
    with {:ok, call_ref} <- :brod_producer.produce(pid, "", messages),
         {:ok, _partition} <- :brod_producer.sync_produce_request(call_ref, :infinity) do
      :ok
    end
  end
end