lib/actors/actor/entity/entity.ex

defmodule Actors.Actor.Entity do
  use GenServer, restart: :transient
  require Logger

  alias Actors.Actor.{Entity.EntityState, StateManager}
  alias Actors.Registry.HostActor

  alias Eigr.Functions.Protocol.Actors.{
    Actor,
    ActorId,
    ActorDeactivationStrategy,
    ActorSettings,
    ActorState,
    ActorSnapshotStrategy,
    ActorSystem,
    Command,
    FixedTimerCommand,
    Metadata,
    TimeoutStrategy
  }

  alias Eigr.Functions.Protocol.{
    ActorInvocation,
    ActorInvocationResponse,
    Broadcast,
    Context,
    Forward,
    InvocationRequest,
    Pipe,
    SideEffect,
    Workflow
  }

  alias Phoenix.PubSub

  import Actors, only: [invoke: 2]
  import Actors.Registry.ActorRegistry, only: [lookup: 2]

  @default_deactivate_timeout 10_000

  @default_host_interface Actors.Actor.Interface.Http

  @default_methods [
    "get",
    "Get",
    "get_state",
    "getState",
    "GetState"
  ]

  @default_snapshot_timeout 2_000

  @fullsweep_after 10

  @min_snapshot_threshold 500

  @timeout_factor_range 200..9000

  @impl true
  @spec init(EntityState.t()) ::
          {:ok, EntityState.t(), {:continue, :load_state}}
  def init(initial_state) do
    state = EntityState.unpack(initial_state)

    do_init(state)
    |> parse_packed_response()
  end

  defp do_init(
         %EntityState{
           actor: %Actor{
             id: %ActorId{name: name} = _id,
             metadata: metadata,
             settings:
               %ActorSettings{persistent: false, deactivation_strategy: deactivation_strategy} =
                 _settings,
             timer_commands: timer_commands
           }
         } = state
       )
       when is_nil(deactivation_strategy) or deactivation_strategy == %{} do
    Process.flag(:trap_exit, true)

    Logger.notice(
      "Activating actor #{name} in Node #{inspect(Node.self())}. Persistence disabled."
    )

    :ok = handle_metadata(name, metadata)
    :ok = handle_timers(timer_commands)

    strategy = {:timeout, TimeoutStrategy.new!(timeout: @default_deactivate_timeout)}
    schedule_deactivate(strategy, get_timeout_factor(@timeout_factor_range))
    {:ok, state}
  end

  defp do_init(
         %EntityState{
           actor: %Actor{
             id: %ActorId{name: name} = _id,
             metadata: metadata,
             settings:
               %ActorSettings{
                 persistent: false,
                 deactivation_strategy:
                   %ActorDeactivationStrategy{strategy: deactivation_strategy} = _dstrategy
               } = _settings,
             timer_commands: timer_commands
           }
         } = state
       ) do
    Process.flag(:trap_exit, true)

    Logger.notice(
      "Activating actor #{name} in Node #{inspect(Node.self())}. Persistence disabled."
    )

    :ok = handle_metadata(name, metadata)
    :ok = handle_timers(timer_commands)

    schedule_deactivate(deactivation_strategy, get_timeout_factor(@timeout_factor_range))
    {:ok, state}
  end

  defp do_init(
         %EntityState{
           actor: %Actor{
             id: %ActorId{name: name} = _id,
             metadata: metadata,
             settings:
               %ActorSettings{
                 persistent: true,
                 snapshot_strategy: %ActorSnapshotStrategy{} = _snapshot_strategy,
                 deactivation_strategy: deactivation_strategy
               } = _settings,
             timer_commands: timer_commands
           }
         } = state
       )
       when is_nil(deactivation_strategy) or deactivation_strategy == %{} do
    Process.flag(:trap_exit, true)

    Logger.notice(
      "Activating actor #{name} in Node #{inspect(Node.self())}. Persistence enabled."
    )

    :ok = handle_metadata(name, metadata)
    :ok = handle_timers(timer_commands)

    strategy = {:timeout, TimeoutStrategy.new!(timeout: @default_deactivate_timeout)}
    schedule_deactivate(strategy, get_timeout_factor(@timeout_factor_range))

    # Write soon in the first time
    schedule_snapshot_advance(@min_snapshot_threshold + get_timeout_factor(@timeout_factor_range))
    {:ok, state, {:continue, :load_state}}
  end

  defp do_init(
         %EntityState{
           actor: %Actor{
             id: %ActorId{name: name} = _id,
             metadata: metadata,
             settings:
               %ActorSettings{
                 persistent: true,
                 snapshot_strategy: %ActorSnapshotStrategy{} = _snapshot_strategy,
                 deactivation_strategy: %ActorDeactivationStrategy{
                   strategy: deactivation_strategy
                 }
               } = _settings,
             timer_commands: timer_commands
           }
         } = state
       ) do
    Process.flag(:trap_exit, true)

    Logger.notice(
      "Activating actor #{inspect(name)} in Node #{inspect(Node.self())}. Persistence enabled."
    )

    :ok = handle_metadata(name, metadata)
    :ok = handle_timers(timer_commands)

    schedule_deactivate(deactivation_strategy, get_timeout_factor(@timeout_factor_range))

    # Write soon in the first time
    schedule_snapshot_advance(@min_snapshot_threshold + get_timeout_factor(@timeout_factor_range))
    {:ok, state, {:continue, :load_state}}
  end

  @impl true
  @spec handle_continue(:load_state, EntityState.t()) :: {:noreply, EntityState.t()}
  def handle_continue(action, state) do
    state = EntityState.unpack(state)

    do_handle_continue(action, state)
    |> parse_packed_response()
  end

  defp do_handle_continue(
         :load_state,
         %EntityState{
           actor: %Actor{id: %ActorId{name: name} = _id, state: actor_state} = actor
         } = state
       )
       when is_nil(actor_state) do
    Logger.debug("Initial state is empty... Getting state from state manager.")

    case StateManager.load(name) do
      {:ok, current_state} ->
        {:noreply, %EntityState{state | actor: %Actor{actor | state: current_state}}}

      {:not_found, %{}} ->
        Logger.debug("Not found initial Actor State on statestore for Actor #{name}.")
        {:noreply, state, :hibernate}
    end
  end

  defp do_handle_continue(
         :load_state,
         %EntityState{
           actor:
             %Actor{id: %ActorId{name: name} = _id, state: %ActorState{} = _actor_state} = actor
         } = state
       ) do
    Logger.debug(
      "Initial state is not empty... Trying to reconcile the state with state manager."
    )

    case StateManager.load(name) do
      {:ok, current_state} ->
        # TODO: Merge current with old ?
        {:noreply, %EntityState{state | actor: %Actor{actor | state: current_state}}}

      {:not_found, %{}} ->
        Logger.debug("Not found initial state on statestore for Actor #{name}.")
        {:noreply, state, :hibernate}

      error ->
        Logger.error("Error on load state for Actor #{name}. Error: #{inspect(error)}")
        {:noreply, state, :hibernate}
    end
  end

  @impl true
  def handle_call(action, from, state) do
    state = EntityState.unpack(state)

    do_handle_call(action, from, state)
    |> parse_packed_response()
  end

  defp do_handle_call(
         :get_state,
         _from,
         %EntityState{
           actor: %Actor{state: actor_state} = _actor
         } = state
       )
       when is_nil(actor_state),
       do: {:reply, {:error, :not_found}, state, :hibernate}

  defp do_handle_call(
         :get_state,
         _from,
         %EntityState{
           actor: %Actor{state: %ActorState{} = actor_state} = _actor
         } = state
       ),
       do: {:reply, {:ok, actor_state}, state, :hibernate}

  defp do_handle_call(
         {:invocation_request,
          %InvocationRequest{
            actor:
              %Actor{
                id: %ActorId{name: actor_name} = _id
              } = _actor,
            command_name: command,
            value: payload
          } = _invocation, opts},
         _from,
         %EntityState{
           system: actor_system,
           actor: %Actor{state: actor_state, commands: commands, timer_commands: timers}
         } = state
       ) do
    if length(commands) <= 0 do
      Logger.warning("Actor [#{actor_name}] has not registered any Actions")
    end

    all_commands =
      commands ++ Enum.map(timers, fn %FixedTimerCommand{command: cmd} = _timer_cmd -> cmd end)

    case Enum.member?(@default_methods, command) or
           Enum.any?(all_commands, fn cmd -> cmd.name == command end) do
      true ->
        interface = get_interface(actor_system, actor_name, opts)
        current_state = Map.get(actor_state || %{}, :state)

        request =
          ActorInvocation.new(
            actor_name: actor_name,
            actor_system: actor_system,
            command_name: command,
            value: payload,
            current_context: Context.new(state: current_state)
          )

        interface.invoke_host(request, state, @default_methods)
        |> case do
          {:ok, response, state} -> {:reply, {:ok, do_response(request, response, state)}, state}
          {:error, reason, state} -> {:reply, {:error, reason}, state, :hibernate}
        end

      false ->
        {:reply, {:error, "Command [#{command}] not found for Actor [#{actor_name}]"}, state,
         :hibernate}
    end
  end

  @impl true
  def handle_cast(action, state) do
    state = EntityState.unpack(state)

    do_handle_cast(action, state)
    |> parse_packed_response()
  end

  defp do_handle_cast(
         {:invocation_request,
          %InvocationRequest{
            actor: %Actor{id: %ActorId{name: actor_name} = _id} = _actor,
            command_name: command,
            value: payload
          } = _invocation, opts},
         %EntityState{
           system: actor_system,
           actor: %Actor{state: actor_state, commands: commands, timer_commands: timers}
         } = state
       ) do
    if length(commands) <= 0 do
      Logger.warning("Actor [#{actor_name}] has not registered any Actions")
    end

    all_commands =
      commands ++ Enum.map(timers, fn %FixedTimerCommand{command: cmd} = _timer_cmd -> cmd end)

    case Enum.member?(@default_methods, command) or
           Enum.any?(all_commands, fn cmd -> cmd.name == command end) do
      true ->
        interface = get_interface(actor_system, actor_name, opts)
        current_state = Map.get(actor_state || %{}, :state)

        request =
          ActorInvocation.new(
            actor_name: actor_name,
            actor_system: actor_system,
            command_name: command,
            value: payload,
            current_context: Context.new(state: current_state)
          )

        interface.invoke_host(request, state, @default_methods)
        |> case do
          {:ok, response, state} ->
            do_response(request, response, state)
            {:noreply, state}

          {:error, _reason, state} ->
            {:noreply, state, :hibernate}
        end

      false ->
        {:reply, {:error, "Command [#{command}] not found for Actor [#{actor_name}]"}, state,
         :hibernate}
    end
  end

  @impl true
  def handle_info(action, state) do
    state = EntityState.unpack(state)

    do_handle_info(action, state)
    |> parse_packed_response()
  end

  defp do_handle_info(
         {:invoke_timer_command,
          %FixedTimerCommand{command: %Command{name: cmd} = _command} = timer},
         %EntityState{
           system: _actor_system,
           actor: %Actor{state: actor_state} = actor
         } = state
       ) do
    current_state = Map.get(actor_state || %{}, :state)

    invocation = %InvocationRequest{
      actor: actor,
      command_name: cmd,
      value: current_state,
      async: true
    }

    result = do_handle_cast({:invocation_request, invocation, []}, state)

    :ok = handle_timers([timer])

    result
  end

  defp do_handle_info(
         {:receive, cmd, payload},
         %EntityState{
           system: _actor_system,
           actor: %Actor{id: %ActorId{name: actor_name} = _id} = actor
         } = state
       ) do
    Logger.debug(
      "Actor [#{actor_name}] Received Broadcast Event [#{inspect(payload)}] to perform Action [#{cmd}]"
    )

    invocation = %InvocationRequest{
      actor: actor,
      command_name: cmd,
      value: payload,
      async: true
    }

    do_handle_cast({:invocation_request, invocation, []}, state)
  end

  defp do_handle_info(
         :snapshot,
         %EntityState{
           actor:
             %Actor{
               state: actor_state,
               settings: %ActorSettings{
                 snapshot_strategy: %ActorSnapshotStrategy{
                   strategy: {:timeout, %TimeoutStrategy{timeout: _timeout}} = snapshot_strategy
                 }
               }
             } = _actor
         } = state
       )
       when is_nil(actor_state) or actor_state == %{} do
    schedule_snapshot(snapshot_strategy)
    {:noreply, state, :hibernate}
  end

  defp do_handle_info(
         :snapshot,
         %EntityState{
           state_hash: old_hash,
           actor:
             %Actor{
               id: %ActorId{name: name} = _id,
               state: %ActorState{} = actor_state,
               settings: %ActorSettings{
                 snapshot_strategy: %ActorSnapshotStrategy{
                   strategy: {:timeout, %TimeoutStrategy{timeout: timeout}} = snapshot_strategy
                 }
               }
             } = _actor
         } = state
       ) do
    # Persist State only when necessary
    res =
      if StateManager.is_new?(old_hash, actor_state.state) do
        Logger.debug("Snapshotting actor #{name}")

        # Execute with timeout equals timeout strategy - 1 to avoid mailbox congestions
        case StateManager.save_async(name, actor_state, timeout - 1) do
          {:ok, _, hash} ->
            {:noreply, %{state | state_hash: hash}, :hibernate}

          {:error, _, _, hash} ->
            {:noreply, %{state | state_hash: hash}, :hibernate}

          {:error, :unsuccessfully, hash} ->
            {:noreply, %{state | state_hash: hash}, :hibernate}

          _ ->
            {:noreply, state, :hibernate}
        end
      else
        {:noreply, state, :hibernate}
      end

    schedule_snapshot(snapshot_strategy)
    res
  end

  defp do_handle_info(
         :deactivate,
         %EntityState{
           actor:
             %Actor{
               id: %ActorId{name: name} = _id,
               settings: %ActorSettings{
                 deactivation_strategy:
                   %ActorDeactivationStrategy{strategy: deactivation_strategy} =
                     _actor_deactivation_strategy
               }
             } = _actor
         } = state
       ) do
    case Process.info(self(), :message_queue_len) do
      {:message_queue_len, 0} ->
        Logger.debug("Deactivating actor #{name} for timeout")
        {:stop, :normal, state}

      _ ->
        schedule_deactivate(deactivation_strategy)
        {:noreply, state, :hibernate}
    end
  end

  defp do_handle_info(
         {:EXIT, from, {:name_conflict, {key, value}, registry, pid}},
         %EntityState{
           actor: %Actor{id: %ActorId{} = id}
         } = state
       ) do
    Logger.warning("A conflict has been detected for ActorId #{inspect(id)}. Possible NetSplit!
      Trace Data: [
        from: #{inspect(from)},
        key: #{inspect(key)},
        value: #{inspect(value)},
        registry: #{inspect(registry)},
        pid: #{inspect(pid)}
      ] ")

    {:stop, :normal, state}
  end

  defp do_handle_info(
         {:EXIT, from, reason},
         %EntityState{
           actor: %Actor{id: %ActorId{name: name} = _id}
         } = state
       ) do
    Logger.warning("Received Exit message for Actor #{name} and PID #{inspect(from)}.")

    {:stop, reason, state}
  end

  defp do_handle_info(
         message,
         %EntityState{
           actor: %Actor{id: %ActorId{name: name} = _id, state: actor_state}
         } = state
       )
       when is_nil(actor_state) do
    Logger.warning(
      "No handled internal message for actor #{name}. Message: #{inspect(message)}. Actor state: #{inspect(state)}"
    )

    {:noreply, state, :hibernate}
  end

  defp do_handle_info(
         message,
         %EntityState{
           actor: %Actor{id: %ActorId{name: name} = _id, state: %ActorState{} = actor_state}
         } = state
       ) do
    Logger.warning(
      "No handled internal message for actor #{name}. Message: #{inspect(message)}. Actor state: #{inspect(state)}"
    )

    StateManager.save(name, actor_state)
    {:noreply, state, :hibernate}
  end

  @impl true
  def terminate(action, state) do
    state = EntityState.unpack(state)

    do_terminate(action, state)
  end

  defp do_terminate(
         reason,
         %EntityState{
           actor: %Actor{
             id: %ActorId{name: name} = _id,
             state: actor_state,
             settings: %ActorSettings{persistent: persistent}
           }
         } = _state
       )
       when is_nil(actor_state) or persistent == false do
    Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
  end

  defp do_terminate(
         reason,
         %EntityState{
           actor: %Actor{
             id: %ActorId{name: name} = _id,
             settings: %ActorSettings{persistent: true},
             state: %ActorState{} = actor_state
           }
         } = _state
       ) do
    StateManager.save(name, actor_state)
    Logger.debug("Terminating actor #{name} with reason #{inspect(reason)}")
  end

  defp do_response(_request, %ActorInvocationResponse{workflow: workflow} = response, _state)
       when is_nil(workflow) or workflow == %{} do
    response
  end

  defp do_response(request, response, state) do
    do_run_workflow(request, response, state)
  end

  defp do_run_workflow(_request, %ActorInvocationResponse{workflow: workflow} = response, _state)
       when is_nil(workflow) or workflow == %{} do
    response
  end

  defp do_run_workflow(
         request,
         %ActorInvocationResponse{
           workflow: %Workflow{broadcast: broadcast, effects: effects} = _workflow
         } = response,
         _state
       ) do
    do_side_effects(effects)
    do_broadcast(broadcast)
    do_handle_routing(request, response)
  end

  defp do_handle_routing(
         _request,
         %ActorInvocationResponse{
           workflow: %Workflow{routing: routing} = _workflow
         } = response
       )
       when is_nil(routing),
       do: response

  defp do_handle_routing(
         _request,
         %ActorInvocationResponse{
           actor_system: system_name,
           value: value,
           workflow:
             %Workflow{
               routing: {:pipe, %Pipe{actor: actor_name, command_name: cmd} = _pipe} = _workflow
             } = response
         }
       ) do
    invocation = %InvocationRequest{
      # TODO check if system is really necessary
      system: %ActorSystem{name: system_name},
      actor: %Actor{id: %ActorId{name: actor_name}},
      command_name: cmd,
      value: value
    }

    try do
      case lookup(system_name, actor_name) do
        {:ok, %HostActor{opts: opts}} ->
          invoke(invocation, opts)

        _ ->
          response
      end
    catch
      error ->
        Logger.warning(
          "Error during Pipe request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
        )

        response
    end
  end

  defp do_handle_routing(
         %ActorInvocation{
           actor_system: system_name,
           value: value
         } = _request,
         %ActorInvocationResponse{
           workflow:
             %Workflow{
               routing:
                 {:forward, %Forward{actor: actor_name, command_name: cmd} = _pipe} = _workflow
             } = response
         }
       ) do
    invocation = %InvocationRequest{
      # TODO check if system is really necessary
      system: %ActorSystem{name: system_name},
      actor: %Actor{id: %ActorId{name: actor_name}},
      command_name: cmd,
      value: value
    }

    try do
      case lookup(system_name, actor_name) do
        {:ok, %HostActor{opts: opts}} ->
          invoke(invocation, opts)

        _ ->
          response
      end
    catch
      error ->
        Logger.warning(
          "Error during Forward request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
        )

        response
    end
  end

  def do_broadcast(broadcast) when is_nil(broadcast) or broadcast == %{} do
    :ok
  end

  def do_broadcast(
        %Broadcast{channel_group: channel, command_name: command, value: payload} = _broadcast
      ) do
    publish(channel, command, payload)
  end

  def do_side_effects(effects) when is_list(effects) and effects == [] do
    :ok
  end

  def do_side_effects(effects) when is_list(effects) do
    spawn(fn ->
      effects
      |> Flow.from_enumerable(min_demand: 1, max_demand: System.schedulers_online())
      |> Flow.map(fn %SideEffect{
                       request:
                         %InvocationRequest{
                           actor: %Actor{id: %ActorId{name: actor_name} = _id} = _actor,
                           system: %ActorSystem{name: system_name}
                         } = invocation
                     } ->
        try do
          case lookup(system_name, actor_name) do
            {:ok, %HostActor{opts: opts}} ->
              invoke(invocation, opts)

            _ ->
              :ok
          end
        catch
          error ->
            Logger.warning(
              "Error during Side Effect request to Actor #{system_name}:#{actor_name}. Error: #{inspect(error)}"
            )

            :ok
        end
      end)
      |> Flow.run()
    end)
  catch
    error ->
      Logger.warning("Error during Side Effect request. Error: #{inspect(error)}")
      :ok
  end

  def start_link(%EntityState{actor: %Actor{id: %ActorId{name: name} = _id}} = state) do
    GenServer.start(__MODULE__, state,
      name: via(name),
      spawn_opt: [fullsweep_after: @fullsweep_after]
    )
  end

  @spec get_state(any) :: {:error, term()} | {:ok, term()}
  def get_state(ref) when is_pid(ref) do
    GenServer.call(ref, :get_state, 20_000)
  end

  def get_state(ref) do
    GenServer.call(via(ref), :get_state, 20_000)
  end

  @spec invoke(any, any, any) :: any
  def invoke(ref, request, opts) when is_pid(ref) do
    GenServer.call(ref, {:invocation_request, request, opts}, 30_000)
  end

  def invoke(ref, request, opts) do
    GenServer.call(via(ref), {:invocation_request, request, opts}, 30_000)
  end

  @spec invoke_async(any, any, any) :: :ok
  def invoke_async(ref, request, opts) when is_pid(ref) do
    GenServer.cast(ref, {:invocation_request, request, opts})
  end

  def invoke_async(ref, request, opts) do
    GenServer.cast(via(ref), {:invocation_request, request, opts})
  end

  defp handle_metadata(_actor, metadata) when is_nil(metadata) or metadata == %{} do
    :ok
  end

  defp handle_metadata(actor, %Metadata{channel_group: channel, tags: _tags} = _metadata) do
    :ok = subscribe(actor, channel)
    :ok
  end

  defp publish(channel, command, payload) do
    PubSub.broadcast(
      :actor_channel,
      channel,
      {:receive, command, payload}
    )
  end

  defp subscribe(_actor, channel) when is_nil(channel), do: :ok

  defp subscribe(actor, channel) do
    Logger.debug("Actor [#{actor}] is subscribing to channel [#{channel}]")
    PubSub.subscribe(:actor_channel, channel)
  end

  defp get_interface(system_name, actor_name, opts),
    do:
      Keyword.get(
        opts,
        :host_interface,
        get_interface_by_actor_or_default(system_name, actor_name)
      )

  defp get_interface_by_actor_or_default(system_name, actor_name) do
    case lookup(system_name, actor_name) do
      {:ok, %HostActor{opts: opts}} ->
        Keyword.get(opts, :host_interface, @default_host_interface)

      _ ->
        @default_host_interface
    end
  end

  defp get_timeout_factor(factor_range) when is_number(factor_range),
    do: Enum.random([factor_range])

  defp get_timeout_factor(factor_range) when is_list(factor_range), do: Enum.random(factor_range)

  defp get_timeout_factor(factor_range), do: Enum.random(factor_range)

  defp schedule_snapshot_advance(timeout),
    do:
      Process.send_after(
        self(),
        :snapshot,
        timeout
      )

  defp schedule_snapshot(snapshot_strategy, timeout_factor \\ 0),
    do:
      Process.send_after(
        self(),
        :snapshot,
        get_snapshot_interval(snapshot_strategy, timeout_factor)
      )

  defp schedule_deactivate(deactivation_strategy, timeout_factor \\ 0),
    do:
      Process.send_after(
        self(),
        :deactivate,
        get_deactivate_interval(deactivation_strategy, timeout_factor)
      )

  defp get_snapshot_interval(timeout_strategy, timeout_factor \\ 0)

  defp get_snapshot_interval(
         {:timeout, %TimeoutStrategy{timeout: timeout}} = _timeout_strategy,
         timeout_factor
       )
       when is_nil(timeout),
       do: @default_snapshot_timeout + timeout_factor

  defp get_snapshot_interval(
         {:timeout, %TimeoutStrategy{timeout: timeout}} = _timeout_strategy,
         timeout_factor
       ),
       do: timeout + timeout_factor

  defp get_deactivate_interval(timeout_strategy, timeout_factor \\ 0)

  defp get_deactivate_interval(
         {:timeout, %TimeoutStrategy{timeout: timeout}} = _timeout_strategy,
         timeout_factor
       )
       when is_nil(timeout),
       do: @default_deactivate_timeout + timeout_factor

  defp get_deactivate_interval(
         {:timeout, %TimeoutStrategy{timeout: timeout}} = _timeout_strategy,
         timeout_factor
       ),
       do: timeout + timeout_factor

  defp handle_timers(timers) when is_list(timers) do
    if length(timers) > 0 do
      timers
      |> Stream.map(fn %FixedTimerCommand{seconds: delay} = timer_command ->
        Process.send_after(self(), {:invoke_timer_command, timer_command}, delay)
      end)
      |> Stream.run()
    end

    :ok
  catch
    error -> Logger.error("Error on handle timers #{inspect(error)}")
  end

  defp handle_timers(nil), do: :ok

  defp handle_timers([]), do: :ok

  defp parse_packed_response(response) do
    case response do
      {:reply, response, state} -> {:reply, response, EntityState.pack(state)}
      {:reply, response, state, opts} -> {:reply, response, EntityState.pack(state), opts}
      {:stop, reason, state, opts} -> {:stop, reason, EntityState.pack(state), opts}
      {:stop, reason, state} -> {:stop, reason, EntityState.pack(state)}
      {:noreply, state} -> {:noreply, EntityState.pack(state)}
      {:noreply, state, opts} -> {:noreply, EntityState.pack(state), opts}
      {:ok, state} -> {:ok, EntityState.pack(state)}
      {:ok, state, opts} -> {:ok, EntityState.pack(state), opts}
    end
  end

  defp via(name) do
    {:via, Horde.Registry, {Spawn.Cluster.Node.Registry, {__MODULE__, name}}}
  end
end