lib/pubsub/subscription.ex

defmodule Google.Pubsub.Subscription do
  alias Google.Pubsub.{Client, Topic, Message}

  alias Google.Pubsub.V1.{Subscription, PullResponse}

  @type t :: Subscription.t()

  @type opts :: [project: String.t(), subscription: String.t()]

  @spec create(project: String.t(), subscription: String.t(), topic: String.t()) ::
          {:ok, t()} | {:error, any()}
  def create(opts) do
    topic_id = Topic.id(opts)
    subscription_id = id(opts)

    client().create_subscription(topic_id, subscription_id)
  end

  @spec get(opts()) ::
          {:ok, t()} | {:error, any()}
  def get(opts) do
    opts |> id() |> client().get_subscription()
  end

  @spec delete(t()) :: :ok | {:error, any()}
  def delete(%Subscription{name: name}) do
    name
    |> client().delete_subscription()
    |> case do
      {:ok, %Google.Protobuf.Empty{}} -> :ok
      {:error, error} -> {:error, error}
    end
  end

  @spec id(opts()) :: String.t()
  def id(opts) do
    Path.join([
      "projects",
      Keyword.fetch!(opts, :project),
      "subscriptions",
      Keyword.fetch!(opts, :subscription)
    ])
  end

  @spec pull(t(), max_messages: number()) :: {:ok, [Message.t()]} | {:error, any()}
  def pull(%Subscription{name: name}, opts \\ []) do
    max_messages = Keyword.get(opts, :max_messages, 10)

    case client().pull(name, max_messages) do
      {:ok, %PullResponse{received_messages: received_messages}} ->
        {:ok, Enum.map(received_messages, &Message.new!/1)}

      {:error, error} ->
        {:error, error}
    end
  end

  @spec acknowledge(t(), Message.t() | [Message.t()]) ::
          :ok | {:error, any()}
  def acknowledge(%Subscription{name: name}, messages) when is_list(messages) do
    ack_ids = Enum.map(messages, fn message -> message.ack_id end)

    case client().acknowledge(name, ack_ids) do
      {:ok, %Google.Protobuf.Empty{}} -> :ok
      {:error, error} -> {:error, error}
    end
  end

  def acknowledge(subscription, message), do: acknowledge(subscription, [message])

  defp client(), do: Application.get_env(:google_grpc_pubsub, :client, Client)
end