lib/actors/actor/entity/invocation.ex

defmodule Actors.Actor.Entity.Invocation do
  @moduledoc """
  Handles Invocation functions for Actor Entity
  All the public functions here assumes they are executing inside a GenServer
  """
  require Logger

  alias Actors.Actor.Entity.EntityState
  alias Actors.Registry.HostActor

  alias Eigr.Functions.Protocol.Actors.{
    Actor,
    ActorId,
    ActorSystem,
    Command,
    FixedTimerCommand
  }

  alias Eigr.Functions.Protocol.{
    ActorInvocation,
    ActorInvocationResponse,
    Broadcast,
    Context,
    Forward,
    InvocationRequest,
    Pipe,
    SideEffect,
    Workflow,
    Noop
  }

  alias Phoenix.PubSub

  import Actors.Registry.ActorRegistry, only: [lookup: 2]

  @default_host_interface Actors.Actor.Interface.Http

  @default_methods [
    "get",
    "Get",
    "get_state",
    "getState",
    "GetState"
  ]

  def timer_invoke(
        %FixedTimerCommand{command: %Command{name: cmd} = _command} = timer,
        %EntityState{
          system: _actor_system,
          actor: %Actor{id: caller_actor_id} = actor
        } = state
      ) do
    invocation = %InvocationRequest{
      actor: actor,
      command_name: cmd,
      payload: {:noop, Noop.new()},
      async: true,
      caller: caller_actor_id
    }

    invoke_result = invoke({invocation, []}, state)

    :ok = handle_timers([timer])

    case invoke_result do
      {:reply, _res, state} -> {:noreply, state}
      {:reply, _res, state, opts} -> {:noreply, state, opts}
    end
  end

  def handle_timers(timers) when is_list(timers) do
    if length(timers) > 0 do
      timers
      |> Stream.map(fn %FixedTimerCommand{seconds: delay} = timer_command ->
        Process.send_after(self(), {:invoke_timer_command, timer_command}, delay)
      end)
      |> Stream.run()
    end

    :ok
  catch
    error -> Logger.error("Error on handle timers #{inspect(error)}")
  end

  def handle_timers(nil), do: :ok

  def handle_timers([]), do: :ok

  def broadcast_invoke(
        command,
        payload,
        %ActorInvocation{actor_name: caller_actor_name, actor_system: actor_system},
        %EntityState{
          system: _actor_system,
          actor: %Actor{id: %ActorId{name: actor_name} = _id} = actor
        } = state
      ) do
    Logger.debug(
      "Actor [#{actor_name}] Received Broadcast Event [#{inspect(payload)}] to perform Action [#{command}]"
    )

    invocation = %InvocationRequest{
      actor: actor,
      command_name: command,
      payload: payload,
      async: true,
      caller: ActorId.new(name: caller_actor_name, system: actor_system)
    }

    case invoke({invocation, []}, state) do
      {:reply, _res, state} -> {:noreply, state}
      {:reply, _res, state, opts} -> {:noreply, state, opts}
    end
  end

  def broadcast_invoke(
        payload,
        %EntityState{
          system: _actor_system,
          actor: %Actor{id: %ActorId{name: actor_name} = _id} = _actor
        } = state
      ) do
    Logger.debug(
      "Actor [#{actor_name}] Received Broadcast Event [#{inspect(payload)}] without command. Just ignoring"
    )

    {:noreply, state}
  end

  @doc """
  Invoke function, receives a request and calls invoke host with the response
  """
  def invoke(
        {%InvocationRequest{
           actor:
             %Actor{
               id: %ActorId{name: actor_name} = _id
             } = _actor,
           metadata: metadata,
           command_name: command,
           payload: payload,
           caller: caller
         }, opts},
        %EntityState{
          system: actor_system,
          actor: %Actor{state: actor_state, commands: commands, timer_commands: timers}
        } = state
      ) do
    if length(commands) <= 0 do
      Logger.warning("Actor [#{actor_name}] has not registered any Actions")
    end

    all_commands =
      commands ++ Enum.map(timers, fn %FixedTimerCommand{command: cmd} = _timer_cmd -> cmd end)

    case Enum.member?(@default_methods, command) or
           Enum.any?(all_commands, fn cmd -> cmd.name == command end) do
      true ->
        interface = get_interface(actor_system, actor_name, opts)

        metadata = if is_nil(metadata), do: %{}, else: metadata
        current_state = Map.get(actor_state || %{}, :state)

        request =
          ActorInvocation.new(
            actor_name: actor_name,
            actor_system: actor_system,
            command_name: command,
            payload: payload,
            current_context:
              Context.new(
                metadata: metadata,
                caller: caller,
                self: ActorId.new(name: actor_name, system: actor_system),
                state: current_state
              ),
            caller: caller
          )

        interface.invoke_host(request, state, @default_methods)
        |> case do
          {:ok, response, state} -> {:reply, {:ok, do_response(request, response, state)}, state}
          {:error, reason, state} -> {:reply, {:error, reason}, state, :hibernate}
        end

      false ->
        {:reply, {:error, "Command [#{command}] not found for Actor [#{actor_name}]"}, state,
         :hibernate}
    end
  end

  defp do_response(_request, %ActorInvocationResponse{workflow: workflow} = response, _state)
       when is_nil(workflow) or workflow == %{} do
    response
  end

  defp do_response(request, response, state) do
    do_run_workflow(request, response, state)
  end

  defp do_run_workflow(_request, %ActorInvocationResponse{workflow: workflow} = response, _state)
       when is_nil(workflow) or workflow == %{} do
    response
  end

  defp do_run_workflow(
         request,
         %ActorInvocationResponse{
           workflow: %Workflow{broadcast: broadcast, effects: effects} = _workflow
         } = response,
         _state
       ) do
    do_side_effects(effects)
    do_broadcast(request, broadcast)
    do_handle_routing(request, response)
  end

  defp do_handle_routing(
         _request,
         %ActorInvocationResponse{
           workflow: %Workflow{routing: routing} = _workflow
         } = response
       )
       when is_nil(routing),
       do: response

  defp do_handle_routing(
         %ActorInvocation{
           actor_name: caller_actor_name,
           actor_system: system_name
         },
         %ActorInvocationResponse{
           payload: payload,
           workflow:
             %Workflow{
               routing: {:pipe, %Pipe{actor: actor_name, command_name: cmd} = _pipe} = _workflow
             } = response
         }
       ) do
    invocation = %InvocationRequest{
      system: %ActorSystem{name: system_name},
      actor: %Actor{id: ActorId.new(name: actor_name, system: system_name)},
      command_name: cmd,
      payload: payload,
      caller: ActorId.new(name: caller_actor_name, system: system_name)
    }

    try do
      case lookup(system_name, actor_name) do
        {:ok, %HostActor{opts: opts}} ->
          case Actors.invoke(invocation, opts) do
            {:ok, response} -> response
            error -> error
          end

        _ ->
          response
      end
    catch
      error ->
        Logger.warning(
          "Error during Pipe request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
        )

        response
    end
  end

  defp do_handle_routing(
         %ActorInvocation{
           actor_system: system_name,
           payload: payload,
           actor_name: caller_actor_name
         } = _request,
         %ActorInvocationResponse{
           workflow:
             %Workflow{
               routing:
                 {:forward, %Forward{actor: actor_name, command_name: cmd} = _pipe} = _workflow
             } = response
         }
       ) do
    invocation = %InvocationRequest{
      system: %ActorSystem{name: system_name},
      actor: %Actor{id: ActorId.new(name: actor_name, system: system_name)},
      command_name: cmd,
      payload: payload,
      caller: ActorId.new(name: caller_actor_name, system: system_name)
    }

    try do
      case lookup(system_name, actor_name) do
        {:ok, %HostActor{opts: opts}} ->
          case Actors.invoke(invocation, opts) do
            {:ok, response} -> response
            error -> error
          end

        _ ->
          response
      end
    catch
      error ->
        Logger.warning(
          "Error during Forward request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
        )

        response
    end
  end

  def do_broadcast(_request, broadcast) when is_nil(broadcast) or broadcast == %{} do
    :ok
  end

  def do_broadcast(
        request,
        %Broadcast{channel_group: channel, command_name: command, payload: payload} = _broadcast
      ) do
    publish(channel, command, payload, request)
  end

  defp publish(channel, command, payload, _request) when is_nil(command) do
    PubSub.broadcast(
      :actor_channel,
      channel,
      {:receive, payload}
    )
  end

  defp publish(channel, command, payload, request) do
    PubSub.broadcast(
      :actor_channel,
      channel,
      {:receive, command, payload, request}
    )
  end

  def do_side_effects(effects) when is_list(effects) and effects == [] do
    :ok
  end

  def do_side_effects(effects) when is_list(effects) do
    spawn(fn ->
      effects
      |> Flow.from_enumerable(min_demand: 1, max_demand: System.schedulers_online())
      |> Flow.map(fn %SideEffect{
                       request:
                         %InvocationRequest{
                           actor: %Actor{id: %ActorId{name: actor_name} = _id} = _actor,
                           system: %ActorSystem{name: system_name}
                         } = invocation
                     } ->
        try do
          case lookup(system_name, actor_name) do
            {:ok, %HostActor{opts: opts}} ->
              Actors.invoke(invocation, opts)

            _ ->
              :ok
          end
        catch
          error ->
            Logger.warning(
              "Error during Side Effect request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
            )

            :ok
        end
      end)
      |> Flow.run()
    end)
  catch
    error ->
      Logger.warning("Error during Side Effect request. Error: #{inspect(error)}")
      :ok
  end

  defp get_interface(system_name, actor_name, opts),
    do:
      Keyword.get(
        opts,
        :host_interface,
        get_interface_by_actor_or_default(system_name, actor_name)
      )

  defp get_interface_by_actor_or_default(system_name, actor_name) do
    case lookup(system_name, actor_name) do
      {:ok, %HostActor{opts: opts}} ->
        Keyword.get(opts, :host_interface, @default_host_interface)

      _ ->
        @default_host_interface
    end
  end
end