lib/kvasir_kafka/subscriber.ex

defmodule Kvasir.Kafka.Subscriber do
  require Logger

  def init(_group = %{partition: p}, {topic, offset, decoder, callback_module, state}) do
    {:ok, new_state} = callback_module.init(topic, p, state)

    {:ok,
     %{
       topic: topic,
       partition: p,
       offset: Kvasir.Offset.get(offset, p),
       decoder: decoder,
       subscriber: callback_module,
       state: new_state
     }}
  end

  def handle_message(
        message = {:kafka_message, o, _, _, _, _, _},
        state = %{
          offset: offset,
          partition: partition,
          decoder: decoder,
          state: s,
          subscriber: sub,
          topic: topic
        }
      ) do
    if Kvasir.Offset.compare(o, offset) == :lt do
      {:ok, :commit, state}
    else
      with {:ok, event} <- Kvasir.Kafka.decode?(decoder, message, topic, partition) do
        case sub.event(event, s) do
          :ok -> {:ok, :commit, state}
          {:ok, new_s} -> {:ok, :commit, %{state | state: new_s}}
          error -> error
        end
      else
        :ok ->
          {:ok, :commit, state}

        err ->
          Logger.error("Kvasir Kafka: Subscriber Error: #{inspect(err)}", error: err)
          err
      end
    end
  end
end