lib/pubsub/subscriber.ex

defmodule Google.Pubsub.Subscriber do
  alias Google.Pubsub.{Message, Subscription, Client}

  alias Google.Pubsub.V1.{
    StreamingPullRequest,
    StreamingPullResponse
  }

  @type t :: %__MODULE__{
          subscription: Subscription.t(),
          request_opts: Keyword.t()
        }

  defstruct subscription: nil, request_opts: []

  @callback handle_messages([Message.t()]) :: [Message.t()]

  @unavailable GRPC.Status.unavailable()
  @unknown GRPC.Status.unknown()

  defmacro __using__(_opts) do
    quote do
      use GenServer

      require Logger

      alias Google.Pubsub.{Subscriber, Subscription}

      @behaviour Subscriber

      def child_spec(init_arg) do
        %{
          id: {Subscriber, init_arg[:project], init_arg[:subscription]},
          start: {__MODULE__, :start_link, [init_arg]}
        }
      end

      def start_link(init_arg) do
        GenServer.start_link(__MODULE__, init_arg)
      end

      @impl true
      def init(init_arg) do
        schedule_listen()

        {subscription_opts, request_opts} = Keyword.split(init_arg, [:subscription, :project])

        case Subscription.get(
               project: subscription_opts[:project],
               subscription: subscription_opts[:subscription]
             ) do
          {:ok, subscription} ->
            {:ok, %Subscriber{subscription: subscription, request_opts: request_opts}}

          {:error, error} ->
            {:error, error}
        end
      end

      @impl true
      def handle_info(
            :listen,
            %Subscriber{subscription: subscription, request_opts: request_opts} = state
          ) do
        subscription
        |> Subscriber.create_stream(request_opts)
        |> Subscriber.receive_messages(&handle_messages/1)
        |> Subscriber.close_stream(subscription)

        schedule_listen()

        {:noreply, state}
      end

      @impl true
      def handle_info({:gun_error, _, _, {:stream_error, :no_error, _}}, struct) do
        {:stop, :shutdown, struct}
      end

      @impl true
      def handle_info({:gun_error, _, _, {:badstate, _}}, struct) do
        {:stop, :shutdown, struct}
      end

      @impl true
      def handle_info(type, struct) do
        Logger.debug("Google.Pubsub.Subscriber: handle_info: #{inspect(type)}")
        {:stop, :unknown, struct}
      end

      defp schedule_listen() do
        Process.send_after(self(), :listen, 100)
      end
    end
  end

  @spec create_stream(Subscription.t(), Keyword.t()) :: GRPC.Client.Stream.t()
  def create_stream(subscription, request_opts) do
    request =
      StreamingPullRequest.new(
        subscription: subscription.name,
        stream_ack_deadline_seconds: Keyword.get(request_opts, :stream_ack_deadline_seconds, 10)
      )

    client().streaming_pull()
    |> GRPC.Stub.send_request(request)
  end

  @spec close_stream(GRPC.Client.Stream.t(), Subscription.t()) :: GRPC.Client.Stream.t()
  def close_stream(stream, subscription) do
    request = StreamingPullRequest.new(subscription: subscription.name)

    GRPC.Stub.send_request(stream, request, end_stream: true)
  end

  @spec receive_messages(GRPC.Client.Stream.t(), function()) :: GRPC.Client.Stream.t()
  def receive_messages(stream, handle_messages) do
    case GRPC.Stub.recv(stream, timeout: :infinity) do
      {:ok, recv} ->
        process_recv(recv, stream, handle_messages)

      {:error, %GRPC.RPCError{status: @unknown, message: message} = e} ->
        if expected_error?(message), do: [], else: raise(e)

      {:error, error} ->
        raise error
    end
  end

  @spec process_recv(Enumerable.t(), GRPC.Client.Stream.t(), function()) :: GRPC.Client.Stream.t()
  defp process_recv(recv, stream, handle_messages) do
    Enum.reduce(recv, stream, fn
      {:ok, %StreamingPullResponse{received_messages: received_messages}}, stream ->
        ack_ids =
          received_messages
          |> Enum.map(&Message.new!/1)
          |> handle_messages.()
          |> Enum.map(fn %Message{ack_id: ack_id} -> ack_id end)

        ack(stream, ack_ids)

      {:error, %GRPC.RPCError{status: @unavailable}}, stream ->
        stream

      {:error, %GRPC.RPCError{status: @unknown, message: message} = error}, stream ->
        if expected_error?(message), do: stream, else: raise(error)

      {:error, error}, _stream ->
        raise error
    end)
  end

  @spec ack(GRPC.Client.Stream.t(), [String.t()]) :: GRPC.Client.Stream.t()
  defp ack(stream, ack_ids) do
    request = StreamingPullRequest.new(ack_ids: ack_ids)

    GRPC.Stub.send_request(stream, request)
  end

  defp client(), do: Application.get_env(:google_grpc_pubsub, :client, Client)

  defp expected_error?(message) do
    Regex.match?(~r/goaway.*max_age/, message) or
      Regex.match?(~r/stream_error.*Stream reset by server/, message)
  end
end