lib/actors/actor/pubsub.ex

defmodule Actors.Actor.Pubsub do
  use GenServer

  require Logger

  alias Eigr.Functions.Protocol.InvocationRequest
  alias Eigr.Functions.Protocol.Actors.ActorSystem
  alias Eigr.Functions.Protocol.Actors.Actor
  alias Eigr.Functions.Protocol.Actors.ActorId

  @default_pubsub_group :actor_channel
  @pubsub Application.compile_env(:spawn, :pubsub_group, @default_pubsub_group)

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, nil, name: __MODULE__)
  end

  def publish(topic, payload, request) do
    Phoenix.PubSub.broadcast(
      @pubsub,
      topic,
      {:receive, payload, request.actor},
      Actors.Actor.PubsubDispatcher
    )
  end

  @doc """
  Subscribes a specific actor to a topic
  """
  def subscribe(topic, actor_name, system, action_handler \\ nil) do
    GenServer.cast(__MODULE__, {:subscribe, topic, actor_name, system, action_handler})
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:subscribe, topic, actor_name, system, action_handler}, state) do
    metadata = %{actor_name: actor_name, system: system, action: action_handler}
    key = :erlang.phash2(metadata)

    if Map.get(state, key) do
      {:noreply, state}
    else
      Phoenix.PubSub.subscribe(@pubsub, topic, metadata: metadata)

      {:noreply, Map.put(state, key, true)}
    end
  end

  @impl true
  def handle_info({{:receive, payload, caller}, metadata}, state) do
    action =
      Map.get(metadata, :action)
      |> case do
        nil -> "receive"
        "" -> "receive"
        action -> action
      end

    actor_name = Map.get(metadata, :actor_name)
    system = Map.get(metadata, :system)

    Logger.debug(
      "Actor [#{actor_name}] Received Broadcast Event to perform Action [#{action}] from caller #{inspect(caller)}"
    )

    invocation = %InvocationRequest{
      system: %ActorSystem{name: system},
      actor: %Actor{
        id: %ActorId{name: actor_name, system: system}
      },
      action_name: action,
      payload: payload,
      caller: caller,
      async: true
    }

    {:ok, :async} = Actors.invoke(invocation)

    {:noreply, state}
  end
end