Skip to main content

lib/dust/cache/ecto.ex

if Code.ensure_loaded?(Ecto.Query) do
  defmodule Dust.Cache.Ecto do
    @behaviour Dust.Cache

    import Ecto.Query

    alias Dust.Cache.Ecto.CacheEntry

    # CacheEntry is also wrapped in `if Code.ensure_loaded?(Ecto.Schema)`
    # so static analysis can't always see it from this file. The runtime
    # `Code.ensure_loaded?(Ecto.Query)` guard above is sufficient — both
    # modules ship together.
    @compile {:no_warn_undefined, CacheEntry}

    @seq_sentinel_path "_dust:last_seq"

    @impl Dust.Cache
    def read(repo, store, path) do
      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path == ^path,
          select: c.value
        )

      case repo.one(query) do
        nil -> :miss
        json -> {:ok, Jason.decode!(json)}
      end
    end

    @impl Dust.Cache
    def read_entry(repo, store, path) do
      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path == ^path,
          select: {c.value, c.type, c.seq, c.synced_at}
        )

      case repo.one(query) do
        nil -> :miss
        {json, type, seq, synced_at} -> {:ok, {Jason.decode!(json), type, seq, synced_at}}
      end
    end

    @impl Dust.Cache
    def read_many(repo, store, paths) do
      unique_paths = Enum.uniq(paths)

      if unique_paths == [] do
        %{}
      else
        query =
          from(c in CacheEntry,
            where: c.store == ^store and c.path in ^unique_paths,
            select: {c.path, c.value, c.type, c.seq}
          )

        repo.all(query)
        |> Enum.reduce(%{}, fn {path, json, type, seq}, acc ->
          Map.put(acc, path, {Jason.decode!(json), type, seq})
        end)
      end
    end

    @impl Dust.Cache
    def read_all(repo, store, pattern) do
      {:ok, compiled} = Dust.Protocol.Glob.compile(pattern)

      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path != ^@seq_sentinel_path,
          select: {c.path, c.value}
        )

      repo.all(query)
      |> Enum.filter(fn {path, _} -> path_matches?(path, compiled) end)
      |> Enum.map(fn {path, json} -> {path, Jason.decode!(json)} end)
    end

    defp path_matches?(path, compiled) do
      case Dust.Protocol.Path.parse_rendered(path) do
        {:ok, segs} -> Dust.Protocol.Glob.match?(compiled, segs)
        _ -> false
      end
    end

    @impl Dust.Cache
    def write(repo, store, path, value, type, seq) do
      now = System.system_time(:millisecond)

      entry = %{
        store: store,
        path: path,
        value: Jason.encode!(value),
        type: type,
        seq: seq,
        synced_at: now
      }

      repo.insert_all(CacheEntry, [entry],
        on_conflict: [set: [value: entry.value, type: entry.type, seq: entry.seq, synced_at: now]],
        conflict_target: [:store, :path]
      )

      update_seq_sentinel(repo, store, seq)
      :ok
    end

    @impl Dust.Cache
    def write_batch(repo, store, entries) do
      now = System.system_time(:millisecond)

      rows =
        Enum.map(entries, fn {path, value, type, seq} ->
          %{
            store: store,
            path: path,
            value: Jason.encode!(value),
            type: type,
            seq: seq,
            synced_at: now
          }
        end)

      max_seq =
        Enum.reduce(rows, 0, fn row, acc ->
          repo.insert_all(CacheEntry, [row],
            on_conflict: [set: [value: row.value, type: row.type, seq: row.seq, synced_at: now]],
            conflict_target: [:store, :path]
          )

          max(acc, row.seq)
        end)

      if max_seq > 0, do: update_seq_sentinel(repo, store, max_seq)
      :ok
    end

    @impl Dust.Cache
    def delete(repo, store, path) do
      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path == ^path
        )

      repo.delete_all(query)
      :ok
    end

    @impl Dust.Cache
    def delete_subtree(repo, store, path) do
      prefix_pattern = like_escape(path) <> "/%"

      query =
        from(c in CacheEntry,
          where:
            c.store == ^store and
              (c.path == ^path or like(c.path, ^prefix_pattern)) and
              c.path != ^@seq_sentinel_path
        )

      {removed, _} = repo.delete_all(query)
      removed
    end

    @impl Dust.Cache
    def read_subtree(repo, store, path) do
      prefix_pattern = like_escape(path) <> "/%"

      query =
        from(c in CacheEntry,
          where:
            c.store == ^store and
              (c.path == ^path or like(c.path, ^prefix_pattern)) and
              c.path != ^@seq_sentinel_path,
          order_by: c.path,
          select: {c.path, c.value, c.type, c.seq}
        )

      repo.all(query)
      |> Enum.map(fn {p, json, type, seq} -> {p, Jason.decode!(json), type, seq} end)
    end

    # Escape `\`, `%`, `_` so a literal path can be used safely as a LIKE
    # prefix. SQLite uses backslash by default; ecto's like/2 also accepts
    # the operator but escaping is the caller's job.
    defp like_escape(path) do
      path
      |> String.replace("\\", "\\\\")
      |> String.replace("%", "\\%")
      |> String.replace("_", "\\_")
    end

    @impl Dust.Cache
    def last_seq(repo, store) do
      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path == ^@seq_sentinel_path,
          select: c.seq
        )

      repo.one(query) || 0
    end

    @impl Dust.Cache
    def count(repo, store) do
      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path != ^@seq_sentinel_path,
          select: count()
        )

      repo.one(query)
    end

    # Chunk size used to walk raw rows when the glob is narrower than the
    # LIKE prefix. Not configurable for Phase 1.
    @browse_chunk_size 500

    @impl Dust.Cache
    def browse(repo, store, opts) do
      pattern = Keyword.get(opts, :pattern, "**")
      cursor = Keyword.get(opts, :cursor)
      limit = Keyword.get(opts, :limit, 50)
      order = Keyword.get(opts, :order, :asc)
      select = Keyword.get(opts, :select, :entries)
      from_key = Keyword.get(opts, :from)
      to_key = Keyword.get(opts, :to)

      matches =
        if is_binary(from_key) and is_binary(to_key) do
          # Range path: SQL bounds are exact, so a single query with limit+1
          # is sufficient — no glob post-filter and no chunked walk.
          range_query(repo, store, from_key, to_key, cursor, order, limit)
        else
          {:ok, compiled} = Dust.Protocol.Glob.compile(pattern)
          literal_like_prefix = literal_like_prefix_of(pattern)

          # Keep fetching chunks of raw rows until we have at least limit+1
          # matches (to detect next page) or the raw source is exhausted.
          collect_matches(
            repo,
            store,
            pattern,
            compiled,
            literal_like_prefix,
            cursor,
            order,
            limit,
            []
          )
        end

      # Decode JSON values — skip decoding entirely when :keys (value is never returned)
      decoded =
        case select do
          :keys ->
            Enum.map(matches, fn {path, _json, _type, _seq} -> {path, nil, nil, nil} end)

          _ ->
            Enum.map(matches, fn {path, json, type, seq} ->
              {path, Jason.decode!(json), type, seq}
            end)
        end

      # Determine pagination
      page = Enum.take(decoded, limit)

      next_cursor =
        if length(decoded) > limit do
          {last_path, _, _, _} = List.last(page)
          last_path
        else
          nil
        end

      projected = project_page(page, select, pattern)

      {projected, next_cursor}
    end

    defp collect_matches(
           repo,
           store,
           pattern,
           compiled,
           literal_like_prefix,
           cursor,
           order,
           limit,
           acc
         ) do
      rows = fetch_chunk(repo, store, literal_like_prefix, cursor, order, @browse_chunk_size)

      filtered =
        if pattern == "**" do
          rows
        else
          Enum.filter(rows, fn {path, _, _, _} -> path_matches?(path, compiled) end)
        end

      all = acc ++ filtered

      cond do
        length(all) > limit ->
          Enum.take(all, limit + 1)

        length(rows) < @browse_chunk_size ->
          all

        true ->
          {last_raw_path, _, _, _} = List.last(rows)

          collect_matches(
            repo,
            store,
            pattern,
            compiled,
            literal_like_prefix,
            last_raw_path,
            order,
            limit,
            all
          )
      end
    end

    # Range query: a single SELECT bounded by [from, to) with limit+1 for
    # cursor detection. No LIKE clause, no glob post-filter, no chunked walk —
    # SQL bounds are exact, so every raw row is a valid match.
    defp range_query(repo, store, from_key, to_key, cursor, order, limit) do
      query =
        from(c in CacheEntry,
          where:
            c.store == ^store and c.path != ^@seq_sentinel_path and
              c.path >= ^from_key and c.path < ^to_key,
          order_by: [{^order, c.path}],
          limit: ^(limit + 1),
          select: {c.path, c.value, c.type, c.seq}
        )

      query =
        if cursor do
          case order do
            :asc -> from(c in query, where: c.path > ^cursor)
            :desc -> from(c in query, where: c.path < ^cursor)
          end
        else
          query
        end

      repo.all(query)
    end

    defp fetch_chunk(repo, store, literal_like_prefix, cursor, order, chunk_size) do
      query =
        from(c in CacheEntry,
          where: c.store == ^store and c.path != ^@seq_sentinel_path,
          order_by: [{^order, c.path}],
          limit: ^chunk_size,
          select: {c.path, c.value, c.type, c.seq}
        )

      query =
        if cursor do
          case order do
            :asc -> from(c in query, where: c.path > ^cursor)
            :desc -> from(c in query, where: c.path < ^cursor)
          end
        else
          query
        end

      query =
        case literal_like_prefix do
          "" ->
            query

          prefix ->
            like_pattern = prefix <> "%"
            from(c in query, where: fragment("? LIKE ? ESCAPE '\\'", c.path, ^like_pattern))
        end

      repo.all(query)
    end

    # Return the literal prefix of a (canonical, slash-rendered) glob
    # pattern — everything before the first segment containing `*` or
    # `**` — with SQL LIKE metacharacters escaped and a trailing slash
    # appended (if non-empty). Returns "" when the pattern starts with
    # a wildcard.
    defp literal_like_prefix_of(pattern) do
      segments = String.split(pattern, "/")

      literal =
        Enum.take_while(segments, fn seg ->
          seg != "*" and seg != "**" and not String.contains?(seg, "*")
        end)

      case literal do
        [] -> ""
        segs -> (Enum.join(segs, "/") <> "/") |> escape_like()
      end
    end

    defp escape_like(literal) do
      literal
      |> String.replace("\\", "\\\\")
      |> String.replace("%", "\\%")
      |> String.replace("_", "\\_")
    end

    # --- projection helpers (copied verbatim from Dust.Cache.Memory — deliberate
    # duplication per Phase 1 plan; keeps adapters self-contained) ---

    defp project_page(page, :entries, _pattern), do: page
    defp project_page(page, :keys, _pattern), do: Enum.map(page, fn {p, _, _, _} -> p end)
    defp project_page(page, :prefixes, pattern), do: prefixes_of(page, pattern)

    defp prefixes_of(page, pattern) do
      literal_prefix = literal_prefix_of(pattern)

      page
      |> Enum.map(fn {p, _, _, _} -> extract_prefix(p, literal_prefix) end)
      |> Enum.reject(&is_nil/1)
      |> Enum.uniq()
      |> Enum.sort()
    end

    defp literal_prefix_of("**"), do: ""

    defp literal_prefix_of(pattern) do
      case String.split(pattern, "/**", parts: 2) do
        [prefix, ""] ->
          prefix

        _ ->
          raise ArgumentError,
                "select: :prefixes requires pattern ending in /** or ** (got #{inspect(pattern)})"
      end
    end

    defp extract_prefix(path, "") do
      case String.split(path, "/", parts: 2) do
        [seg | _] -> seg
        [] -> nil
      end
    end

    defp extract_prefix(path, literal) do
      prefix_with_slash = literal <> "/"

      if String.starts_with?(path, prefix_with_slash) do
        rest = String.replace_prefix(path, prefix_with_slash, "")
        [next_seg | _] = String.split(rest, "/", parts: 2)
        literal <> "/" <> next_seg
      end
    end

    defp update_seq_sentinel(repo, store, seq) do
      import Ecto.Query

      sentinel = %{
        store: store,
        path: @seq_sentinel_path,
        value: Jason.encode!(seq),
        type: "integer",
        seq: seq
      }

      # Try insert; on conflict only update if new seq is higher
      repo.insert_all(CacheEntry, [sentinel],
        on_conflict:
          from(c in CacheEntry,
            update: [set: [seq: ^seq, value: ^Jason.encode!(seq)]],
            where: c.seq < ^seq
          ),
        conflict_target: [:store, :path]
      )
    end
  end
end