core/async_job/queue.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule AntikytheraCore.AsyncJob.Queue do
  @moduledoc """
  A queue-like data structure, replicated using `RaftedValue`, to store `AntikytheraCore.AsyncJob` structs.
  """

  alias Croma.Result, as: R
  alias RaftedValue.Data, as: RVData
  alias Antikythera.{Time, Cron, MilliSecondsSinceEpoch}
  alias Antikythera.AsyncJob.{Id, MaxDuration, StateLabel, Status}
  alias AntikytheraCore.AsyncJob
  alias AntikytheraCore.AsyncJob.{RateLimit, RaftedValueConfigMaker}
  alias AntikytheraCore.ExecutorPool.AsyncJobBroker, as: Broker

  @max_jobs 1000
  # just for documentation
  def max_jobs(), do: @max_jobs

  defmodule JobsMap do
    defmodule Triplet do
      use Croma.SubtypeOfTuple, elem_modules: [AsyncJob, MilliSecondsSinceEpoch, StateLabel]
    end

    use Croma.SubtypeOfMap, key_module: Id, value_module: Triplet
  end

  defmodule JobKey do
    # make unique sort keys by time(milliseconds) and job ID
    use Croma.SubtypeOfTuple, elem_modules: [MilliSecondsSinceEpoch, Id]
  end

  defmodule SecondaryIndex do
    @type t :: :gb_sets.set(JobKey.t())

    defun valid?(s :: any) :: boolean, do: :gb_sets.is_set(s)
  end

  defmodule IdJobPair do
    use Croma.SubtypeOfTuple, elem_modules: [Id, AsyncJob]
  end

  use Croma.Struct,
    recursive_new?: true,
    fields: [
      jobs: JobsMap,
      index_waiting: SecondaryIndex,
      index_runnable: SecondaryIndex,
      index_running: SecondaryIndex,
      brokers_waiting: Croma.TypeGen.list_of(Croma.Pid),
      # to propagate information to leader hook
      brokers_to_notify: Croma.TypeGen.list_of(Croma.Pid),
      # to propagate information to leader hook
      abandoned_jobs: Croma.TypeGen.list_of(IdJobPair)
    ]

  @behaviour RVData

  @impl true
  defun new() :: t do
    set = :gb_sets.empty()

    %__MODULE__{
      jobs: %{},
      index_waiting: set,
      index_runnable: set,
      index_running: set,
      brokers_waiting: [],
      brokers_to_notify: [],
      abandoned_jobs: []
    }
  end

  @impl true
  defun command(q1 :: v[t], cmd :: RVData.command_arg()) :: {RVData.command_ret(), t} do
    case cmd do
      {{:add, job_key, job}, now_millis} ->
        insert(q1, job_key, job) |> maintain_invariants_and_return(now_millis)

      {{:fetch, broker}, now_millis} ->
        fetch(q1, broker, now_millis) |> maintain_invariants_and_return(now_millis)

      {{:remove_locked, job_key}, now_millis} ->
        {:ok, remove_locked(q1, job_key, now_millis)}
        |> maintain_invariants_and_return(now_millis)

      {{:unlock_for_retry, job_key}, now_millis} ->
        {:ok, unlock_for_retry(q1, job_key, now_millis)}
        |> maintain_invariants_and_return(now_millis)

      {{:remove_broker_from_waiting_list, pid}, now_millis} ->
        {:ok, remove_broker(q1, pid)} |> maintain_invariants_and_return(now_millis)

      {{:cancel, job_id}, now_millis} ->
        cancel_job(q1, job_id) |> maintain_invariants_and_return(now_millis)

      {:get_metrics, now_millis} ->
        {metrics(q1), q1} |> maintain_invariants_and_return(now_millis)

      # failsafe: not to crash on unexpected command
      _ ->
        {:ok, q1}
    end
  end

  defp maintain_invariants_and_return({ret, q}, now_millis) do
    {ret, maintain_invariants(q, now_millis)}
  end

  defp maintain_invariants(q, now_millis) do
    q
    |> release_locks_of_jobs_running_too_long(now_millis)
    |> move_now_runnable_jobs(now_millis)
  end

  defunp release_locks_of_jobs_running_too_long(q :: v[t], now_millis :: v[pos_integer]) :: t do
    %__MODULE__{q | abandoned_jobs: []}
    |> move_jobs_running_too_long(now_millis)
  end

  defp move_jobs_running_too_long(
         %__MODULE__{
           jobs: jobs,
           index_waiting: index_waiting,
           index_running: index_running,
           abandoned_jobs: abandoned_jobs
         } = q,
         now_millis
       ) do
    threshold_time = now_millis - MaxDuration.max()

    case take_smallest_with_earlier_timestamp(index_running, threshold_time) do
      nil ->
        q

      {{_, job_id} = job_key, index_running2} ->
        {j1, t, :running} = Map.fetch!(jobs, job_id)

        case j1.remaining_attempts do
          1 ->
            jobs2 = Map.delete(jobs, job_id)
            abandoned_jobs2 = [{job_id, j1} | abandoned_jobs]

            %__MODULE__{
              q
              | jobs: jobs2,
                index_running: index_running2,
                abandoned_jobs: abandoned_jobs2
            }
            |> requeue_if_recurring(j1, job_id, now_millis)

          remaining ->
            j2 = %AsyncJob{j1 | remaining_attempts: remaining - 1}
            jobs2 = Map.put(jobs, job_id, {j2, t, :waiting})
            index_waiting2 = :gb_sets.add(job_key, index_waiting)

            %__MODULE__{
              q
              | jobs: jobs2,
                index_waiting: index_waiting2,
                index_running: index_running2
            }
        end
        |> move_jobs_running_too_long(threshold_time)
    end
  end

  defp take_smallest_with_earlier_timestamp(set, time) do
    case safe_take_smallest(set) do
      {{t, _}, _} = tuple when t <= time -> tuple
      _ -> nil
    end
  end

  defp safe_take_smallest(set) do
    case :gb_sets.is_empty(set) do
      true -> nil
      false -> :gb_sets.take_smallest(set)
    end
  end

  defunp move_now_runnable_jobs(q :: v[t], now_millis :: v[pos_integer]) :: t do
    %__MODULE__{q | brokers_to_notify: []}
    |> move_now_runnable_jobs_impl(now_millis)
  end

  defp move_now_runnable_jobs_impl(
         %__MODULE__{
           jobs: jobs,
           index_waiting: index_waiting,
           index_runnable: index_runnable,
           brokers_waiting: brokers_waiting,
           brokers_to_notify: brokers_to_notify
         } = q,
         now_millis
       ) do
    case take_smallest_with_earlier_timestamp(index_waiting, now_millis) do
      nil ->
        q

      {{_, job_id} = job_key, index_waiting2} ->
        jobs2 = Map.update!(jobs, job_id, fn {j, t, :waiting} -> {j, t, :runnable} end)
        index_runnable2 = :gb_sets.add(job_key, index_runnable)

        q2 = %__MODULE__{
          q
          | jobs: jobs2,
            index_waiting: index_waiting2,
            index_runnable: index_runnable2
        }

        case brokers_waiting do
          [] ->
            q2

          [b | bs] ->
            %__MODULE__{q2 | brokers_waiting: bs, brokers_to_notify: [b | brokers_to_notify]}
        end
        |> move_now_runnable_jobs_impl(now_millis)
    end
  end

  defp insert(
         %__MODULE__{jobs: jobs, index_waiting: index_waiting} = q,
         {start_time, job_id} = job_key,
         job
       ) do
    if map_size(jobs) < @max_jobs do
      if Map.has_key?(jobs, job_id) do
        {{:error, :existing_id}, q}
      else
        index_waiting2 = :gb_sets.add(job_key, index_waiting)
        jobs2 = Map.put(jobs, job_id, {job, start_time, :waiting})
        {:ok, %__MODULE__{q | jobs: jobs2, index_waiting: index_waiting2}}
      end
    else
      {{:error, :full}, q}
    end
  end

  defp fetch(
         %__MODULE__{
           index_waiting: index_waiting,
           index_runnable: index_runnable,
           brokers_waiting: bs_waiting
         } = q,
         broker,
         now_millis
       ) do
    # to avoid duplication, first remove the fetching broker's pid
    bs_waiting2 = remove_brokers_by_node(bs_waiting, broker)

    case take_smallest_with_earlier_timestamp(index_runnable, now_millis) do
      nil ->
        case take_smallest_with_earlier_timestamp(index_waiting, now_millis) do
          nil ->
            {nil, %__MODULE__{q | brokers_waiting: [broker | bs_waiting2]}}

          {{_, job_id}, index_waiting2} ->
            %__MODULE__{q | index_waiting: index_waiting2, brokers_waiting: bs_waiting2}
            |> lock_and_return_job(job_id, now_millis)
        end

      {{_, job_id}, index_runnable2} ->
        %__MODULE__{q | index_runnable: index_runnable2, brokers_waiting: bs_waiting2}
        |> lock_and_return_job(job_id, now_millis)
    end
  end

  defp remove_brokers_by_node(bs, target_broker) do
    # From `bs` remove (if any) both
    # - `target_broker` itself
    # - stale broker pid before restart (since exactly 1 broker exists per node, pid with the same node must already be dead)
    n = node(target_broker)
    Enum.reject(bs, fn b -> node(b) == n end)
  end

  defp lock_and_return_job(
         %__MODULE__{jobs: jobs, index_running: index_running} = q,
         job_id,
         now_millis
       ) do
    locked_job_key = {now_millis, job_id}
    index_running2 = :gb_sets.add(locked_job_key, index_running)

    {job, jobs2} =
      Map.get_and_update!(jobs, job_id, fn {j, _, _} -> {j, {j, now_millis, :running}} end)

    {{locked_job_key, job}, %__MODULE__{q | jobs: jobs2, index_running: index_running2}}
  end

  defp remove_locked(
         %__MODULE__{jobs: jobs, index_running: index_running} = q,
         {_, job_id} = job_key,
         now_millis
       ) do
    if :gb_sets.is_member(job_key, index_running) do
      {{j, _, :running}, jobs2} = Map.pop(jobs, job_id)

      %__MODULE__{q | jobs: jobs2, index_running: :gb_sets.delete(job_key, index_running)}
      |> requeue_if_recurring(j, job_id, now_millis)
    else
      q
    end
  end

  defp requeue_if_recurring(
         %__MODULE__{jobs: jobs, index_waiting: index_waiting} = q,
         j,
         job_id,
         now_millis
       ) do
    case j.schedule do
      {:once, _} ->
        q

      {:cron, cron} ->
        # reset `remaining_attempts`
        j2 = %AsyncJob{j | remaining_attempts: j.attempts}
        next_time = Cron.next_in_epoch_milliseconds(cron, now_millis)
        jobs2 = Map.put(jobs, job_id, {j2, next_time, :waiting})
        index_waiting2 = :gb_sets.add({next_time, job_id}, index_waiting)
        %__MODULE__{q | jobs: jobs2, index_waiting: index_waiting2}
    end
  end

  defp unlock_for_retry(
         %__MODULE__{jobs: jobs, index_waiting: index_waiting, index_running: index_running} = q,
         {_, job_id} = job_key,
         now_millis
       ) do
    if :gb_sets.is_member(job_key, index_running) do
      {job, _, :running} = Map.fetch!(jobs, job_id)
      next_start = now_millis + AsyncJob.compute_retry_interval(job)
      new_job = %AsyncJob{job | remaining_attempts: job.remaining_attempts - 1}
      index_running2 = :gb_sets.delete(job_key, index_running)
      index_waiting2 = :gb_sets.add({next_start, job_id}, index_waiting)
      jobs2 = Map.put(jobs, job_id, {new_job, next_start, :waiting})
      %__MODULE__{q | jobs: jobs2, index_waiting: index_waiting2, index_running: index_running2}
    else
      q
    end
  end

  defp remove_broker(%__MODULE__{brokers_waiting: brokers} = q, pid) do
    %__MODULE__{q | brokers_waiting: remove_brokers_by_node(brokers, pid)}
  end

  defp cancel_job(
         %__MODULE__{
           jobs: jobs,
           index_waiting: index_waiting,
           index_runnable: index_runnable,
           index_running: index_running
         } = q,
         job_id
       ) do
    case Map.pop(jobs, job_id) do
      {nil, _} ->
        {{:error, :not_found}, q}

      {{_, t, state}, new_jobs} ->
        job_key = {t, job_id}

        q2 =
          case state do
            :waiting ->
              %__MODULE__{
                q
                | jobs: new_jobs,
                  index_waiting: :gb_sets.delete(job_key, index_waiting)
              }

            :runnable ->
              %__MODULE__{
                q
                | jobs: new_jobs,
                  index_runnable: :gb_sets.delete(job_key, index_runnable)
              }

            :running ->
              %__MODULE__{
                q
                | jobs: new_jobs,
                  index_running: :gb_sets.delete(job_key, index_running)
              }
          end

        {:ok, q2}
    end
  end

  defp metrics(%__MODULE__{
         index_waiting: index_waiting,
         index_runnable: index_runnable,
         index_running: index_running,
         brokers_waiting: brokers
       }) do
    {
      :gb_sets.size(index_waiting),
      :gb_sets.size(index_runnable),
      :gb_sets.size(index_running),
      length(brokers)
    }
  end

  @impl true
  defun query(q :: v[t], arg :: RVData.query_arg()) :: RVData.query_ret() do
    case arg do
      {:status, job_id} -> get_status(q, job_id)
      :list -> list_jobs(q)
      # failsafe and for testing
      _ -> q
    end
  end

  defp get_status(%__MODULE__{jobs: jobs}, job_id) do
    case jobs[job_id] do
      nil -> {:error, :not_found}
      triplet -> {:ok, triplet}
    end
  end

  defp list_jobs(%__MODULE__{
         index_waiting: waiting,
         index_runnable: runnable,
         index_running: running
       }) do
    {running, runnable, waiting}
  end

  defmodule Hook do
    alias Antikythera.ContextId
    alias AntikytheraCore.AsyncJob.Queue
    alias AntikytheraCore.GearLog.Writer
    alias AntikytheraCore.GearModule
    require AntikytheraCore.Logger, as: L

    @behaviour RaftedValue.LeaderHook

    @impl true
    def on_command_committed(_, _, _, %Queue{
          brokers_to_notify: bs,
          abandoned_jobs: abandoned_jobs
        }) do
      Enum.each(bs, &Broker.notify_job_registered/1)
      Enum.each(abandoned_jobs, &log_abandoned_job/1)
    end

    @impl true
    def on_query_answered(_, _, _), do: nil
    @impl true
    def on_follower_added(_, _), do: nil
    @impl true
    def on_follower_removed(_, _), do: nil
    @impl true
    def on_elected(_), do: nil
    @impl true
    def on_restored_from_files(_), do: nil

    defp log_abandoned_job({id, %AsyncJob{gear_name: gear_name}}) do
      message_common = "abandoned a job that has been running too long: id=#{id}"
      L.error("#{message_common} gear_name=#{gear_name}")
      logger = GearModule.logger(gear_name)

      Writer.error(
        logger,
        Time.now(),
        ContextId.system_context(),
        "<async_job> #{message_common}"
      )
    end
  end

  #
  # Public API
  #
  @default_rv_config_options Keyword.put(
                               RaftedValueConfigMaker.options(),
                               :leader_hook_module,
                               Hook
                             )

  defun make_rv_config(opts :: Keyword.t() \\ @default_rv_config_options) ::
          RaftedValue.Config.t() do
    # :leader_hook_module must not be modified
    opts = Keyword.put(opts, :leader_hook_module, Hook)
    RaftedValue.make_config(__MODULE__, opts)
  end

  defun add_job(
          queue_name :: v[atom],
          job_id :: v[Id.t()],
          job :: v[AsyncJob.t()],
          start_time_millis :: v[pos_integer],
          now_millis :: v[pos_integer]
        ) :: :ok | {:error, :full | :existing_id | {:rate_limit_reached, pos_integer}} do
    run_command_with_rate_limit_check!(
      queue_name,
      {:add, {start_time_millis, job_id}, job},
      now_millis
    )
  end

  defun cancel(queue_name :: v[atom], job_id :: v[Id.t()]) ::
          :ok | {:error, :not_found | {:rate_limit_reached, pos_integer}} do
    run_command_with_rate_limit_check!(
      queue_name,
      {:cancel, job_id},
      System.system_time(:millisecond)
    )
  end

  defp run_command_with_rate_limit_check!(queue_name, cmd, now_millis) do
    case RateLimit.check_for_command(queue_name) do
      :ok -> run_command!(queue_name, cmd, now_millis)
      {:error, millis_to_wait} -> {:error, {:rate_limit_reached, millis_to_wait}}
    end
  end

  defun fetch_job(queue_name :: v[atom]) :: nil | {JobKey.t(), AsyncJob.t()} do
    run_command!(queue_name, {:fetch, self()})
  end

  defun remove_locked_job(queue_name :: v[atom], job_key :: v[JobKey.t()]) :: :ok do
    :ok = run_command!(queue_name, {:remove_locked, job_key})
  end

  defun unlock_job_for_retry(queue_name :: v[atom], job_key :: v[JobKey.t()]) :: :ok do
    :ok = run_command!(queue_name, {:unlock_for_retry, job_key})
  end

  defun remove_broker_from_waiting_list(queue_name :: v[atom]) :: :ok do
    case run_command(queue_name, {:remove_broker_from_waiting_list, self()}) do
      {:ok, :ok} -> :ok
      # We don't care if this operation succeeds or not, as this node is being terminated anyway.
      {:error, :no_leader} -> :ok
    end
  end

  defp run_command!(queue_name, cmd, now_millis \\ System.system_time(:millisecond)) do
    run_command(queue_name, cmd, now_millis) |> R.get!()
  end

  defp run_command(queue_name, cmd, now_millis \\ System.system_time(:millisecond)) do
    RaftFleet.command(queue_name, {cmd, now_millis})
  end

  defun start_jobs_and_get_metrics(pid :: v[pid]) :: nil | tuple do
    # Note that:
    # - we call `RaftedValue.command` instead of `RaftFleet.command` in order not to send message to remote node
    # - we use command instead of query so that it can trigger some of the stored jobs
    now_millis = System.system_time(:millisecond)

    case RaftedValue.command(pid, {:get_metrics, now_millis}) do
      {:ok, metrics_data} -> metrics_data
      {:error, _} -> nil
    end
  end

  defun status(queue_name :: v[atom], job_id :: v[Id.t()]) :: R.t(Status.t()) do
    {:ok, result} =
      RateLimit.check_with_retry_for_query(queue_name, fn ->
        RaftFleet.query(queue_name, {:status, job_id})
      end)

    R.map(result, fn {job, start_time_millis, state} ->
      job_to_status(job, job_id, start_time_millis, state)
    end)
  end

  defp job_to_status(
         %AsyncJob{
           gear_name: gear_name,
           module: module,
           payload: payload,
           schedule: schedule,
           max_duration: max_duration,
           attempts: attempts,
           remaining_attempts: remaining_attempts,
           retry_interval: retry_interval
         },
         job_id,
         start_time_millis,
         state
       ) do
    %Status{
      id: job_id,
      start_time: Time.from_epoch_milliseconds(start_time_millis),
      state: state,
      gear_name: gear_name,
      module: module,
      payload: if(is_binary(payload), do: :erlang.binary_to_term(payload), else: payload),
      schedule: schedule,
      max_duration: max_duration,
      attempts: attempts,
      remaining_attempts: remaining_attempts,
      retry_interval: retry_interval
    }
  end

  defun list(queue_name :: v[atom]) :: [{Time.t(), Id.t(), StateLabel.t()}] do
    # data conversions should be done at caller side (raft leader should not do CPU-intensive works)
    {:ok, {running, runnable, waiting}} =
      RateLimit.check_with_retry_for_query(queue_name, fn ->
        RaftFleet.query(queue_name, :list)
      end)

    [
      {:gb_sets.to_list(running), :running},
      {:gb_sets.to_list(runnable), :runnable},
      {:gb_sets.to_list(waiting), :waiting}
    ]
    |> Enum.flat_map(fn {list, state} ->
      Enum.map(list, fn {millis, id} -> {Time.from_epoch_milliseconds(millis), id, state} end)
    end)
  end
end