Skip to main content

src/webql/runner/run_step.gleam

import gleam/dynamic
import gleam/result
import webql/assembler/plan
import webql/engine
import webql/memory
import webql/runner/diagnostic
import webql/runner/run

/// Runs a step in a batch.
pub fn run(
  step: plan.Step(task),
  edges: List(plan.Edge),
  engine: engine.Engine(task, memory.Memory(storage), error),
  memory: memory.Memory(storage),
  run_plan,
) -> task {
  engine.handle_start_step(fn() {
    use inputs <- result.try(run.get_inputs(memory, step.name, edges))
    run_step(step, inputs, engine, memory, run_plan)
  })
}

// PRIVATE FUNCTIONS
// =================
fn run_step(
  step: plan.Step(task),
  inputs: dynamic.Dynamic,
  engine: engine.Engine(task, memory.Memory(storage), error),
  memory: memory.Memory(storage),
  run_plan,
) {
  use results <- result.try(case step.node {
    plan.Node(resolver:) -> Ok(resolver.resolver(inputs))

    plan.Supernode(plan:) -> run_inline(inputs, plan, engine, memory, run_plan)
  })

  Ok(
    engine.handle_finish_step(results, fn(result) {
      case result {
        Ok(outputs) -> run.add_outputs(memory, step.name, outputs)

        Error(message) ->
          Error(
            diagnostic.Diagnostic(kind: diagnostic.RuntimeError(
              step: step.name,
              message:,
            )),
          )
      }
    }),
  )
}

fn run_inline(
  inputs: dynamic.Dynamic,
  plan: plan.Plan(task),
  engine: engine.Engine(task, memory.Memory(storage), error),
  memory: memory.Memory(storage),
  run_plan,
) {
  let results = run_plan(plan, memory.new(), engine, inputs)

  Ok(
    engine.handle_finish_plan(results, fn(memory) {
      case run.get_returns(memory, plan.edges) {
        Ok(returns) -> Ok(returns)
        Error(message) ->
          Error(diagnostic.Diagnostic(kind: diagnostic.MissingReturn(message:)))
      }
    }),
  )
}