lib/elsa/consumer/worker.ex

defmodule Elsa.Consumer.Worker do
  @moduledoc """
  Defines the worker GenServer that is managed by the DynamicSupervisor.
  Workers are instantiated and assigned to a specific topic/partition
  and process messages according to the specified message handler module
  passed in from the manager before calling the ack function to
  notify the cluster the messages have been successfully processed.
  """
  use GenServer, restart: :temporary, shutdown: 10_000

  import Elsa.ElsaSupervisor, only: [registry: 1]
  import Record, only: [defrecord: 2, extract: 2]

  alias Elsa.ElsaRegistry
  alias Elsa.Group.Acknowledger

  require Logger

  defrecord :kafka_message_set, extract(:kafka_message_set, from_lib: "brod/include/brod.hrl")

  @subscribe_delay 200
  @subscribe_retries 20
  @start_failure_delay 5_000

  defmodule State do
    @moduledoc """
    The running state of the worker process.
    """
    defstruct [
      :connection,
      :topic,
      :partition,
      :generation_id,
      :offset,
      :handler,
      :handler_init_args,
      :handler_state,
      :config,
      :consumer_pid
    ]
  end

  @type init_opts :: [
          connection: Elsa.connection(),
          topic: Elsa.topic(),
          partition: Elsa.partition(),
          generation_id: non_neg_integer,
          begin_offset: non_neg_integer,
          handler: module,
          handler_init_args: term,
          config: :brod.consumer_config()
        ]

  @doc """
  Start the worker process and init the state with the given config.
  """
  @spec start_link(init_opts) :: GenServer.on_start()
  def start_link(init_args) do
    GenServer.start_link(__MODULE__, init_args)
  end

  def init(init_args) do
    Process.flag(:trap_exit, true)

    state = %State{
      connection: Keyword.fetch!(init_args, :connection),
      topic: Keyword.fetch!(init_args, :topic),
      partition: Keyword.fetch!(init_args, :partition),
      generation_id: Keyword.get(init_args, :generation_id),
      offset: Keyword.fetch!(init_args, :begin_offset),
      handler: Keyword.fetch!(init_args, :handler),
      handler_init_args: Keyword.get(init_args, :handler_init_args, []),
      config: Keyword.get(init_args, :config, [])
    }

    Process.put(:elsa_connection, state.connection)
    Process.put(:elsa_topic, state.topic)
    Process.put(:elsa_partition, state.partition)
    Process.put(:elsa_generation_id, state.generation_id)

    ElsaRegistry.register_name({registry(state.connection), :"worker_#{state.topic}_#{state.partition}"}, self())

    {:ok, state, {:continue, :subscribe}}
  end

  def handle_continue(:subscribe, state) do
    registry = registry(state.connection)

    with {:ok, consumer_pid} <- start_consumer(state.connection, state.topic, state.partition, state.config),
         :yes <- ElsaRegistry.register_name({registry, :"consumer_#{state.topic}_#{state.partition}"}, consumer_pid),
         :ok <- subscribe(consumer_pid, state) do
      {:ok, handler_state} = state.handler.init(state.handler_init_args)

      {:noreply, %{state | consumer_pid: consumer_pid, handler_state: handler_state}}
    else
      {:error, reason} ->
        Logger.warn(
          "Unable to subscribe to topic/partition/offset(#{state.topic}/#{state.partition}/#{state.offset}), reason #{inspect(reason)}"
        )

        Process.sleep(@start_failure_delay)
        {:stop, reason, state}
    end
  end

  def handle_info({_consumer_pid, kafka_message_set(topic: topic, partition: partition, messages: messages)}, state) do
    transformed_messages = transform_messages(topic, partition, messages, state)

    case send_messages_to_handler(transformed_messages, state) do
      {ack, new_handler_state} when ack in [:ack, :acknowledge] ->
        offset = transformed_messages |> List.last() |> Map.get(:offset)
        ack_messages(topic, partition, offset, state)
        {:noreply, %{state | offset: offset, handler_state: new_handler_state}}

      {ack, offset, new_handler_state} when ack in [:ack, :acknowledge] ->
        ack_messages(topic, partition, offset, state)
        {:noreply, %{state | offset: offset, handler_state: new_handler_state}}

      {no_ack, new_handler_state} when no_ack in [:no_ack, :noop] ->
        {:noreply, %{state | handler_state: new_handler_state}}

      {:continue, new_handler_state} ->
        offset = transformed_messages |> List.last() |> Map.get(:offset)
        :ok = :brod_consumer.ack(state.consumer_pid, offset)
        {:noreply, %{state | handler_state: new_handler_state}}
    end
  end

  def handle_info({:EXIT, _pid, reason}, state) do
    {:stop, reason, state}
  end

  def terminate(_reason, %{consumer_pid: nil} = state) do
    state
  end

  def terminate(reason, state) do
    _ = :brod_consumer.unsubscribe(state.consumer_pid, self())
    Process.exit(state.consumer_pid, reason)
    state
  end

  defp transform_messages(topic, partition, messages, state) do
    Enum.map(messages, &Elsa.Message.new(&1, topic: topic, partition: partition, generation_id: state.generation_id))
  end

  defp send_messages_to_handler(messages, state) do
    state.handler.handle_messages(messages, state.handler_state)
  end

  defp ack_messages(_topic, _partition, offset, %{generation_id: nil} = state) do
    :brod_consumer.ack(state.consumer_pid, offset)
  end

  defp ack_messages(topic, partition, offset, state) do
    Acknowledger.ack(state.connection, topic, partition, state.generation_id, offset)
  end

  defp start_consumer(connection, topic, partition, config) do
    registry = registry(connection)
    brod_client = ElsaRegistry.whereis_name({registry, :brod_client})
    :brod_consumer.start_link(brod_client, topic, partition, config)
  end

  defp subscribe(consumer_pid, state, retries \\ @subscribe_retries)

  defp subscribe(_consumer_pid, _state, 0) do
    {:error, :failed_subscription}
  end

  defp subscribe(consumer_pid, state, retries) do
    opts = determine_subscriber_opts(state)

    case :brod_consumer.subscribe(consumer_pid, self(), opts) do
      {:error, reason} ->
        Logger.warn(
          "Retrying to subscribe to topic #{state.topic} parition #{state.partition} offset #{state.offset} reason #{inspect(reason)}"
        )

        Process.sleep(@subscribe_delay)
        subscribe(consumer_pid, state, retries - 1)

      :ok ->
        Logger.info("Subscribing to topic #{state.topic} partition #{state.partition} offset #{state.offset}")
        :ok
    end
  end

  defp determine_subscriber_opts(state) do
    begin_offset =
      case state.offset do
        :undefined ->
          Keyword.get(state.config, :begin_offset, :latest)

        offset ->
          offset
      end

    Keyword.put(state.config, :begin_offset, begin_offset)
  end
end