Skip to main content

lib/ex_sql/codegen.ex

defmodule ExSQL.Codegen do
  @moduledoc """
  Runtime query codegen — the JIT path.

  Two shapes compile today:

    * single-table scan / filter / project / sort, and
    * two-table `LEFT JOIN` on an equi-key — hash-joined, with NULL-filled
      non-matches, residual `ON` filters pushed into the index build, and
      2-arg `COALESCE(col, literal)` projections.

  For either, this compiles a parsed `%ExSQL.AST.Select{}` into a native BEAM
  function (via `Module.create/3`, which the compiler/JIT turns into machine
  code) rather than walking the AST per row. Columns are read positionally from
  the stored row tuple (`:erlang.element/2`), and SQL semantics are preserved by
  delegating comparisons/booleans/affinity to `ExSQL.Value` — the exact functions
  the tree walker uses.

  Literals are lifted to a `params` tuple so a query *shape* compiles once and is
  reused for any literal values (`WHERE x > 30` and `WHERE x > 40` share a module;
  the value is a runtime argument). Compilation costs a few ms and only pays off
  for repeated/prepared queries, so the caller gates on that and falls back to the
  tree walker for anything `compilable?/2` rejects.

  The generated function takes the table's positional rows (`[{rowid, tuple}]`,
  scan order) plus a `params` tuple and returns `[[value, ...]]` — the same shape
  as a tree-walked result's rows. The scan form is `run(rows, params)`; the join
  form is `run(left_rows, right_rows, params)`, which indexes the right side by
  the (affinity-canonicalized) join key and probes it once per left row, then
  re-verifies each candidate with an exact comparison before emitting.
  """

  alias ExSQL.{Database, Table, Value}

  @cmp_ops [:gt, :lt, :ge, :le, :eq, :ne]

  # ---- shape detection -------------------------------------------------------

  @doc """
  Returns `{:ok, spec}` if `stmt` is a supported single-table scan shape, else
  `:no`. The spec carries column positions, a param-lifted WHERE/ORDER/LIMIT IR,
  and the lifted literal values.
  """
  @spec compilable?(ExSQL.AST.Select.t(), Database.t()) :: {:ok, map()} | :no
  def compilable?(%ExSQL.AST.Select{} = stmt, db) do
    with true <- simple_shape?(stmt),
         true <- default_pragmas?(db) do
      case stmt.from do
        {:table, name, _alias} ->
          compile_scan(stmt, db, name)

        {:join, %{left: true, right: false, natural: false}, {:table, lt, la}, {:table, rt, ra},
         {:on, on}} ->
          compile_left_join(stmt, db, {lt, la}, {rt, ra}, on)

        _ ->
          :no
      end
    else
      _ -> :no
    end
  catch
    :uncompilable -> :no
  end

  def compilable?(_stmt, _db), do: :no

  defp compile_scan(stmt, db, name) do
    with true <- is_binary(name),
         {:ok, %Table{without_rowid: false} = table} <- fetch_plain_table(db, name) do
      index = Table.column_index(table)
      meta = column_meta(table)
      projection = plain_projection!(stmt.columns, index)
      {acc, where_ir} = lift_where(stmt.where, index, meta, [])
      {acc, order_ir} = lift_order(stmt.order_by, projection, index, meta, acc)
      {acc, limit_ir} = lift_int(stmt.limit, acc)
      {params, offset_ir} = lift_int(stmt.offset, acc)

      {:ok,
       %{
         kind: :scan,
         table: name,
         columns: Enum.map(projection, &elem(&1, 0)),
         positions: Enum.map(projection, &elem(&1, 1)),
         names: Enum.map(projection, &elem(&1, 2)),
         affinities:
           Enum.map(projection, fn {_k, pos, _n} -> meta |> Map.fetch!(pos) |> elem(0) end),
         where: where_ir,
         order: order_ir,
         limit: limit_ir,
         offset: offset_ir,
         params: params |> Enum.reverse() |> List.to_tuple()
       }}
    else
      _ -> :no
    end
  end

  # ---- left-join shape -------------------------------------------------------
  #
  # `L LEFT JOIN R ON <equi-key> [AND <right-only filters>]`, two base tables,
  # WHERE over left columns only, projection of qualified columns and 2-arg
  # COALESCE(col, literal). Compiled to a hash join: the right side is indexed by
  # the (affinity-canonicalized) join key with the residual ON filters applied,
  # then each left row probes it; unmatched left rows are NULL-filled.
  defp compile_left_join(stmt, db, {lt, la}, {rt, ra}, on) do
    with true <- is_binary(lt),
         true <- is_binary(rt),
         {:ok, %Table{without_rowid: false} = ltab} <- fetch_plain_table(db, lt),
         {:ok, %Table{without_rowid: false} = rtab} <- fetch_plain_table(db, rt) do
      lqual = la || lt
      rqual = ra || rt
      if lqual == rqual, do: throw(:uncompilable)

      ctx = %{
        lqual: lqual,
        rqual: rqual,
        lindex: Table.column_index(ltab),
        lmeta: column_meta(ltab),
        rindex: Table.column_index(rtab),
        rmeta: column_meta(rtab)
      }

      {join_key, residual_ir, acc} = lift_on(on, ctx, [])
      {acc, where_ir} = lift_cmp_tree(stmt.where, ctx, :left, acc)
      {acc, projection} = lift_join_projection(stmt.columns, ctx, acc)
      {acc, order_ir} = lift_join_order(stmt.order_by, projection, ctx, acc)
      {acc, limit_ir} = lift_int(stmt.limit, acc)
      {params, offset_ir} = lift_int(stmt.offset, acc)

      {:ok,
       %{
         kind: :left_join,
         left_table: lt,
         right_table: rt,
         join_key: join_key,
         residual: residual_ir,
         where: where_ir,
         projection: Enum.map(projection, &elem(&1, 0)),
         names: Enum.map(projection, &elem(&1, 1)),
         affinities: Enum.map(projection, &elem(&1, 2)),
         order: order_ir,
         limit: limit_ir,
         offset: offset_ir,
         params: params |> Enum.reverse() |> List.to_tuple()
       }}
    else
      _ -> :no
    end
  end

  # Resolve a (qualifier, name) reference to {:left | :right, position, affinity,
  # collation}. Unqualified names must live in exactly one side.
  defp resolve_col(qual, name, ctx) do
    key = Table.key(name)

    cond do
      qual == ctx.lqual -> side_col(:left, key, ctx)
      qual == ctx.rqual -> side_col(:right, key, ctx)
      qual != nil -> throw(:uncompilable)
      true -> unqualified_col(key, ctx)
    end
  end

  defp unqualified_col(key, ctx) do
    case {Map.has_key?(ctx.lindex, key), Map.has_key?(ctx.rindex, key)} do
      {true, false} -> side_col(:left, key, ctx)
      {false, true} -> side_col(:right, key, ctx)
      _ -> throw(:uncompilable)
    end
  end

  defp side_col(:left, key, ctx) do
    pos = Map.get(ctx.lindex, key) || throw(:uncompilable)
    {affinity, collation} = Map.fetch!(ctx.lmeta, pos)
    {:left, pos, affinity, collation}
  end

  defp side_col(:right, key, ctx) do
    pos = Map.get(ctx.rindex, key) || throw(:uncompilable)
    {affinity, collation} = Map.fetch!(ctx.rmeta, pos)
    {:right, pos, affinity, collation}
  end

  # ON → {join_key, residual_ir, acc}. Exactly one equi-key (`left.col = right.col`)
  # plus any number of right-only filter comparisons (pushed into the index build).
  defp lift_on(on, ctx, acc) do
    leaves = flatten_and(on)
    {equis, others} = Enum.split_with(leaves, &equi_key?(&1, ctx))

    case equis do
      [equi] ->
        join_key = build_join_key(equi, ctx)

        {acc, residual} =
          Enum.reduce(others, {acc, nil}, fn leaf, {acc, ir} ->
            {acc, leaf_ir} = lift_cmp_leaf(leaf, ctx, :right, acc)
            {acc, and_ir(ir, leaf_ir)}
          end)

        {join_key, residual, acc}

      _ ->
        throw(:uncompilable)
    end
  end

  defp flatten_and({:binary, :and, l, r}), do: flatten_and(l) ++ flatten_and(r)
  defp flatten_and({:binary, op, _a, _b} = leaf) when op in @cmp_ops, do: [leaf]
  defp flatten_and(_other), do: throw(:uncompilable)

  # An equi-key leaf is `col = col` with the two columns on opposite sides.
  defp equi_key?({:binary, :eq, {:column, qa, na}, {:column, qb, nb}}, ctx) do
    {sa, _, _, _} = resolve_col(qa, na, ctx)
    {sb, _, _, _} = resolve_col(qb, nb, ctx)
    sa != sb
  catch
    :uncompilable -> false
  end

  defp equi_key?(_leaf, _ctx), do: false

  defp build_join_key({:binary, :eq, a, b}, ctx) do
    {:column, qa, na} = a
    {:column, qb, nb} = b
    ca = resolve_col(qa, na, ctx)
    cb = resolve_col(qb, nb, ctx)
    {{:left, lpos, laff, lcoll}, {:right, rpos, raff, rcoll}} = order_sides(ca, cb)
    class = key_class(laff, raff)
    if class == :text, do: binary_collation!(lcoll) && binary_collation!(rcoll)
    {lpos, rpos, class, laff, raff}
  end

  defp order_sides({:left, _, _, _} = l, {:right, _, _, _} = r), do: {l, r}
  defp order_sides({:right, _, _, _} = r, {:left, _, _, _} = l), do: {l, r}

  # The comparison-affinity class for the equi-key (mirrors `comparison_coerce`):
  # numeric if either side is numeric, else text if either is text, else none.
  defp key_class(a, b) do
    cond do
      a in [:integer, :real, :numeric] or b in [:integer, :real, :numeric] -> :numeric
      a == :text or b == :text -> :text
      true -> :none
    end
  end

  # A WHERE/residual condition tree restricted to one side's columns. Produces IR
  # over {:lcol | :rcol, pos, affinity} and {:param, i, :blob}.
  defp lift_cmp_tree(nil, _ctx, _side, acc), do: {acc, nil}

  defp lift_cmp_tree({:binary, :and, l, r}, ctx, side, acc) do
    {acc, lir} = lift_cmp_tree(l, ctx, side, acc)
    {acc, rir} = lift_cmp_tree(r, ctx, side, acc)
    {acc, {:and, lir, rir}}
  end

  defp lift_cmp_tree({:binary, op, _a, _b} = leaf, ctx, side, acc) when op in @cmp_ops do
    lift_cmp_leaf(leaf, ctx, side, acc)
  end

  defp lift_cmp_tree(_other, _ctx, _side, _acc), do: throw(:uncompilable)

  defp lift_cmp_leaf({:binary, op, a, b}, ctx, side, acc) do
    {acc, air} = lift_join_operand(a, ctx, side, acc)
    {acc, bir} = lift_join_operand(b, ctx, side, acc)
    {acc, {:cmp, op, air, bir}}
  end

  defp lift_join_operand({:column, q, name}, ctx, side, acc) do
    {rside, pos, affinity, collation} = resolve_col(q, name, ctx)
    if rside != side, do: throw(:uncompilable)
    binary_collation!(collation)
    {acc, {col_tag(side), pos, affinity}}
  end

  defp lift_join_operand({:literal, value}, _ctx, _side, acc),
    do: {[value | acc], {:param, length(acc), :blob}}

  defp lift_join_operand(_other, _ctx, _side, _acc), do: throw(:uncompilable)

  defp col_tag(:left), do: :lcol
  defp col_tag(:right), do: :rcol

  defp and_ir(nil, ir), do: ir
  defp and_ir(l, r), do: {:and, l, r}

  # Projection items → [{read_ir, name, affinity}]. Supports qualified columns and
  # 2-arg COALESCE(col, literal). A coalesce result has no affinity (NONE/:blob).
  defp lift_join_projection(columns, ctx, acc) do
    {acc, items} =
      Enum.reduce(columns, {acc, []}, fn col, {acc, items} ->
        {acc, item} = lift_proj_item(col, ctx, acc)
        {acc, [item | items]}
      end)

    {acc, Enum.reverse(items)}
  end

  defp lift_proj_item({{:column, q, name}, alias_name}, ctx, acc) do
    {side, pos, affinity, collation} = resolve_col(q, name, ctx)
    binary_collation!(collation)
    {acc, {{col_tag(side), pos}, alias_name || name, affinity}}
  end

  defp lift_proj_item(
         {{:function, "coalesce", [{:column, q, name}, {:literal, dflt}]}, alias_name},
         ctx,
         acc
       ) do
    {side, pos, _affinity, collation} = resolve_col(q, name, ctx)
    binary_collation!(collation)
    name = alias_name || "coalesce(#{q_prefix(q)}#{name}, ?)"
    {[dflt | acc], {{:coalesce, {col_tag(side), pos}, length(acc)}, name, :blob}}
  end

  defp lift_proj_item(_col, _ctx, _acc), do: throw(:uncompilable)

  defp q_prefix(nil), do: ""
  defp q_prefix(q), do: "#{q}."

  # ORDER BY for a join: positional (into the projection) or a qualified column.
  defp lift_join_order([], _projection, _ctx, acc), do: {acc, nil}

  defp lift_join_order(order_by, projection, ctx, acc) do
    keys =
      Enum.map(order_by, fn
        {{:literal, pos}, dir} when is_integer(pos) and pos >= 1 and pos <= length(projection) ->
          {read_ir, _name, _aff} = Enum.at(projection, pos - 1)
          {:key, read_ir, dir}

        {{:column, q, name}, dir} ->
          {side, pos, _aff, collation} = resolve_col(q, name, ctx)
          binary_collation!(collation)
          {:key, {col_tag(side), pos}, dir}

        _ ->
          throw(:uncompilable)
      end)

    {acc, keys}
  end

  defp simple_shape?(%ExSQL.AST.Select{} = s) do
    not s.distinct and s.group_by == [] and s.having == nil and map_size(s.windows) == 0
  end

  # Pragmas that change SELECT output (result order or column names) aren't
  # modeled by the generated code — fall back to the tree walker when set.
  defp default_pragmas?(db) do
    not db.reverse_unordered_selects and not db.full_column_names and db.short_column_names
  end

  # position => {affinity, collation}, from the cached frame columns.
  defp column_meta(table) do
    table
    |> Table.frame_columns()
    |> Enum.with_index()
    |> Map.new(fn {{_key, _name, affinity, collation}, index} ->
      {index, {affinity, collation}}
    end)
  end

  # Resolve via the schema-aware lookup (main/temp/attached order) so codegen
  # scans the *same* table the tree walker would — and so views (not in
  # `db.tables`) fall through to `:no`.
  defp fetch_plain_table(db, name) do
    case Database.lookup_table(db, name) do
      {:ok, %Table{} = t} -> {:ok, t}
      _ -> :no
    end
  end

  defp plain_projection!(columns, index) do
    Enum.map(columns, fn
      {{:column, _q, cname}, alias_name} ->
        {Table.key(cname), column_pos!(cname, index), alias_name || cname}

      _ ->
        throw(:uncompilable)
    end)
  end

  defp column_pos!(name, index) do
    key = Table.key(name)

    case index do
      %{^key => pos} -> pos
      _ -> throw(:uncompilable)
    end
  end

  # WHERE → IR: {:and, l, r} | {:cmp, op, a, b}; operands carry affinity:
  # {:col, pos, affinity} | {:param, i, affinity}. Comparison affinity is applied
  # via `comparison_coerce`, matching the tree walker. Non-binary collations fall
  # back (a global compiled module can't see connection-local collations).
  defp lift_where(nil, _index, _meta, acc), do: {acc, nil}

  defp lift_where({:binary, :and, l, r}, index, meta, acc) do
    {acc, lir} = lift_where(l, index, meta, acc)
    {acc, rir} = lift_where(r, index, meta, acc)
    {acc, {:and, lir, rir}}
  end

  defp lift_where({:binary, op, a, b}, index, meta, acc) when op in @cmp_ops do
    {acc, air} = lift_operand(a, index, meta, acc)
    {acc, bir} = lift_operand(b, index, meta, acc)
    {acc, {:cmp, op, air, bir}}
  end

  defp lift_where(_other, _index, _meta, _acc), do: throw(:uncompilable)

  defp lift_operand({:column, _q, name}, index, meta, acc) do
    pos = column_pos!(name, index)
    {affinity, collation} = Map.fetch!(meta, pos)
    binary_collation!(collation)
    {acc, {:col, pos, affinity}}
  end

  # A literal has affinity NONE (`:blob`); `comparison_coerce` then applies the
  # column's affinity to it (SQLite §4.2).
  defp lift_operand({:literal, value}, _index, _meta, acc),
    do: {[value | acc], {:param, length(acc), :blob}}

  defp lift_operand(_other, _index, _meta, _acc), do: throw(:uncompilable)

  defp binary_collation!(coll) when coll in [nil, :binary], do: :ok
  defp binary_collation!(_coll), do: throw(:uncompilable)

  # ORDER BY → [{:read, pos, dir}]: positions reference projected columns. Sort
  # uses binary collation, so a non-binary-collated key column falls back.
  defp lift_order([], _projection, _index, _meta, acc), do: {acc, nil}

  defp lift_order(order_by, projection, index, meta, acc) do
    positions = Enum.map(projection, &elem(&1, 1))

    keys =
      Enum.map(order_by, fn
        {{:literal, pos}, dir} when is_integer(pos) and pos >= 1 and pos <= length(positions) ->
          read_pos = Enum.at(positions, pos - 1)
          order_key!(read_pos, dir, meta)

        {{:column, _q, name}, dir} ->
          order_key!(column_pos!(name, index), dir, meta)

        _ ->
          throw(:uncompilable)
      end)

    {acc, keys}
  end

  defp order_key!(pos, dir, meta) do
    {_affinity, collation} = Map.fetch!(meta, pos)
    binary_collation!(collation)
    {:read, pos, dir}
  end

  defp lift_int(nil, acc), do: {acc, nil}

  defp lift_int({:literal, n}, acc) when is_integer(n) and n >= 0,
    do: {[n | acc], {:param, length(acc)}}

  defp lift_int(_other, _acc), do: throw(:uncompilable)

  # ---- top-level entry (cache + gate + run) ----------------------------------

  @doc """
  Runs `stmt` via a compiled native function when enabled, the shape is supported,
  and it's been seen enough times to compile. Returns `{:ok, %ExSQL.Result{}}` or
  `:fallback` (use the tree walker). Enabled by `EXSQL_CODEGEN=1`.
  """
  @spec run_select(Database.t(), ExSQL.AST.Select.t()) :: {:ok, ExSQL.Result.t()} | :fallback
  def run_select(db, stmt) do
    if enabled?() do
      case compilable?(stmt, db) do
        {:ok, spec} -> dispatch(db, spec)
        :no -> :fallback
      end
    else
      :fallback
    end
  end

  defp enabled?, do: System.get_env("EXSQL_CODEGEN") == "1"

  defp dispatch(db, spec) do
    key = shape_key(spec)

    case ExSQL.Codegen.Cache.fetch(key) do
      {:hit, module} ->
        run_compiled(db, spec, module)

      {:compile, _n} ->
        {:ok, %{module: module}} = build(spec)
        ExSQL.Codegen.Cache.store(key, module)
        run_compiled(db, spec, module)

      :too_few ->
        :fallback
    end
  end

  # The shape: everything but the lifted literal *values* (so all literal values
  # of one structure share a compiled module).
  defp shape_key(%{kind: :left_join} = s) do
    {:left_join, s.left_table, s.right_table, s.join_key, s.residual, s.where, s.projection,
     s.names, s.order, s.limit, s.offset}
  end

  defp shape_key(s) do
    {:scan, s.table, s.positions, s.names, s.where, s.order, s.limit, s.offset}
  end

  defp run_compiled(db, %{kind: :left_join} = spec, module) do
    with {:ok, ltab} <- Database.lookup_table(db, spec.left_table),
         {:ok, rtab} <- Database.lookup_table(db, spec.right_table) do
      rows = module.run(Table.scan_positional(ltab), Table.scan_positional(rtab), spec.params)
      select_result(spec, rows)
    else
      _ -> :fallback
    end
  end

  defp run_compiled(db, spec, module) do
    case Database.lookup_table(db, spec.table) do
      {:ok, table} ->
        select_result(spec, module.run(Table.scan_positional(table), spec.params))

      _ ->
        :fallback
    end
  end

  defp select_result(spec, rows) do
    {:ok,
     %ExSQL.Result{
       command: :select,
       columns: spec.names,
       rows: rows,
       rows_affected: 0,
       affinities: spec.affinities
     }}
  end

  # ---- compilation -----------------------------------------------------------

  @doc "Compiles a `spec` (from `compilable?/2`) into a native module. Returns `{:ok, %{module, params, columns}}`."
  @spec build(map()) :: {:ok, map()}
  def build(spec) do
    # A globally-unique module name (not a hash of the shape) — the ETS cache,
    # keyed by the full shape, is the source of truth for reuse, so two distinct
    # shapes can never collide onto the same module.
    module = :"Elixir.ExSQL.Codegen.Q#{System.unique_integer([:positive, :monotonic])}"
    ast = if spec.kind == :left_join, do: function_ast_join(spec), else: function_ast(spec)
    Module.create(module, ast, Macro.Env.location(__ENV__))
    {:ok, %{module: module, params: spec.params}}
  end

  defp function_ast(spec) do
    row_match = quote(do: {_rowid, row})
    where_guard = emit_where(spec.where)
    proj = quote(do: [unquote_splicing(Enum.map(spec.positions, &read_pos/1))])

    body =
      case spec.order do
        nil ->
          collected =
            if where_guard do
              quote(do: for(unquote(row_match) <- rows, unquote(where_guard), do: unquote(proj)))
            else
              quote(do: for(unquote(row_match) <- rows, do: unquote(proj)))
            end

          quote do
            unquote(collected) |> unquote(emit_clamp(spec))
          end

        keys ->
          key_reads =
            quote(
              do: {unquote(Enum.map(keys, fn {:read, p, _} -> read_pos(p) end)), unquote(proj)}
            )

          dirs = Enum.map(keys, fn {:read, _, dir} -> dir end)

          collected =
            if where_guard do
              quote(
                do: for(unquote(row_match) <- rows, unquote(where_guard), do: unquote(key_reads))
              )
            else
              quote(do: for(unquote(row_match) <- rows, do: unquote(key_reads)))
            end

          quote do
            unquote(collected)
            |> Enum.sort(&ExSQL.Codegen.order_le?(elem(&1, 0), elem(&2, 0), unquote(dirs)))
            |> Enum.map(&elem(&1, 1))
            |> unquote(emit_clamp(spec))
          end
      end

    quote do
      def run(rows, params) do
        _ = params
        unquote(body)
      end
    end
  end

  # OFFSET then LIMIT, applied to the (already projected) list.
  defp emit_clamp(spec) do
    dropped =
      case spec.offset do
        nil -> quote(do: & &1)
        {:param, i} -> quote(do: &Enum.drop(&1, elem(params, unquote(i))))
      end

    taken =
      case spec.limit do
        nil -> quote(do: & &1)
        {:param, i} -> quote(do: &Enum.take(&1, elem(params, unquote(i))))
      end

    quote(do: unquote(dropped).() |> unquote(taken).())
  end

  # A WHERE node → a boolean expression over `row`/`params`. compare_op/sql_and
  # match the tree walker exactly (true passes; nil/false fail the comprehension).
  defp emit_where(nil), do: nil

  defp emit_where({:and, l, r}) do
    quote(do: ExSQL.Value.sql_and(unquote(emit_where(l)), unquote(emit_where(r))))
  end

  defp emit_where({:cmp, op, a, b}) do
    {av, aaff} = emit_operand(a)
    {bv, baff} = emit_operand(b)

    quote(
      do: ExSQL.Codegen.cmp(unquote(op), unquote(av), unquote(aaff), unquote(bv), unquote(baff))
    )
  end

  defp emit_operand({:col, pos, aff}), do: {read_pos(pos), aff}
  defp emit_operand({:param, i, aff}), do: {quote(do: elem(params, unquote(i))), aff}

  # Positional read of a stored tuple cell (element/2 is 1-based).
  defp read_pos(pos), do: quote(do: :erlang.element(unquote(pos + 1), row))

  # ---- left-join emission ----------------------------------------------------
  #
  # run(left_rows, right_rows, params): index the right side by the (canonical)
  # join key with the residual ON filters applied, scan the left side (filtered by
  # WHERE), probe + re-verify each match exactly, and NULL-fill unmatched lefts.
  defp function_ast_join(spec) do
    {lpos, rpos, class, laff, raff} = spec.join_key

    keep_fun =
      case spec.residual do
        nil -> quote(do: fn _srow -> true end)
        ir -> quote(do: fn srow -> unquote(emit_cond(ir)) end)
      end

    key_fun =
      quote do
        fn srow ->
          ExSQL.Codegen.join_canon(:erlang.element(unquote(rpos + 1), srow), unquote(class))
        end
      end

    probe =
      quote do
        lkey = :erlang.element(unquote(lpos + 1), lrow)

        matches =
          case ExSQL.Codegen.join_canon(lkey, unquote(class)) do
            :__exsql_null__ ->
              []

            k ->
              rindex
              |> Map.get(k, [])
              |> Enum.filter(fn srow ->
                ExSQL.Codegen.cmp(
                  :eq,
                  lkey,
                  unquote(laff),
                  :erlang.element(unquote(rpos + 1), srow),
                  unquote(raff)
                ) == true
              end)
          end

        case matches do
          [] -> [unquote(join_row(spec, :null))]
          ms -> for srow <- ms, do: unquote(join_row(spec, :matched))
        end
      end

    loop =
      case emit_cond(spec.where) do
        nil ->
          quote(do: for({_lrid, lrow} <- left_rows, do: unquote(probe)))

        guard ->
          quote(do: for({_lrid, lrow} <- left_rows, unquote(guard) == true, do: unquote(probe)))
      end

    quote do
      def run(left_rows, right_rows, params) do
        _ = params
        rindex = ExSQL.Codegen.build_index(right_rows, unquote(keep_fun), unquote(key_fun))
        unquote(loop) |> Enum.concat() |> unquote(join_post(spec))
      end
    end
  end

  # Sort (when ordered) then OFFSET/LIMIT, over the concatenated output rows.
  defp join_post(%{order: nil} = spec), do: emit_clamp(spec)

  defp join_post(%{order: keys} = spec) do
    dirs = Enum.map(keys, fn {:key, _ir, dir} -> dir end)

    quote do
      Enum.sort(&ExSQL.Codegen.order_le?(elem(&1, 0), elem(&2, 0), unquote(dirs)))
      |> Enum.map(&elem(&1, 1))
      |> unquote(emit_clamp(spec))
    end
  end

  # One output row for a (lrow, srow) pair. `:matched` reads the right tuple;
  # `:null` fills the right side with NULL. Ordered queries pair it with sort keys.
  defp join_row(%{order: nil} = spec, branch), do: emit_proj(spec.projection, branch)

  defp join_row(%{order: keys} = spec, branch) do
    key_reads = Enum.map(keys, fn {:key, ir, _dir} -> emit_proj_read(ir, branch) end)
    proj = emit_proj(spec.projection, branch)
    quote(do: {[unquote_splicing(key_reads)], unquote(proj)})
  end

  defp emit_proj(projection, branch) do
    quote(do: [unquote_splicing(Enum.map(projection, &emit_proj_read(&1, branch)))])
  end

  defp emit_proj_read({:coalesce, colref, pi}, branch) do
    quote do
      case unquote(emit_col_read(colref, branch)) do
        nil -> elem(params, unquote(pi))
        v -> v
      end
    end
  end

  defp emit_proj_read(colref, branch), do: emit_col_read(colref, branch)

  defp emit_col_read({:lcol, pos}, _branch),
    do: quote(do: :erlang.element(unquote(pos + 1), lrow))

  defp emit_col_read({:rcol, pos}, :matched),
    do: quote(do: :erlang.element(unquote(pos + 1), srow))

  defp emit_col_read({:rcol, _pos}, :null), do: nil

  # A condition IR ({:and,…} | {:cmp,…}) → a boolean expression over lrow/srow/params.
  # Each operand reads from its own side, which is the variable in scope.
  defp emit_cond(nil), do: nil

  defp emit_cond({:and, l, r}) do
    quote(do: ExSQL.Value.sql_and(unquote(emit_cond(l)), unquote(emit_cond(r))))
  end

  defp emit_cond({:cmp, op, a, b}) do
    {av, aaff} = emit_cond_operand(a)
    {bv, baff} = emit_cond_operand(b)

    quote(
      do: ExSQL.Codegen.cmp(unquote(op), unquote(av), unquote(aaff), unquote(bv), unquote(baff))
    )
  end

  defp emit_cond_operand({:lcol, pos, aff}),
    do: {quote(do: :erlang.element(unquote(pos + 1), lrow)), aff}

  defp emit_cond_operand({:rcol, pos, aff}),
    do: {quote(do: :erlang.element(unquote(pos + 1), srow)), aff}

  defp emit_cond_operand({:param, i, aff}), do: {quote(do: elem(params, unquote(i))), aff}

  # ---- runtime helpers (called from generated code) -------------------------

  @doc false
  # A comparison with SQLite affinity + (binary) collation — exactly the tree
  # walker's `comparison_operands` + `compare_op`, but called directly from
  # generated code with the affinities baked in.
  def cmp(op, a, a_aff, b, b_aff) do
    {x, y} = Value.comparison_coerce(a, a_aff, b, b_aff)
    Value.compare_op(op, x, y, :binary)
  end

  @doc false
  # Mirrors the tree walker's ORDER BY: compare keys left-to-right, NULL sorts
  # low via `Value.compare`, direction flips lt/gt. Returns `ka <= kb`.
  # Recursive (no per-comparison `Enum.zip` allocation).
  def order_le?([a | ta], [b | tb], [dir | td]) do
    case Value.compare(a, b, :binary) do
      :eq -> order_le?(ta, tb, td)
      :lt -> dir == :asc
      :gt -> dir == :desc
    end
  end

  def order_le?([], [], []), do: true

  @doc false
  # Builds the right-side hash index for a left join: `canonical_key => [srow, …]`
  # in scan order, keeping only rows that pass the residual ON filters (`keep_fun`)
  # and have a non-NULL join key (`key_fun` returns `:__exsql_null__` for NULL,
  # which never matches — as `NULL = x` is never true).
  def build_index(rows, keep_fun, key_fun) do
    rows
    |> Enum.reduce(%{}, fn {_rid, srow}, acc ->
      if keep_fun.(srow) do
        case key_fun.(srow) do
          :__exsql_null__ -> acc
          k -> Map.update(acc, k, [srow], &[srow | &1])
        end
      else
        acc
      end
    end)
    |> Map.new(fn {k, rows} -> {k, :lists.reverse(rows)} end)
  end

  @doc false
  # A canonical hash key for an equi-join value under the comparison-affinity
  # `class` (see `key_class/2`). Values that compare equal must share a key; an
  # exact `cmp/5` re-verify in the generated code then rejects any over-grouping.
  # Numeric coercion canonicalizes `5.0`/`5`; NULL keys never match.
  def join_canon(nil, _class), do: :__exsql_null__
  def join_canon(value, :numeric), do: value |> Value.apply_affinity(:numeric) |> norm_num()
  def join_canon(value, :text), do: Value.apply_affinity(value, :text)
  def join_canon(value, :none), do: norm_num(value)

  defp norm_num(f) when is_float(f) do
    t = trunc(f)
    if t == f, do: t, else: f
  end

  defp norm_num(v), do: v
end