Skip to main content

lib/dust_ecto/transport/sdk.ex

defmodule DustEcto.Transport.SDK do
  @moduledoc """
  WebSocket-backed transport — delegates every call to the configured
  Dust facade module (default `Dust`). Uses the SDK pre-work landed in
  the same monorepo: sync write semantics on every op, subtree-aware
  reads, and `:committed` subscription mode for exactly-once delivery.

  Returns:

    * `{:ok, ...}` shapes match the Repo contract — `%{store_seq:}` for
      writes, full entry maps for reads.
    * `{:error, %DustEcto.Error{}}` for everything that's not a 2xx
      analogue. Network-shaped failures from the SDK (e.g. `:timeout`)
      get translated to the right `kind`.

  All callbacks are documented on `DustEcto.Transport`.
  """

  @behaviour DustEcto.Transport

  alias DustEcto.Error

  @impl DustEcto.Transport
  def list(store, pattern, opts) do
    config = pop_config(opts)
    facade = facade!(config)

    page_opts =
      opts
      |> Keyword.drop([:config])
      |> Keyword.put_new(:select, :entries)

    case safe_call(fn -> facade.enum(store, pattern, page_opts) end) do
      %Dust.Page{items: items, next_cursor: cursor} ->
        {:ok, %{items: Enum.map(items, &render_item/1), next_cursor: cursor}}

      items when is_list(items) ->
        # The 2-arg `enum/2` legacy shape returns [{path, value}, ...].
        # Repo always passes opts so this branch is rare, but we handle it.
        {:ok, %{items: Enum.map(items, &render_item/1), next_cursor: nil}}

      {:error, %Error{}} = err ->
        err
    end
  end

  @impl DustEcto.Transport
  def get(store, path) do
    config = pop_config([])
    facade = facade!(config)

    case safe_call(fn -> facade.entry(store, path) end) do
      {:ok, %Dust.Entry{path: p, value: v, type: t, revision: r}} ->
        {:ok, %{path: p, value: v, type: t, revision: r}}

      {:error, :not_found} ->
        {:error, :not_found}

      {:error, %Error{}} = err ->
        err
    end
  end

  @impl DustEcto.Transport
  def exists?(store, path) do
    case get(store, path) do
      {:ok, _} -> {:ok, true}
      {:error, :not_found} -> {:ok, false}
      err -> err
    end
  end

  @impl DustEcto.Transport
  def put(store, path, value, opts) do
    config = pop_config(opts)
    facade = facade!(config)
    sdk_opts = Keyword.drop(opts, [:config])

    translate_write_result(safe_call(fn -> facade.put(store, path, value, sdk_opts) end))
  end

  @impl DustEcto.Transport
  def delete(store, path, opts) do
    config = pop_config(opts)
    facade = facade!(config)
    sdk_opts = Keyword.drop(opts, [:config])

    translate_write_result(safe_call(fn -> facade.delete(store, path, sdk_opts) end))
  end

  defp translate_write_result({:ok, store_seq}) when is_integer(store_seq),
    do: {:ok, %{store_seq: store_seq}}

  defp translate_write_result({:ok, %{store_seq: _} = ok}),
    do: {:ok, ok}

  defp translate_write_result({:error, %Error{}} = err), do: err

  defp translate_write_result({:error, reason}) when is_atom(reason),
    do: {:error, sdk_error_to_dust(reason)}

  defp translate_write_result({:error, reason}) when is_binary(reason),
    do: {:error, sdk_error_to_dust(reason)}

  @impl DustEcto.Transport
  def batch_write(_store, _ops, _opts) do
    # Atomic batch is HTTP-only for now. The SDK protocol does not yet
    # expose a batch_write op; users wanting transactional multi-key
    # writes from SDK mode should construct individual ops or fall
    # back to HTTP. Tracked as future work.
    {:error,
     Error.new(
       :not_supported,
       "batch_write is not available over the SDK transport — use HTTP mode for atomic multi-key writes",
       retryable?: false
     )}
  end

  @impl DustEcto.Transport
  def subscribe(store, pattern, callback) do
    config = pop_config([])
    facade = facade!(config)

    case safe_call(fn -> facade.on(store, pattern, callback, mode: :committed) end) do
      ref when is_reference(ref) -> {:ok, ref}
      {:error, %Error{}} = err -> err
    end
  end

  @impl DustEcto.Transport
  def unsubscribe(store, ref) do
    config = pop_config([])
    facade = facade!(config)

    _ = safe_call(fn -> facade.off(store, ref) end)
    :ok
  end

  # --- internals ---

  defp pop_config(opts) do
    Keyword.get(opts, :config) ||
      case DustEcto.Transport.pick() do
        {DustEcto.Transport.SDK, config} -> config
        _ -> %{facade: Dust}
      end
  end

  defp facade!(%{facade: f}) when is_atom(f), do: f
  defp facade!(_), do: Dust

  # Wrap an SDK call. On normal completion returns the raw SDK result
  # untouched (so callers can pattern-match on whatever shape the SDK
  # returned). On `:exit` (process down, timeout) returns
  # `{:error, %DustEcto.Error{}}` instead.
  defp safe_call(fun) do
    fun.()
  catch
    :exit, {reason, _} when reason in [:timeout, :noproc, :normal] ->
      {:error, sdk_error_to_dust(reason)}

    :exit, reason ->
      {:error, Error.new(:network, {:sdk_exit, reason}, retryable?: true)}
  end

  defp sdk_error_to_dust(:conflict), do: Error.new(:conflict, nil)
  defp sdk_error_to_dust(:timeout), do: Error.new(:timeout, "no ack within window")
  defp sdk_error_to_dust(:rate_limited), do: Error.new(:rate_limited, nil)
  defp sdk_error_to_dust(:unauthorized), do: Error.new(:unauthorized, nil)

  defp sdk_error_to_dust(:noproc),
    do: Error.new(:network, "Dust SDK process not running", retryable?: true)

  defp sdk_error_to_dust(:normal),
    do: Error.new(:network, "Dust SDK process exited", retryable?: true)

  defp sdk_error_to_dust("conflict"), do: Error.new(:conflict, nil)
  defp sdk_error_to_dust("rate_limited"), do: Error.new(:rate_limited, nil)
  defp sdk_error_to_dust("unauthorized"), do: Error.new(:unauthorized, nil)

  defp sdk_error_to_dust(other),
    do: Error.new(:invalid_params, other, retryable?: false)

  defp render_item(%Dust.Entry{path: p, value: v, type: t, revision: r}),
    do: %{path: p, value: v, type: t, revision: r}

  defp render_item({path, value}) when is_binary(path),
    do: %{path: path, value: value, type: nil, revision: nil}

  defp render_item(path) when is_binary(path), do: path
end