Skip to main content

src/aion/child.gleam

//// Typed child-workflow handles and await wrappers.

import aion/codec.{type Codec}
import aion/error
import aion/internal/ffi
import aion/internal/pump
import gleam/json
import gleam/string

/// A typed handle for a linked child-workflow execution.
///
/// `output` and `workflow_error` are the child workflow's statically-known
/// result and error types. The handle carries the engine correlation id plus the
/// codecs required to decode the recorded child completion or failure payload
/// returned by AT/AD.
pub opaque type ChildHandle(output, workflow_error) {
  ChildHandle(
    child_id: String,
    output_codec: Codec(output),
    error_codec: Codec(workflow_error),
  )
}

/// Start a linked child workflow and return its typed handle.
///
/// The `workflow_fn` is accepted as a type anchor for the child workflow's
/// `fn(input) -> Result(output, workflow_error)` contract. The SDK does not call
/// it here; lifecycle, linking, recording, and replay/no-respawn behavior are
/// owned by AT/AD behind the FFI boundary.
pub fn spawn(
  name: String,
  workflow_fn: fn(input) -> Result(output, workflow_error),
  input: input,
  input_codec: Codec(input),
  output_codec: Codec(output),
  error_codec: Codec(workflow_error),
) -> Result(ChildHandle(output, workflow_error), error.EngineError) {
  let _workflow_fn = workflow_fn
  let encoded_input = input_codec.encode(input)

  case ffi.spawn_child(name, encoded_input, spawn_config()) {
    Ok(raw_child_id) ->
      Ok(ChildHandle(
        child_id: raw_child_id,
        output_codec: output_codec,
        error_codec: error_codec,
      ))
    Error(raw_error) -> Error(error.EngineFailure(message: raw_error))
  }
}

/// Await a child workflow's recorded completion or failure.
///
/// AT/AD own blocking, replay resolution, and event recording. This wrapper
/// decodes the raw recorded envelope with the codecs carried on the handle and
/// returns decode/engine failures as typed data.
///
/// The await is a yield point: pending workflow queries are serviced by the
/// query pump before the child terminal resolves, exactly as activity awaits,
/// signal receives, and timers do. Without the pump, a query arriving while
/// the workflow is parked here would surface its sentinel as a bogus child
/// failure and leave the engine refusing every later await in the run.
pub fn await(
  handle: ChildHandle(output, workflow_error),
) -> Result(output, error.ChildError(workflow_error)) {
  // The engine reserves `{error, _}` from `await_child` for engine faults
  // (`await_child:`-prefixed messages) and the `with_timeout` scope-expiry
  // sentinel that the enclosing scope consumes. Child failure — including
  // engine-side cancellation/timeout terminals — arrives as `{ok, "error:"}`
  // data and is decoded by `decode_child_result` below.
  //
  // The child id is precomputed so the pump thunk's body is exactly one
  // shielded FFI call on a captured value — the re-execution-safety contract
  // for suspending awaits (see `aion/internal/pump`).
  let awaited_child_id = child_id(handle)
  case pump.run(fn() { pump.shield(ffi.await_child(awaited_child_id)) }) {
    Ok(raw_result) -> decode_child_result(raw_result, handle)
    Error(raw_error) -> Error(error.ChildEngineFailure(message: raw_error))
  }
}

/// Start a linked child workflow and await its recorded result.
///
/// This is the spawn-then-await convenience kept in the child logic module so
/// `aion/workflow` can remain a forwarding authoring surface.
pub fn spawn_and_wait(
  name: String,
  workflow_fn: fn(input) -> Result(output, workflow_error),
  input: input,
  input_codec: Codec(input),
  output_codec: Codec(output),
  error_codec: Codec(workflow_error),
) -> Result(output, error.ChildError(workflow_error)) {
  case spawn(name, workflow_fn, input, input_codec, output_codec, error_codec) {
    Ok(handle) -> await(handle)
    Error(error.EngineFailure(message: message)) ->
      Error(error.ChildEngineFailure(message: message))
  }
}

/// Return the engine child/correlation id carried by this handle.
pub fn child_id(handle: ChildHandle(output, workflow_error)) -> String {
  handle.child_id
}

/// Return the output codec carried by this child handle.
pub fn output_codec(
  handle: ChildHandle(output, workflow_error),
) -> Codec(output) {
  handle.output_codec
}

/// Return the workflow-error codec carried by this child handle.
pub fn error_codec(
  handle: ChildHandle(output, workflow_error),
) -> Codec(workflow_error) {
  handle.error_codec
}

fn decode_child_result(
  raw_result: String,
  handle: ChildHandle(output, workflow_error),
) -> Result(output, error.ChildError(workflow_error)) {
  case string.starts_with(raw_result, "ok:") {
    True -> decode_output(string.drop_start(raw_result, 3), handle)
    False ->
      case string.starts_with(raw_result, "error:") {
        True -> decode_error_payload(string.drop_start(raw_result, 6), handle)
        False -> Error(error.ChildEngineFailure(message: raw_result))
      }
  }
}

fn decode_output(
  payload: String,
  handle: ChildHandle(output, workflow_error),
) -> Result(output, error.ChildError(workflow_error)) {
  let codec = output_codec(handle)
  case codec.decode(payload) {
    Ok(output) -> Ok(output)
    Error(decode_error) -> Error(error.ChildOutputDecodeFailed(decode_error))
  }
}

fn decode_error_payload(
  payload: String,
  handle: ChildHandle(output, workflow_error),
) -> Result(output, error.ChildError(workflow_error)) {
  let codec = error_codec(handle)
  case codec.decode(payload) {
    Ok(workflow_error) -> Error(error.ChildWorkflowFailed(workflow_error))
    Error(decode_error) -> Error(error.ChildErrorDecodeFailed(decode_error))
  }
}

fn spawn_config() -> String {
  json.object([]) |> json.to_string
}