lib/off_broadway/mns/producer.ex

defmodule OffBroadway.MNS.Producer do
  @moduledoc """
  A Aliyun MNS producer for Broadway.
  """
  use GenStage
  require Logger
  alias Broadway.{Message, Acknowledger, Producer}
  alias ExAliyun.MNS

  @behaviour Acknowledger
  @behaviour Producer
  @max_batch_size 16
  @queue_url_prefix "queues/"
  @default_retry_receive_message_interval 5_000

  @impl true
  def init(opts) do
    {gen_stage_opts, opts} = Keyword.split(opts, [:buffer_size, :buffer_keep])

    {receive_interval, opts} =
      Keyword.pop(opts, :retry_receive_message_interval, @default_retry_receive_message_interval)

    {queue, opts} =
      Keyword.pop_lazy(opts, :queue, fn -> raise KeyError, key: :queue, term: opts end)

    queue = @queue_url_prefix <> queue

    {receive_opts, _opts} = Keyword.pop(opts, :receive_opts, wait_time_seconds: 30)

    state = %{
      queue: queue,
      receive_opts: receive_opts,
      receive_interval: receive_interval,
      receive_timer: nil,
      demand: 0
    }

    {:producer, state, gen_stage_opts}
  end

  @impl true
  def handle_demand(incoming_demand, %{demand: demand} = state) do
    handle_receive_messages(%{state | demand: demand + incoming_demand})
  end

  @impl true
  def handle_info(:receive_messages, %{receive_timer: nil} = state) do
    {:noreply, [], state}
  end

  def handle_info(:receive_messages, state) do
    handle_receive_messages(%{state | receive_timer: nil})
  end

  def handle_info(info, state) do
    Logger.warn("unsupported message: " <> inspect(info))
    {:noreply, [], state}
  end

  @impl Producer
  def prepare_for_draining(%{receive_timer: receive_timer} = state) do
    receive_timer && Process.cancel_timer(receive_timer)
    {:noreply, [], %{state | receive_timer: nil}}
  end

  defp handle_receive_messages(%{receive_timer: nil, demand: demand} = state) when demand > 0 do
    messages = receive_messages_from_mns(demand, state.queue, state.receive_opts)
    new_demand = demand - length(messages)

    receive_timer =
      case messages do
        [] -> schedule_receive_messages(state.receive_interval)
        _ when new_demand <= 0 -> nil
        _ -> schedule_receive_messages(0)
      end

    {:noreply, messages, %{state | demand: new_demand, receive_timer: receive_timer}}
  end

  defp handle_receive_messages(state) do
    {:noreply, [], state}
  end

  defp receive_messages_from_mns(demand, queue, opts) do
    number = min(demand, @max_batch_size)

    case MNS.receive_message(queue, [{:number, number} | opts]) do
      {:ok, %{body: %{"Messages" => messages}}} ->
        # Logger.info("receive message get:" <> inspect(messages))
        Enum.map(messages, &transform(&1, queue))

      {:error, %{body: %{"Error" => %{"Code" => "MessageNotExist"}}}} ->
        []

      error ->
        Logger.warn("receive message error:" <> inspect(error))

        []
    end
  end

  @compile {:inline, transform: 2}
  defp transform(message, queue) do
    {data, metadata} =
      Map.pop_lazy(
        message,
        "MessageBody",
        fn ->
          raise KeyError, key: "MessageBody", term: message
        end
      )

    %Message{
      data: data,
      metadata: metadata,
      acknowledger: {__MODULE__, _ack_ref = queue, []}
    }
  end

  defp schedule_receive_messages(interval) do
    Process.send_after(self(), :receive_messages, interval)
  end

  @impl Acknowledger
  def ack(_ack_ref = queue, successful, _failed) do
    # only successful messages ask to delete
    # failed messages will auto retry by MNS
    Stream.chunk_every(successful, @max_batch_size)
    |> Enum.each(fn messages ->
      receipt_handles =
        Enum.map(
          messages,
          fn message ->
            message.metadata["ReceiptHandle"]
          end
        )

      MNS.batch_delete_message(queue, receipt_handles)
    end)

    :ok
  end
end