lib/emit.ex

defmodule Emit do
  alias Emit.{Cluster, DB}
  alias Lethe.Query

  def sub(key \\ key(), metadata, table \\ DB.default_table()) when is_map(metadata) do
    DB.set(key, metadata, table)
  end

  def unsub(key \\ key(), table \\ DB.default_table()) do
    DB.del(key, table)
  end

  def unsub_auto(key \\ key(), table \\ DB.default_table()) do
    spawn(fn ->
      Process.monitor(key)

      receive do
        {:DOWN, _ref, :process, _pid, _reason} ->
          DB.del(key, table)
      end
    end)
  end

  def pub(msg, %Query{} = query) do
    [Node.self() | Node.list()]
    |> Enum.map(fn node ->
      target =
        if node == Node.self() do
          Cluster.task_supervisor()
        else
          {Cluster.task_supervisor(), node}
        end

      Task.Supervisor.async(target, fn ->
        query
        |> DB.query()
        |> Manifold.send(msg)
      end)
    end)
    |> Enum.each(&Task.await/1)
  end

  def query(table \\ DB.default_table()), do: DB.new_query(table)

  defp key, do: self()
end