Skip to main content

lib/ex_sql/parser.ex

defmodule ExSQL.Parser do
  @moduledoc """
  Recursive-descent parser producing `ExSQL.AST` structs.

  SQLite generates its parser from a Lemon LALR grammar (`parse.y`); a
  hand-written recursive descent with precedence climbing is the idiomatic
  Elixir equivalent and keeps the grammar readable as plain functions, one
  per production.

  ## Expression representation

  Expressions are tagged tuples:

    * `{:literal, value}` — `nil`, integer, float, UTF-8 binary, `{:blob, bin}`,
      or boolean (`TRUE`/`FALSE` literals)
    * `{:column, table | nil, name}` — column reference
    * `{:binary, op, left, right}` — `op` in `:or | :and | :eq | :ne | :lt |
      :le | :gt | :ge | :add | :sub | :mul | :div | :mod | :concat`
    * `{:not, expr}` / `{:negate, expr}`
    * `{:is, expr, expr}` / `{:is_not, expr, expr}` — NULL-safe comparison
    * `{:in, expr, [expr] | {:select, query}, negated?}`
    * `{:between, expr, low, high, negated?}`
    * `{:like, expr, pattern, negated?}` / `{:glob, expr, pattern, negated?}`
    * `{:function, name, :star | [expr]}` — e.g. `count(*)`
    * `{:case, operand | nil, [{when_expr, then_expr}], else_expr | nil}`
    * `{:subquery, %Select{}}` — scalar subquery
    * `{:exists, %Select{}}` — `EXISTS (SELECT ...)`
  """

  alias ExSQL.AST.{
    AlterTable,
    ColumnDef,
    Compound,
    CreateIndex,
    CreateTable,
    CreateTrigger,
    CreateView,
    Delete,
    DropIndex,
    DropTable,
    DropView,
    Insert,
    Pragma,
    Select,
    Update,
    Values,
    With
  }

  alias ExSQL.{Error, Tokenizer}

  @type expr :: tuple()
  @type statement ::
          Select.t()
          | Compound.t()
          | Values.t()
          | With.t()
          | CreateTable.t()
          | CreateIndex.t()
          | CreateView.t()
          | AlterTable.t()
          | Insert.t()
          | Pragma.t()
          | Update.t()
          | Delete.t()
          | DropTable.t()
          | DropIndex.t()
          | DropView.t()
          | {:attach, expr(), String.t()}
          | {:detach, String.t()}
          | {:explain, statement()}
          | {:explain_query_plan, statement()}

  @doc """
  Parses `sql` into a list of statements.

  Accepts multiple statements separated by `;`. Returns `{:ok, statements}`
  or `{:error, %ExSQL.Error{}}`.
  """
  @spec parse(String.t()) :: {:ok, [statement()]} | {:error, Error.t()}
  def parse(sql) when is_binary(sql) do
    with {:ok, tokens} <- tokenize(sql) do
      {:ok, statements(tokens, [])}
    end
  rescue
    e in Error -> {:error, e}
  end

  defp tokenize(sql) do
    case Tokenizer.tokenize(sql) do
      {:ok, tokens} ->
        {:ok, tokens}

      {:error, reason} ->
        {:error,
         %Error{
           message: "tokenizer error: #{inspect(reason)}",
           line: elem(reason, tuple_size(reason) - 1)
         }}
    end
  end

  defp statements([], acc), do: Enum.reverse(acc)
  defp statements([{:semicolon, _, _} | rest], acc), do: statements(rest, acc)

  defp statements(tokens, acc) do
    {stmt, rest} = statement(tokens)

    case rest do
      [] -> statements(rest, [stmt | acc])
      [{:semicolon, _, _} | rest] -> statements(rest, [stmt | acc])
      [{_, value, line} | _] -> fail(~s(near "#{token_text(value)}": syntax error), line)
    end
  end

  # -- statement dispatch ---------------------------------------------------

  defp statement([{:keyword, :select, _} | _] = tokens), do: select(tokens)
  defp statement([{:keyword, :values, _} | _] = tokens), do: select(tokens)
  defp statement([{:keyword, :with, _} | _] = tokens), do: with_statement(tokens)
  defp statement([{:keyword, :create, _} | _] = tokens), do: create_stmt(tokens)
  defp statement([{:keyword, :insert, _} | _] = tokens), do: insert(tokens)
  defp statement([{:keyword, :replace, _} | _] = tokens), do: insert(tokens)
  defp statement([{:keyword, :update, _} | _] = tokens), do: update(tokens)
  defp statement([{:keyword, :delete, _} | _] = tokens), do: delete(tokens)
  defp statement([{:keyword, :drop, _} | _] = tokens), do: drop_stmt(tokens)
  defp statement([{:keyword, :alter, _} | _] = tokens), do: alter_table(tokens)
  defp statement([{:keyword, :pragma, _} | _] = tokens), do: pragma(tokens)
  defp statement([{:keyword, :explain, line} | rest]), do: explain_statement(rest, line)
  defp statement([{:keyword, :begin, _} | rest]), do: begin_statement(rest)
  defp statement([{:keyword, :commit, _} | rest]), do: commit_statement(rest)
  defp statement([{:keyword, :end, _} | rest]), do: commit_statement(rest)
  defp statement([{:keyword, :rollback, _} | rest]), do: rollback_statement(rest)
  defp statement([{:keyword, :savepoint, _} | rest]), do: savepoint_statement(rest)
  defp statement([{:keyword, :release, _} | rest]), do: release_statement(rest)
  defp statement([{:keyword, :analyze, _} | rest]), do: operational_statement(:analyze, rest)
  defp statement([{:keyword, :vacuum, _} | rest]), do: operational_statement(:vacuum, rest)
  defp statement([{:keyword, :reindex, _} | rest]), do: operational_statement(:reindex, rest)
  defp statement([{:keyword, :attach, line} | rest]), do: attach_statement(rest, line)
  defp statement([{:keyword, :detach, line} | rest]), do: detach_statement(rest, line)

  defp statement([{_, value, line} | _]),
    do: fail("expected a statement, got #{inspect(value)}", line)

  defp operational_statement(kind, tokens) do
    {name, rest} = optional_qualified_identifier(tokens)
    {{kind, name}, rest}
  end

  defp attach_statement(tokens, line) do
    tokens = skip_optional_database_keyword(tokens)
    {filename_expr, rest} = expression(tokens)
    rest = expect_keyword(rest, :as, line)
    {name, rest} = identifier(rest)
    {{:attach, filename_expr, name}, rest}
  end

  defp detach_statement(tokens, _line) do
    tokens = skip_optional_database_keyword(tokens)
    {name, rest} = identifier(tokens)
    {{:detach, name}, rest}
  end

  defp skip_optional_database_keyword([{:keyword, :database, _} | rest]), do: rest
  defp skip_optional_database_keyword(tokens), do: tokens

  defp optional_qualified_identifier([]), do: {nil, []}
  defp optional_qualified_identifier([{:semicolon, _, _} | _] = tokens), do: {nil, tokens}

  defp optional_qualified_identifier(tokens) do
    case tokens do
      [{:keyword, kw, _} | _] when kw in [:where, :order, :limit, :returning] ->
        {nil, tokens}

      _ ->
        {schema, name, rest} = qualified_schema_identifier(tokens)
        {if(schema, do: schema <> "." <> name, else: name), rest}
    end
  end

  defp explain_statement(
         [{:keyword, :query, _}, {:keyword, :plan, _} | rest],
         _line
       ) do
    {stmt, rest} = statement(rest)
    {{:explain_query_plan, stmt}, rest}
  end

  defp explain_statement(tokens, _line) do
    {stmt, rest} = statement(tokens)
    {{:explain, stmt}, rest}
  end

  defp pragma([{:keyword, :pragma, line} | rest]) do
    {schema, name, rest} = pragma_name(rest, line)
    name = String.downcase(name)

    {stmt, rest} =
      case name do
        "database_list" ->
          {arg, rest} = optional_pragma_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "collation_list" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "compile_options" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "function_list" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "module_list" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "pragma_list" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "encoding" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "foreign_keys" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "defer_foreign_keys" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "recursive_triggers" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "ignore_check_constraints" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        name
        when name in [
               "count_changes",
               "case_sensitive_like",
               "short_column_names",
               "full_column_names",
               "reverse_unordered_selects",
               "read_uncommitted",
               "query_only",
               "cell_size_check",
               "checkpoint_fullfsync",
               "empty_result_callbacks",
               "fullfsync",
               "trusted_schema"
             ] ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "automatic_index" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "auto_vacuum" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "incremental_vacuum" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "shrink_memory" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "mmap_size" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        name
        when name in [
               "analysis_limit",
               "busy_timeout",
               "hard_heap_limit",
               "soft_heap_limit",
               "threads"
             ] ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        name
        when name in [
               "cache_spill",
               "default_cache_size",
               "journal_size_limit",
               "max_page_count",
               "secure_delete",
               "wal_autocheckpoint"
             ] ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "wal_checkpoint" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "optimize" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "stats" ->
          {arg, rest} = optional_pragma_arg(rest)
          {%Pragma{name: name, arg: arg}, rest}

        "data_version" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "page_count" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        name when name in ["page_size", "cache_size"] ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        name when name in ["journal_mode", "locking_mode", "synchronous", "temp_store"] ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        name when name in ["schema_version", "user_version", "application_id"] ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "table_list" ->
          {arg, rest} = optional_pragma_identifier_arg(rest, line, name)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "freelist_count" ->
          {arg, rest} = optional_pragma_setting_arg(rest)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        "foreign_key_check" ->
          {arg, rest} = optional_pragma_identifier_arg(rest, line, name)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        name when name in ["integrity_check", "quick_check"] ->
          {arg, rest} = optional_pragma_integrity_arg(rest, line, name)
          {%Pragma{name: name, arg: arg}, rest}

        name
        when name in [
               "table_info",
               "table_xinfo",
               "foreign_key_list",
               "index_list",
               "index_info",
               "index_xinfo"
             ] ->
          {arg, rest} = optional_pragma_identifier_arg(rest, line, name)
          arg = pragma_schema_arg(schema, arg)
          {%Pragma{name: name, arg: arg}, rest}

        _ ->
          fail("unsupported PRAGMA: #{name}", line)
      end

    {schema_pragma(stmt, schema), rest}
  end

  defp pragma_name(tokens, _line) do
    {first, rest} = identifier(tokens)

    case rest do
      [{:dot, _, _} | rest] ->
        schema = String.downcase(first)

        {name, rest} = identifier(rest)
        {schema, name, rest}

      rest ->
        {nil, first, rest}
    end
  end

  defp pragma_schema_arg(nil, arg), do: arg
  defp pragma_schema_arg(schema, arg), do: {:schema, schema, arg}

  defp schema_pragma(stmt, nil), do: stmt
  defp schema_pragma(%Pragma{arg: {:schema, _schema, _arg}} = stmt, _outer_schema), do: stmt
  defp schema_pragma(%Pragma{arg: arg} = stmt, schema), do: %{stmt | arg: {:schema, schema, arg}}

  defp pragma_arg([{:lparen, _, _}, {:string, name, _}, {:rparen, _, _} | rest], _line, _name),
    do: {name, rest}

  defp pragma_arg([{:lparen, _, _} | rest], _line, _name) do
    {name, rest} = identifier(rest)
    {name, expect(rest, :rparen)}
  end

  defp pragma_arg([{:eq, _, _}, {:string, name, _} | rest], _line, _name), do: {name, rest}

  defp pragma_arg([{:eq, _, _} | rest], _line, _name), do: identifier(rest)

  defp pragma_arg(tokens, line, name),
    do: fail("PRAGMA #{name} requires an argument", line_of(tokens) || line)

  defp optional_pragma_arg([{:lparen, _, _} | rest]), do: {nil, skip_until_rparen(rest)}
  defp optional_pragma_arg([{:eq, _, _} | rest]), do: {nil, elem(expression(rest), 1)}
  defp optional_pragma_arg(tokens), do: {nil, tokens}

  defp optional_pragma_identifier_arg([{:lparen, _, _} | _] = tokens, line, name),
    do: pragma_arg(tokens, line, name)

  defp optional_pragma_identifier_arg([{:eq, _, _} | _] = tokens, line, name),
    do: pragma_arg(tokens, line, name)

  defp optional_pragma_identifier_arg(tokens, _line, _name), do: {nil, tokens}

  defp optional_pragma_integrity_arg(
         [{:lparen, _, _}, {:int, value, _}, {:rparen, _, _} | rest],
         _line,
         _name
       ),
       do: {value, rest}

  defp optional_pragma_integrity_arg(
         [{:lparen, _, _}, {:minus, _, _}, {:int, value, _}, {:rparen, _, _} | rest],
         _line,
         _name
       ),
       do: {-value, rest}

  defp optional_pragma_integrity_arg(
         [{:lparen, _, _}, {:string, name, _}, {:rparen, _, _} | rest],
         _line,
         _name
       ),
       do: {name, rest}

  defp optional_pragma_integrity_arg([{:lparen, _, _} | _] = tokens, line, name),
    do: pragma_arg(tokens, line, name)

  defp optional_pragma_integrity_arg([{:eq, _, _} | rest], _line, _name),
    do: pragma_setting_value(rest)

  defp optional_pragma_integrity_arg(tokens, _line, _name), do: {nil, tokens}

  defp optional_pragma_setting_arg([{:lparen, _, _} | rest]) do
    {value, rest} = pragma_setting_value(rest)
    {value, expect(rest, :rparen)}
  end

  defp optional_pragma_setting_arg([{:eq, _, _} | rest]), do: pragma_setting_value(rest)
  defp optional_pragma_setting_arg(tokens), do: {nil, tokens}

  defp pragma_setting_value([{:int, value, _} | rest]), do: {value, rest}
  defp pragma_setting_value([{:minus, _, _}, {:int, value, _} | rest]), do: {-value, rest}
  defp pragma_setting_value([{:string, value, _} | rest]), do: {value, rest}
  defp pragma_setting_value([{:id, value, _} | rest]), do: {value, rest}
  defp pragma_setting_value([{:keyword, value, _} | rest]), do: {Atom.to_string(value), rest}

  defp pragma_setting_value(tokens) do
    {expr, rest} = expression(tokens)
    {expr, rest}
  end

  # -- transactions ------------------------------------------------------------
  #
  # BEGIN [DEFERRED|IMMEDIATE|EXCLUSIVE] [TRANSACTION [name]] — the locking
  # mode and transaction name are accepted and ignored, as in SQLite.

  defp begin_statement(tokens) do
    tokens =
      case tokens do
        [{:id, mode, _} | rest] ->
          if String.downcase(mode) in ~w(deferred immediate exclusive), do: rest, else: tokens

        tokens ->
          tokens
      end

    {{:begin}, tokens |> skip_transaction_keyword() |> skip_transaction_name()}
  end

  defp commit_statement(tokens),
    do: {{:commit}, tokens |> skip_transaction_keyword() |> skip_transaction_name()}

  defp rollback_statement(tokens) do
    tokens = skip_transaction_keyword(tokens)

    case tokens do
      [{:keyword, :to, _} | rest] ->
        rest =
          case rest do
            [{:keyword, :savepoint, _} | rest] -> rest
            rest -> rest
          end

        {name, rest} = savepoint_name(rest)
        {{:rollback_to, name}, rest}

      tokens ->
        {{:rollback}, skip_transaction_name(tokens)}
    end
  end

  defp savepoint_statement(tokens) do
    {name, rest} = savepoint_name(tokens)
    {{:savepoint, name}, rest}
  end

  defp release_statement(tokens) do
    tokens =
      case tokens do
        [{:keyword, :savepoint, _} | rest] -> rest
        tokens -> tokens
      end

    {name, rest} = savepoint_name(tokens)
    {{:release, name}, rest}
  end

  defp skip_transaction_keyword([{:keyword, :transaction, _} | rest]), do: rest
  defp skip_transaction_keyword(tokens), do: tokens

  defp skip_transaction_name([{:id, _, _} | rest]), do: rest
  defp skip_transaction_name([{:string, _, _} | rest]), do: rest
  defp skip_transaction_name(tokens), do: tokens

  defp savepoint_name([{:id, name, _} | rest]), do: {name, rest}
  defp savepoint_name([{:string, name, _} | rest]), do: {name, rest}

  defp savepoint_name([{_, value, line} | _]),
    do: fail(~s(near "#{token_text(value)}": syntax error), line)

  defp savepoint_name([]), do: fail("expected a savepoint name, got end of input", nil)

  # -- WITH (CTEs) -----------------------------------------------------------
  #
  # WITH [RECURSIVE] name [(cols)] AS (query) [, ...] <select | insert | update | delete>

  defp with_statement([{:keyword, :with, _} | rest]) do
    {recursive, rest} =
      case rest do
        [{:keyword, :recursive, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {ctes, rest} = cte_defs(rest, [])

    # The body can be SELECT/VALUES/WITH or DML
    {query, rest} =
      case rest do
        [{:keyword, kw, _} | _] when kw in [:select, :values] -> select(rest)
        [{:keyword, :with, _} | _] -> with_statement(rest)
        [{:keyword, kw, _} | _] when kw in [:insert, :replace] -> insert(rest)
        [{:keyword, :update, _} | _] -> update(rest)
        [{:keyword, :delete, _} | _] -> delete(rest)
        [{_, value, line} | _] -> fail(~s(near "#{token_text(value)}": syntax error), line)
        [] -> fail("unexpected end of input after WITH clause", nil)
      end

    {%With{recursive: recursive, ctes: ctes, query: query}, rest}
  end

  defp cte_defs(tokens, acc) do
    {name, rest} = identifier(tokens)

    {columns, rest} =
      case rest do
        [{:lparen, _, _} | rest] ->
          {names, rest} = identifier_list(rest, [])
          {names, rest}

        rest ->
          {nil, rest}
      end

    rest = expect_keyword(rest, :as, line_of(rest))
    rest = expect(rest, :lparen)
    # CTE body can be SELECT, VALUES, or a nested WITH
    {query, rest} =
      case rest do
        [{:keyword, :with, _} | _] -> with_statement(rest)
        rest -> select(rest)
      end

    rest = expect(rest, :rparen)

    cte = %{name: name, columns: columns, query: query}

    case rest do
      [{:comma, _, _} | [{:keyword, value, line} | _]]
      when value in [:select, :values, :with, :insert, :update, :delete] ->
        # Trailing comma before the main query body is a syntax error
        fail(~s(near "#{token_text(value)}": syntax error), line)

      [{:comma, _, _} | rest] ->
        cte_defs(rest, [cte | acc])

      rest ->
        {Enum.reverse([cte | acc]), rest}
    end
  end

  # -- SELECT ----------------------------------------------------------------
  #
  # A select is one or more cores joined by compound operators; ORDER BY and
  # LIMIT come after the last core and apply to the whole compound.

  defp select(tokens) do
    {core, rest} = select_core(tokens)
    {stmt, rest} = compound_chain(core, rest)

    {order_by, rest} =
      case rest do
        [{:keyword, :order, _}, {:keyword, :by, _} | rest] -> order_terms(rest, [])
        rest -> {[], rest}
      end

    {limit, offset, rest} =
      case rest do
        [{:keyword, :limit, _} | rest] ->
          {limit, rest} = expression(rest)

          case rest do
            [{:keyword, :offset, _} | rest] ->
              {offset, rest} = expression(rest)
              {limit, offset, rest}

            # LIMIT x, y is SQLite shorthand for LIMIT y OFFSET x
            [{:comma, _, _} | rest] ->
              {real_limit, rest} = expression(rest)
              {real_limit, limit, rest}

            rest ->
              {limit, nil, rest}
          end

        rest ->
          {nil, nil, rest}
      end

    # Only the last component of a compound may carry ORDER BY / LIMIT.
    case compound_op(rest) do
      {op, _} when order_by != [] ->
        fail("ORDER BY clause should come after #{compound_name(op)} not before", line_of(rest))

      {op, _} when limit != nil ->
        fail("LIMIT clause should come after #{compound_name(op)} not before", line_of(rest))

      _ ->
        {%{stmt | order_by: order_by, limit: limit, offset: offset}, rest}
    end
  end

  defp compound_chain(left, tokens) do
    case compound_op(tokens) do
      {op, rest} ->
        {right, rest} = select_core(rest)
        compound_chain(%Compound{op: op, left: left, right: right}, rest)

      nil ->
        {left, tokens}
    end
  end

  defp compound_op([{:keyword, :union, _}, {:keyword, :all, _} | rest]), do: {:union_all, rest}
  defp compound_op([{:keyword, :union, _} | rest]), do: {:union, rest}
  defp compound_op([{:keyword, :intersect, _} | rest]), do: {:intersect, rest}
  defp compound_op([{:keyword, :except, _} | rest]), do: {:except, rest}
  defp compound_op(_tokens), do: nil

  defp compound_name(:union_all), do: "UNION ALL"
  defp compound_name(:union), do: "UNION"
  defp compound_name(:intersect), do: "INTERSECT"
  defp compound_name(:except), do: "EXCEPT"

  defp select_core([{:keyword, :values, _} | rest]) do
    {rows, rest} = value_rows(rest, [])
    {%Values{rows: rows}, rest}
  end

  defp select_core([{:keyword, :select, _} | rest]) do
    {distinct, rest} =
      case rest do
        [{:keyword, :distinct, _} | rest] -> {true, rest}
        [{:keyword, :all, _} | rest] -> {false, rest}
        rest -> {false, rest}
      end

    {columns, rest} = result_columns(rest, [])

    {from, rest} =
      case rest do
        [{:keyword, :from, _} | rest] -> from_clause(rest)
        rest -> {nil, rest}
      end

    {where, rest} = optional_where(rest)

    {group_by, rest} =
      case rest do
        [{:keyword, :group, line} | rest] ->
          rest = expect_keyword(rest, :by, line)
          expression_list(rest, [])

        rest ->
          {[], rest}
      end

    {having, rest} =
      case rest do
        [{:keyword, :having, _} | rest] -> expression(rest)
        rest -> {nil, rest}
      end

    {windows, rest} = optional_window_clause(rest)

    select = %Select{
      columns: columns,
      from: from,
      where: where,
      group_by: group_by,
      having: having,
      windows: windows,
      distinct: distinct
    }

    {select, rest}
  end

  defp optional_window_clause([{:keyword, :window, _} | rest]), do: window_defs(rest, %{})
  defp optional_window_clause(tokens), do: {%{}, tokens}

  defp window_defs(tokens, acc) do
    {name, rest} = identifier(tokens)
    rest = expect_keyword(rest, :as, line_of(rest))
    rest = expect(rest, :lparen)
    {spec, rest} = window_spec(rest)
    acc = Map.put(acc, String.downcase(name), spec)

    case rest do
      [{:comma, _, _} | rest] -> window_defs(rest, acc)
      rest -> {acc, rest}
    end
  end

  defp result_columns(tokens, acc) do
    {column, rest} =
      case tokens do
        [{:star, _, _} | rest] ->
          {:star, rest}

        [{:id, table, _}, {:dot, _, _}, {:star, _, _} | rest] ->
          {{:qualified_star, table}, rest}

        tokens ->
          {expr, rest} = expression(tokens)

          case rest do
            [{:keyword, :as, _}, {:string, alias_name, _} | rest] ->
              {{expr, alias_name}, rest}

            [{:keyword, :as, _} | rest] ->
              {alias_name, rest} = identifier(rest)
              {{expr, alias_name}, rest}

            [{:id, alias_name, _} | rest] ->
              {{expr, alias_name}, rest}

            rest ->
              {{expr, nil}, rest}
          end
      end

    case rest do
      [{:comma, _, _} | rest] -> result_columns(rest, [column | acc])
      rest -> {Enum.reverse([column | acc]), rest}
    end
  end

  defp order_terms(tokens, acc) do
    {expr, rest} = expression(tokens)

    {direction, rest} =
      case rest do
        [{:keyword, :asc, _} | rest] -> {:asc, rest}
        [{:keyword, :desc, _} | rest] -> {:desc, rest}
        rest -> {:asc, rest}
      end

    case rest do
      [{:comma, _, _} | rest] -> order_terms(rest, [{expr, direction} | acc])
      rest -> {Enum.reverse([{expr, direction} | acc]), rest}
    end
  end

  # -- FROM clause -------------------------------------------------------------
  #
  # from_clause := source (join source [ON expr | USING (cols)])*
  # A comma is a plain join; SQLite even allows ON/USING after it (legacy).

  @join_keywords [:join, :natural, :left, :right, :full, :outer, :inner, :cross]

  defguardp is_join(source) when elem(source, 0) == :join

  defp from_clause(tokens) do
    {source, rest} = single_source(tokens)
    join_chain(source, rest)
  end

  defp join_chain(left, tokens) do
    case tokens do
      [{:comma, _, _} | rest] ->
        type = %{natural: false, left: false, right: false}
        {right, rest} = single_source(rest)
        {constraint, rest} = join_constraint(rest, type)
        join_chain({:join, type, left, right, constraint}, rest)

      [{:keyword, kw, _} | _] when kw in @join_keywords ->
        {type, rest} = join_type(tokens, [])
        {right, rest} = single_source(rest)
        {constraint, rest} = join_constraint(rest, type)
        join_chain({:join, type, left, right, constraint}, rest)

      # A USING/ON with no preceding JOIN gets SQLite's dedicated message; one
      # following an already-constrained join is a plain syntax error.
      [{:keyword, :using, line} | _] when not is_join(left) ->
        fail("a JOIN clause is required before USING", line)

      [{:keyword, :on, line} | _] when not is_join(left) ->
        fail("a JOIN clause is required before ON", line)

      tokens ->
        {left, tokens}
    end
  end

  defp join_type([{:keyword, :join, line} | rest], words),
    do: {validate_join_type(Enum.reverse(words), line), rest}

  defp join_type([{:keyword, kw, _} | rest], words) when kw in @join_keywords,
    do: join_type(rest, [kw | words])

  defp join_type([{_, value, line} | _], _words),
    do: fail(~s(near "#{token_text(value)}": syntax error), line)

  defp join_type([], _words), do: fail("unexpected end of input in JOIN", nil)

  # Mirrors sqlite3JoinType (select.c): each keyword contributes flags and
  # contradictory combinations are rejected with the original message.
  defp validate_join_type(words, line) do
    flags = Enum.flat_map(words, &join_word_flags/1)
    natural = :natural in flags
    outer = :outer in flags
    sided = :left in flags or :right in flags

    cond do
      (:inner in flags and outer) or (outer and not sided) ->
        unknown = words |> Enum.map_join(" ", &(&1 |> Atom.to_string() |> String.upcase()))
        fail("unknown join type: #{unknown}", line)

      true ->
        %{natural: natural, left: :left in flags, right: :right in flags}
    end
  end

  defp join_word_flags(:natural), do: [:natural]
  defp join_word_flags(:left), do: [:left, :outer]
  defp join_word_flags(:right), do: [:right, :outer]
  defp join_word_flags(:full), do: [:left, :right, :outer]
  defp join_word_flags(:outer), do: [:outer]
  defp join_word_flags(:inner), do: [:inner]
  defp join_word_flags(:cross), do: [:inner]

  defp join_constraint(tokens, type) do
    case tokens do
      [{:keyword, :on, line} | rest] ->
        if type.natural, do: fail("a NATURAL join may not have an ON or USING clause", line)
        {expr, rest} = expression(rest)
        {{:on, expr}, rest}

      [{:keyword, :using, line} | rest] ->
        if type.natural, do: fail("a NATURAL join may not have an ON or USING clause", line)
        rest = expect(rest, :lparen)
        {names, rest} = identifier_list(rest, [])
        {{:using, names}, rest}

      tokens ->
        {nil, tokens}
    end
  end

  defp single_source([{:lparen, _, _} | [{:keyword, kw, _} | _] = rest])
       when kw in [:select, :values, :with] do
    {query, rest} = statement(rest)
    rest = expect(rest, :rparen)
    {alias_name, rest} = optional_alias(rest)
    {{:subquery, query, alias_name}, rest}
  end

  defp single_source([{:lparen, _, _} | rest]) do
    {source, rest} = from_clause(rest)
    rest = expect(rest, :rparen)
    {alias_name, rest} = optional_alias(rest)
    {{:grouped, source, alias_name}, rest}
  end

  # A table-valued function: `json_each(expr [, path])` and friends.
  defp single_source([{:id, name, _}, {:lparen, _, _} | rest]) do
    {args, rest} =
      case rest do
        [{:rparen, _, _} | rest] ->
          {[], rest}

        rest ->
          {args, rest} = expression_list(rest, [])
          {args, expect(rest, :rparen)}
      end

    {alias_name, rest} = optional_alias(rest)
    {{:table_function, String.downcase(name), args, alias_name}, rest}
  end

  defp single_source(tokens) do
    {schema, name, rest} = qualified_schema_identifier(tokens)
    {alias_name, rest} = optional_alias(rest)
    table_name = if(schema, do: {:schema, schema, name}, else: name)
    {{:table, table_name, alias_name}, rest}
  end

  defp optional_alias([{:keyword, :as, _}, {:id, name, _} | rest]), do: {name, rest}
  defp optional_alias([{:keyword, :as, _}, {:string, name, _} | rest]), do: {name, rest}

  defp optional_alias([{:keyword, :as, _}, {:keyword, name, _} | rest]),
    do: {Atom.to_string(name), rest}

  defp optional_alias([{:keyword, :as, _}, {_, value, line} | _]),
    do: fail(~s(near "#{token_text(value)}": syntax error), line)

  defp optional_alias([{:id, name, _} | rest]), do: {name, rest}
  defp optional_alias(tokens), do: {nil, tokens}

  # -- CREATE / DROP dispatch ---------------------------------------------------

  defp create_stmt([{:keyword, :create, line} | rest] = tokens) do
    case rest do
      [{:keyword, :unique, _} | _] ->
        create_index(tokens)

      [{:keyword, :index, _} | _] ->
        create_index(tokens)

      [{:keyword, :view, _} | rest] ->
        create_view_body(rest, line)

      [{:id, word, _} | rest2] ->
        case String.downcase(word) do
          "trigger" ->
            create_trigger_body(rest2, line)

          temp when temp in ["temp", "temporary"] ->
            case rest2 do
              [{:keyword, :table, _} | rest3] ->
                create_table_body(rest3, line, "temp")

              [{:keyword, :view, _} | rest3] ->
                create_view_body(rest3, line, "temp")

              [{:id, w, _} | rest3] ->
                if String.downcase(w) == "trigger" do
                  create_trigger_body(rest3, line, true)
                else
                  fail("expected TABLE, VIEW, or TRIGGER after CREATE TEMP", line)
                end

              _ ->
                fail("expected TABLE, VIEW, or TRIGGER after CREATE TEMP", line)
            end

          _ ->
            rest = expect_keyword(rest, :table, line)
            create_table_body(rest, line)
        end

      rest ->
        rest = expect_keyword(rest, :table, line)
        create_table_body(rest, line)
    end
  end

  defp drop_stmt([{:keyword, :drop, line} | rest] = tokens) do
    case rest do
      [{:keyword, :index, _} | _] ->
        drop_index(tokens)

      [{:keyword, :view, _} | rest] ->
        drop_view_body(rest)

      [{:id, word, _} | rest2] ->
        if String.downcase(word) == "trigger" do
          drop_trigger_body(rest2)
        else
          rest = expect_keyword(rest, :table, line)
          drop_table_body(rest)
        end

      rest ->
        rest = expect_keyword(rest, :table, line)
        drop_table_body(rest)
    end
  end

  # -- CREATE TRIGGER / DROP TRIGGER -----------------------------------------------

  defp create_trigger_body(tokens, line, temp? \\ false) do
    {if_not_exists, rest} =
      case tokens do
        [{:keyword, :if, _}, {:keyword, :not, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)

    cond do
      temp? and schema != nil ->
        fail("temporary trigger may not have qualified name", line)

      true ->
        :ok
    end

    {timing, rest} =
      case rest do
        [{:id, word, _} | rest2] ->
          case String.downcase(word) do
            "before" ->
              {:before, rest2}

            "after" ->
              {:after, rest2}

            "instead" ->
              case rest2 do
                [{:id, of, _} | rest3] ->
                  if String.downcase(of) == "of" do
                    {:instead_of, rest3}
                  else
                    fail("expected OF after INSTEAD", line)
                  end

                _ ->
                  fail("expected OF after INSTEAD", line)
              end

            _ ->
              {:before, rest}
          end

        rest ->
          {:before, rest}
      end

    {event, update_columns, rest} =
      case rest do
        [{:keyword, :delete, _} | rest] ->
          {:delete, nil, rest}

        [{:keyword, :insert, _} | rest] ->
          {:insert, nil, rest}

        [{:keyword, :update, _}, {:id, of, _} | rest2] = tokens2 ->
          if String.downcase(of) == "of" do
            {names, rest3} = identifier_names(rest2, [])
            {:update, Enum.map(names, &String.downcase/1), rest3}
          else
            {:update, nil, tl(tokens2)}
          end

        [{:keyword, :update, _} | rest] ->
          {:update, nil, rest}

        _ ->
          fail("expected DELETE, INSERT, or UPDATE in CREATE TRIGGER", line)
      end

    rest = expect_keyword(rest, :on, line)
    {table_schema, table, rest} = qualified_schema_identifier(rest)

    rest = skip_for_each_row(rest)

    {when_expr, rest} =
      case rest do
        [{:keyword, :when, _} | rest] -> expression(rest)
        rest -> {nil, rest}
      end

    rest = expect_keyword(rest, :begin, line)
    {body, rest} = trigger_statements(rest, [])

    {%CreateTrigger{
       name: name,
       schema: schema || if(temp?, do: "temp", else: nil),
       if_not_exists: if_not_exists,
       timing: timing,
       event: event,
       update_columns: update_columns,
       table: table,
       table_schema: table_schema,
       when: when_expr,
       body: body
     }, rest}
  end

  defp qualified_schema_identifier(tokens) do
    {first, rest} = identifier(tokens)

    case rest do
      [{:dot, _, _} | rest] ->
        {name, rest} = identifier(rest)
        {String.downcase(first), name, rest}

      rest ->
        {nil, first, rest}
    end
  end

  # A bare comma-separated identifier list with no closing paren.
  defp identifier_names(tokens, acc) do
    case tokens do
      [{:id, name, _} | rest] ->
        identifier_names_rest(rest, [name | acc])

      [{:keyword, kw, _} | rest]
      when kw in [:add, :alter, :check, :column, :constraint, :rename] ->
        identifier_names_rest(rest, [Atom.to_string(kw) | acc])

      [{_, value, line} | _] ->
        fail(~s(near "#{token_text(value)}": syntax error), line)

      [] ->
        fail("unexpected end of input in identifier list", nil)
    end
  end

  defp identifier_names_rest(tokens, acc) do
    case tokens do
      [{:comma, _, _} | rest] -> identifier_names(rest, acc)
      rest -> {Enum.reverse(acc), rest}
    end
  end

  defp skip_for_each_row([for_token, each_token, row_token | rest] = tokens) do
    cond do
      token_word?(for_token, "for") and token_word?(each_token, "each") and
          token_word?(row_token, "row") ->
        rest

      token_word?(for_token, "for") and token_word?(each_token, "each") ->
        {_, value, line} = row_token
        fail(~s(near "#{token_text(value)}": syntax error), line)

      token_word?(for_token, "for") ->
        {_, value, line} = each_token
        fail(~s(near "#{token_text(value)}": syntax error), line)

      true ->
        tokens
    end
  end

  defp skip_for_each_row([for_token, each_token] = tokens) do
    cond do
      token_word?(for_token, "for") and token_word?(each_token, "each") ->
        fail(~s(near "BEGIN": syntax error), elem(each_token, 2))

      token_word?(for_token, "for") ->
        {_, value, line} = each_token
        fail(~s(near "#{token_text(value)}": syntax error), line)

      true ->
        tokens
    end
  end

  defp skip_for_each_row([for_token] = tokens) do
    if token_word?(for_token, "for") do
      fail(~s(near "END": syntax error), elem(for_token, 2))
    else
      tokens
    end
  end

  defp skip_for_each_row(tokens), do: tokens

  defp token_word?({:keyword, word, _line}, text), do: Atom.to_string(word) == text
  defp token_word?({:id, word, _line}, text), do: String.downcase(word) == text
  defp token_word?(_token, _text), do: false

  defp trigger_statements([{:keyword, :end, _} | rest], acc), do: {Enum.reverse(acc), rest}

  defp trigger_statements([], _acc),
    do: fail("unexpected end of input in trigger body", nil)

  defp trigger_statements(tokens, acc) do
    {stmt, rest} = statement(tokens)
    rest = expect(rest, :semicolon)
    trigger_statements(rest, [stmt | acc])
  end

  defp drop_trigger_body(tokens) do
    {if_exists, rest} =
      case tokens do
        [{:keyword, :if, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)

    {{:drop_trigger, schema, name, if_exists}, rest}
  end

  # -- CREATE TABLE / CREATE VIEW ------------------------------------------------

  defp create_table_body(rest, line, temp_schema \\ nil) do
    {if_not_exists, rest} =
      case rest do
        [{:keyword, :if, _}, {:keyword, :not, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)
    schema = create_temp_schema!(schema, temp_schema, line)

    case rest do
      [{:keyword, :as, _} | rest] ->
        {query, rest} = query_statement(rest)

        {%CreateTable{
           name: name,
           schema: schema,
           if_not_exists: if_not_exists,
           query: query
         }, rest}

      rest ->
        rest = expect(rest, :lparen)
        {columns, table_constraints, rest} = column_defs(rest, [])
        {without_rowid, strict, rest} = table_options(rest, false, false)

        {%CreateTable{
           name: name,
           schema: schema,
           columns: columns,
           if_not_exists: if_not_exists,
           constraints: table_constraints,
           without_rowid: without_rowid,
           strict: strict
         }, rest}
    end
  end

  defp query_statement([{:keyword, :with, _} | _] = tokens), do: with_statement(tokens)

  defp query_statement([{:keyword, keyword, _} | _] = tokens)
       when keyword in [:select, :values],
       do: select(tokens)

  defp query_statement([{_, value, line} | _]),
    do: fail("expected SELECT, VALUES, or WITH after AS, got #{inspect(value)}", line)

  defp query_statement([]), do: fail("expected SELECT, VALUES, or WITH after AS", nil)

  defp table_options(
         [{:keyword, :without, _line}, {:keyword, :rowid, _} | rest],
         _without,
         strict
       ) do
    rest = skip_option_comma(rest)
    table_options(rest, true, strict)
  end

  defp table_options([{:keyword, :without, _line}, {:id, rowid, _} | rest], _without, strict)
       when rowid in ["ROWID", "rowid"] do
    rest = skip_option_comma(rest)
    table_options(rest, true, strict)
  end

  defp table_options([{:keyword, :strict, _} | rest], without, _strict) do
    rest = skip_option_comma(rest)
    table_options(rest, without, true)
  end

  defp table_options(tokens, without, strict), do: {without, strict, tokens}

  defp skip_option_comma([{:comma, _, _} | rest]), do: rest
  defp skip_option_comma(tokens), do: tokens

  defp create_view_body(rest, line, temp_schema \\ nil) do
    {if_not_exists, rest} =
      case rest do
        [{:keyword, :if, _}, {:keyword, :not, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)
    schema = create_temp_schema!(schema, temp_schema, line)

    {columns, rest} =
      case rest do
        [{:lparen, _, _} | rest] ->
          {names, rest} = identifier_list(rest, [])
          {names, rest}

        rest ->
          {nil, rest}
      end

    rest = expect_keyword(rest, :as, line_of(rest))

    {query, rest} =
      case rest do
        [{:keyword, :with, _} | _] -> with_statement(rest)
        rest -> select(rest)
      end

    {%CreateView{
       name: name,
       schema: schema,
       columns: columns,
       query: query,
       if_not_exists: if_not_exists
     }, rest}
  end

  defp create_temp_schema!(schema, nil, _line), do: schema
  defp create_temp_schema!(schema, "temp", _line) when schema in [nil, "temp"], do: "temp"

  defp create_temp_schema!(_schema, "temp", line),
    do: fail("temporary table name must be unqualified", line)

  defp column_defs(tokens, col_acc, constraint_acc \\ []) do
    # Check if this entry is a table-level constraint
    case table_constraint_start?(tokens) do
      true ->
        {constraint, rest} = table_constraint(tokens)

        case rest do
          [{:comma, _, _} | rest] ->
            column_defs(rest, col_acc, [constraint | constraint_acc])

          [{:rparen, _, _} | rest] ->
            {Enum.reverse(col_acc), Enum.reverse([constraint | constraint_acc]), rest}

          [{_, value, line} | _] ->
            fail("expected ',' or ')' after constraint, got #{inspect(value)}", line)

          [] ->
            fail("unexpected end of input in column list", nil)
        end

      false ->
        {name, rest} = identifier(tokens)
        {declared_type, rest} = type_name(rest, [])

        {column, rest} =
          column_constraints(%ColumnDef{name: name} |> put_type(declared_type), rest)

        case rest do
          [{:comma, _, _} | rest] ->
            column_defs(rest, [column | col_acc], constraint_acc)

          [{:rparen, _, _} | rest] ->
            {Enum.reverse([column | col_acc]), Enum.reverse(constraint_acc), rest}

          [{_, value, line} | _] ->
            fail("expected ',' or ')' in column list, got #{inspect(value)}", line)

          [] ->
            fail("unexpected end of input in column list", nil)
        end
    end
  end

  # Returns true if the token stream starts a table-level constraint
  # (i.e. CONSTRAINT, PRIMARY KEY, UNIQUE, CHECK, FOREIGN KEY at the table level)
  defp table_constraint_start?([{:keyword, :constraint, _} | _]), do: true
  defp table_constraint_start?([{:keyword, :primary, _} | _]), do: true
  defp table_constraint_start?([{:keyword, :unique, _} | _]), do: true
  defp table_constraint_start?([{:keyword, :check, _} | _]), do: true
  defp table_constraint_start?([{:keyword, :foreign, _} | _]), do: true
  defp table_constraint_start?(_), do: false

  # Parses a single table-level constraint
  defp table_constraint(tokens) do
    # Optional CONSTRAINT name prefix
    {cname, tokens} =
      case tokens do
        [{:keyword, :constraint, _} | rest] ->
          {name, rest} = identifier(rest)
          {name, rest}

        tokens ->
          {nil, tokens}
      end

    case tokens do
      [{:keyword, :primary, line} | rest] ->
        rest = expect_keyword(rest, :key, line)
        rest = expect(rest, :lparen)
        {col_names, rest} = identifier_list(rest, [])
        col_keys = Enum.map(col_names, &String.downcase/1)
        rest = skip_trailing_constraint_name(rest)
        {{:primary_key, cname, col_keys}, rest}

      [{:keyword, :unique, _} | rest] ->
        rest = expect(rest, :lparen)
        {col_names, rest} = identifier_list(rest, [])
        col_keys = Enum.map(col_names, &String.downcase/1)
        rest = skip_trailing_constraint_name(rest)
        {{:unique, cname, col_keys}, rest}

      [{:keyword, :check, _} | rest] ->
        rest = expect(rest, :lparen)
        {expr, rest} = expression(rest)
        rest = expect(rest, :rparen)
        rest = skip_trailing_constraint_name(rest)
        {{:check, cname, expr}, rest}

      [{:keyword, :foreign, line} | rest] ->
        rest = expect_keyword(rest, :key, line)
        rest = expect(rest, :lparen)
        {col_names, rest} = identifier_list(rest, [])
        rest = expect_keyword(rest, :references, line)
        {{ref_table, ref_columns, actions}, rest} = references_clause(rest)
        col_keys = Enum.map(col_names, &String.downcase/1)
        ref_keys = Enum.map(ref_columns, &String.downcase/1)
        rest = skip_trailing_constraint_name(rest)
        {{:foreign_key, cname, col_keys, ref_table, ref_keys, actions}, rest}

      [{_, value, line} | _] ->
        fail(
          "expected PRIMARY KEY, UNIQUE, CHECK, or FOREIGN KEY in table constraint, got #{inspect(value)}",
          line
        )

      [] ->
        fail("unexpected end of input in table constraint", nil)
    end
  end

  # SQLite's undocumented behavior: a CONSTRAINT name clause may follow a
  # constraint definition. It is silently ignored.
  defp skip_trailing_constraint_name([{:keyword, :constraint, _} | rest]) do
    {_name, rest} = identifier(rest)
    skip_trailing_constraint_name(rest)
  end

  defp skip_trailing_constraint_name(tokens), do: tokens

  # A type name is any run of identifiers/type keywords, e.g. `VARCHAR (255)`
  # or `UNSIGNED BIG INT`. SQLite accepts nearly anything here and derives an
  # affinity from the text.
  defp type_name([{:id, word, _} | rest], acc), do: type_name(rest, [word | acc])

  # Extend keyword list for type names: many keywords (except constraint starters)
  # can appear in type declarations like "UNSIGNED BIG INT".
  @type_keywords [:integer, :text, :real, :blob, :null]
  defp type_name([{:keyword, kw, _} | rest], acc) when kw in @type_keywords,
    do: type_name(rest, [Atom.to_string(kw) | acc])

  defp type_name([{:lparen, _, _} | rest] = tokens, acc) do
    case acc do
      # `(255)` size suffix only valid after a type word
      [] ->
        {nil, tokens}

      [prev | tail] ->
        # Preserve the size suffix on the type, e.g. `VARCHAR(5)`/`DECIMAL(10,2)`,
        # exactly as SQLite stores it in PRAGMA table_info.
        {suffix, rest} = type_size_suffix(rest, ["("])
        type_name(rest, [prev <> suffix | tail])
    end
  end

  defp type_name(tokens, []), do: {nil, tokens}
  defp type_name(tokens, acc), do: {acc |> Enum.reverse() |> Enum.join(" "), tokens}

  defp type_size_suffix([{:rparen, _, _} | rest], acc),
    do: {acc |> Enum.reverse([")"]) |> Enum.join(), rest}

  defp type_size_suffix([{:int, n, _} | rest], acc),
    do: type_size_suffix(rest, [Integer.to_string(n) | acc])

  defp type_size_suffix([{:comma, _, _} | rest], acc), do: type_size_suffix(rest, ["," | acc])
  defp type_size_suffix([{:plus, _, _} | rest], acc), do: type_size_suffix(rest, ["+" | acc])
  defp type_size_suffix([{:minus, _, _} | rest], acc), do: type_size_suffix(rest, ["-" | acc])
  defp type_size_suffix([_other | rest], acc), do: type_size_suffix(rest, acc)
  defp type_size_suffix([], _acc), do: fail("unterminated type declaration", nil)

  defp skip_until_rparen([{:rparen, _, _} | rest]), do: rest
  defp skip_until_rparen([_ | rest]), do: skip_until_rparen(rest)
  defp skip_until_rparen([]), do: fail("unterminated type declaration", nil)

  defp put_type(column, nil), do: %{column | declared_type: nil, affinity: :blob}

  defp put_type(column, declared_type),
    do: %{column | declared_type: declared_type, affinity: affinity(declared_type)}

  # SQLite's affinity rules, in rule order (https://sqlite.org/datatype3.html §3.1)
  defp affinity(declared_type) do
    upper = String.upcase(declared_type)

    cond do
      String.contains?(upper, "INT") -> :integer
      String.contains?(upper, ["CHAR", "CLOB", "TEXT"]) -> :text
      String.contains?(upper, "BLOB") -> :blob
      String.contains?(upper, ["REAL", "FLOA", "DOUB"]) -> :real
      true -> :numeric
    end
  end

  defp column_constraints(column, tokens) do
    case tokens do
      [{:keyword, :constraint, _} | rest] ->
        # CONSTRAINT name — parse the name, then the actual constraint
        {cname, rest} = identifier(rest)
        column_constraint_with_name(column, cname, rest)

      [{:keyword, :primary, line} | rest] ->
        rest = expect_keyword(rest, :key, line)
        column_constraints(%{column | primary_key: true}, rest)

      [{:keyword, :autoincrement, _} | rest] ->
        column_constraints(%{column | autoincrement: true}, rest)

      [{:keyword, :not, line} | rest] ->
        rest = expect_keyword(rest, :null, line)
        column_constraints(%{column | not_null: true}, rest)

      [{:keyword, :unique, _} | rest] ->
        column_constraints(%{column | unique: true}, rest)

      [{:keyword, :default, _} | rest] ->
        {expr, rest} = default_expression(rest)
        column_constraints(%{column | default: expr}, rest)

      [{:keyword, :collate, _} | rest] ->
        {name, rest} = identifier(rest)
        column_constraints(%{column | collate: name}, rest)

      [{:keyword, :references, _} | rest] ->
        {references, rest} = references_clause(rest)
        column_constraints(%{column | references: references}, rest)

      [{:keyword, :generated, _}, {:keyword, :always, _}, {:keyword, :as, _} | rest] ->
        {generated, rest} = generated_clause(rest)
        column_constraints(%{column | generated: generated}, rest)

      [{:keyword, :as, _} | rest] ->
        {generated, rest} = generated_clause(rest)
        column_constraints(%{column | generated: generated}, rest)

      [{:keyword, :check, _} | rest] ->
        rest = expect(rest, :lparen)
        {expr, rest} = expression(rest)
        rest = expect(rest, :rparen)
        column_constraints(%{column | check: expr}, rest)

      tokens ->
        {column, tokens}
    end
  end

  # After seeing CONSTRAINT name, parse the actual constraint keyword
  defp column_constraint_with_name(column, cname, tokens) do
    case tokens do
      [{:keyword, :check, _} | rest] ->
        rest = expect(rest, :lparen)
        {expr, rest} = expression(rest)
        rest = expect(rest, :rparen)
        column_constraints(%{column | check: expr, check_name: cname}, rest)

      [{:keyword, :primary, line} | rest] ->
        rest = expect_keyword(rest, :key, line)
        column_constraints(%{column | primary_key: true}, rest)

      [{:keyword, :autoincrement, _} | rest] ->
        column_constraints(%{column | autoincrement: true}, rest)

      [{:keyword, :not, line} | rest] ->
        rest = expect_keyword(rest, :null, line)
        column_constraints(%{column | not_null: true}, rest)

      [{:keyword, :unique, _} | rest] ->
        column_constraints(%{column | unique: true}, rest)

      [{:keyword, :default, _} | rest] ->
        {expr, rest} = default_expression(rest)
        column_constraints(%{column | default: expr}, rest)

      [{:keyword, :references, _} | rest] ->
        {references, rest} = references_clause(rest)
        column_constraints(%{column | references: references}, rest)

      [{:keyword, :generated, _}, {:keyword, :always, _}, {:keyword, :as, _} | rest] ->
        {generated, rest} = generated_clause(rest)
        column_constraints(%{column | generated: generated}, rest)

      [{:keyword, :as, _} | rest] ->
        {generated, rest} = generated_clause(rest)
        column_constraints(%{column | generated: generated}, rest)

      # CONSTRAINT name with no following keyword — trailing constraint name (SQLite quirk)
      tokens ->
        column_constraints(column, tokens)
    end
  end

  @fk_default_actions %{on_delete: :no_action, on_update: :no_action, deferred: false}

  defp references_clause(tokens) do
    {table, rest} = identifier(tokens)

    {columns, rest} =
      case rest do
        [{:lparen, _, _} | rest] -> identifier_list(rest, [])
        rest -> {[], rest}
      end

    {actions, rest} = fk_clause_tail(rest, @fk_default_actions)
    {{table, columns, actions}, rest}
  end

  # ON DELETE/UPDATE actions, MATCH (parsed and ignored, as in SQLite), and
  # the deferrable clause. Only DEFERRABLE INITIALLY DEFERRED makes the
  # constraint deferred; every other combination is immediate.
  defp fk_clause_tail(tokens, actions) do
    case tokens do
      [{:keyword, :on, line}, {:keyword, :delete, _} | rest] ->
        {action, rest} = fk_action(rest, line)
        fk_clause_tail(rest, %{actions | on_delete: action})

      [{:keyword, :on, line}, {:keyword, :update, _} | rest] ->
        {action, rest} = fk_action(rest, line)
        fk_clause_tail(rest, %{actions | on_update: action})

      [{:keyword, :match, _} | rest] ->
        {_name, rest} = identifier(rest)
        fk_clause_tail(rest, actions)

      [{:keyword, :not, _}, {:id, word, _} | rest] ->
        if String.downcase(word) == "deferrable" do
          fk_clause_tail(skip_initially_clause(rest), %{actions | deferred: false})
        else
          {actions, tokens}
        end

      [{:id, word, _} | rest] ->
        if String.downcase(word) == "deferrable" do
          case rest do
            [{:id, initially, _}, {:id, mode, _} | rest2] ->
              if String.downcase(initially) == "initially" and
                   String.downcase(mode) == "deferred" do
                fk_clause_tail(rest2, %{actions | deferred: true})
              else
                fk_clause_tail(skip_initially_clause(rest), %{actions | deferred: false})
              end

            rest ->
              fk_clause_tail(rest, %{actions | deferred: false})
          end
        else
          {actions, tokens}
        end

      tokens ->
        {actions, tokens}
    end
  end

  defp skip_initially_clause([{:id, initially, _}, {:id, _mode, _} | rest] = tokens) do
    if String.downcase(initially) == "initially", do: rest, else: tokens
  end

  defp skip_initially_clause(tokens), do: tokens

  defp fk_action(tokens, line) do
    case tokens do
      [{:keyword, :set, _}, {:keyword, :null, _} | rest] ->
        {:set_null, rest}

      [{:keyword, :set, _}, {:keyword, :default, _} | rest] ->
        {:set_default, rest}

      [{:id, word, _} | rest] ->
        case String.downcase(word) do
          "cascade" ->
            {:cascade, rest}

          "restrict" ->
            {:restrict, rest}

          "no" ->
            case rest do
              [{:id, action, _} | rest2] ->
                if String.downcase(action) == "action" do
                  {:no_action, rest2}
                else
                  fail("unknown foreign key action: NO #{action}", line)
                end

              _ ->
                fail("unknown foreign key action: NO", line)
            end

          other ->
            fail("unknown foreign key action: #{other}", line)
        end

      _ ->
        fail("expected a foreign key action after ON DELETE/UPDATE", line)
    end
  end

  defp generated_clause(tokens) do
    rest = expect(tokens, :lparen)
    {expr, rest} = expression(rest)
    rest = expect(rest, :rparen)

    case rest do
      [{:keyword, :stored, _} | rest] -> {{:stored, expr}, rest}
      [{:keyword, :virtual, _} | rest] -> {{:virtual, expr}, rest}
      rest -> {{:virtual, expr}, rest}
    end
  end

  # -- INSERT -------------------------------------------------------------------

  @conflict_actions [:replace, :ignore, :abort, :fail, :rollback]

  defp insert([{:keyword, :replace, line} | rest]) do
    rest = expect_keyword(rest, :into, line)
    insert_body(rest, :replace)
  end

  defp insert([{:keyword, :insert, line} | rest]) do
    {or_conflict, rest} = or_conflict(rest)
    rest = expect_keyword(rest, :into, line)
    insert_body(rest, or_conflict)
  end

  defp or_conflict([{:keyword, :or, _}, {:keyword, action, _} | rest])
       when action in @conflict_actions,
       do: {action, rest}

  defp or_conflict(tokens), do: {nil, tokens}

  defp insert_body(tokens, or_conflict) do
    {schema, table, target_qualified, rest} = dml_target(tokens)

    {columns, rest} =
      case rest do
        [{:lparen, _, _} | rest] ->
          {names, rest} = identifier_list(rest, [])
          {names, rest}

        rest ->
          {nil, rest}
      end

    {source, rest} =
      case rest do
        [{:keyword, :default, dline} | rest] ->
          {:default_values, expect_keyword(rest, :values, dline)}

        [{:keyword, :values, _} | rest] ->
          {rows, rest} = value_rows(rest, [])
          {{:values, rows}, rest}

        [{:keyword, :select, _} | _] = rest ->
          {query, rest} = select(rest)
          {{:select, query}, rest}

        [{_, value, line} | _] ->
          fail(~s(near "#{token_text(value)}": syntax error), line)

        [] ->
          fail("unexpected end of input in INSERT", nil)
      end

    {upsert, rest} = optional_upserts(rest, [])
    {returning, rest} = optional_returning(rest)

    {%Insert{
       table: table,
       schema: schema,
       target_qualified: target_qualified,
       columns: columns,
       source: source,
       or_conflict: or_conflict,
       upsert: upsert,
       returning: returning
     }, rest}
  end

  # ON CONFLICT clauses chain; a clause without a conflict target swallows
  # every conflict, so it may only appear last (a syntax error otherwise,
  # as in SQLite's grammar).
  defp optional_upserts(tokens, acc) do
    case optional_upsert(tokens) do
      {nil, rest} ->
        {Enum.reverse(acc), rest}

      {clause, rest} ->
        catch_all? = elem(clause, 1) == nil

        case rest do
          [{:keyword, :on, line}, {:keyword, :conflict, _} | _] when catch_all? ->
            fail(~s(near "ON": syntax error), line)

          rest ->
            optional_upserts(rest, [clause | acc])
        end
    end
  end

  defp optional_upsert([{:keyword, :on, line}, {:keyword, :conflict, _} | rest]) do
    # Conflict target: `(columns) [WHERE predicate]`. The predicate selects a
    # partial unique index, as in upsert-clause syntax.
    {target, rest} =
      case rest do
        [{:lparen, _, _} | rest] ->
          {names, rest} = identifier_list(rest, [])
          {target_where, rest} = optional_where(rest)
          {{Enum.map(names, &String.downcase/1), target_where}, rest}

        rest ->
          {nil, rest}
      end

    rest = expect_keyword(rest, :do, line)

    case rest do
      [{:keyword, :nothing, _} | rest] ->
        {{:nothing, target}, rest}

      [{:keyword, :update, _update_line}, {:keyword, :set, _} | rest] ->
        {assignments, rest} = assignments(rest, [])
        {where, rest} = optional_where(rest)
        {{:update, target, assignments, where}, rest}

      [{_, value, update_line} | _] ->
        fail(~s(near "#{token_text(value)}": syntax error), update_line)

      [] ->
        fail("unexpected end of input in UPSERT", line)
    end
  end

  defp optional_upsert(tokens), do: {nil, tokens}

  defp identifier_list(tokens, acc) do
    {name, rest} = identifier(tokens)

    case rest do
      [{:comma, _, _} | rest] -> identifier_list(rest, [name | acc])
      [{:rparen, _, _} | rest] -> {Enum.reverse([name | acc]), rest}
      [{_, value, line} | _] -> fail("expected ',' or ')', got #{inspect(value)}", line)
      [] -> fail("unexpected end of input in identifier list", nil)
    end
  end

  defp value_rows(tokens, acc) do
    rest = expect(tokens, :lparen)
    {exprs, rest} = expression_list(rest, [])
    rest = expect(rest, :rparen)

    case rest do
      [{:comma, _, _} | rest] -> value_rows(rest, [exprs | acc])
      rest -> {Enum.reverse([exprs | acc]), rest}
    end
  end

  defp expression_list(tokens, acc) do
    {expr, rest} = expression(tokens)

    case rest do
      [{:comma, _, _} | rest] -> expression_list(rest, [expr | acc])
      rest -> {Enum.reverse([expr | acc]), rest}
    end
  end

  # -- UPDATE / DELETE / DROP ------------------------------------------------------

  defp update([{:keyword, :update, line} | rest]) do
    {or_conflict, rest} = or_conflict(rest)
    {schema, table, target_qualified, rest} = dml_target(rest)
    {index_hint, rest} = optional_dml_index_hint(rest)
    rest = expect_keyword(rest, :set, line)
    {assignments, rest} = assignments(rest, [])
    {from, rest} = optional_update_from(rest)
    {where, rest} = optional_where(rest)
    {returning, rest} = optional_returning(rest)
    {order_by, limit, offset, rest} = optional_dml_order_limit(rest)

    {%Update{
       table: table,
       schema: schema,
       target_qualified: target_qualified,
       index_hint: index_hint,
       assignments: assignments,
       from: from,
       where: where,
       or_conflict: or_conflict,
       returning: returning,
       order_by: order_by,
       limit: limit,
       offset: offset
     }, rest}
  end

  defp optional_update_from([{:keyword, :from, _} | rest]), do: from_clause(rest)
  defp optional_update_from(tokens), do: {nil, tokens}

  defp assignments(tokens, acc) do
    {name, rest} = identifier(tokens)
    rest = expect(rest, :eq)
    {expr, rest} = expression(rest)

    case rest do
      [{:comma, _, _} | rest] -> assignments(rest, [{name, expr} | acc])
      rest -> {Enum.reverse([{name, expr} | acc]), rest}
    end
  end

  defp delete([{:keyword, :delete, line} | rest]) do
    rest = expect_keyword(rest, :from, line)
    {schema, table, target_qualified, rest} = dml_target(rest)
    {index_hint, rest} = optional_dml_index_hint(rest)
    {where, rest} = optional_where(rest)
    {returning, rest} = optional_returning(rest)
    {order_by, limit, offset, rest} = optional_dml_order_limit(rest)

    {%Delete{
       table: table,
       schema: schema,
       target_qualified: target_qualified,
       index_hint: index_hint,
       where: where,
       returning: returning,
       order_by: order_by,
       limit: limit,
       offset: offset
     }, rest}
  end

  defp dml_target(tokens) do
    {first, rest} = identifier(tokens)

    case rest do
      [{:dot, _, _} | rest] ->
        {table, rest} = identifier(rest)
        {first, table, true, rest}

      rest ->
        {nil, first, false, rest}
    end
  end

  defp optional_dml_index_hint([{:keyword, :not, not_line}, {:id, indexed, indexed_line} | rest]) do
    if String.downcase(indexed) == "indexed" do
      {:not_indexed, rest}
    else
      {nil, [{:keyword, :not, not_line}, {:id, indexed, indexed_line} | rest]}
    end
  end

  defp optional_dml_index_hint([{:id, indexed, line}, {:keyword, :by, _} | rest]) do
    if String.downcase(indexed) == "indexed" do
      {name, rest} = identifier(rest)
      {{:indexed_by, name}, rest}
    else
      fail(~s(near "#{token_text(indexed)}": syntax error), line)
    end
  end

  defp optional_dml_index_hint(tokens), do: {nil, tokens}

  defp optional_returning([{:keyword, :returning, _} | rest]), do: result_columns(rest, [])
  defp optional_returning(tokens), do: {[], tokens}

  defp optional_dml_order_limit(tokens) do
    {order_by, rest} =
      case tokens do
        [{:keyword, :order, _}, {:keyword, :by, _} | rest] -> order_terms(rest, [])
        rest -> {[], rest}
      end

    {limit, offset, rest} = optional_limit(rest)
    {order_by, limit, offset, rest}
  end

  defp optional_limit([{:keyword, :limit, _} | rest]) do
    {limit, rest} = expression(rest)

    case rest do
      [{:keyword, :offset, _} | rest] ->
        {offset, rest} = expression(rest)
        {limit, offset, rest}

      [{:comma, _, _} | rest] ->
        {real_limit, rest} = expression(rest)
        {real_limit, limit, rest}

      rest ->
        {limit, nil, rest}
    end
  end

  defp optional_limit(tokens), do: {nil, nil, tokens}

  defp drop_table_body(rest) do
    {if_exists, rest} =
      case rest do
        [{:keyword, :if, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)
    {%DropTable{name: name, schema: schema, if_exists: if_exists}, rest}
  end

  # -- CREATE INDEX ------------------------------------------------------------
  #
  # CREATE [UNIQUE] INDEX [IF NOT EXISTS] name ON table(col [ASC|DESC], ...)
  # Expression indexes (any non-column expression) are not supported.

  defp create_index([{:keyword, :create, line} | rest]) do
    {unique, rest} =
      case rest do
        [{:keyword, :unique, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    rest = expect_keyword(rest, :index, line)

    {if_not_exists, rest} =
      case rest do
        [{:keyword, :if, _}, {:keyword, :not, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)
    rest = expect_keyword(rest, :on, line)
    {table, rest} = identifier(rest)
    rest = expect(rest, :lparen)
    {columns, rest} = index_columns(rest, [])
    {where, rest} = optional_index_where(rest)

    {%CreateIndex{
       name: name,
       schema: schema,
       table: table,
       columns: columns,
       unique: unique,
       if_not_exists: if_not_exists,
       where: where
     }, rest}
  end

  defp optional_index_where([{:keyword, :where, _} | rest]), do: expression(rest)
  defp optional_index_where(tokens), do: {nil, tokens}

  # An indexed column is an arbitrary expression; a bare (possibly collated)
  # column reference stays a plain column member while preserving the
  # member-level collation for uniqueness checks.
  defp index_columns(tokens, acc) do
    {expr, rest} = expression(tokens)

    {direction, rest} =
      case rest do
        [{:keyword, :asc, _} | rest] -> {:asc, rest}
        [{:keyword, :desc, _} | rest] -> {:desc, rest}
        rest -> {:asc, rest}
      end

    column =
      case expr do
        {:column, nil, name} ->
          %{name: name, expr: nil, direction: direction, collate: nil}

        {:collate, {:column, nil, name}, collation} ->
          %{name: name, expr: nil, direction: direction, collate: collation}

        expr ->
          %{name: nil, expr: expr, direction: direction, collate: expression_collation(expr)}
      end

    case rest do
      [{:comma, _, _} | rest] ->
        index_columns(rest, [column | acc])

      [{:rparen, _, _} | rest] ->
        {Enum.reverse([column | acc]), rest}

      [{_, value, line} | _] ->
        fail("expected ',' or ')' in index columns, got #{inspect(value)}", line)

      [] ->
        fail("unexpected end of input in index columns", nil)
    end
  end

  defp expression_collation({:collate, _expr, collation}), do: collation
  defp expression_collation(_expr), do: nil

  # -- DROP INDEX --------------------------------------------------------------

  defp drop_index([{:keyword, :drop, line} | rest]) do
    rest = expect_keyword(rest, :index, line)

    {if_exists, rest} =
      case rest do
        [{:keyword, :if, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)
    {%DropIndex{name: name, schema: schema, if_exists: if_exists}, rest}
  end

  # -- DROP VIEW ---------------------------------------------------------------

  defp drop_view_body(rest) do
    {if_exists, rest} =
      case rest do
        [{:keyword, :if, _}, {:keyword, :exists, _} | rest] -> {true, rest}
        rest -> {false, rest}
      end

    {schema, name, rest} = qualified_schema_identifier(rest)
    {%DropView{name: name, schema: schema, if_exists: if_exists}, rest}
  end

  # -- ALTER TABLE ----------------------------------------------------------------
  #
  # ALTER TABLE name RENAME TO new_name
  # ALTER TABLE name RENAME [COLUMN] col_name TO new_col_name
  # ALTER TABLE name ADD [COLUMN] column-def
  # ALTER TABLE name DROP [COLUMN] col_name

  defp alter_table([{:keyword, :alter, line} | rest]) do
    rest = expect_keyword(rest, :table, line)
    {schema, name, rest} = qualified_schema_identifier(rest)

    case rest do
      [{:keyword, :rename, _} | rest] ->
        alter_rename(schema, name, rest)

      [{:keyword, :add, _} | rest] ->
        # Skip optional COLUMN keyword
        rest =
          case rest do
            [{:keyword, :column, _} | rest] -> rest
            rest -> rest
          end

        {col_name, rest} = identifier(rest)
        {declared_type, rest} = type_name(rest, [])

        {column, rest} =
          column_constraints(%ColumnDef{name: col_name} |> put_type(declared_type), rest)

        {%AlterTable{name: name, schema: schema, op: {:add_column, column}}, rest}

      [{:keyword, :drop, _} | rest] ->
        # Skip optional COLUMN keyword
        rest =
          case rest do
            [{:keyword, :column, _} | rest] -> rest
            rest -> rest
          end

        {col_name, rest} = identifier(rest)
        {%AlterTable{name: name, schema: schema, op: {:drop_column, col_name}}, rest}

      [{_, value, line} | _] ->
        fail("expected RENAME, ADD, or DROP in ALTER TABLE, got #{inspect(value)}", line)

      [] ->
        fail("unexpected end of input in ALTER TABLE", nil)
    end
  end

  defp alter_rename(schema, table_name, tokens) do
    case tokens do
      [{:keyword, :to, _} | rest] ->
        {new_name, rest} = identifier(rest)
        {%AlterTable{name: table_name, schema: schema, op: {:rename_table, new_name}}, rest}

      [{:keyword, :column, _} | rest] ->
        {old_name, rest} = identifier(rest)
        rest = expect_keyword(rest, :to, nil)
        {new_name, rest} = identifier(rest)

        {%AlterTable{name: table_name, schema: schema, op: {:rename_column, old_name, new_name}},
         rest}

      tokens ->
        # RENAME col_name TO new_name (without COLUMN keyword)
        {old_name, rest} = identifier(tokens)
        rest = expect_keyword(rest, :to, nil)
        {new_name, rest} = identifier(rest)

        {%AlterTable{name: table_name, schema: schema, op: {:rename_column, old_name, new_name}},
         rest}
    end
  end

  defp optional_where([{:keyword, :where, _} | rest]), do: expression(rest)
  defp optional_where(tokens), do: {nil, tokens}

  # A DEFAULT value is a literal, a signed number, or a bare word.
  defp default_expression([{:minus, _, _} | rest]) do
    {expr, rest} = primary_expression(rest)
    {{:negate, expr}, rest}
  end

  defp default_expression([{:plus, _, _} | rest]), do: primary_expression(rest)
  defp default_expression(tokens), do: primary_expression(tokens)

  # -- expressions ---------------------------------------------------------------
  #
  # Precedence climbing, lowest first: OR < AND < NOT < comparison/IS/IN/LIKE/
  # BETWEEN < relational < additive < multiplicative < || < COLLATE < unary < primary.

  @doc false
  def expression(tokens), do: or_expr(tokens)

  defp or_expr(tokens) do
    {left, rest} = and_expr(tokens)
    or_expr_rest(left, rest)
  end

  defp or_expr_rest(left, [{:keyword, :or, _} | rest]) do
    {right, rest} = and_expr(rest)
    or_expr_rest({:binary, :or, left, right}, rest)
  end

  defp or_expr_rest(left, tokens), do: {left, tokens}

  defp and_expr(tokens) do
    {left, rest} = not_expr(tokens)
    and_expr_rest(left, rest)
  end

  defp and_expr_rest(left, [{:keyword, :and, _} | rest]) do
    {right, rest} = not_expr(rest)
    and_expr_rest({:binary, :and, left, right}, rest)
  end

  defp and_expr_rest(left, tokens), do: {left, tokens}

  defp not_expr([{:keyword, :not, _} | rest]) do
    {expr, rest} = not_expr(rest)
    {{:not, expr}, rest}
  end

  defp not_expr(tokens), do: comparison(tokens)

  defp comparison(tokens) do
    {left, rest} = relational(tokens)
    comparison_rest(left, rest)
  end

  defp comparison_rest(left, tokens) do
    case tokens do
      [{:eq, _, _} | rest] ->
        {right, rest} = relational(rest)
        comparison_rest({:binary, :eq, left, right}, rest)

      [{:ne, _, _} | rest] ->
        {right, rest} = relational(rest)
        comparison_rest({:binary, :ne, left, right}, rest)

      [
        {:keyword, :is, _},
        {:keyword, :not, _},
        {:keyword, :distinct, _},
        {:keyword, :from, _} | rest
      ] ->
        {right, rest} = relational(rest)
        comparison_rest({:is, left, right}, rest)

      [{:keyword, :is, _}, {:keyword, :distinct, _}, {:keyword, :from, _} | rest] ->
        {right, rest} = relational(rest)
        comparison_rest({:is_not, left, right}, rest)

      [{:keyword, :is, _}, {:keyword, :not, _} | rest] ->
        {right, rest} = relational(rest)
        comparison_rest({:is_not, left, right}, rest)

      [{:keyword, :is, _} | rest] ->
        {right, rest} = relational(rest)
        comparison_rest({:is, left, right}, rest)

      [{:keyword, :not, _}, {:keyword, :null, _} | rest] ->
        comparison_rest({:is_not, left, {:literal, nil}}, rest)

      [{:id, word, _} | rest] ->
        case String.downcase(word) do
          "notnull" -> comparison_rest({:is_not, left, {:literal, nil}}, rest)
          "isnull" -> comparison_rest({:is, left, {:literal, nil}}, rest)
          _other -> {left, tokens}
        end

      [{:keyword, :in, _} | rest] ->
        in_expr(left, false, rest)

      [{:keyword, :not, _}, {:keyword, :in, _} | rest] ->
        in_expr(left, true, rest)

      [{:keyword, :like, _} | rest] ->
        {pattern, rest} = relational(rest)
        {escape, rest} = optional_escape(rest)
        comparison_rest({:like, left, pattern, escape, false}, rest)

      [{:keyword, :not, _}, {:keyword, :like, _} | rest] ->
        {pattern, rest} = relational(rest)
        {escape, rest} = optional_escape(rest)
        comparison_rest({:like, left, pattern, escape, true}, rest)

      [{:keyword, :glob, _} | rest] ->
        {pattern, rest} = relational(rest)
        comparison_rest({:glob, left, pattern, false}, rest)

      [{:keyword, :not, _}, {:keyword, :glob, _} | rest] ->
        {pattern, rest} = relational(rest)
        comparison_rest({:glob, left, pattern, true}, rest)

      [{:keyword, :regexp, _} | rest] ->
        {pattern, rest} = relational(rest)
        comparison_rest({:regexp, left, pattern, false}, rest)

      [{:keyword, :not, _}, {:keyword, :regexp, _} | rest] ->
        {pattern, rest} = relational(rest)
        comparison_rest({:regexp, left, pattern, true}, rest)

      [{:keyword, :match, _} | rest] ->
        {pattern, rest} = relational(rest)
        comparison_rest({:match, left, pattern, false}, rest)

      [{:keyword, :not, _}, {:keyword, :match, _} | rest] ->
        {pattern, rest} = relational(rest)
        comparison_rest({:match, left, pattern, true}, rest)

      [{:keyword, :between, _} | rest] ->
        between(left, false, rest)

      [{:keyword, :not, _}, {:keyword, :between, _} | rest] ->
        between(left, true, rest)

      tokens ->
        {left, tokens}
    end
  end

  defp optional_escape([{:keyword, :escape, _} | rest]), do: relational(rest)
  defp optional_escape(tokens), do: {nil, tokens}

  defp in_expr(left, negated, [{:lparen, _, _} | rest]) do
    case rest do
      [{:keyword, kw, _} | _] when kw in [:select, :values, :with] ->
        {query, rest} = statement(rest)
        comparison_rest({:in, left, {:select, query}, negated}, expect(rest, :rparen))

      [{:rparen, _, _} | rest] ->
        comparison_rest({:in, left, [], negated}, rest)

      rest ->
        {exprs, rest} = expression_list(rest, [])
        comparison_rest({:in, left, exprs, negated}, expect(rest, :rparen))
    end
  end

  defp in_expr(left, negated, tokens) do
    {name, rest} = identifier(tokens)
    query = %Select{columns: [:star], from: {:table, name, nil}}
    comparison_rest({:in, left, {:select, query}, negated}, rest)
  end

  defp between(left, negated, tokens) do
    {low, rest} = relational(tokens)

    case rest do
      [{:keyword, :and, _} | rest] ->
        {high, rest} = relational(rest)
        comparison_rest({:between, left, low, high, negated}, rest)

      [{_, value, line} | _] ->
        fail("expected AND in BETWEEN, got #{inspect(value)}", line)

      [] ->
        fail("unexpected end of input in BETWEEN", nil)
    end
  end

  defp relational(tokens) do
    {left, rest} = bitwise(tokens)
    relational_rest(left, rest)
  end

  defp relational_rest(left, [{op, _, _} | rest]) when op in [:lt, :le, :gt, :ge] do
    {right, rest} = bitwise(rest)
    relational_rest({:binary, op, left, right}, rest)
  end

  defp relational_rest(left, tokens), do: {left, tokens}

  # << >> & | share one precedence level in SQLite, between + - and < <=.
  defp bitwise(tokens) do
    {left, rest} = additive(tokens)
    bitwise_rest(left, rest)
  end

  defp bitwise_rest(left, [{op, _, _} | rest]) when op in [:shl, :shr, :bitand, :bitor] do
    {right, rest} = additive(rest)
    bitwise_rest({:binary, op, left, right}, rest)
  end

  defp bitwise_rest(left, tokens), do: {left, tokens}

  defp additive(tokens) do
    {left, rest} = multiplicative(tokens)
    additive_rest(left, rest)
  end

  defp additive_rest(left, [{:plus, _, _} | rest]) do
    {right, rest} = multiplicative(rest)
    additive_rest({:binary, :add, left, right}, rest)
  end

  defp additive_rest(left, [{:minus, _, _} | rest]) do
    {right, rest} = multiplicative(rest)
    additive_rest({:binary, :sub, left, right}, rest)
  end

  defp additive_rest(left, tokens), do: {left, tokens}

  defp multiplicative(tokens) do
    {left, rest} = concat_expr(tokens)
    multiplicative_rest(left, rest)
  end

  defp multiplicative_rest(left, [{op_token, _, _} | rest])
       when op_token in [:star, :slash, :percent] do
    op = %{star: :mul, slash: :div, percent: :mod}[op_token]
    {right, rest} = concat_expr(rest)
    multiplicative_rest({:binary, op, left, right}, rest)
  end

  defp multiplicative_rest(left, tokens), do: {left, tokens}

  defp concat_expr(tokens) do
    {left, rest} = collate_expr(tokens)
    concat_rest(left, rest)
  end

  defp concat_rest(left, [{:concat, _, _} | rest]) do
    {right, rest} = collate_expr(rest)
    concat_rest({:binary, :concat, left, right}, rest)
  end

  # The JSON extraction operators bind like `||`, left-associative.
  defp concat_rest(left, [{:arrow, _, _} | rest]) do
    {right, rest} = collate_expr(rest)
    concat_rest({:json_arrow, left, right}, rest)
  end

  defp concat_rest(left, [{:arrow_text, _, _} | rest]) do
    {right, rest} = collate_expr(rest)
    concat_rest({:json_arrow_text, left, right}, rest)
  end

  defp concat_rest(left, tokens), do: {left, tokens}

  defp collate_expr(tokens) do
    {expr, rest} = unary(tokens)
    collate_expr_rest(expr, rest)
  end

  defp collate_expr_rest(expr, [{:keyword, :collate, _} | rest]) do
    {name, rest} = identifier(rest)
    collate_expr_rest({:collate, expr, name}, rest)
  end

  defp collate_expr_rest(expr, rest), do: {expr, rest}

  # A minus sign directly on the out-of-range constant 9223372036854775808
  # yields the smallest 64-bit integer, as in SQLite's grammar; the bare
  # constant alone becomes a REAL (see primary_expression).
  defp unary([{:minus, _, _}, {:int, 9_223_372_036_854_775_808, _} | rest]) do
    {{:literal, -9_223_372_036_854_775_808}, rest}
  end

  defp unary([{:minus, _, _} | rest]) do
    {expr, rest} = unary(rest)
    {{:negate, expr}, rest}
  end

  defp unary([{:tilde, _, _} | rest]) do
    {expr, rest} = unary(rest)
    {{:bitnot, expr}, rest}
  end

  defp unary([{:plus, _, _} | rest]), do: unary(rest)
  defp unary(tokens), do: primary_expression(rest_or_fail(tokens))

  defp rest_or_fail([]), do: fail("unexpected end of input in expression", nil)
  defp rest_or_fail(tokens), do: tokens

  # Bind parameter forms: `?` (anonymous), `?NNN` (numbered, 1..32766, as in
  # SQLITE_MAX_VARIABLE_NUMBER), `:name` / `@name` / `$name` (named; the
  # sigil is part of the name, as in sqlite3_bind_parameter_name). Indexes
  # are assigned by the tokenizer in source order.
  defp validate_param!("?" <> digits, line) when digits != "" do
    n = String.to_integer(digits)

    if n < 1 or n > 32_766 do
      fail("variable number must be between ?1 and ?32766", line)
    end

    :ok
  end

  defp validate_param!(_raw, _line), do: :ok

  # Integer constants beyond the 64-bit range become REAL, as in SQLite.
  defp int_literal(n) when n > 9_223_372_036_854_775_807, do: n * 1.0
  defp int_literal(n), do: n

  defp primary_expression(tokens) do
    case tokens do
      [{:int, n, _} | rest] ->
        {{:literal, int_literal(n)}, rest}

      [{:float, f, _} | rest] ->
        {{:literal, f}, rest}

      [{:string, s, _} | rest] ->
        {{:literal, s}, rest}

      [{:blob, b, _} | rest] ->
        {{:literal, {:blob, b}}, rest}

      [{:placeholder, {index, raw}, line} | rest] ->
        validate_param!(raw, line)
        {{:param, index, raw}, rest}

      [{:keyword, :null, _} | rest] ->
        {{:literal, nil}, rest}

      [{:keyword, true, _} | rest] ->
        {{:literal, 1}, rest}

      [{:keyword, false, _} | rest] ->
        {{:literal, 0}, rest}

      [{:keyword, :case, _} | rest] ->
        case_expr(rest)

      [{:keyword, :cast, line}, {:lparen, _, _} | rest] ->
        {expr, rest} = expression(rest)
        rest = expect_keyword(rest, :as, line)
        {type, rest} = type_name(rest, [])
        if type == nil, do: fail("expected a type name in CAST", line_of(rest))
        {{:cast, expr, affinity(type)}, expect(rest, :rparen)}

      [{:keyword, :exists, _}, {:lparen, _, _} | [{:keyword, kw, _} | _] = rest]
      when kw in [:select, :values, :with] ->
        {query, rest} = statement(rest)
        {{:exists, query}, expect(rest, :rparen)}

      [{:id, name, line}, {:lparen, _, _} | rest] ->
        if String.downcase(name) == "raise" do
          raise_expression(rest, line)
        else
          function_call(name, rest)
        end

      # Some keywords double as function names.
      [{:keyword, kw, _}, {:lparen, _, _} | rest]
      when kw in [:like, :glob, :regexp, :match, :replace] ->
        function_call(Atom.to_string(kw), rest)

      [{:keyword, table, _}, {:dot, _, _}, {:id, column, _} | rest] ->
        {{:column, Atom.to_string(table), column}, rest}

      # KEY is one of SQLite's non-reserved keywords: usable as a bare
      # column reference (json_each exposes a `key` column).
      [{:keyword, :key, _} | rest] ->
        {{:column, nil, "key"}, rest}

      [{:id, table, _}, {:dot, _, _}, {:keyword, :key, _} | rest] ->
        {{:column, table, "key"}, rest}

      [{:id, table, _}, {:dot, _, _}, {:id, column, _} | rest] ->
        {{:column, table, column}, rest}

      [{:id, name, _} | rest] ->
        {{:column, nil, name}, rest}

      [{:lparen, _, _} | [{:keyword, kw, _} | _] = rest] when kw in [:select, :values, :with] ->
        {query, rest} = statement(rest)
        {{:subquery, query}, expect(rest, :rparen)}

      [{:lparen, _, _} | rest] ->
        {expr, rest} = expression(rest)
        {expr, expect(rest, :rparen)}

      [{_, value, line} | _] ->
        fail(~s(near "#{token_text(value)}": syntax error), line)

      [] ->
        fail("unexpected end of input in expression", nil)
    end
  end

  # RAISE(IGNORE) or RAISE(ABORT|ROLLBACK|FAIL, message) — only meaningful
  # inside trigger programs.
  defp raise_expression(tokens, line) do
    case tokens do
      [{:keyword, :ignore, _}, {:rparen, _, _} | rest] ->
        {{:raise, :ignore}, rest}

      [{:keyword, kw, _}, {:comma, _, _} | rest] when kw in [:abort, :rollback, :fail] ->
        {message, rest} = expression(rest)
        {{:raise, kw, message}, expect(rest, :rparen)}

      _ ->
        fail("expected IGNORE, ABORT, ROLLBACK, or FAIL in RAISE()", line)
    end
  end

  defp function_call(name, tokens) do
    case tokens do
      [{:rparen, _, _} | rest] ->
        function_or_window(name, [], rest)

      [{:star, _, _}, {:rparen, _, _} | rest] ->
        function_or_window(name, :star, rest)

      [{:keyword, :distinct, _} | rest] ->
        {args, rest} = expression_list(rest, [])
        rest = expect(rest, :rparen)
        function_or_window(name, {:distinct, args}, rest)

      # `ALL` is the default aggregate quantifier (the opposite of DISTINCT), so
      # `AVG(ALL x)` is equivalent to `AVG(x)`. Accept and discard it.
      [{:keyword, :all, _} | rest] ->
        {args, rest} = expression_list(rest, [])
        rest = expect(rest, :rparen)
        function_or_window(name, args, rest)

      tokens ->
        {args, rest} = expression_list(tokens, [])
        rest = expect(rest, :rparen)
        function_or_window(name, args, rest)
    end
  end

  defp function_or_window(name, args, tokens) do
    {filter, rest} = optional_filter(tokens)
    function_or_window(name, args, filter, rest)
  end

  defp function_or_window(name, args, filter, [{:keyword, :over, _}, {:lparen, _, _} | rest]) do
    {spec, rest} = window_spec(rest)
    {{:window, String.downcase(name), args, spec, filter}, rest}
  end

  defp function_or_window(name, args, filter, [{:keyword, :over, _} | rest]) do
    {window_name, rest} = identifier(rest)
    {{:window, String.downcase(name), args, {:ref, String.downcase(window_name)}, filter}, rest}
  end

  defp function_or_window(name, args, nil, rest),
    do: {{:function, String.downcase(name), args}, rest}

  defp function_or_window(name, args, filter, rest),
    do: {{:filter_function, String.downcase(name), args, filter}, rest}

  defp optional_filter([{:keyword, :filter, line}, {:lparen, _, _} | rest]) do
    rest = expect_keyword(rest, :where, line)
    {expr, rest} = expression(rest)
    {expr, expect(rest, :rparen)}
  end

  defp optional_filter(tokens), do: {nil, tokens}

  defp window_spec(tokens) do
    {partition_by, rest} =
      case tokens do
        [{:keyword, :partition, _line}, {:keyword, :by, _} | rest] ->
          {exprs, rest} = expression_list(rest, [])
          {exprs, rest}

        [{:keyword, :partition, line} | _] ->
          fail(~s(near "#{token_text(:partition)}": syntax error), line)

        rest ->
          {[], rest}
      end

    {order_by, rest} =
      case rest do
        [{:keyword, :order, _}, {:keyword, :by, _} | rest] -> order_terms(rest, [])
        rest -> {[], rest}
      end

    {frame, rest} = window_frame(rest)
    {%{partition_by: partition_by, order_by: order_by, frame: frame}, expect(rest, :rparen)}
  end

  defp window_frame([{:keyword, kw, _} | rest]) when kw in [:range, :rows] do
    {bounds, rest} = frame_bounds(rest)
    {exclude, rest} = frame_exclude(rest)
    {%{unit: kw, start: elem(bounds, 0), finish: elem(bounds, 1), exclude: exclude}, rest}
  end

  # GROUPS is not a reserved word in SQLite, so it arrives as an identifier.
  defp window_frame([{:id, word, _} | rest] = tokens) do
    if String.downcase(word) == "groups" do
      {bounds, rest} = frame_bounds(rest)
      {exclude, rest} = frame_exclude(rest)
      {%{unit: :groups, start: elem(bounds, 0), finish: elem(bounds, 1), exclude: exclude}, rest}
    else
      {nil, tokens}
    end
  end

  defp window_frame(tokens), do: {nil, tokens}

  defp frame_exclude([{:id, word, line} | rest] = tokens) do
    if String.downcase(word) == "exclude" do
      case rest do
        [{:keyword, :no, _}, {:id, others, _} | rest] ->
          if String.downcase(others) == "others" do
            {:no_others, rest}
          else
            fail(~s(near "#{others}": syntax error), line)
          end

        [{:id, no, _}, {:id, others, _} | rest2] ->
          if String.downcase(no) == "no" and String.downcase(others) == "others" do
            {:no_others, rest2}
          else
            fail(~s(near "#{no}": syntax error), line)
          end

        [{:keyword, :current, _}, {:keyword, :row, _} | rest] ->
          {:current_row, rest}

        [{:keyword, :group, _} | rest] ->
          {:group, rest}

        [{:id, ties, _} | rest2] ->
          if String.downcase(ties) == "ties" do
            {:ties, rest2}
          else
            fail(~s(near "#{ties}": syntax error), line)
          end

        _ ->
          fail("unexpected tokens after EXCLUDE", line)
      end
    else
      {:no_others, tokens}
    end
  end

  defp frame_exclude(tokens), do: {:no_others, tokens}

  defp frame_bounds([{:keyword, :between, _} | rest]) do
    {start, rest} = frame_boundary(rest)
    rest = expect_keyword(rest, :and, line_of(rest))
    {finish, rest} = frame_boundary(rest)
    {{start, finish}, rest}
  end

  defp frame_bounds(tokens) do
    {start, rest} = frame_boundary(tokens)
    {{start, :current_row}, rest}
  end

  defp frame_boundary([{:keyword, :unbounded, line} | rest]) do
    case rest do
      [{:keyword, :preceding, _} | rest] -> {:unbounded_preceding, rest}
      [{:keyword, :following, _} | rest] -> {:unbounded_following, rest}
      _ -> expect_keyword(rest, :preceding, line)
    end
  end

  defp frame_boundary([{:keyword, :current, line} | rest]) do
    {:current_row, expect_keyword(rest, :row, line)}
  end

  defp frame_boundary(tokens) do
    {expr, rest} = expression(tokens)

    case rest do
      [{:keyword, :preceding, _} | rest] -> {{:preceding, expr}, rest}
      [{:keyword, :following, _} | rest] -> {{:following, expr}, rest}
      [{_, value, line} | _] -> fail(~s(near "#{token_text(value)}": syntax error), line)
      [] -> fail("unexpected end of input in window frame", nil)
    end
  end

  defp case_expr(tokens) do
    {operand, rest} =
      case tokens do
        [{:keyword, :when, _} | _] -> {nil, tokens}
        tokens -> expression(tokens)
      end

    {branches, rest} = case_branches(rest, [])

    {else_expr, rest} =
      case rest do
        [{:keyword, :else, _} | rest] -> expression(rest)
        rest -> {nil, rest}
      end

    case rest do
      [{:keyword, :end, _} | rest] -> {{:case, operand, branches, else_expr}, rest}
      [{_, value, line} | _] -> fail("expected END in CASE, got #{inspect(value)}", line)
      [] -> fail("unexpected end of input in CASE", nil)
    end
  end

  defp case_branches([{:keyword, :when, _} | rest], acc) do
    {when_expr, rest} = expression(rest)

    case rest do
      [{:keyword, :then, _} | rest] ->
        {then_expr, rest} = expression(rest)
        case_branches(rest, [{when_expr, then_expr} | acc])

      [{_, value, line} | _] ->
        fail("expected THEN in CASE, got #{inspect(value)}", line)

      [] ->
        fail("unexpected end of input in CASE", nil)
    end
  end

  defp case_branches(tokens, []), do: fail("CASE requires at least one WHEN", line_of(tokens))
  defp case_branches(tokens, acc), do: {Enum.reverse(acc), tokens}

  # -- token helpers -----------------------------------------------------------------

  defp identifier([{:id, name, _} | rest]), do: {name, rest}
  defp identifier([{:string, name, _} | rest]), do: {name, rest}

  # Some keywords are "soft" — they can also be used as column/table names.
  # This is important for ALTER TABLE where newly-added keywords like ADD,
  # COLUMN, RENAME, ALTER, CHECK, CONSTRAINT must not break existing schemas
  # whose tables use them as identifiers.
  @soft_keywords ~w(add alter check column constraint key rename)a
  defp identifier([{:keyword, kw, _} | rest]) when kw in @soft_keywords,
    do: {Atom.to_string(kw), rest}

  defp identifier([{_, value, line} | _]),
    do: fail("expected an identifier, got #{inspect(value)}", line)

  defp identifier([]), do: fail("expected an identifier, got end of input", nil)

  defp expect([{type, _, _} | rest], type), do: rest

  defp expect([{_, value, line} | _], type),
    do: fail("expected #{expected_name(type)}, got #{inspect(value)}", line)

  defp expect([], type), do: fail("expected #{expected_name(type)}, got end of input", nil)

  defp expect_keyword([{:keyword, keyword, _} | rest], keyword, _line), do: rest

  defp expect_keyword([{_, value, line} | _], keyword, _line),
    do:
      fail(
        "expected #{keyword |> Atom.to_string() |> String.upcase()}, got #{inspect(value)}",
        line
      )

  defp expect_keyword([], keyword, line),
    do: fail("expected #{keyword |> Atom.to_string() |> String.upcase()}, got end of input", line)

  defp expected_name(:lparen), do: "'('"
  defp expected_name(:rparen), do: "')'"
  defp expected_name(:eq), do: "'='"
  defp expected_name(type), do: "'#{type}'"

  defp line_of([{_, _, line} | _]), do: line
  defp line_of([]), do: nil

  # Renders a token's value the way SQLite's `near "X": syntax error` does.
  defp token_text(value) when is_atom(value), do: value |> Atom.to_string() |> String.upcase()
  defp token_text(value) when is_binary(value), do: value
  defp token_text(value), do: to_string(value)

  @spec fail(String.t(), pos_integer() | nil) :: no_return()
  defp fail(message, line), do: raise(Error, message: message, line: line)
end