lib/elsa/wrapper.ex

defmodule Elsa.Wrapper do
  @moduledoc """
  Provides a supervisable wrapper for the Elsa supervision
  tree to manage brod producers and consumers. Provides
  convenience functions for starting producer and consumer
  processes directly without the default supervisors brod
  interposes between them and the application.

  By taking over supervision of the processes directly interacting
  with Kafka by way of a brod client, these processes can be
  registered independently from the client process so in the event
  of the client dropping its connection, the record of what
  processes should be producing or consuming from what topics and
  partitions can be immediately reconstructed instead of being dropped.
  """
  use GenServer
  require Logger

  @default_delay 5_000
  @default_kill_timeout 5_000

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  def init(args) do
    Process.flag(:trap_exit, true)

    state = %{
      mfa: Keyword.fetch!(args, :mfa),
      started: :erlang.system_time(:milli_seconds),
      delay: Keyword.get(args, :delay, @default_delay),
      kill_timeout: Keyword.get(args, :kill_timeout, @default_kill_timeout),
      register: Keyword.get(args, :register, :no_register)
    }

    case start(state) do
      {:ok, pid} ->
        unless state.register == :no_register do
          Elsa.ElsaRegistry.register_name(state.register, pid)
        end

        {:ok, Map.put(state, :pid, pid)}

      {:error, reason} ->
        Logger.error(
          "#{__MODULE__}:#{inspect(self())} : wrapped process #{inspect(state.mfa)} failed to init for reason #{inspect(reason)}"
        )

        Process.sleep(state.delay)
        {:stop, reason}
    end
  end

  def handle_info({:EXIT, pid, reason}, %{pid: pid, delay: delay, started: started} = state) do
    lifetime = :erlang.system_time(:milli_seconds) - started

    max(delay - lifetime, 0)
    |> Process.sleep()

    {:stop, reason, state}
  end

  def handle_info(message, state) do
    Logger.info(
      "#{__MODULE__}:#{inspect(self())} : received invalid message #{inspect(message)}, current state: #{inspect(state)}"
    )

    {:noreply, state}
  end

  def terminate(reason, %{pid: pid} = state) do
    if Process.alive?(pid), do: kill(pid, reason, state.kill_timeout)

    reason
  end

  def terminate(reason, _state), do: reason

  defp kill(pid, reason, timeout) do
    Process.exit(pid, reason)

    receive do
      {:EXIT, ^pid, _reason} -> :ok
    after
      timeout -> Process.exit(pid, :kill)
    end
  end

  defp start(%{mfa: {module, function, args}}) do
    apply(module, function, args)
  end
end