lib/pubsub/testing/client.ex

defmodule Google.Pubsub.Testing.Client do
  import ExUnit.Assertions

  alias Google.Pubsub.Message

  alias Google.Pubsub.V1.{
    Topic,
    Subscription,
    ReceivedMessage,
    PubsubMessage,
    PublishResponse,
    PullResponse
  }

  def create_topic(id) do
    send_message({:topic_created, id})
    {:ok, Topic.new(name: id)}
  end

  def get_topic(id) do
    {:ok, Topic.new(name: id)}
  end

  def publish(topic_id, messages) do
    send_message({:messages_published, topic_id, messages})
    {:ok, PublishResponse.new()}
  end

  def create_subscription(topic_id, subscription_id) do
    send_message({:create_subscription, topic_id, subscription_id})

    {:ok,
     Subscription.new(
       topic: topic_id,
       name: subscription_id
     )}
  end

  def get_subscription(subscription_id) do
    {:ok, Subscription.new(name: subscription_id)}
  end

  def delete_subscription(_subscription_id) do
    {:ok, Google.Protobuf.Empty.new()}
  end

  def pull(subscription_id, max_messages \\ 10) do
    assert_receive({:messages_published, ^subscription_id, messages})

    received_messages =
      messages
      |> Enum.take(max_messages)
      |> Enum.map(fn %Message{
                       ack_id: ack_id,
                       data: data,
                       attributes: attributes,
                       delivery_attempt: delivery_attempt,
                       publish_time: publish_time
                     } ->
        ReceivedMessage.new(
          ack_id: ack_id || to_string(:rand.uniform()),
          delivery_attempt: delivery_attempt,
          message: %PubsubMessage{
            data: data,
            attributes: attributes,
            publish_time: %Google.Protobuf.Timestamp{
              seconds:
                if(publish_time, do: publish_time, else: DateTime.utc_now()) |> DateTime.to_unix(),
              nanos: 0
            }
          }
        )
      end)

    {:ok, PullResponse.new(received_messages: received_messages)}
  end

  def acknowledge(subscription_id, ack_ids) do
    send_message({:acknowledged_messages, subscription_id, ack_ids})
    {:ok, Google.Protobuf.Empty.new()}
  end

  defp send_message(data) do
    pid = Application.get_env(:google_grpc_pubsub, :shared_test_process, self())

    send(pid, data)
  end
end