Skip to main content

lib/ex_sql/vdbe.ex

defmodule ExSQL.Vdbe do
  @moduledoc """
  A small register virtual machine, in the spirit of SQLite's VDBE
  (`vdbe.c`). `ExSQL.Executor` walks the AST directly; for the common
  single-table scan/filter/project shape it instead compiles the statement to
  a flat opcode program (the `compile_vdbe` codegen in `ExSQL.Executor`) and runs it here,
  avoiding the per-row environment construction and recursive expression
  descent of the tree walker.

  A program is a tuple of opcodes addressed by program counter. State threads
  through a single tail-recursive loop:

    * one cursor over the table's rows (`Rewind`/`Next` advance it),
    * an integer-keyed register file the opcodes read and write,
    * the emitted output rows plus LIMIT/OFFSET counters.

  Opcodes (each a tuple tagged by its name):

    * `{:rewind, end_pc}` — position the cursor on the first row, or jump to
      `end_pc` when the table is empty.
    * `{:next, top_pc}` — advance the cursor and jump to `top_pc`, or fall
      through when exhausted.
    * `{:column, col_key, reg}` — load the cursor row's column into `reg`.
    * `{:rowid, reg}` — load the cursor row's rowid into `reg`.
    * `{:value, value, reg}` — load a constant into `reg`.
    * `{:eval, fun, reg}` — load `fun.(row, rowid)` into `reg`. The closure is
      built by the compiler over the tree walker's evaluator, so arbitrary
      projection/sort expressions reuse exact SQL semantics.
    * `{:cmp, op, reg_a, aff_a, reg_b, aff_b, collation, fail_pc}` — apply
      comparison affinity then compare; jump to `fail_pc` unless the result is
      true (NULL and false both fail, as in a WHERE clause).
    * `{:filter, fun, fail_pc}` — jump to `fail_pc` unless `fun.(row, rowid)`
      is true; the general WHERE form when the `:cmp` chain doesn't apply.
    * `{:result_row, proj_regs, key_regs}` — emit a row from `proj_regs`. With
      no ORDER BY (`key_regs == []`) rows stream out honoring OFFSET/LIMIT; with
      ORDER BY every row is collected with its `key_regs` sort key, then sorted
      and clamped after the scan.
    * `{:halt}` — stop.

  The comparison opcodes delegate to `ExSQL.Value`, the same module the tree
  walker uses, so affinity, collation, and NULL semantics match exactly.
  """

  alias ExSQL.Value

  @typedoc "An opcode program addressed by program counter."
  @type program :: tuple()

  @doc """
  Runs `program` over `rows` (a `[{rowid, row_map}]` cursor, in scan order).

  `order`, when non-nil, is a list of `:asc | :desc` directions aligned with the
  `key_regs` emitted by `result_row` — the rows are collected, sorted by their
  keys (NULL sorts low, as in SQLite), then OFFSET/LIMIT applied. With `order ==
  nil` rows stream out and LIMIT can stop the scan early.
  """
  @spec run(
          [{integer(), map()}],
          program(),
          non_neg_integer() | nil,
          non_neg_integer() | nil,
          [:asc | :desc] | nil
        ) :: [[ExSQL.Value.t()]]
  def run(rows, program, limit, offset, order \\ nil) do
    state = %{
      rows: rows,
      rowid: nil,
      row: nil,
      regs: %{},
      out: [],
      emitted: 0,
      skipped: 0,
      limit: limit,
      offset: offset || 0,
      order: order
    }

    final = exec(program, 0, state)

    if order == nil do
      Enum.reverse(final)
    else
      final
      |> sort_collected(order)
      |> drop_offset(state.offset)
      |> take_limit(limit)
    end
  end

  defp exec(program, pc, state) do
    case :erlang.element(pc + 1, program) do
      {:rewind, end_pc} ->
        case state.rows do
          [] ->
            exec(program, end_pc, state)

          [{rid, row} | rest] ->
            exec(program, pc + 1, %{state | rows: rest, rowid: rid, row: row})
        end

      {:next, top_pc} ->
        case state.rows do
          [] ->
            exec(program, pc + 1, state)

          [{rid, row} | rest] ->
            exec(program, top_pc, %{state | rows: rest, rowid: rid, row: row})
        end

      {:column, key, reg} ->
        exec(program, pc + 1, %{state | regs: Map.put(state.regs, reg, Map.get(state.row, key))})

      {:rowid, reg} ->
        exec(program, pc + 1, %{state | regs: Map.put(state.regs, reg, state.rowid)})

      {:value, value, reg} ->
        exec(program, pc + 1, %{state | regs: Map.put(state.regs, reg, value)})

      {:eval, fun, reg} ->
        exec(program, pc + 1, %{
          state
          | regs: Map.put(state.regs, reg, fun.(state.row, state.rowid))
        })

      {:cmp, op, ra, aff_a, rb, aff_b, collation, fail_pc} ->
        {a, b} =
          Value.comparison_coerce(Map.get(state.regs, ra), aff_a, Map.get(state.regs, rb), aff_b)

        if Value.compare_op(op, a, b, collation) == true do
          exec(program, pc + 1, state)
        else
          exec(program, fail_pc, state)
        end

      {:filter, fun, fail_pc} ->
        if fun.(state.row, state.rowid) == true do
          exec(program, pc + 1, state)
        else
          exec(program, fail_pc, state)
        end

      {:result_row, proj_regs, key_regs} ->
        emit_row(program, pc, state, proj_regs, key_regs)

      {:halt} ->
        if state.order == nil, do: state.out, else: state.out
    end
  end

  # ORDER BY present: collect each row with its sort key, sort/clamp later.
  defp emit_row(program, pc, %{order: order} = state, proj_regs, key_regs) when order != nil do
    proj = Enum.map(proj_regs, fn r -> sql_value(Map.get(state.regs, r)) end)
    keys = Enum.map(key_regs, fn r -> Map.get(state.regs, r) end)
    exec(program, pc + 1, %{state | out: [{keys, proj} | state.out]})
  end

  # No ORDER BY: stream out honoring OFFSET, then LIMIT (which can stop early).
  defp emit_row(program, pc, state, proj_regs, _key_regs) do
    if state.skipped < state.offset do
      exec(program, pc + 1, %{state | skipped: state.skipped + 1})
    else
      row = Enum.map(proj_regs, fn r -> sql_value(Map.get(state.regs, r)) end)
      emitted = state.emitted + 1
      state = %{state | out: [row | state.out], emitted: emitted}

      if state.limit != nil and emitted >= state.limit do
        state.out
      else
        exec(program, pc + 1, state)
      end
    end
  end

  defp sort_collected(collected, order) do
    collected
    |> Enum.sort(fn {ka, _pa}, {kb, _pb} -> order_before_or_equal?(ka, kb, order) end)
    |> Enum.map(fn {_keys, proj} -> proj end)
  end

  # Mirrors `ExSQL.Executor.order/4`: compare keys left to right; NULL sorts low
  # via `Value.compare`, and direction flips the `:lt`/`:gt` outcome.
  defp order_before_or_equal?(ka, kb, order) do
    [ka, kb, order]
    |> Enum.zip()
    |> Enum.reduce_while(true, fn {a, b, direction}, _acc ->
      case Value.compare(a, b, :binary) do
        :eq -> {:cont, true}
        :lt -> {:halt, direction == :asc}
        :gt -> {:halt, direction == :desc}
      end
    end)
  end

  defp drop_offset(rows, 0), do: rows
  defp drop_offset(rows, offset), do: Enum.drop(rows, offset)

  defp take_limit(rows, nil), do: rows
  defp take_limit(rows, limit), do: Enum.take(rows, limit)

  defp sql_value({:json, text}), do: text
  defp sql_value(value), do: value
end