lib/off_broadway_amqp10/producer.ex

defmodule OffBroadwayAmqp10.Producer do
  @moduledoc """
  An AMQP 1.0 producer for [Broadway](https://hexdocs.pm/broadway).

  ## Features
    * Automatically acknowledges/rejects messages.


  ## Options
  #{NimbleOptions.docs(OffBroadwayAmqp10.Producer.Params.__opts_schema__())}



  ## Example

      Broadway.start_link(MyBroadway,
        name: MyBroadway,
        producer: [
          module:
            {OffBroadwayAmqp10.Producer,
            queue: "my_queue",
            connection: [
              host: "my-service.servicebus.windows.net"
              sasl: [mechanism: :plain, username: "foo", password: "bar"],
              tls_opts: [],
              transfer_limit_margin: 100
            ],
            session: [
              name: to_string(node())
            ]
          concurrency: 1
        ],
        processors: [
          default: [
            concurrency: 2,
          ]
        ]
      )

  ## Message

  The messages which you receive at the Broadway's processors is like:


      message = %Broadway.Message{
        data: "raw message body",
        metadata: %{
          headers: %{delivery_count: 0, durable: false, ... }
          properties: %{creation_time: 1_656_945_422_583, ...},
          application_properties: %{"foo" => "bar", ...},
          annotations: %{"x-opt-sequence-number" => 6068, .. }
        }
      }

  ## Acking

  Successful messages are acked and failed messages are rejected

  """
  use GenStage
  require Logger

  alias Broadway.Message
  alias OffBroadwayAmqp10.Producer.State

  @impl GenStage
  def handle_demand(incoming_demand, state) do
    log_event("incomming demand(#{incoming_demand})")

    new_state = State.increase_demand(state, incoming_demand)

    send(self(), :maybe_ask_credits)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def init(opts) do
    state = State.new(opts)

    send(self(), :open_connection)

    {:producer, state, []}
  end

  @impl GenStage
  def handle_info(:open_connection, state) do
    log_command("open connection")

    {:ok, connection} = state.amqp.client_module.open_connection(state.amqp)

    new_state = State.set_connection(state, connection)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def handle_info({:amqp10_event, {:connection, _, :opened}}, state) do
    log_event("connection opened")

    new_state = State.set_connection_status(state, :open)

    send(self(), :begin_session)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def handle_info(
        {:amqp10_event, {:connection, _, {:closed, {:forced, error_message}}}},
        state
      ) do
    Logger.error("off_broadway_amqp10 connection closed forcefully, message: [#{error_message}]")

    {:stop, :connection_closed_forcefully, state}
  end

  @impl GenStage
  def handle_info(:begin_session, state) do
    log_command("begin session")

    {:ok, session} = state.amqp.client_module.begin_session(state.amqp)

    new_state = State.set_session(state, session)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def handle_info({:amqp10_event, {:session, _, :begun}}, state) do
    log_event("session begun")

    new_state = State.set_session_status(state, :begun)

    send(self(), :attach_receiver)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def handle_info(:attach_receiver, state) do
    log_command("attach")

    {:ok, session} = state.amqp.client_module.attach(state.amqp)

    new_state = State.set_receiver(state, session)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def handle_info({:amqp10_event, {:link, {:link_ref, :receiver, _, _}, :attached}}, state) do
    log_event("attached")

    new_state = State.set_receiver_status(state, :attached)

    send(self(), :maybe_ask_credits)

    {:noreply, [], new_state}
  end

  @impl GenStage
  def handle_info(
        {:amqp10_event,
         {:link, {:link_ref, :receiver, _, _},
          {:detached, {:"v1_0.error", {:symbol, error_title}, {:utf8, error_message}, _}}}},
        state
      ) do
    Logger.error(
      "off_broadway_amqp10 session detached with error [#{error_title}], message: [#{error_message}]"
    )

    {:stop, error_title, state}
  end

  @impl GenStage
  def handle_info(:maybe_ask_credits, state) do
    if State.has_demand?(state) && State.receiver_attached?(state) do
      credits = State.credits_within_limits(state)
      log_command("ask for credits(#{credits})")

      :ok = state.amqp.client_module.flow_link_credit(state.amqp, credits)
    end

    {:noreply, [], state}
  end

  @impl GenStage
  def handle_info(
        {:amqp10_event, {:link, {:link_ref, :receiver, _, _}, :credit_exhausted}},
        state
      ) do
    log_event("credit exhausted")

    send(self(), :maybe_ask_credits)

    {:noreply, [], state}
  end

  @impl GenStage
  def handle_info({:amqp10_msg, {:link_ref, :receiver, _, _}, msg}, state) do
    client_module = state.amqp.client_module
    payload = client_module.body(msg)

    metadata = %{
      headers: client_module.headers(msg),
      properties: client_module.properties(msg),
      application_properties: client_module.application_properties(msg),
      annotations: client_module.annotations(msg)
    }

    acknowledger = {OffBroadwayAmqp10.Acknowledger, state.amqp, msg}

    message = %Message{
      data: payload,
      metadata: metadata,
      acknowledger: acknowledger
    }

    new_state = State.decrease_demand(state, 1)
    {:noreply, [message], new_state}
  end

  defp log_event(msg) do
    Logger.debug("#{__MODULE__}:#{inspect(self())} Event: #{msg}")
  end

  defp log_command(msg) do
    Logger.debug("#{__MODULE__}:#{inspect(self())} Command: #{msg}")
  end
end