lib/myspace_pubsub.ex

defmodule MyspacePubsub do
  @moduledoc """
  MyspacePubsub is where the Pubsub commands of the IPFS API reside.
  """

  alias MyspacePubsub.Topic
  alias MyspacePubsub.Api
  require Logger

  @doc """
  Lists the topics that the node is subscribed to.
  """
  @spec ls() :: {:ok, list(binary)} | {:error, any | :invalid_response}
  def ls() do
    case Api.get("/topics") do
      {:ok, %Tesla.Env{body: body}} when is_map(body) ->
        %{"topics" => topics} = body
        {:ok, topics}

      {:ok, _} ->
        {:error, :invalid_response}

      {:error, err} ->
        {:error, err}
    end
  end

  @doc """
  Lists the topics that the node is subscribed to.
  """
  @spec ls!() :: list(binary)
  def ls!() do
    {:ok, %Tesla.Env{body: body}} = Api.get("/topics")
    %{"topics" => topics} = body
    topics
  end

  @doc """
  Checks if a topic exists in the list of topics that the node is subscribed to.
  ## Parameters
    `topic` - The topic to check for.
  """
  @spec exists?(binary) :: boolean
  def exists?(topic) do
    {:ok, topics} = ls()
    topic in topics
  end

  @doc """
    Subscribe to messages on a topic and listen for them.
    https://docs.ipfs.io/reference/http/api/#api-v0-pubsub-sub
    Messages are sent to the process as a tuple of `{:myspace_pubsub_topic_message, message}`.
    This should make it easy to pattern match on the messages in a receive do loop.

    ## Parameters
      `topic` - The topic to subscribe to.
      `pid`   - The process to send the messages to.

    ## Usage
    MyspacePubsub.sub("mytopic")
    MyspacePubsub.sub("mytopic", self()) # Same as above
    MyspacePubsub.sub("mytopic", pid) # Send messages to a specific process

  Returns {:ok, pid} where pid is the pid of the GenServer that is listening for messages.
  Messages will be sent to the provided as a parameter to the function.
  """
  #  @spec sub(binary, pid) :: {:ok, pid} | {:error, any}
  @spec sub(binary, pid) :: any
  def sub(topic, pid \\ self()) when is_binary(topic) do
    topic = Topic.new!(topic, pid)

    case MyspacePubsub.Supervisor.start_topic(topic) do
      {:ok, pid} ->
        Logger.info("Started topic: #{topic.topic}")
        {:ok, pid}

      {:error, {:already_started, pid}} ->
        Logger.info("Subscribing to topic: #{topic.topic}")
        MyspacePubsub.Topic.subscribe(pid, topic.topic)
        {:ok, pid}
    end
  end

  @doc """
  Get next message from the Pubsub Topic in your inbox or wait for one to arrive.
  This is probably just useful for testing and is just here for the sweetness of it.
  https://www.youtube.com/watch?v=6jAVHBLvo2c
  It's just not good for your health, but OK for your soul.
  """
  @spec get_pubsub_topic_message :: any
  def get_pubsub_topic_message() do
    receive do
      {:myspace_pubsub_message, message} -> message
    end
  end

  @doc """
  Lists the peers that are participating in the given topic.
  ## Parameters
    `topic` - The topic to list peers for.
  ## Returns
  A tuple `{:ok, peers}` on success, where peers is a list of peer identifiers.
  Returns `{:error, reason}` on failure.
  """
  @spec peers(binary) :: {:ok, list(binary)} | {:error, any | :invalid_response}
  def peers(topic) when is_binary(topic) do
    case Api.get("/topics/" <> topic <> "/peers") do
      {:ok, %Tesla.Env{body: body}} when is_map(body) ->
        %{"peers" => peers} = body
        {:ok, peers}

      {:ok, _} ->
        {:error, :invalid_response}

      {:error, err} ->
        {:error, err}
    end
  end

  @doc """
  Lists the peers that are participating in the given topic.
  ## Parameters
    `topic` - The topic to list peers for.
  ## Returns
  A list of peer identifiers.
  """
  @spec peers!(binary) :: list(binary)
  def peers!(topic) when is_binary(topic) do
    {:ok, %Tesla.Env{body: body}} = Api.get("/topics/" <> topic <> "/peers")
    %{"peers" => peers} = body
    peers
  end
end