defmodule ExSQL.Vdbe do
@moduledoc """
A small register virtual machine, in the spirit of SQLite's VDBE
(`vdbe.c`). `ExSQL.Executor` walks the AST directly; for the common
single-table scan/filter/project shape it instead compiles the statement to
a flat opcode program (the `compile_vdbe` codegen in `ExSQL.Executor`) and runs it here,
avoiding the per-row environment construction and recursive expression
descent of the tree walker.
A program is a tuple of opcodes addressed by program counter. State threads
through a single tail-recursive loop:
* one cursor over the table's rows (`Rewind`/`Next` advance it),
* an integer-keyed register file the opcodes read and write,
* the emitted output rows plus LIMIT/OFFSET counters.
Opcodes (each a tuple tagged by its name):
* `{:rewind, end_pc}` — position the cursor on the first row, or jump to
`end_pc` when the table is empty.
* `{:next, top_pc}` — advance the cursor and jump to `top_pc`, or fall
through when exhausted.
* `{:column, col_key, reg}` — load the cursor row's column into `reg`.
* `{:rowid, reg}` — load the cursor row's rowid into `reg`.
* `{:value, value, reg}` — load a constant into `reg`.
* `{:eval, fun, reg}` — load `fun.(row, rowid)` into `reg`. The closure is
built by the compiler over the tree walker's evaluator, so arbitrary
projection/sort expressions reuse exact SQL semantics.
* `{:cmp, op, reg_a, aff_a, reg_b, aff_b, collation, fail_pc}` — apply
comparison affinity then compare; jump to `fail_pc` unless the result is
true (NULL and false both fail, as in a WHERE clause).
* `{:filter, fun, fail_pc}` — jump to `fail_pc` unless `fun.(row, rowid)`
is true; the general WHERE form when the `:cmp` chain doesn't apply.
* `{:result_row, proj_regs, key_regs}` — emit a row from `proj_regs`. With
no ORDER BY (`key_regs == []`) rows stream out honoring OFFSET/LIMIT; with
ORDER BY every row is collected with its `key_regs` sort key, then sorted
and clamped after the scan.
* `{:halt}` — stop.
The comparison opcodes delegate to `ExSQL.Value`, the same module the tree
walker uses, so affinity, collation, and NULL semantics match exactly.
"""
alias ExSQL.Value
@typedoc "An opcode program addressed by program counter."
@type program :: tuple()
@doc """
Runs `program` over `rows` (a `[{rowid, row_map}]` cursor, in scan order).
`order`, when non-nil, is a list of `:asc | :desc` directions aligned with the
`key_regs` emitted by `result_row` — the rows are collected, sorted by their
keys (NULL sorts low, as in SQLite), then OFFSET/LIMIT applied. With `order ==
nil` rows stream out and LIMIT can stop the scan early.
"""
@spec run(
[{integer(), map()}],
program(),
non_neg_integer() | nil,
non_neg_integer() | nil,
[:asc | :desc] | nil
) :: [[ExSQL.Value.t()]]
def run(rows, program, limit, offset, order \\ nil) do
state = %{
rows: rows,
rowid: nil,
row: nil,
regs: %{},
out: [],
emitted: 0,
skipped: 0,
limit: limit,
offset: offset || 0,
order: order
}
final = exec(program, 0, state)
if order == nil do
Enum.reverse(final)
else
final
|> sort_collected(order)
|> drop_offset(state.offset)
|> take_limit(limit)
end
end
defp exec(program, pc, state) do
case :erlang.element(pc + 1, program) do
{:rewind, end_pc} ->
case state.rows do
[] ->
exec(program, end_pc, state)
[{rid, row} | rest] ->
exec(program, pc + 1, %{state | rows: rest, rowid: rid, row: row})
end
{:next, top_pc} ->
case state.rows do
[] ->
exec(program, pc + 1, state)
[{rid, row} | rest] ->
exec(program, top_pc, %{state | rows: rest, rowid: rid, row: row})
end
{:column, key, reg} ->
exec(program, pc + 1, %{state | regs: Map.put(state.regs, reg, Map.get(state.row, key))})
{:rowid, reg} ->
exec(program, pc + 1, %{state | regs: Map.put(state.regs, reg, state.rowid)})
{:value, value, reg} ->
exec(program, pc + 1, %{state | regs: Map.put(state.regs, reg, value)})
{:eval, fun, reg} ->
exec(program, pc + 1, %{
state
| regs: Map.put(state.regs, reg, fun.(state.row, state.rowid))
})
{:cmp, op, ra, aff_a, rb, aff_b, collation, fail_pc} ->
{a, b} =
Value.comparison_coerce(Map.get(state.regs, ra), aff_a, Map.get(state.regs, rb), aff_b)
if Value.compare_op(op, a, b, collation) == true do
exec(program, pc + 1, state)
else
exec(program, fail_pc, state)
end
{:filter, fun, fail_pc} ->
if fun.(state.row, state.rowid) == true do
exec(program, pc + 1, state)
else
exec(program, fail_pc, state)
end
{:result_row, proj_regs, key_regs} ->
emit_row(program, pc, state, proj_regs, key_regs)
{:halt} ->
if state.order == nil, do: state.out, else: state.out
end
end
# ORDER BY present: collect each row with its sort key, sort/clamp later.
defp emit_row(program, pc, %{order: order} = state, proj_regs, key_regs) when order != nil do
proj = Enum.map(proj_regs, fn r -> sql_value(Map.get(state.regs, r)) end)
keys = Enum.map(key_regs, fn r -> Map.get(state.regs, r) end)
exec(program, pc + 1, %{state | out: [{keys, proj} | state.out]})
end
# No ORDER BY: stream out honoring OFFSET, then LIMIT (which can stop early).
defp emit_row(program, pc, state, proj_regs, _key_regs) do
if state.skipped < state.offset do
exec(program, pc + 1, %{state | skipped: state.skipped + 1})
else
row = Enum.map(proj_regs, fn r -> sql_value(Map.get(state.regs, r)) end)
emitted = state.emitted + 1
state = %{state | out: [row | state.out], emitted: emitted}
if state.limit != nil and emitted >= state.limit do
state.out
else
exec(program, pc + 1, state)
end
end
end
defp sort_collected(collected, order) do
collected
|> Enum.sort(fn {ka, _pa}, {kb, _pb} -> order_before_or_equal?(ka, kb, order) end)
|> Enum.map(fn {_keys, proj} -> proj end)
end
# Mirrors `ExSQL.Executor.order/4`: compare keys left to right; NULL sorts low
# via `Value.compare`, and direction flips the `:lt`/`:gt` outcome.
defp order_before_or_equal?(ka, kb, order) do
[ka, kb, order]
|> Enum.zip()
|> Enum.reduce_while(true, fn {a, b, direction}, _acc ->
case Value.compare(a, b, :binary) do
:eq -> {:cont, true}
:lt -> {:halt, direction == :asc}
:gt -> {:halt, direction == :desc}
end
end)
end
defp drop_offset(rows, 0), do: rows
defp drop_offset(rows, offset), do: Enum.drop(rows, offset)
defp take_limit(rows, nil), do: rows
defp take_limit(rows, limit), do: Enum.take(rows, limit)
defp sql_value({:json, text}), do: text
defp sql_value(value), do: value
end