lib/ex_ipfs_pubsub.ex

defmodule ExIpfsPubsub do
  @moduledoc """
  ExIpfsPubsub is where the Pubsub commands of the IPFS API reside.
  """
  import ExIpfs.Api
  import ExIpfs.Utils
  alias ExIpfs.Multibase
  alias ExIpfsPubsub.Topic

  require Logger

  @spec ls :: {:error, any} | {:ok, list}
  @doc """
  List the topics you are currently subscribed to.

  https://docs.ipfs.io/reference/http/api/#api-v0-pubsub-ls
  """
  # @spec ls :: {:ok, ExIpfs.strings()} | ExIpfs.Api.error_response()
  def ls do
    post_query("/pubsub/ls")
    |> decode_strings()
    |> Map.get("Strings")
    |> okify()
  end

  @doc """
  List the peers you are currently connected to.

  https://docs.ipfs.io/reference/http/api/#api-v0-pubsub-peers

  ## Parameters
    `topic` - The topic to list peers for.
  """
  @spec peers(binary) :: {:ok, any} | ExIpfs.Api.error_response()
  def peers(topic) do
    base64topic = Multibase.encode!(topic, [])

    post_query("/pubsub/peers?arg=#{base64topic}")
    |> Map.get("Strings")
    |> okify()
  end

  @doc """
  Publish a message to a topic.

  https://docs.ipfs.io/reference/http/api/#api-v0-pubsub-pub

  ## Parameters
  ```
    `topic` - The topic to publish to.
    `data` - The data to publish.
  ```

  ## Usage
  ```
  ExIpfsPubsub.sub("mymessage", "mytopic")
  ```

  """
  @spec pub(binary, binary) :: {:ok, any} | ExIpfs.Api.error_response()
  def pub(data, topic) do
    multipart_content(data, "data")
    |> post_multipart("/pubsub/pub?arg=" <> Multibase.encode!(topic, []))
    |> okify()
  end

  @doc """
  Subscribe to messages on a topic and listen for them.

  https://docs.ipfs.io/reference/http/api/#api-v0-pubsub-sub

  Messages are sent to the process as a tuple of `{:ex_ipfs_pubsub_topic_message, message}`.
  This should make it easy to pattern match on the messages in a receive do loop.

  ## Parameters
    `topic` - The topic to subscribe to.
    `pid`   - The process to send the messages to.

  ## Usage
  ```
  ExIpfsPubsub.sub(self(), "mytopic")
  ```

  Returns {:ok, pid} where pid is the pid of the GenServer that is listening for messages.
  Messages will be sent to the provided as a parameter to the function.
  """
  @spec sub(binary, pid) :: {:ok, pid} | ExIpfs.Api.error_response()
  def sub(topic, pid \\ self()) when is_binary(topic) do
    topic = Topic.new!(topic, pid)

    case ExIpfsPubsub.Supervisor.start_topic(topic) do
      {:ok, pid} ->
        Logger.info("Started topic: #{topic.topic}")
        {:ok, pid}

      {:error, {:already_started, handler}} ->
        Logger.info("Subscribing to topic: #{topic.base64url_topic}")
        ExIpfsPubsub.Topic.subscribe(pid, topic.topic)
        {:ok, handler}
    end
  end

  @doc """
  Get next message from the Pubsub Topic in your inbox or wait for one to arrive.

  This is probably just useful for testing and is just here for the sweetness of it.
  https://www.youtube.com/watch?v=6jAVHBLvo2c

  It's just not good for your health, but OK for your soul.
  """
  @spec get_pubsub_topic_message :: any
  def get_pubsub_topic_message() do
    receive do
      {:ex_ipfs_pubsub_topic_message, message} -> message
    end
  end

  @spec decode_strings({:error, any} | map | list) :: {:error, any} | map | list
  defp decode_strings({:error, data}), do: {:error, data}

  defp decode_strings(strings) when is_map(strings) do
    strings = Map.get(strings, "Strings", [])
    decoded_strings = Enum.map(strings, &Multibase.decode!/1)
    %{"Strings" => decoded_strings}
  end

  defp decode_strings(list), do: Enum.map(list, &decode_strings/1)
end