lib/elsa/consumer/message_handler.ex

defmodule Elsa.Consumer.MessageHandler do
  @moduledoc """
  Define the behaviour and default implementations of functions
  for creating message handlers that will be called by Elsa worker
  processes.
  """
  @callback init(term()) :: {:ok, term()}

  @callback handle_messages(term(), term()) ::
              {:acknowledge, term()}
              | {:acknowledge, term(), term()}
              | {:ack, term()}
              | {:ack, term(), term()}
              | {:no_ack, term()}
              | {:noop, term()}
              | {:continue, term()}

  @callback handle_messages(term()) ::
              :ack | :acknowledge | {:ack, term()} | {:acknowledge, term()} | :no_ack | :noop | :continue

  @doc """
  Defines the macro for implementing the message handler behaviour
  in an application. Default implementations allow injecting of
  configuration into the worker process, persisting state between runs
  of the message handler function, or alternatively, basic processing
  and acknowlegement of messages.
  """
  defmacro __using__(_opts) do
    quote do
      @behaviour Elsa.Consumer.MessageHandler

      @impl Elsa.Consumer.MessageHandler
      def init(args) do
        {:ok, args}
      end

      @impl Elsa.Consumer.MessageHandler
      def handle_messages(messages, state) do
        case handle_messages(messages) do
          :ack -> {:ack, state}
          :acknowledge -> {:acknowledge, state}
          {:ack, offset} -> {:ack, offset, state}
          {:acknowledge, offset} -> {:acknowledge, offset, state}
          :no_ack -> {:no_ack, state}
          :noop -> {:noop, state}
          :continue -> {:continue, state}
        end
      end

      @impl Elsa.Consumer.MessageHandler
      def handle_messages(messages) do
        :ack
      end

      def topic() do
        Process.get(:elsa_topic)
      end

      def partition() do
        Process.get(:elsa_partition)
      end

      def generation_id() do
        Process.get(:elsa_generation_id)
      end

      def connection() do
        Process.get(:elsa_connection)
      end

      defoverridable Elsa.Consumer.MessageHandler
    end
  end
end