Skip to main content

lib/dust/cache/memory.ex

defmodule Dust.Cache.Memory do
  use GenServer
  @behaviour Dust.Cache

  def start_link(opts) do
    GenServer.start_link(__MODULE__, %{}, opts)
  end

  # Dust.Cache implementation — pid is prepended to all callback args.

  @impl Dust.Cache
  def read(pid, store, path) do
    GenServer.call(pid, {:read, store, path})
  end

  @impl Dust.Cache
  def read_entry(pid, store, path) do
    GenServer.call(pid, {:read_entry, store, path})
  end

  @impl Dust.Cache
  def read_all(pid, store, pattern) do
    GenServer.call(pid, {:read_all, store, pattern})
  end

  @impl Dust.Cache
  def read_many(pid, store, paths) do
    GenServer.call(pid, {:read_many, store, paths})
  end

  @impl Dust.Cache
  def write(pid, store, path, value, type, seq) do
    GenServer.call(pid, {:write, store, path, value, type, seq})
  end

  @impl Dust.Cache
  def write_batch(pid, store, entries) do
    GenServer.call(pid, {:write_batch, store, entries})
  end

  @impl Dust.Cache
  def delete(pid, store, path) do
    GenServer.call(pid, {:delete, store, path})
  end

  @impl Dust.Cache
  def delete_subtree(pid, store, path) do
    GenServer.call(pid, {:delete_subtree, store, path})
  end

  @impl Dust.Cache
  def read_subtree(pid, store, path) do
    GenServer.call(pid, {:read_subtree, store, path})
  end

  @impl Dust.Cache
  def last_seq(pid, store) do
    GenServer.call(pid, {:last_seq, store})
  end

  @impl Dust.Cache
  def count(pid, store) do
    GenServer.call(pid, {:count, store})
  end

  @impl Dust.Cache
  def browse(pid, store, opts) do
    GenServer.call(pid, {:browse, store, opts})
  end

  # Server

  @impl true
  def init(_) do
    # `entries`: {store, path} => {value, type, seq}
    # `synced`:  {store, path} => unix epoch ms when this mirror last wrote
    #            the row. Kept parallel to `entries` so the browse/range/
    #            read_all paths stay on the existing 3-tuple shape; only
    #            read_entry surfaces synced_at. Both maps are purged together
    #            on delete/delete_subtree.
    {:ok, %{entries: %{}, seqs: %{}, synced: %{}}}
  end

  @impl true
  def handle_call({:read, store, path}, _from, state) do
    key = {store, path}

    case Map.get(state.entries, key) do
      nil -> {:reply, :miss, state}
      {value, _type, _seq} -> {:reply, {:ok, value}, state}
    end
  end

  @impl true
  def handle_call({:read_entry, store, path}, _from, state) do
    case Map.get(state.entries, {store, path}) do
      nil ->
        {:reply, :miss, state}

      {value, type, seq} ->
        synced_at = Map.get(state.synced, {store, path})
        {:reply, {:ok, {value, type, seq, synced_at}}, state}
    end
  end

  @impl true
  def handle_call({:read_all, store, pattern}, _from, state) do
    {:ok, compiled} = Dust.Protocol.Glob.compile(pattern)

    results =
      state.entries
      |> Enum.filter(fn {{s, path}, _} ->
        s == store and path_matches?(path, compiled)
      end)
      |> Enum.map(fn {{_s, path}, {value, _type, _seq}} -> {path, value} end)

    {:reply, results, state}
  end

  @impl true
  def handle_call({:read_many, store, paths}, _from, state) do
    result =
      paths
      |> Enum.uniq()
      |> Enum.reduce(%{}, fn path, acc ->
        case Map.get(state.entries, {store, path}) do
          nil -> acc
          tuple -> Map.put(acc, path, tuple)
        end
      end)

    {:reply, result, state}
  end

  @impl true
  def handle_call({:write, store, path, value, type, seq}, _from, state) do
    now = System.system_time(:millisecond)
    state = put_in(state.entries[{store, path}], {value, type, seq})
    state = put_in(state.synced[{store, path}], now)
    current = Map.get(state.seqs, store, 0)
    state = put_in(state.seqs[store], max(current, seq))
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:write_batch, store, entries}, _from, state) do
    now = System.system_time(:millisecond)

    state =
      Enum.reduce(entries, state, fn {path, value, type, seq}, acc ->
        acc = put_in(acc.entries[{store, path}], {value, type, seq})
        acc = put_in(acc.synced[{store, path}], now)
        current = Map.get(acc.seqs, store, 0)
        put_in(acc.seqs[store], max(current, seq))
      end)

    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:delete, store, path}, _from, state) do
    state = update_in(state.entries, &Map.delete(&1, {store, path}))
    state = update_in(state.synced, &Map.delete(&1, {store, path}))
    {:reply, :ok, state}
  end

  @impl true
  def handle_call({:delete_subtree, store, path}, _from, state) do
    {:ok, segments} = Dust.Protocol.Path.parse_rendered(path)
    {:ok, prefix} = Dust.Protocol.Path.render_descendant_prefix(segments)

    {removed, kept} =
      Enum.split_with(state.entries, fn {{s, p}, _} ->
        s == store and (p == path or String.starts_with?(p, prefix))
      end)

    removed_keys = Enum.map(removed, fn {key, _} -> key end)

    state = %{
      state
      | entries: Map.new(kept),
        synced: Map.drop(state.synced, removed_keys)
    }

    {:reply, length(removed), state}
  end

  @impl true
  def handle_call({:read_subtree, store, path}, _from, state) do
    {:ok, segments} = Dust.Protocol.Path.parse_rendered(path)
    {:ok, prefix} = Dust.Protocol.Path.render_descendant_prefix(segments)

    rows =
      state.entries
      |> Enum.filter(fn {{s, p}, _} ->
        s == store and (p == path or String.starts_with?(p, prefix))
      end)
      |> Enum.map(fn {{_s, p}, {value, type, seq}} -> {p, value, type, seq} end)
      |> Enum.sort_by(fn {p, _, _, _} -> p end)

    {:reply, rows, state}
  end

  @impl true
  def handle_call({:last_seq, store}, _from, state) do
    {:reply, Map.get(state.seqs, store, 0), state}
  end

  @impl true
  def handle_call({:count, store}, _from, state) do
    count =
      state.entries
      |> Enum.count(fn {{s, _path}, _} -> s == store end)

    {:reply, count, state}
  end

  @impl true
  def handle_call({:browse, store, opts}, _from, state) do
    pattern = Keyword.get(opts, :pattern, "**")
    cursor = Keyword.get(opts, :cursor)
    limit = Keyword.get(opts, :limit, 50)
    order = Keyword.get(opts, :order, :asc)
    select = Keyword.get(opts, :select, :entries)
    from = Keyword.get(opts, :from)
    to = Keyword.get(opts, :to)

    {:ok, compiled} = Dust.Protocol.Glob.compile(pattern)

    entries =
      state.entries
      |> Enum.filter(fn {{s, path}, _} ->
        s == store and matches_filter?(path, pattern, compiled, from, to)
      end)
      |> Enum.map(fn {{_s, path}, {value, type, seq}} -> {path, value, type, seq} end)
      |> Enum.sort_by(fn {path, _, _, _} -> path end, sort_direction(order))

    entries = apply_cursor(entries, cursor, order)

    # Apply limit
    page = Enum.take(entries, limit)

    next_cursor =
      if length(page) < limit or length(page) == 0 do
        nil
      else
        {last_path, _, _, _} = List.last(page)
        last_path
      end

    projected = project_page(page, select, pattern)

    {:reply, {projected, next_cursor}, state}
  end

  defp project_page(page, :entries, _pattern), do: page
  defp project_page(page, :keys, _pattern), do: Enum.map(page, fn {p, _, _, _} -> p end)
  defp project_page(page, :prefixes, pattern), do: prefixes_of(page, pattern)

  defp prefixes_of(page, pattern) do
    literal_prefix = literal_prefix_of(pattern)

    page
    |> Enum.map(fn {p, _, _, _} -> extract_prefix(p, literal_prefix) end)
    |> Enum.reject(&is_nil/1)
    |> Enum.uniq()
    |> Enum.sort()
  end

  defp literal_prefix_of("**"), do: ""

  defp literal_prefix_of(pattern) do
    case String.split(pattern, "/**", parts: 2) do
      [prefix, ""] ->
        prefix

      _ ->
        raise ArgumentError,
              "select: :prefixes requires pattern ending in /** or ** (got #{inspect(pattern)})"
    end
  end

  defp extract_prefix(path, "") do
    case String.split(path, "/", parts: 2) do
      [seg | _] -> seg
      [] -> nil
    end
  end

  defp extract_prefix(path, literal) do
    prefix_with_slash = literal <> "/"

    if String.starts_with?(path, prefix_with_slash) do
      rest = String.replace_prefix(path, prefix_with_slash, "")
      [next_seg | _] = String.split(rest, "/", parts: 2)
      literal <> "/" <> next_seg
    end
  end

  defp matches_filter?(path, _pattern, _compiled, from, to)
       when is_binary(from) and is_binary(to) do
    path >= from and path < to
  end

  defp matches_filter?(path, _pattern, compiled, _from, _to) do
    path_matches?(path, compiled)
  end

  defp sort_direction(:asc), do: :asc
  defp sort_direction(:desc), do: :desc

  defp apply_cursor(entries, nil, _order), do: entries

  defp apply_cursor(entries, cursor, :asc),
    do: Enum.drop_while(entries, fn {p, _, _, _} -> p <= cursor end)

  defp apply_cursor(entries, cursor, :desc),
    do: Enum.drop_while(entries, fn {p, _, _, _} -> p >= cursor end)

  defp path_matches?(path, compiled) do
    case Dust.Protocol.Path.parse_rendered(path) do
      {:ok, segs} -> Dust.Protocol.Glob.match?(compiled, segs)
      _ -> false
    end
  end
end