lib/async_job/async_job.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule Antikythera.AsyncJob do
  alias Croma.Result, as: R
  alias Antikythera.{Time, Context, GearName}
  alias Antikythera.ExecutorPool.Id, as: EPoolId
  alias Antikythera.ExecutorPool.BadIdReason
  alias Antikythera.AsyncJob.Id
  alias Antikythera.AsyncJob.Schedule
  alias Antikythera.AsyncJob.MaxDuration
  alias Antikythera.AsyncJob.Attempts
  alias Antikythera.AsyncJob.RetryInterval
  alias Antikythera.AsyncJob.Metadata
  alias Antikythera.AsyncJob.Status
  alias Antikythera.AsyncJob.StateLabel
  alias AntikytheraCore.AsyncJob.{Queue, RateLimit}
  alias AntikytheraCore.ExecutorPool.RegisteredName, as: RegName
  alias AntikytheraCore.ExecutorPool.Id, as: CoreEPoolId
  alias AntikytheraCore.ExecutorPool.AsyncJobRunner

  @moduledoc """
  Antikythera's "async job" functionality allows gears to run their code in background.

  This module is the interface for gear implementations to define and register their own async jobs.

  ## Usage

  ### Preparing your async job module

  Each gear can define multiple async job modules.
  Those modules must `use` this module as in the following example:

      defmodule YourGear.SomeAsyncJob do
        use Antikythera.AsyncJob

        # run/3 callback is required
        @impl true
        def run(_payload, _metadata, _context) do
          # do something here
        end

        # abandon/3 callback is optional; you can omit it
        @impl true
        def abandon(_payload, _metadata, _context) do
          # cleanup code when all retries failed
        end

        # inspect_payload/1 callback is optional; you can omit it
        @impl true
        def inspect_payload(payload) do
          # convert a payload to a short string that can be used to identify a job in logs
        end
      end

  Implementations of `run/3` can return any value; it is simply ignored.
  If execution of `run/3` terminated abnormally (exception, timeout, etc.) the job is regarded as failed.
  Failed jobs are automatically retried up to a specified number of attempts (see below).

  `abandon/3` optional callback is called when a job is abandoned after all attempts failed.
  You can put your cleanup logic in this callback when e.g. you use external storage system to store job information.
  Note that `abandon/3` must finish within `#{div(AsyncJobRunner.abandon_callback_max_duration(), 1_000)}` seconds;
  when it takes longer, antikythera stops the execution of `abandon/3` in the middle.

  `inspect_payload/1` optional callback is solely for logging purpose.
  By providing a concrete callback implementation you can additionally include
  summary of each jobs's payload into logs.
  As an example, suppose your `payload` contains "id" field; then it's natural to define
  the following `inspect_payload/1`:

      def inspect_payload(%{"id" => id}) do
        id
      end

  Your gear can have multiple modules that `use Antikythera.AsyncJob`.

  ### Registering jobs

  With the above module defined, you can call `register/3` to invoke a new job:

      YourGear.SomeAsyncJob.register(%{"arbitrary" => "map"}, context, options)

  Here first argument is an arbitrary map that is passed to `run/3` callback implementation
  (note that structs are maps and thus usable as payloads).
  `context` is a value of `t:Antikythera.Context.t/0` and is used to obtain
  the executor pool which this job is registered to.
  When you need to register a job to an executor pool that is not the current one,
  you can pass a `t:Antikythera.ExecutorPool.Id.t/0` instead of `t:Antikythera.Context.t/0`.
  `options` must be a `t:Keyword.t/0` which can include the following values:

  - `id`: An ID of the job.
    If given it must match the regex pattern `~r/#{Id.pattern().source}/` and must be
    unique in the job queue specified by the second argument.
    If not given antikythera automatically generates one for you.
  - `schedule`: When to run the job as a 2-tuple.
    If not given it defaults to `{:once, Antikythera.Time.now()}`, i.e.,
    the job will be executed as soon as an available worker process is found in the specified executor pool.
    Allowed value format is:
      - `{:once, Antikythera.Time.t}`
          - The job is executed at the given time.
            After the job is either successfully completed or abandoned by failure(s), the job is removed from the job queue.
            The time must be a future time and within #{div(AntikytheraCore.AsyncJob.max_start_time_from_now(), 24 * 60 * 60_000)} days from now.
      - `{:cron, Antikythera.Cron.t}`
          - The job is repeatedly executed at the given cron schedule.
            After the job is either successfully completed or abandoned by failure(s),
            the job is requeued to the job queue.
            Note that next start time is computed from the time of requeuing.
            For example, if a job is scheduled on every 10 minutes ("*/10 * * * *")
            and its execution takes 15 minutes to complete,
            then the job will in effect run on every 20 minutes.
            The schedule will repeat indefinitely; when you have done with the job
            you can remove it by `cancel/3`.
  - `max_duration`: Maximum execution time (in milliseconds) of the job.
    A job which has been running for more than `max_duration` is brutally killed and
    if it has remaining attempts it will be retried.
    Defaults to `#{MaxDuration.default()}` (`#{div(MaxDuration.default(), 60_000)}` minutes).
    If explicitly given it cannot exceed `#{MaxDuration.max()}` (`#{div(MaxDuration.max(), 60_000)}` minutes).
  - `attempts`: A positive integer within `#{Attempts.min()}..#{Attempts.max()}`), up to which antikythera tries to run the job.
    Defaults to `#{Attempts.default()}`.
  - `retry_interval`: A 2-tuple of integer and float to calculate time interval between retries.
    Defaults to `#{inspect(RetryInterval.default())}`.
    First element is a time interval (in milliseconds) between 1st and 2nd attempts
    and must be within `#{RetryInterval.Factor.min()}..#{RetryInterval.Factor.max()}`.
    Second element is a multiplying factor for exponential backoff
    and must be within `#{RetryInterval.Base.min()}..#{RetryInterval.Base.max()}`.
    For example:
      - When `retry_interval: {10_000, 2.0}` is given,
          - 2nd attempt (1st retry) after failure of 1st attempt is started in `10`s,
          - 3rd attempt (2nd retry) after failure of 2nd attempt is started in `20`s,
          - 4th attempt (3rd retry) after failure of 3rd attempt is started in `40`s,
          - and so on.
      - If you want to set constant interval, specify `1.0` to second element.
  - `bypass_job_queue`: Whether or not to run the job immediately, skipping addition to the job queue.
    Defaults to `false`.
      - When this option is `true`, `register/3` tries to pick a worker process in the executor pool in the same node
        and then immediately start the job using the worker process.
        If all processes in the local executor pool are busy, `register/3` returns `{:error, :no_available_workers}`.
        In that case you have 2 options:
          - give up running the job, or
          - call `register/3` again without `bypass_job_queue` option so that the job
            will be executed afterward, possibly in another node.
      - Execution with `bypass_job_queue` option takes advantage of being free from rate limiting (see below)
        and having no overhead with pushing to the distributed job queue.
        Please note that if you bypass the job queue you cannot
          - specify start time of the job (it's started immediately and only once), and
          - the job is not retried when its execution results in a failure.
      - Therefore this option cannot be used with `schedule`, `attempts` or `retry_interval`.
        Note also that jobs running with `bypass_job_queue: true` cannot be inspected
        by `Antikythera.AsyncJob.list/1` or `Antikythera.AsyncJob.status/2`.

  `register/3` returns a tuple of `{:ok, Antikythera.AsyncJob.Id.t}` on success.
  You can register jobs up to #{Queue.max_jobs()}.
  When #{Queue.max_jobs()} jobs remain unfinished in the job queue, trying to register a new job results in `{:error, :full}`.

  The example call to `register/3` above will eventually invoke `YourGear.SomeAsyncJob.run/3`
  with the given payload and options.

  ### Rate limiting of accesses to each job queue

  Antikythera's async job queues are distributed, fault tolerant, persistent data structures.
  These features come at an extra cost: interacting with a job queue is a relatively expensive operation.
  In order not to overwhelm a job queue, accesses to a job queue are rate-limited by
  the [token bucket algorithm](https://en.wikipedia.org/wiki/Token_bucket).
  Rate limiting is imposed on a per-node basis; in each node, there exists a bucket per job queue.

  Each bucket contains up to #{RateLimit.max_tokens()} tokens and
  one token is generated in every #{RateLimit.milliseconds_per_token()} milliseconds.
  The following operations take tokens from the corresponding bucket:

  - #{RateLimit.tokens_per_command()} tokens per `register/3`
  - #{RateLimit.tokens_per_command()} tokens per `cancel/3`
  - 1 token per `status/2`
  - 1 token per `list/1`

  When there are not enough tokens in the bucket, `register/3` and `cancel/3` return
  `{:error, {:rate_limit_reached, milliseconds_to_wait}}`.
  On the other hand `status/2` and `list/1` automatically wait and retry several times
  when hitting the rate limit.

  During testing, the rate limiting feature may disrupt smooth test executions.
  To bypass rate limiting in your tests you can use `Antikythera.Test.AsyncJobHelper.reset_rate_limit_status/1`.

  ### Payload

  Payloads are registered by calling `register/3` and used in `run/3`, `abandon/3` or `inspect_payload/1`.
  Payloads are compressed as binary when `register/3` is called and kept in memory
  until job executions, so large payloads could degrade overall system performance.
  Please avoid including large data in payloads; instead put ID of the data in payload
  and fetch the whole data in run/3.
  """

  @callback run(map, Metadata.t(), Context.t()) :: any
  @callback abandon(map, Metadata.t(), Context.t()) :: any
  @callback inspect_payload(map) :: String.t()

  @type option ::
          {:id, Id.t()}
          | {:schedule, Schedule.t()}
          | {:max_duration, MaxDuration.t()}
          | {:attempts, Attempts.t()}
          | {:retry_interval, RetryInterval.t()}
          | {:bypass_job_queue, boolean}

  defmacro __using__(_) do
    gear_name = Mix.Project.config()[:app]

    quote bind_quoted: [gear_name: gear_name] do
      @gear_name gear_name

      @behaviour Antikythera.AsyncJob

      @impl true
      defun abandon(
              _payload :: map,
              _metadata :: Antikythera.AsyncJob.Metadata.t(),
              _context :: Antikythera.Context.t()
            ) :: any do
        :ok
      end

      @impl true
      defun inspect_payload(_payload :: map) :: String.t() do
        ""
      end

      defoverridable abandon: 3, inspect_payload: 1

      defun register(
              payload :: v[map],
              context_or_epool_id :: v[Antikythera.Context.t() | Antikythera.ExecutorPool.Id.t()],
              options :: v[[Antikythera.AsyncJob.option()]] \\ []
            ) :: R.t(Id.t()) do
        AntikytheraCore.AsyncJob.register(
          @gear_name,
          __MODULE__,
          payload,
          context_or_epool_id,
          options
        )
      end
    end
  end

  @doc """
  Cancels an async job registered with the job queue specified by `context_or_epool_id`.

  Note that currently-running job executions cannot be cancelled.
  However, calling `cancel/2` with currently running job's `job_id` prevents retries of the job when it fails.
  """
  defun cancel(
          gear_name :: v[GearName.t()],
          context_or_epool_id :: v[Context.t() | EPoolId.t()],
          job_id :: v[Id.t()]
        ) :: :ok | {:error, :not_found | BadIdReason.t() | {:rate_limit_reached, pos_integer}} do
    epool_id1 = exec_pool_id(context_or_epool_id)

    case CoreEPoolId.validate_association(epool_id1, gear_name) do
      {:ok, epool_id2} -> Queue.cancel(RegName.async_job_queue(epool_id2), job_id)
      error -> error
    end
  end

  @doc """
  Retrieves detailed information of an async job registered with the job queue specified by `context_or_epool_id`.
  """
  defun status(context_or_epool_id :: v[Context.t() | EPoolId.t()], job_id :: v[Id.t()]) ::
          R.t(Status.t()) do
    Queue.status(queue_name(context_or_epool_id), job_id)
  end

  @doc """
  Retrieves list of async jobs registered with the job queue specified by `context_or_epool_id`.

  Each element of returned list is a 3-tuple: scheduled execution time, job ID and current status.
  The returned list is already sorted.
  """
  defun list(context_or_epool_id :: v[Context.t() | EPoolId.t()]) :: [
          {Time.t(), Id.t(), StateLabel.t()}
        ] do
    Queue.list(queue_name(context_or_epool_id))
  end

  defunp exec_pool_id(context_or_epool_id :: v[Context.t() | EPoolId.t()]) :: EPoolId.t() do
    case context_or_epool_id do
      %Context{executor_pool_id: id} -> id
      epool_id -> epool_id
    end
  end

  defunp queue_name(context_or_epool_id :: v[Context.t() | EPoolId.t()]) :: atom do
    exec_pool_id(context_or_epool_id) |> RegName.async_job_queue()
  end
end