lib/elsa/consumer/message_handler.ex

defmodule Elsa.Consumer.MessageHandler do
  @moduledoc """
  Define the behaviour and default implementations of functions
  for creating message handlers that will be called by Elsa worker
  processes.
  """
  @callback init(term()) :: {:ok, term()}

  @callback handle_messages(term(), term()) ::
              {:acknowledge, term()}
              | {:acknowledge, term(), term()}
              | {:ack, term()}
              | {:ack, term(), term()}
              | {:no_ack, term()}
              | {:noop, term()}
              | {:continue, term()}

  @callback handle_messages(term()) ::
              :ack | :acknowledge | {:ack, term()} | {:acknowledge, term()} | :no_ack | :noop | :continue

  @doc """
  Defines the macro for implementing the message handler behaviour
  in an application. Default implementations allow injecting of
  configuration into the worker process, persisting state between runs
  of the message handler function, or alternatively, basic processing
  and acknowlegement of messages.
  """
  defmacro __using__(_opts) do
    quote do
      @behaviour Elsa.Consumer.MessageHandler

      @impl Elsa.Consumer.MessageHandler
      def init(args) do
        {:ok, args}
      end

      @impl Elsa.Consumer.MessageHandler
      def handle_messages(messages, state) do
        case handle_messages(messages) do
          :ack -> {:ack, state}
          :acknowledge -> {:acknowledge, state}
          {:ack, offset} -> {:ack, offset, state}
          {:acknowledge, offset} -> {:acknowledge, offset, state}
          :no_ack -> {:no_ack, state}
          :noop -> {:noop, state}
          :continue -> {:continue, state}
        end
      end

      @impl Elsa.Consumer.MessageHandler
      def handle_messages(messages) do
        :ack
      end

      def topic do
        Process.get(:elsa_topic)
      end

      def partition do
        Process.get(:elsa_partition)
      end

      def generation_id do
        Process.get(:elsa_generation_id)
      end

      def connection do
        Process.get(:elsa_connection)
      end

      defoverridable Elsa.Consumer.MessageHandler
    end
  end
end