core/async_job/async_job.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule AntikytheraCore.AsyncJob do
  alias Croma.Result, as: R
  alias Antikythera.{Time, Cron, GearName, Context, MilliSecondsSinceEpoch}
  alias Antikythera.AsyncJob.{Id, Schedule, MaxDuration, Attempts, RetryInterval}
  alias Antikythera.ExecutorPool.Id, as: EPoolId
  alias AntikytheraCore.ExecutorPool.Id, as: CoreEPoolId
  alias AntikytheraCore.ExecutorPool.RegisteredName, as: RegName
  alias AntikytheraCore.ExecutorPool.AsyncJobRunner
  alias AntikytheraCore.AsyncJob.Queue

  @max_start_time_from_now 50 * 24 * 60 * 60_000
  # just for documentation
  def max_start_time_from_now(), do: @max_start_time_from_now

  use Croma.Struct,
    recursive_new?: true,
    fields: [
      gear_name: GearName,
      module: Croma.Atom,
      schedule: Schedule,
      max_duration: MaxDuration,
      attempts: Attempts,
      remaining_attempts: Attempts,
      retry_interval: RetryInterval,
      # opaque data given and used by gear
      payload: Croma.TypeGen.union([Croma.Map, Croma.Binary])
    ]

  @typep option :: Antikythera.AsyncJob.option()

  defun register(
          gear_name :: v[GearName.t()],
          module :: v[module],
          payload :: v[map],
          context_or_epool_id :: v[EPoolId.t() | Context.t()],
          options :: v[[option]]
        ) :: R.t(Id.t()) do
    now_millis = System.system_time(:millisecond)

    R.m do
      epool_id <- find_executor_pool(gear_name, context_or_epool_id)
      job_id <- extract_job_id(options)
      bypass? <- bypass_job_queue?(options)
      {schedule, start_time_millis} <- validate_schedule(now_millis, options)
      job <- make_job(gear_name, module, payload, schedule, bypass?, options)
      do_register(epool_id, job_id, job, bypass?, start_time_millis, now_millis)
    end
  end

  defunp find_executor_pool(
           gear_name :: v[GearName.t()],
           context_or_epool_id :: v[EPoolId.t() | Context.t()]
         ) :: R.t(EPoolId.t()) do
    case context_or_epool_id do
      %Context{executor_pool_id: id} -> {:ok, id}
      epool_id -> CoreEPoolId.validate_association(epool_id, gear_name)
    end
  end

  defunp extract_job_id(options :: v[[option]]) :: R.t(Id.t()) do
    case options[:id] do
      nil -> {:ok, Id.generate()}
      id -> R.wrap_if_valid(id, Id)
    end
  end

  defunp bypass_job_queue?(options :: v[[option]]) :: R.t(boolean) do
    case options[:bypass_job_queue] do
      true ->
        [:schedule, :attempts, :retry_interval]
        |> Enum.map(fn disallowed ->
          if Keyword.has_key?(options, disallowed) do
            {:error, {:invalid_key_combination, :bypass_job_queue, disallowed}}
          else
            {:ok, nil}
          end
        end)
        |> R.sequence()
        |> R.map(fn _ -> true end)

      _ ->
        {:ok, false}
    end
  end

  defunp validate_schedule(now_millis :: v[MilliSecondsSinceEpoch.t()], options :: v[[option]]) ::
           R.t({Schedule.t(), MilliSecondsSinceEpoch.t()}) do
    case options[:schedule] do
      nil -> {:ok, {{:once, Time.from_epoch_milliseconds(now_millis)}, now_millis}}
      {:once, t} -> validate_schedule_once(t, now_millis)
      {:cron, c} -> {:ok, {{:cron, c}, Cron.next_in_epoch_milliseconds(c, now_millis)}}
      _otherwise -> {:error, {:invalid_value, :schedule}}
    end
  end

  defunp validate_schedule_once(time :: v[Time.t()], now_millis :: v[MilliSecondsSinceEpoch.t()]) ::
           R.t({Schedule.t(), MilliSecondsSinceEpoch.t()}) do
    time_millis = Time.to_epoch_milliseconds(time)
    diff = time_millis - now_millis

    if 0 <= diff and diff <= @max_start_time_from_now do
      {:ok, {{:once, time}, time_millis}}
    else
      {:error, {:invalid_value, :schedule}}
    end
  end

  defunpt make_job(
            gear_name :: v[GearName.t()],
            module :: v[module],
            payload :: v[map],
            schedule :: v[Schedule.t()],
            bypass? :: v[boolean],
            options :: v[[option]]
          ) :: R.t(t) do
    attempts = Keyword.get(options, :attempts, Attempts.default())

    encoded_payload =
      if bypass?, do: payload, else: :erlang.term_to_binary(payload, [:compressed])

    new(
      gear_name: gear_name,
      module: module,
      schedule: schedule,
      max_duration: Keyword.get(options, :max_duration, MaxDuration.default()),
      attempts: attempts,
      remaining_attempts: attempts,
      retry_interval: Keyword.get(options, :retry_interval, RetryInterval.default()),
      payload: encoded_payload
    )
  end

  defunp do_register(
           epool_id :: v[EPoolId.t()],
           job_id :: v[Id.t()],
           job :: v[t],
           bypass? :: v[boolean],
           start_time_millis :: v[MilliSecondsSinceEpoch.t()],
           now_millis :: v[MilliSecondsSinceEpoch.t()]
         ) :: R.t(Id.t()) do
    if bypass? do
      run_immediately_bypassing_job_queue(epool_id, job_id, job, now_millis)
    else
      queue_name = RegName.async_job_queue(epool_id)

      case Queue.add_job(queue_name, job_id, job, start_time_millis, now_millis) do
        :ok -> {:ok, job_id}
        error -> error
      end
    end
  end

  defunp run_immediately_bypassing_job_queue(
           epool_id :: v[EPoolId.t()],
           job_id :: v[Id.t()],
           job :: v[t],
           now_millis :: v[MilliSecondsSinceEpoch.t()]
         ) :: R.t(Id.t()) do
    pool_name = RegName.async_job_runner_pool(epool_id)

    case PoolSup.checkout_nonblocking(pool_name) do
      nil ->
        {:error, :no_available_workers}

      pid ->
        AsyncJobRunner.run(pid, nil, {now_millis, job_id}, job)
        {:ok, job_id}
    end
  end

  defun compute_retry_interval(%__MODULE__{
          retry_interval: retry_interval,
          attempts: attempts,
          remaining_attempts: remaining_attempts
        }) :: non_neg_integer do
    RetryInterval.interval(retry_interval, attempts - remaining_attempts)
  end
end