lib/actors/actor/invocation_scheduler.ex

defmodule Actors.Actor.InvocationScheduler do
  @moduledoc """
  `InvocationScheduler` is a global process for the cluster that controls
  all Actions of type Schedule.
  This process is global to allow that even after restarts of a process or restart
  of an application we will still be able to perform invocations to actors,
  without the need for persistent storage such as a database.
  """
  use GenServer, restart: :transient
  use Retry

  require Logger

  alias Spawn.Cluster.StateHandoff.InvocationSchedulerState
  alias Eigr.Functions.Protocol.InvocationRequest

  @hibernate_delay 20_000
  @hibernate_jitter 30_000

  @impl true
  def init(_arg) do
    Process.flag(:trap_exit, true)
    Process.flag(:message_queue_data, :off_heap)
    {:ok, %{}, {:continue, :init_invocations}}
  end

  @impl true
  def handle_continue(:init_invocations, state) do
    schedule_hibernate()

    InvocationSchedulerState.all()
    |> Enum.each(&call_invoke/1)

    {:noreply, state}
  end

  @impl true
  def terminate(reason, _state) do
    Logger.debug("InvocationScheduler down with reason (#{inspect(reason)})")
  end

  @impl true
  def handle_info({:invoke, decoded_request, scheduled_to, repeat_in}, state) do
    if is_nil(repeat_in) do
      InvocationSchedulerState.remove(InvocationRequest.encode(decoded_request))
    else
      scheduled_to = DateTime.add(scheduled_to, repeat_in, :millisecond)

      GenServer.cast(self(), {:schedule, decoded_request, scheduled_to, repeat_in})
    end

    request_to_invoke = %InvocationRequest{decoded_request | scheduled_to: 0, async: true}
    Actors.invoke(request_to_invoke)

    {:noreply, state}
  end

  def handle_info(:hibernate, state) do
    schedule_hibernate()
    {:noreply, state, :hibernate}
  end

  @impl true
  def handle_cast({:schedule, request, scheduled_to, repeat_in}, state) do
    encoded_request =
      InvocationRequest.encode(%InvocationRequest{request | scheduled_to: 0, async: true})

    InvocationSchedulerState.put(encoded_request, scheduled_to, repeat_in)

    call_invoke({request, {scheduled_to, repeat_in}})

    {:noreply, state}
  end

  @impl true
  def handle_cast({:schedule_fixed, requests}, state) do
    requests =
      Enum.reduce(requests, [], fn {request, scheduled_to, repeat_in}, acc ->
        encoded_request = InvocationRequest.encode(request)

        if is_nil(InvocationSchedulerState.get(encoded_request)) do
          call_invoke({request, {scheduled_to, repeat_in}})

          acc ++ [{encoded_request, scheduled_to, repeat_in}]
        else
          acc
        end
      end)

    InvocationSchedulerState.put_many(requests)

    {:noreply, state}
  end

  defp call_invoke({encoded_request, {scheduled_to, repeat_in}})
       when is_binary(encoded_request) do
    decoded = encoded_request |> InvocationRequest.decode()

    call_invoke({decoded, {scheduled_to, repeat_in}})
  end

  defp call_invoke({%InvocationRequest{} = decoded_request, {scheduled_to, repeat_in}}) do
    scheduled_to = get_scheduled_to_datetime(scheduled_to)
    delay_in_ms = DateTime.diff(scheduled_to, DateTime.utc_now(), :millisecond)

    if delay_in_ms <= 0 do
      Logger.warning(
        "Received negative delayed invocation request (#{delay_in_ms}), invoking now"
      )

      Process.send(self(), {:invoke, decoded_request, scheduled_to, repeat_in}, [])
    else
      Process.send_after(self(), {:invoke, decoded_request, scheduled_to, repeat_in}, delay_in_ms)
    end
  end

  defp schedule_hibernate() do
    Process.send_after(self(), :hibernate, next_hibernate_delay())
  end

  defp get_scheduled_to_datetime(scheduled_to) when is_number(scheduled_to) do
    scheduled_to
    |> DateTime.from_unix!(:millisecond)
  end

  defp get_scheduled_to_datetime(scheduled_to), do: scheduled_to

  def next_hibernate_delay(), do: @hibernate_delay + :rand.uniform(@hibernate_jitter)

  # Client

  def schedule_fixed_invocations(requests) do
    GenServer.cast({:global, __MODULE__}, {:schedule_fixed, requests})
  end

  def schedule_invoke(%InvocationRequest{} = invocation_request, repeat_in \\ nil) do
    GenServer.cast(
      {:global, __MODULE__},
      {:schedule, invocation_request, invocation_request.scheduled_to, repeat_in}
    )
  end

  def child_spec do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, []},
      restart: :transient
    }
  end

  def start_link() do
    GenServer.start_link(__MODULE__, [], name: {:global, __MODULE__})
  end
end