lib/emit.ex

defmodule Emit do
  alias Emit.{Cluster, DB}
  alias Lethe.Query

  def sub(metadata) when is_map(metadata) do
    DB.set key(), metadata
  end

  def unsub do
    DB.del key()
  end

  def unsub_auto do
    pid = self()
    spawn fn ->
      Process.monitor pid
      receive do
        {:DOWN, _ref, :process, _pid, _reason} ->
          DB.del pid
      end
    end
  end

  def pub(msg, %Query{} = query) do
    [Node.self() | Node.list()]
    |> Enum.map(fn node ->
      target =
        if node == Node.self() do
          Cluster.task_supervisor()
        else
          {Cluster.task_supervisor(), node}
        end

      Task.Supervisor.async target, fn ->
        query
        |> DB.query
        |> Manifold.send(msg)
      end
    end)
    |> Enum.each(&Task.await/1)
  end

  def query, do: DB.new_query()

  defp key, do: self()
end