lib/pubsub/subscription.ex

defmodule Google.Pubsub.Subscription do
  alias Google.Pubsub.{Client, Topic, Message}

  alias Google.Pubsub.V1.{
    Subscriber.Stub,
    Subscription,
    GetSubscriptionRequest,
    PullRequest,
    PullResponse,
    AcknowledgeRequest
  }

  @type t :: Subscription.t()

  @type opts :: [project: String.t(), subscription: String.t()]

  @spec create(project: String.t(), subscription: String.t(), topic: String.t()) ::
          {:ok, t()} | {:error, any()}
  def create(opts) do
    subscription =
      Subscription.new(
        name: id(opts),
        topic: Topic.id(opts)
      )

    client().send_request(subscription, &Stub.create_subscription/3)
  end

  @spec get(opts()) ::
          {:ok, t()} | {:error, any()}
  def get(opts) do
    request = GetSubscriptionRequest.new(subscription: id(opts))

    client().send_request(request, &Stub.get_subscription/3)
  end

  @spec delete(t()) :: :ok | {:error, any()}
  def delete(%Subscription{name: name}) do
    request = Google.Pubsub.V1.DeleteSubscriptionRequest.new(subscription: name)

    case client().send_request(request, &Stub.delete_subscription/3) do
      {:ok, %Google.Protobuf.Empty{}} -> :ok
      {:error, error} -> {:error, error}
    end
  end

  @spec id(opts()) :: String.t()
  def id(opts) do
    Path.join([
      "projects",
      Keyword.fetch!(opts, :project),
      "subscriptions",
      Keyword.fetch!(opts, :subscription)
    ])
  end

  @spec pull(t(), max_messages: number()) :: {:ok, [Message.t()]} | {:error, any()}
  def pull(%Subscription{name: name}, opts \\ []) do
    request =
      PullRequest.new(
        subscription: name,
        max_messages: Keyword.get(opts, :max_messages, 10)
      )

    client().send_request(request, &Stub.pull/3)
    |> case do
      {:ok, %PullResponse{received_messages: received_messages}} ->
        {:ok, Enum.map(received_messages, &Message.new!/1)}

      {:error, error} ->
        {:error, error}
    end
  end

  @spec acknowledge(t(), Message.t() | [Message.t()]) ::
          :ok | {:error, any()}
  def acknowledge(subscription, messages) when is_list(messages) do
    ack_ids = Enum.map(messages, fn message -> message.ack_id end)

    request = AcknowledgeRequest.new(ack_ids: ack_ids, subscription: subscription.name)

    case client().send_request(request, &Stub.acknowledge/3) do
      {:ok, %Google.Protobuf.Empty{}} -> :ok
      {:error, error} -> {:error, error}
    end
  end

  def acknowledge(subscription, message), do: acknowledge(subscription, [message])

  defp client(), do: Application.get_env(:google_grpc_pubsub, :client, Client)
end