lib/elsa/util.ex

defmodule Elsa.Util do
  @moduledoc """
  Provides functions for simplifying first-class interactions (consuming and
  producing) such as connecting to a cluster and establishing a persistent
  client process for interacting with a cluster.
  """

  @default_max_chunk_size 900_000
  @timestamp_size_in_bytes 10

  @doc """
  Wrap establishing a connection to a cluster for performing an operation.
  """
  @spec with_connection(Elsa.endpoints(), atom(), fun()) :: term()
  def with_connection(endpoints, type \\ :any, fun) when is_function(fun) do
    endpoints
    |> reformat_endpoints()
    |> connect(type)
    |> do_with_connection(fun)
  end

  @doc """
  Retrieves the appropriate registry for the given value and validates it exists.
  Executes the function with the registry name if it successfully locates one.
  """
  @spec with_registry(atom() | String.t(), (atom() -> term())) :: term() | {:error, String.t()}
  def with_registry(connection, function) when is_function(function, 1) do
    registry = Elsa.Supervisor.registry(connection)

    case Process.whereis(registry) do
      nil -> {:error, "Elsa with connection #{connection} has not been started correctly"}
      _pid -> function.(registry)
    end
  end

  @doc """
  Retrieves the pid of a brod client process if it exists and executes the
  given function against the client.
  """
  @spec with_client(atom(), (pid() -> term())) :: term() | {:error, String.t()}
  def with_client(registry, function) when is_function(function, 1) do
    case Elsa.Registry.whereis_name({registry, :brod_client}) do
      :undefined -> {:error, "Unable to find brod_client in registry(#{registry})"}
      pid -> function.(pid)
    end
  end

  @doc """
  Convert supplied cluster endpoints from common keyword list format to
  brod-compatible tuple.
  """
  @spec reformat_endpoints(keyword()) :: [{charlist(), integer()}]
  def reformat_endpoints(endpoints) do
    Enum.map(endpoints, fn {key, value} -> {to_charlist(key), value} end)
  end

  @doc """
  Retrieve the api version of the desired operation supported by the
  connected cluster.
  """
  @spec get_api_version(pid(), atom()) :: non_neg_integer()
  def get_api_version(connection, api) do
    {:ok, api_versions} = :kpro.get_api_versions(connection)
    {_, version} = Map.get(api_versions, api)
    version
  end

  @doc """
  Determines if client pid is alive
  """
  @spec client?(pid() | atom()) :: boolean()
  def client?(pid) when is_pid(pid) do
    Process.alive?(pid)
  end

  def client?(client) when is_atom(client) do
    case Process.whereis(client) do
      pid when is_pid(pid) -> client?(pid)
      nil -> false
    end
  end

  @doc """
  Create a named client connection process for managing interactions
  with the connected cluster.
  """
  @spec start_client(keyword(), atom()) :: :ok | {:error, term()}
  def start_client(endpoints, name, config \\ []) do
    :brod.start_client(endpoints, name, config)
  end

  @doc """
  Process messages into chunks of size up to the size specified by the calling function in bytes,
  and determined by the function argument. If no chunk size is specified the default maximum
  size a chunk will be is approximately 1 megabyte. If no sizing function is provided to construct
  the appropriately sized chunks, the internal function based on Kernel.byte_size/1 is used.
  """
  @spec chunk_by_byte_size(term(), integer(), fun()) :: [term()]
  def chunk_by_byte_size(collection, chunk_byte_size \\ @default_max_chunk_size, byte_size_function \\ &get_byte_size/1) do
    collection
    |> Enum.chunk_while({0, []}, &chunk(&1, &2, chunk_byte_size, byte_size_function), &after_chunk/1)
  end

  @doc """
  Return the number of partitions for a given topic. Bypasses the need for a persistent client
  for lighter weight interactions from one-off calls.
  """
  @spec partition_count(keyword | Elsa.connection() | pid, String.t()) :: integer()
  def partition_count(endpoints, topic) when is_list(endpoints) do
    {:ok, metadata} = :brod.get_metadata(reformat_endpoints(endpoints), [topic])

    count_partitions(metadata)
  end

  def partition_count(connection, topic) when is_atom(connection) or is_pid(connection) do
    {:ok, metadata} = :brod_client.get_metadata(connection, topic)

    count_partitions(metadata)
  end

  # Handle brod < 3.16
  defp count_partitions(%{topic_metadata: topic_metadatas}) do
    [count | _] = for %{partition_metadata: metadata} <- topic_metadatas, do: Enum.count(metadata)
    count
  end

  # Handle brod 3.16+
  defp count_partitions(%{topics: topics}) do
    [count | _] = for %{partitions: partitions} <- topics, do: Enum.count(partitions)
    count
  end

  defp connect(endpoints, :controller), do: :kpro.connect_controller(endpoints, [])
  defp connect(endpoints, _type), do: :kpro.connect_any(endpoints, [])

  defp do_with_connection({:ok, connection}, fun) do
    fun.(connection)
  after
    :kpro.close_connection(connection)
  end

  defp do_with_connection({:error, reason}, _fun) do
    raise Elsa.ConnectError, message: format_reason(reason)
  end

  defp format_reason(reason) do
    cond do
      is_binary(reason) -> reason
      Exception.exception?(reason) -> Exception.format(:error, reason)
      true -> inspect(reason)
    end
  end

  defp chunk(item, {current_size, current_batch}, chunk_byte_size, byte_size_function) do
    item_size = byte_size_function.(item) + @timestamp_size_in_bytes
    new_total = current_size + item_size

    case new_total < chunk_byte_size do
      true -> add_item_to_batch(new_total, item, current_batch)
      false -> finish_batch(item_size, item, current_batch)
    end
  end

  defp add_item_to_batch(total, item, batch) do
    {:cont, {total, [item | batch]}}
  end

  defp finish_batch(total, item, batch) do
    {:cont, Enum.reverse(batch), {total, [item]}}
  end

  defp after_chunk({_size, []}) do
    {:cont, {0, []}}
  end

  defp after_chunk({_size, current_batch}) do
    finish_batch(0, nil, current_batch)
  end

  defp get_byte_size(%{key: key, value: value} = msg) do
    header_size =
      Map.get(msg, :headers, [])
      |> Enum.map(fn {key, value} -> byte_size(key) + byte_size(value) end)
      |> Enum.sum()

    byte_size(key) + byte_size(value) + header_size
  end
end