lib/pubsub/topic.ex

defmodule Google.Pubsub.Topic do
  alias Google.Pubsub.{Client, Message}

  alias Google.Pubsub.V1.{
    Topic,
    PubsubMessage,
    PublishResponse
  }

  @type t :: Topic.t()

  @type opts :: [project: String.t(), topic: String.t()]

  @spec create(opts()) :: {:ok, t()} | {:error, any()}
  def create(opts) do
    opts |> id() |> client().create_topic()
  end

  @spec get(opts()) :: {:ok, t()} | {:error, any()}
  def get(opts) do
    opts |> id() |> client().get_topic()
  end

  @spec id(opts()) :: String.t()
  def id(opts) do
    Path.join(["projects", Keyword.fetch!(opts, :project), "topics", Keyword.fetch!(opts, :topic)])
  end

  @spec publish(t(), [Message.t()]) :: :ok | {:error, any()}
  def publish(topic, message) when not is_list(message),
    do: publish(topic, [message])

  def publish(topic, messages) when is_list(messages) do
    messages =
      messages
      |> Enum.map(fn %Message{data: data} when is_binary(data) ->
        PubsubMessage.new(data: data)
      end)

    case client().publish(topic.name, messages) do
      {:ok, %PublishResponse{}} -> :ok
      {:error, error} -> {:error, error}
    end
  end

  defp client(), do: Application.get_env(:google_grpc_pubsub, :client, Client)
end