Skip to main content

lib/ex_sql/value.ex

defmodule ExSQL.Value do
  @moduledoc """
  SQLite's value semantics: storage classes, type affinity, comparison
  ordering, arithmetic, and three-valued logic.

  Values map onto Elixir terms directly:

  | storage class | Elixir term       |
  |---------------|-------------------|
  | `NULL`        | `nil`             |
  | `INTEGER`     | `integer()`       |
  | `REAL`        | `float()`         |
  | `TEXT`        | `binary()`        |
  | `BLOB`        | `{:blob, binary}` |

  Internally, JSON-producing functions may temporarily use `{:json, binary}`
  to model SQLite's JSON subtype. It behaves as TEXT for ordinary SQL
  semantics and is stripped before public result/storage boundaries.

  Comparison follows SQLite's cross-class ordering: NULL < numeric values
  (INTEGER and REAL compare numerically with each other) < TEXT < BLOB.
  Boolean logic is three-valued — `nil` means "unknown" and propagates per
  the SQL standard truth tables.
  """

  @type t :: nil | integer() | float() | binary() | {:blob, binary()} | {:json, binary()}

  # -- storage classes -----------------------------------------------------

  @doc "Returns the storage class of a value, as `typeof()` reports it."
  @spec type_of(t()) :: :null | :integer | :real | :text | :blob
  def type_of(nil), do: :null
  def type_of(v) when is_integer(v), do: :integer
  def type_of(v) when is_float(v), do: :real
  def type_of(v) when is_binary(v), do: :text
  def type_of({:json, _}), do: :text
  def type_of({:blob, _}), do: :blob

  # -- type affinity ----------------------------------------------------------

  @doc """
  Coerces `value` per a column's affinity before storage, following
  https://sqlite.org/datatype3.html §3: affinity is a preference, not a
  constraint — values that can't be losslessly converted are stored as-is.
  """
  @spec apply_affinity(t(), ExSQL.AST.ColumnDef.affinity()) :: t()
  def apply_affinity({:json, text}, affinity), do: apply_affinity(text, affinity)
  def apply_affinity(value, :any), do: value
  def apply_affinity(nil, _affinity), do: nil
  def apply_affinity({:blob, _} = blob, _affinity), do: blob

  def apply_affinity(value, :text) when is_integer(value) or is_float(value),
    do: to_text(value)

  def apply_affinity(value, affinity) when affinity in [:integer, :numeric] do
    case to_number(value) do
      f when is_float(f) -> demote_to_integer(f)
      other -> other
    end
  end

  def apply_affinity(value, :real) do
    case to_number(value) do
      i when is_integer(i) -> i * 1.0
      other -> other
    end
  end

  def apply_affinity(value, _affinity), do: value

  # A REAL that is exactly representable as an integer is stored as INTEGER
  # under INTEGER/NUMERIC affinity.
  defp demote_to_integer(f) do
    truncated = trunc(f)
    if truncated * 1.0 == f, do: truncated, else: f
  end

  # Lossless text→number conversion; non-numeric text is returned unchanged.
  defp to_number(false), do: 0
  defp to_number(true), do: 1

  defp to_number(value) when is_integer(value) or is_float(value), do: value

  defp to_number(value) when is_binary(value) do
    trimmed = String.trim(value)

    case Integer.parse(trimmed) do
      {i, ""} ->
        i

      _ ->
        case Float.parse(trimmed) do
          {f, ""} -> f
          _ -> value
        end
    end
  end

  # -- CAST --------------------------------------------------------------------

  @doc """
  The `CAST` operator (https://sqlite.org/lang_expr.html#castexpr). Unlike
  affinity this is a forced conversion: non-numeric text becomes 0, and
  numeric casts of text take the longest numeric prefix (`'123abc'` → 123).
  """
  @spec cast(t(), ExSQL.AST.ColumnDef.affinity()) :: t()
  def cast(nil, _affinity), do: nil
  def cast({:json, text}, affinity), do: cast(text, affinity)
  def cast({:blob, _} = blob, :blob), do: blob
  def cast(value, :blob), do: {:blob, to_text(value)}
  # Any other cast of a BLOB first interprets its bytes as text.
  def cast({:blob, b}, affinity), do: cast(b, affinity)
  def cast(value, :text), do: to_text(value)

  def cast(value, :integer) do
    case cast_numeric(value) do
      f when is_float(f) -> f |> trunc() |> clamp_int64()
      i -> i
    end
  end

  def cast(value, :real), do: cast_numeric(value) * 1.0

  # NUMERIC converts to INTEGER only when lossless: 123.456 stays REAL.
  def cast(value, :numeric) do
    case cast_numeric(value) do
      f when is_float(f) -> demote_to_integer(f)
      i -> i
    end
  end

  # CAST to INTEGER saturates at the 64-bit range, as in SQLite.
  defp clamp_int64(n) when n > 9_223_372_036_854_775_807, do: 9_223_372_036_854_775_807
  defp clamp_int64(n) when n < -9_223_372_036_854_775_808, do: -9_223_372_036_854_775_808
  defp clamp_int64(n), do: n

  defp cast_numeric(v) when is_integer(v) or is_float(v), do: v

  # Longest numeric prefix: prefer the float parse only when it consumes
  # more input than the integer parse ('123abc' → 123, '123.5abc' → 123.5).
  defp cast_numeric(v) when is_binary(v) do
    trimmed = String.trim(v)

    case {Integer.parse(trimmed), Float.parse(trimmed)} do
      {{i, int_rest}, {f, float_rest}} ->
        if byte_size(float_rest) < byte_size(int_rest), do: f, else: i

      {:error, {f, _rest}} ->
        f

      {{i, _rest}, :error} ->
        i

      {:error, :error} ->
        0
    end
  end

  # -- bitwise operators ---------------------------------------------------------

  @doc """
  Bitwise `&`, `|`, `<<`, `>>` over 64-bit integers: NULL propagates,
  operands convert with CAST-to-INTEGER semantics, a negative shift count
  shifts the other way, and results wrap to signed 64 bits.
  """
  @spec bitwise(:bitand | :bitor | :shl | :shr, t(), t()) :: t()
  def bitwise(_op, nil, _b), do: nil
  def bitwise(_op, _a, nil), do: nil

  def bitwise(op, a, b) do
    a = cast(a, :integer)
    b = cast(b, :integer)

    case op do
      :bitand -> wrap64(Bitwise.band(a, b))
      :bitor -> wrap64(Bitwise.bor(a, b))
      :shl -> shift_left(a, b)
      :shr -> shift_left(a, -b)
    end
  end

  @doc "Bitwise NOT (`~`) over 64 bits; NULL propagates."
  @spec bitnot(t()) :: t()
  def bitnot(nil), do: nil
  def bitnot(v), do: v |> cast(:integer) |> Bitwise.bnot() |> wrap64()

  defp shift_left(_a, s) when s >= 64, do: 0
  defp shift_left(a, s) when s >= 0, do: wrap64(Bitwise.bsl(a, s))
  defp shift_left(a, s) when s <= -64, do: if(a < 0, do: -1, else: 0)
  defp shift_left(a, s), do: Bitwise.bsr(a, -s)

  defp wrap64(n) do
    <<v::signed-64>> = <<n::signed-64>>
    v
  end

  # -- comparison -----------------------------------------------------------------

  @doc """
  Total ordering across storage classes: NULL < numeric < TEXT < BLOB.

  Used by `ORDER BY` and the comparison operators (which additionally return
  NULL when either operand is NULL — see `compare_op/3`).
  """
  @spec compare(t(), t(), atom()) :: :lt | :eq | :gt
  def compare(a, b, collation \\ :binary)

  # Fast path for two numbers (same storage-class rank), skipping the rank
  # dispatch. Used heavily by ORDER BY sorting on numeric keys.
  def compare(a, b, _collation) when is_number(a) and is_number(b) do
    cond do
      a < b -> :lt
      a > b -> :gt
      true -> :eq
    end
  end

  def compare(a, b, collation) do
    case {rank(a), rank(b)} do
      {r, r} -> compare_same_class(a, b, collation)
      {ra, rb} when ra < rb -> :lt
      _ -> :gt
    end
  end

  defp rank(nil), do: 0
  defp rank(v) when is_integer(v) or is_float(v), do: 1
  defp rank(v) when is_binary(v), do: 2
  defp rank({:json, _}), do: 2
  defp rank({:blob, _}), do: 3

  defp compare_same_class(nil, nil, _collation), do: :eq
  defp compare_same_class({:blob, a}, {:blob, b}, _collation), do: compare_terms(a, b)
  defp compare_same_class({:json, a}, {:json, b}, collation), do: compare_text(a, b, collation)

  defp compare_same_class({:json, a}, b, collation) when is_binary(b),
    do: compare_text(a, b, collation)

  defp compare_same_class(a, {:json, b}, collation) when is_binary(a),
    do: compare_text(a, b, collation)

  defp compare_same_class(a, b, collation) when is_binary(a) and is_binary(b),
    do: compare_text(a, b, collation)

  defp compare_same_class(a, b, _collation), do: compare_terms(a, b)

  defp compare_text(a, b, :nocase), do: compare_terms(ascii_downcase(a), ascii_downcase(b))

  defp compare_text(a, b, :rtrim),
    do: compare_terms(String.trim_trailing(a, " "), String.trim_trailing(b, " "))

  defp compare_text(a, b, {:custom, _name, comparator}), do: comparator.(a, b)

  defp compare_text(a, b, _collation), do: compare_terms(a, b)

  defp ascii_downcase(text) do
    for <<c <- text>>, into: <<>> do
      if c in ?A..?Z, do: <<c + 32>>, else: <<c>>
    end
  end

  defp compare_terms(a, b) do
    cond do
      a < b -> :lt
      a > b -> :gt
      true -> :eq
    end
  end

  @doc """
  SQL comparison operator: returns `nil` (unknown) if either side is NULL,
  otherwise a boolean per `compare/2`.
  """
  @spec compare_op(:eq | :ne | :lt | :le | :gt | :ge, t(), t()) :: boolean() | nil
  def compare_op(_op, nil, _b), do: nil
  def compare_op(_op, _a, nil), do: nil

  def compare_op(op, a, b) do
    compare_op(op, a, b, :binary)
  end

  def compare_op(_op, nil, _b, _collation), do: nil
  def compare_op(_op, _a, nil, _collation), do: nil

  # Fast path: two numbers compare numerically, the same result
  # `compare/3` (rank → compare_same_class → compare_terms) would give, but
  # without the storage-class dispatch. This is the bulk of numeric scans.
  def compare_op(op, a, b, _collation) when is_number(a) and is_number(b) do
    case op do
      :eq -> a == b
      :ne -> a != b
      :lt -> a < b
      :le -> a <= b
      :gt -> a > b
      :ge -> a >= b
    end
  end

  def compare_op(op, a, b, collation) do
    result = compare(a, b, collation)

    case op do
      :eq -> result == :eq
      :ne -> result != :eq
      :lt -> result == :lt
      :le -> result != :gt
      :gt -> result == :gt
      :ge -> result != :lt
    end
  end

  @doc """
  Applies comparison affinity to a pair of operands before comparing, per
  https://sqlite.org/datatype3.html §4.2: if one side has numeric affinity
  and the other does not, NUMERIC is applied to the other side; else if one
  side has TEXT affinity and the other has none, TEXT is applied. BLOB
  affinity counts as "no affinity". Literals compared to literals are left
  untouched (ticket #805).
  """
  @spec comparison_coerce(t(), atom(), t(), atom()) :: {t(), t()}
  def comparison_coerce(a, affinity_a, b, affinity_b) do
    cond do
      numeric_affinity?(affinity_a) and not numeric_affinity?(affinity_b) ->
        {a, apply_affinity(b, :numeric)}

      numeric_affinity?(affinity_b) and not numeric_affinity?(affinity_a) ->
        {apply_affinity(a, :numeric), b}

      affinity_a == :text and affinity_b != :text ->
        {a, apply_affinity(b, :text)}

      affinity_b == :text and affinity_a != :text ->
        {apply_affinity(a, :text), b}

      true ->
        {a, b}
    end
  end

  defp numeric_affinity?(affinity), do: affinity in [:integer, :real, :numeric]

  # -- three-valued logic -------------------------------------------------------------

  @doc "Converts a value to SQL boolean: `nil`, `true`, or `false`."
  @spec truthy(t()) :: boolean() | nil
  def truthy(nil), do: nil
  def truthy(false), do: false
  def truthy(true), do: true

  def truthy(value) do
    case to_number(value) do
      n when is_integer(n) or is_float(n) -> n != 0
      # non-numeric text is false in a boolean context
      _ -> false
    end
  end

  @doc "Three-valued AND."
  @spec sql_and(boolean() | nil, boolean() | nil) :: boolean() | nil
  def sql_and(false, _), do: false
  def sql_and(_, false), do: false
  def sql_and(true, true), do: true
  def sql_and(_, _), do: nil

  @doc "Three-valued OR."
  @spec sql_or(boolean() | nil, boolean() | nil) :: boolean() | nil
  def sql_or(true, _), do: true
  def sql_or(_, true), do: true
  def sql_or(false, false), do: false
  def sql_or(_, _), do: nil

  @doc "Three-valued NOT."
  @spec sql_not(boolean() | nil) :: boolean() | nil
  def sql_not(nil), do: nil
  def sql_not(b), do: not b

  # -- arithmetic -------------------------------------------------------------------

  @doc """
  Arithmetic with SQLite semantics: NULL propagates; text operands get
  numeric affinity applied (non-numeric text becomes 0); division and modulo
  by zero yield NULL.
  """
  @spec arithmetic(:add | :sub | :mul | :div | :mod, t(), t()) :: t()
  def arithmetic(_op, nil, _b), do: nil
  def arithmetic(_op, _a, nil), do: nil

  def arithmetic(op, a, b) do
    a = to_operand(a)
    b = to_operand(b)

    result =
      case op do
        :add -> a + b
        :sub -> a - b
        :mul -> a * b
        :div -> divide(a, b)
        :mod -> modulo(a, b)
      end

    # Integer results that overflow 64 bits are recomputed as REAL, as in
    # SQLite (expr.test: integer overflow flips the operation to floating
    # point rather than wrapping or erroring).
    if is_integer(result) and out_of_int64_range?(result) do
      case op do
        :add -> a * 1.0 + b
        :sub -> a * 1.0 - b
        :mul -> a * 1.0 * b
        :div -> a * 1.0 / b
        :mod -> result
      end
    else
      result
    end
  end

  @int64_min -9_223_372_036_854_775_808
  @int64_max 9_223_372_036_854_775_807

  @doc "Returns true if an integer falls outside the 64-bit signed range."
  @spec out_of_int64_range?(integer()) :: boolean()
  def out_of_int64_range?(n), do: n < @int64_min or n > @int64_max

  defp to_operand(v) do
    case to_number(v) do
      n when is_integer(n) or is_float(n) -> n
      _ -> 0
    end
  end

  defp divide(_a, b) when b == 0, do: nil
  defp divide(a, b) when is_integer(a) and is_integer(b), do: div(a, b)
  defp divide(a, b), do: a / b

  defp modulo(_a, b) when b == 0, do: nil
  defp modulo(a, b), do: rem(trunc(a), trunc(b))

  # -- text ------------------------------------------------------------------------

  @doc "String concatenation (`||`): NULL propagates, operands become text."
  @spec concat(t(), t()) :: t()
  def concat(nil, _b), do: nil
  def concat(_a, nil), do: nil
  def concat(a, b), do: to_text(a) <> to_text(b)

  @doc "Renders a value as TEXT, the way `CAST(x AS TEXT)` would."
  @spec to_text(t()) :: binary()
  def to_text(v) when is_binary(v), do: v
  def to_text({:json, text}), do: text
  def to_text(v) when is_integer(v), do: Integer.to_string(v)
  def to_text(v) when is_float(v), do: float_to_sql_text(v)
  def to_text({:blob, b}), do: b

  # SQLite renders a REAL with `%!.15g`: 15 significant digits, trailing zeros
  # trimmed, always with a decimal point, switching to exponent form when the
  # decimal exponent is < -4 or >= 15 (C `%g` rule). `scientific: 14` gives 15
  # reliable significant digits (1 + 14) without log10 rounding hazards.
  defp float_to_sql_text(f) when f == 0.0, do: "0.0"

  defp float_to_sql_text(f) do
    [mantissa, exp] = abs(f) |> :erlang.float_to_binary([{:scientific, 14}]) |> String.split("e")
    digits = String.replace(mantissa, ".", "")
    exp = String.to_integer(exp)

    body =
      if exp < -4 or exp >= 15 do
        sci_float_text(digits, exp)
      else
        fixed_float_text(digits, exp)
      end

    if f < 0, do: "-" <> body, else: body
  end

  defp fixed_float_text(digits, exp) do
    point = exp + 1

    cond do
      point <= 0 ->
        "0." <> String.duplicate("0", -point) <> digits

      point >= byte_size(digits) ->
        digits <> String.duplicate("0", point - byte_size(digits)) <> ".0"

      true ->
        (fn {i, frac} -> i <> "." <> frac end).(String.split_at(digits, point))
    end
    |> trim_float_zeros()
  end

  defp sci_float_text(<<lead::binary-size(1), rest::binary>>, exp) do
    sign = if exp < 0, do: "-", else: "+"
    exp_digits = exp |> abs() |> Integer.to_string() |> String.pad_leading(2, "0")
    trim_float_zeros(lead <> "." <> rest) <> "e" <> sign <> exp_digits
  end

  defp trim_float_zeros(text) do
    if String.contains?(text, ".") do
      trimmed = String.replace_trailing(text, "0", "")
      if String.ends_with?(trimmed, "."), do: trimmed <> "0", else: trimmed
    else
      text <> ".0"
    end
  end

  # -- pattern matching ---------------------------------------------------------------

  @doc """
  The `LIKE` operator: `%` matches any run, `_` any single character;
  ASCII case-insensitive, as in SQLite's default (ICU-less) build.
  """
  @spec like(t(), t()) :: boolean() | nil
  @spec like(t(), t(), boolean()) :: boolean() | nil
  def like(nil, _pattern), do: nil
  def like(_value, nil), do: nil
  def like(value, pattern), do: like(value, pattern, false)

  def like(value, pattern, case_sensitive?) when is_boolean(case_sensitive?) do
    regex = pattern_to_regex(to_text(pattern), "%", "_", nil, like_options(case_sensitive?))
    Regex.match?(regex, to_text(value))
  end

  @spec like(t(), t(), binary(), boolean()) :: boolean() | nil
  def like(nil, _pattern, _escape), do: nil
  def like(_value, nil, _escape), do: nil
  def like(value, pattern, escape), do: like(value, pattern, escape, false)

  def like(value, pattern, escape, case_sensitive?) when is_boolean(case_sensitive?) do
    regex = pattern_to_regex(to_text(pattern), "%", "_", escape, like_options(case_sensitive?))
    Regex.match?(regex, to_text(value))
  end

  defp like_options(true), do: []
  defp like_options(false), do: [:caseless]

  @doc "The `GLOB` operator: `*`/`?` wildcards, case-sensitive."
  @spec glob(t(), t()) :: boolean() | nil
  def glob(nil, _pattern), do: nil
  def glob(_value, nil), do: nil

  def glob(value, pattern) do
    regex = glob_to_regex(to_text(pattern))
    Regex.match?(regex, to_text(value))
  end

  # GLOB supports `*`, `?`, and `[...]` character classes (with `^` negation and
  # `a-c` ranges) — unlike LIKE, which treats `[` literally. An unterminated `[`
  # is a literal bracket, matching SQLite.
  defp glob_to_regex(pattern) do
    inner = pattern |> String.to_charlist() |> glob_segments() |> IO.iodata_to_binary()
    Regex.compile!("\\A" <> inner <> "\\z", [:dotall])
  end

  defp glob_segments([]), do: []
  defp glob_segments([?* | rest]), do: [".*" | glob_segments(rest)]
  defp glob_segments([?? | rest]), do: ["." | glob_segments(rest)]

  defp glob_segments([?[ | rest]) do
    case glob_bracket(rest) do
      {:ok, class, rest} -> [class | glob_segments(rest)]
      :error -> [Regex.escape("[") | glob_segments(rest)]
    end
  end

  defp glob_segments([char | rest]), do: [Regex.escape(<<char::utf8>>) | glob_segments(rest)]

  defp glob_bracket(chars) do
    {neg, chars} = if match?([?^ | _], chars), do: {"^", tl(chars)}, else: {"", chars}
    # A `]` right after `[`/`[^` is a literal member, not the terminator.
    {lead, chars} = if match?([?] | _], chars), do: {"\\]", tl(chars)}, else: {"", chars}

    case glob_class_body(chars, []) do
      {:ok, body, rest} -> {:ok, "[" <> neg <> lead <> body <> "]", rest}
      :error -> :error
    end
  end

  defp glob_class_body([], _acc), do: :error
  defp glob_class_body([?] | rest], acc), do: {:ok, IO.iodata_to_binary(Enum.reverse(acc)), rest}
  defp glob_class_body([?\\ | rest], acc), do: glob_class_body(rest, ["\\\\" | acc])
  defp glob_class_body([char | rest], acc), do: glob_class_body(rest, [<<char::utf8>> | acc])

  defp pattern_to_regex(pattern, many, one, escape, options) do
    inner =
      pattern
      |> String.graphemes()
      |> escaped_pattern_parts(many, one, escape)
      |> Enum.join()

    Regex.compile!("\\A" <> inner <> "\\z", [:dotall | options])
  end

  defp escaped_pattern_parts([], _many, _one, _escape), do: []

  defp escaped_pattern_parts([escape, char | rest], many, one, escape) when is_binary(escape) do
    [Regex.escape(char) | escaped_pattern_parts(rest, many, one, escape)]
  end

  defp escaped_pattern_parts([char | rest], many, one, escape) do
    part =
      cond do
        char == many -> ".*"
        char == one -> "."
        true -> Regex.escape(char)
      end

    [part | escaped_pattern_parts(rest, many, one, escape)]
  end
end