Skip to main content

src/aion/workflow/run.gleam

//// workflow.run (recorded activity dispatch) + now + random (determinism bindings)

import aion/activity.{type Activity}
import aion/duration
import aion/error
import aion/internal/ffi
import aion/internal/pump
import gleam/float
import gleam/int
import gleam/json
import gleam/option.{None, Some}
import gleam/string

/// A timestamp supplied by AD's determinism context.
///
/// The inner value is the canonical millisecond timestamp string returned by the
/// engine. Workflow code should treat it as recorded deterministic data, not as
/// a wall-clock reading.
pub opaque type Timestamp {
  Timestamp(milliseconds: Int)
}

/// Return the canonical millisecond representation of a deterministic timestamp.
pub fn timestamp_to_milliseconds(timestamp: Timestamp) -> Int {
  timestamp.milliseconds
}

/// Dispatch an activity through the single recorded side-effect boundary.
///
/// Plain Gleam workflow code is re-run on replay. The only recorded
/// side-effectful path exposed by this SDK is `run` (and later concurrency
/// combinators over activities); there is intentionally no generic
/// `side_effect(fn)` escape hatch. The activity input is encoded with the
/// activity's input `Codec`, the engine dispatches and records via AD, and the
/// returned payload is decoded with the output `Codec`.
pub fn run(activity_value: Activity(i, o)) -> Result(o, error.ActivityError) {
  let input_codec = activity.input_codec(activity_value)
  let output_codec = activity.output_codec(activity_value)
  let encoded_input = input_codec.encode(activity.input(activity_value))

  case
    ffi.dispatch_activity(
      activity.name(activity_value),
      encoded_input,
      activity_config(activity_value),
    )
  {
    Ok(correlation_id) -> {
      // The await is a yield point: pending workflow queries are serviced
      // by the query pump before the activity result resolves.
      case
        pump.run(fn() { pump.shield(ffi.await_activity_result(correlation_id)) })
      {
        Ok(payload) -> {
          case output_codec.decode(payload) {
            Ok(output) -> Ok(output)
            Error(decode_error) ->
              Error(error.ActivityDecodeFailed(decode_error))
          }
        }
        Error(raw_error) -> Error(activity_error(raw_error))
      }
    }
    Error(raw_error) -> Error(activity_error(raw_error))
  }
}

/// Return AD's recorded deterministic timestamp.
///
/// This is the only time source exposed to workflow code. Workflow authors must
/// not call wall-clock APIs such as Gleam/Erlang clocks from workflow logic, as
/// ambient time would desynchronise replay.
pub fn now() -> Result(Timestamp, error.EngineError) {
  case ffi.now() {
    Ok(raw_timestamp) -> parse_timestamp(raw_timestamp)
    Error(raw_error) -> Error(error.EngineFailure(message: raw_error))
  }
}

/// Draw a deterministic floating-point value from AD's seeded RNG.
///
/// The engine keys the RNG seed on WorkflowId + RunId so replay observes the
/// same sequence. Workflow authors must not call ambient entropy sources.
pub fn random() -> Result(Float, error.EngineError) {
  case ffi.random() {
    Ok(raw_random) -> parse_float(raw_random, "random")
    Error(raw_error) -> Error(error.EngineFailure(message: raw_error))
  }
}

/// Draw a deterministic integer in the engine-defined inclusive range.
///
/// Values come from AD's seeded RNG through the FFI boundary; no wall-clock or
/// ambient entropy binding is exposed by the SDK.
pub fn random_int(min: Int, max: Int) -> Result(Int, error.EngineError) {
  case min > max {
    True ->
      Error(error.EngineFailure(
        message: "Invalid deterministic random_int range: min is greater than max",
      ))
    False ->
      case ffi.random_int(int.to_string(min), int.to_string(max)) {
        Ok(raw_random) -> parse_int(raw_random, "random_int")
        Error(raw_error) -> Error(error.EngineFailure(message: raw_error))
      }
  }
}

fn parse_timestamp(raw: String) -> Result(Timestamp, error.EngineError) {
  case int.parse(raw) {
    Ok(milliseconds) -> Ok(Timestamp(milliseconds: milliseconds))
    Error(_) ->
      Error(error.EngineFailure(
        message: "Invalid deterministic timestamp: " <> raw,
      ))
  }
}

fn parse_float(raw: String, label: String) -> Result(Float, error.EngineError) {
  case float.parse(raw) {
    Ok(value) -> Ok(value)
    Error(_) ->
      Error(error.EngineFailure(
        message: "Invalid deterministic " <> label <> ": " <> raw,
      ))
  }
}

fn parse_int(raw: String, label: String) -> Result(Int, error.EngineError) {
  case int.parse(raw) {
    Ok(value) -> Ok(value)
    Error(_) ->
      Error(error.EngineFailure(
        message: "Invalid deterministic " <> label <> ": " <> raw,
      ))
  }
}

fn activity_error(raw: String) -> error.ActivityError {
  case string.starts_with(raw, "retryable:") {
    True -> error.Retryable(message: string.drop_start(raw, 10), details: "")
    False ->
      case string.starts_with(raw, "terminal:") {
        True -> error.Terminal(message: string.drop_start(raw, 9), details: "")
        False ->
          case string.starts_with(raw, "timeout:") {
            True ->
              error.ActivityTimedOut(error.TimedOut(string.drop_start(raw, 8)))
            False ->
              case string.starts_with(raw, "cancelled:") {
                True ->
                  error.ActivityCancelled(
                    error.Cancelled(string.drop_start(raw, 10)),
                  )
                False ->
                  case string.starts_with(raw, "non_determinism:") {
                    True ->
                      error.ActivityNonDeterministic(
                        error.NonDeterminismViolation(string.drop_start(raw, 16)),
                      )
                    False -> error.ActivityEngineFailure(message: raw)
                  }
              }
          }
      }
  }
}

fn activity_config(activity_value: Activity(i, o)) -> String {
  json.object([
    #("retry", retry_config(activity.retry_policy(activity_value))),
    #(
      "timeout_ms",
      optional_duration(activity.timeout_duration(activity_value)),
    ),
    #(
      "heartbeat_ms",
      optional_duration(activity.heartbeat_interval(activity_value)),
    ),
  ])
  |> json.to_string
}

fn retry_config(policy) -> json.Json {
  case policy {
    None -> json.null()
    Some(activity.RetryPolicy(max_attempts: attempts, backoff: backoff)) ->
      json.object([
        #("max_attempts", json.int(attempts)),
        #("backoff", backoff_config(backoff)),
      ])
  }
}

fn backoff_config(backoff: activity.Backoff) -> json.Json {
  case backoff {
    activity.Exponential(initial: initial, multiplier: multiplier, max: max) ->
      json.object([
        #("kind", json.string("exponential")),
        #("initial_ms", duration_json(initial)),
        #("multiplier", json.float(multiplier)),
        #("max_ms", duration_json(max)),
      ])
    activity.Linear(initial: initial, increment: increment, max: max) ->
      json.object([
        #("kind", json.string("linear")),
        #("initial_ms", duration_json(initial)),
        #("increment_ms", duration_json(increment)),
        #("max_ms", duration_json(max)),
      ])
    activity.Fixed(delay: delay) ->
      json.object([
        #("kind", json.string("fixed")),
        #("delay_ms", duration_json(delay)),
      ])
  }
}

fn optional_duration(value) -> json.Json {
  case value {
    None -> json.null()
    Some(duration) -> duration_json(duration)
  }
}

fn duration_json(value: duration.Duration) -> json.Json {
  value
  |> duration.to_milliseconds
  |> json.int
}