defmodule DustEcto.Repo do
@moduledoc """
Ecto-shaped facade over the active `DustEcto.Transport`. Functions
mirror the parts of `Ecto.Repo` that map cleanly onto Dust's flat KV
model: `all/1`, `get/2` + `get!/2`, `stream/1`, `exists?/2`, plus
`insert/1`, `update/1`, `delete/1` + `delete/2`, and `delete_all/1`.
No `where`, no `from`, no `preload`, no `insert_all`, no
`transaction` (yet — pending the upstream `entries.batch_write`
primitive that's now shipped, plus a corresponding SDK transport
implementation).
## Honest contract
Dust writes are upserts. There's no atomic insert-or-fail or
read-modify-write at the wire level (capver 2 has leaf-only CAS).
`insert/1` and `update/1` are both **validated upserts**: they run
the changeset, then write. If you need INSERT-or-fail semantics, do
a `Repo.exists?/2` check first and accept that another writer can
race you.
"""
alias Dust.Protocol.Path, as: DustPath
alias DustEcto.{Error, Transport}
require Logger
# --- Path-building helpers --------------------------------------
# Schemas declare their prefix as a segment list
# (e.g. `prefix: ["reading", "links"]`). These helpers compose
# record / field / pattern paths in canonical slash-rendered form
# via the segment-first `Dust.Protocol.Path` API; nothing in this
# module should build paths via string interpolation.
defp record_path!(prefix, slug) when is_list(prefix) and is_binary(slug) do
{:ok, segs} = DustPath.child(prefix, slug)
{:ok, str} = DustPath.render(segs)
str
end
defp field_path!(prefix, slug, field) when is_list(prefix) and is_binary(slug) do
{:ok, segs} = DustPath.child(prefix, slug)
{:ok, segs} = DustPath.child(segs, to_string(field))
{:ok, str} = DustPath.render(segs)
str
end
defp prefix_pattern!(prefix) when is_list(prefix) do
{:ok, str} = DustPath.render(prefix ++ ["**"])
str
end
defp prefix_path!(prefix) when is_list(prefix) do
{:ok, str} = DustPath.render(prefix)
str
end
# ------------------------------------------------------------------
# Reads
# ------------------------------------------------------------------
@doc """
Loads every record of `schema`. Walks every page of the underlying
enum until `next_cursor` is nil — no silent truncation.
Records that fail the required-fields guard
(`schema.__dust_required_fields__/0`) are silently dropped and a
`Logger.warning` records the slug, missing fields, and any
unrecognized fields so devs can grep the logs.
"""
@spec all(module()) :: {:ok, [struct()]} | {:error, Error.t()}
def all(schema) when is_atom(schema) do
prefix = schema.__dust_prefix__()
pattern = prefix_pattern!(prefix)
case stream_all_items(pattern) do
{:ok, items} ->
{:ok, items |> rebuild_records(schema, prefix)}
err ->
err
end
end
@doc """
Returns a `Stream` of records of `schema`, lazy across pages. Useful
when the prefix could match more than a few hundred records.
The stream still applies the required-fields guard with the same
warning behaviour as `all/1`.
"""
@spec stream(module()) :: Enumerable.t()
def stream(schema) when is_atom(schema) do
prefix = schema.__dust_prefix__()
pattern = prefix_pattern!(prefix)
{transport, _} = Transport.pick()
store = Transport.store!()
# Server enum returns paths in lexicographic order, which means all
# leaves of a slug land contiguously. We buffer the in-progress
# slug across page boundaries and emit a record only once we see a
# leaf belonging to a different slug (or the stream ends).
Stream.resource(
fn -> %{cursor: :start, buffer: nil} end,
fn
:done ->
{:halt, :done}
state ->
opts = [select: :entries, limit: 100]
opts =
if state.cursor == :start, do: opts, else: Keyword.put(opts, :after, state.cursor)
case transport.list(store, pattern, opts) do
{:ok, %{items: items, next_cursor: next}} ->
{emit, buffer} = group_items_by_slug(items, prefix, state.buffer)
records = Enum.flat_map(emit, &emit_or_skip(&1, schema))
cond do
next == nil and is_nil(buffer) ->
{records, :done}
next == nil ->
# Flush the final in-progress slug.
{records ++ emit_or_skip(buffer, schema), :done}
true ->
{records, %{cursor: next, buffer: buffer}}
end
{:error, reason} ->
# Raise instead of yielding/filtering — silent truncation
# is the worst failure mode for a sync consumer that does
# `Repo.stream(...) |> Enum.to_list()`. Caller can rescue
# if they want lenient semantics.
raise "DustEcto.Repo.stream/1 transport error: #{inspect(reason)}"
end
end,
fn _ -> :ok end
)
end
# Walks a list of leaf items, grouping consecutive same-slug entries
# into `{slug, fields_map}` accumulators. Yields tuples for every
# slug that *closed* (a leaf for a new slug arrived) and returns the
# last-still-open slug as the buffer for the next page.
defp group_items_by_slug(items, prefix, initial_buffer) do
Enum.reduce(items, {[], initial_buffer}, fn item, {closed, buffer} ->
case parse_path(item.path, prefix) do
{:ok, slug, field_segments} ->
case buffer do
nil ->
{closed, {slug, put_nested(%{}, field_segments, item.value)}}
{^slug, fields} ->
{closed, {slug, put_nested(fields, field_segments, item.value)}}
other ->
{[other | closed], {slug, put_nested(%{}, field_segments, item.value)}}
end
:error ->
{closed, buffer}
end
end)
|> then(fn {closed, buffer} -> {Enum.reverse(closed), buffer} end)
end
defp emit_or_skip({slug, fields}, schema) do
case load_record(schema, slug, fields) do
{:ok, struct} -> [struct]
:missing_required -> []
end
end
defp emit_or_skip(nil, _schema), do: []
@doc """
Fetches a single record by slug. Returns `{:ok, struct}` on a hit,
`{:error, :not_found}` on a miss, `{:error, %DustEcto.Error{}}` on
a transport failure.
"""
@spec get(module(), String.t()) ::
{:ok, struct()} | {:error, :not_found | Error.t()}
def get(schema, slug) when is_atom(schema) and is_binary(slug) do
prefix = schema.__dust_prefix__()
{transport, _} = Transport.pick()
store = Transport.store!()
case transport.get(store, record_path!(prefix, slug)) do
{:ok, %{value: value}} when is_map(value) ->
case load_record(schema, slug, value) do
{:ok, struct} -> {:ok, struct}
:missing_required -> {:error, :not_found}
end
{:ok, %{value: scalar}} ->
# An exact-path leaf with a non-map value lives directly at
# <prefix>.<slug>. Treat it as a record with no expanded
# fields — load with empty body and let the required-fields
# guard speak.
case load_record(schema, slug, %{}) do
{:ok, struct} ->
{:ok, struct}
:missing_required ->
log_skip(schema, slug, %{}, scalar: scalar)
{:error, :not_found}
end
{:error, :not_found} ->
{:error, :not_found}
{:error, _} = err ->
err
end
end
@doc "Like `get/2` but raises if the record is missing or transport errored."
@spec get!(module(), String.t()) :: struct()
def get!(schema, slug) do
case get(schema, slug) do
{:ok, struct} ->
struct
{:error, :not_found} ->
raise "DustEcto.Repo.get!/2: no #{inspect(schema)} found for slug #{inspect(slug)}"
{:error, %Error{} = err} ->
raise "DustEcto.Repo.get!/2: transport error: #{inspect(err)}"
end
end
@doc """
Cheap existence probe. SDK mode: in-process cache lookup. HTTP mode:
one HEAD round-trip (no body).
"""
@spec exists?(module(), String.t()) :: {:ok, boolean()} | {:error, Error.t()}
def exists?(schema, slug) when is_atom(schema) and is_binary(slug) do
prefix = schema.__dust_prefix__()
{transport, _} = Transport.pick()
store = Transport.store!()
transport.exists?(store, record_path!(prefix, slug))
end
# ------------------------------------------------------------------
# Writes
# ------------------------------------------------------------------
@doc """
Validated upsert. Runs the changeset; on success, dumps the struct
and writes it to the store.
Returns `{:ok, struct}`, `{:error, %Ecto.Changeset{}}` on validation
failure, or `{:error, %DustEcto.Error{}}` on transport failure.
"""
@spec insert(Ecto.Changeset.t() | struct()) ::
{:ok, struct()} | {:error, Ecto.Changeset.t() | Error.t()}
def insert(%Ecto.Changeset{} = cs) do
case Ecto.Changeset.apply_action(cs, :insert) do
{:ok, struct} -> write_struct(struct, :all_fields, [])
{:error, _} = err -> err
end
end
def insert(%_{} = struct), do: write_struct(struct, :all_fields, [])
@doc "Like `insert/1` but raises on changeset/transport errors."
@spec insert!(Ecto.Changeset.t() | struct()) :: struct()
def insert!(input) do
case insert(input) do
{:ok, struct} -> struct
{:error, %Ecto.Changeset{} = cs} -> raise Ecto.InvalidChangesetError, changeset: cs
{:error, %Error{} = err} -> raise "DustEcto.Repo.insert!/1: #{inspect(err)}"
end
end
@doc """
Validated upsert. Runs the changeset; on success, dumps the struct
and writes only the changed fields (in flat mode) or the full
record (in map mode). Returns the same shapes as `insert/1`.
## Options
* `:if_match` — optimistic-concurrency revision. Only supported on
`:map`-mode schemas (single leaf write at `<prefix>.<slug>`). In
`:flat` mode the update is N PUTs, none of which have a meaningful
whole-record revision — pass `:if_match` and the call raises
`ArgumentError`. For atomic multi-field CAS use `batch_write/2`.
"""
@spec update(Ecto.Changeset.t(), keyword()) ::
{:ok, struct()} | {:error, Ecto.Changeset.t() | Error.t()}
def update(%Ecto.Changeset{} = cs, opts \\ []) do
case Ecto.Changeset.apply_action(cs, :update) do
{:ok, struct} ->
mode = struct.__struct__.__dust_mode__()
if Keyword.has_key?(opts, :if_match) and mode == :flat do
raise ArgumentError,
":if_match is only supported on :map-mode schemas. " <>
"For atomic multi-field CAS in :flat mode, use Repo.batch_write/2."
end
case mode do
:map ->
write_struct(struct, :all_fields, opts)
:flat ->
changed = cs.changes |> Map.keys() |> Enum.reject(&(&1 == :slug))
case changed do
[] -> {:ok, struct}
fields -> write_struct(struct, fields, opts)
end
end
{:error, _} = err ->
err
end
end
@doc """
Removes a record. Accepts a struct or changeset; `delete/2` also
accepts a `(schema, slug)` shape. Always issues a single `DELETE`
against `<prefix>.<slug>` — server clears the leaf and every
descendant.
## Options
* `:if_match` — optimistic-concurrency revision. Server enforces
leaf-only CAS in capver 2; meaningful for `:map`-mode records
where the slug path itself is a leaf. On a subtree delete the
server may ignore or reject — surface as `:conflict`.
"""
@spec delete(struct() | Ecto.Changeset.t()) ::
{:ok, %{store_seq: integer()}} | {:error, Error.t()}
def delete(struct_or_cs), do: delete(struct_or_cs, [])
@spec delete(struct() | Ecto.Changeset.t() | module(), keyword() | String.t()) ::
{:ok, %{store_seq: integer()}} | {:error, Error.t()}
def delete(%Ecto.Changeset{data: struct}, opts) when is_list(opts), do: delete(struct, opts)
def delete(%schema{slug: slug}, opts) when is_atom(schema) and is_list(opts),
do: delete(schema, slug, opts)
def delete(schema, slug) when is_atom(schema) and is_binary(slug),
do: delete(schema, slug, [])
@doc """
Three-arg convenience form accepting `(schema, slug, opts)`. Equivalent
to `delete(%schema{slug: slug}, opts)`.
"""
@spec delete(module(), String.t(), keyword()) ::
{:ok, %{store_seq: integer()}} | {:error, Error.t()}
def delete(schema, slug, opts) when is_atom(schema) and is_binary(slug) and is_list(opts) do
prefix = schema.__dust_prefix__()
{transport, _} = Transport.pick()
store = Transport.store!()
transport.delete(store, record_path!(prefix, slug), Keyword.take(opts, [:if_match]))
end
@doc """
Atomic multi-record write. Accepts a list of operation tuples:
Repo.batch_write([
{:insert, Link.changeset(%Link{}, attrs1)},
{:insert, Link.changeset(%Link{}, attrs2)},
{:update, existing_cs, if_match: 7},
{:delete, Link, "stale-slug"},
{:delete, Link, "old", if_match: 4}
])
Returns `{:ok, %{store_seq:, ops: [...]}}` on a server-side commit,
`{:error, %Ecto.Changeset{}}` if any changeset fails validation
(short-circuits before the batch is sent), or `{:error,
%DustEcto.Error{}}` on transport failure.
## Mode interaction
* `:map`-mode schemas produce one wire op per record (PUT at
`<prefix>.<slug>`). `:if_match` applies to that single op.
* `:flat`-mode schemas produce N wire ops per record (one PUT per
non-nil field). `:if_match` in `:flat` mode raises
`ArgumentError` — per-field CAS requires per-field revisions,
which this v1 API doesn't surface. Open an issue if you need it.
All ops in a single `batch_write/1` commit atomically server-side —
either every op lands or none of them does.
"""
@spec batch_write([tuple()]) ::
{:ok, %{store_seq: integer(), ops: list()}}
| {:error, Ecto.Changeset.t() | Error.t()}
def batch_write(ops) when is_list(ops) do
case prepare_batch_ops(ops, []) do
{:ok, transport_ops} ->
{transport, _} = Transport.pick()
store = Transport.store!()
transport.batch_write(store, transport_ops, [])
{:error, _} = err ->
err
end
end
defp prepare_batch_ops([], acc), do: {:ok, Enum.reverse(acc)}
defp prepare_batch_ops([op | rest], acc) do
case batch_op_to_wire(op) do
{:ok, wire_ops} -> prepare_batch_ops(rest, Enum.reverse(wire_ops) ++ acc)
{:error, _} = err -> err
end
end
defp batch_op_to_wire({:insert, %Ecto.Changeset{} = cs}),
do: batch_op_to_wire({:insert, cs, []})
defp batch_op_to_wire({:insert, %Ecto.Changeset{} = cs, opts}) do
case Ecto.Changeset.apply_action(cs, :insert) do
{:ok, struct} -> struct_to_wire_ops(struct, :set, opts)
{:error, cs} -> {:error, cs}
end
end
defp batch_op_to_wire({:update, %Ecto.Changeset{} = cs}),
do: batch_op_to_wire({:update, cs, []})
defp batch_op_to_wire({:update, %Ecto.Changeset{} = cs, opts}) do
case Ecto.Changeset.apply_action(cs, :update) do
{:ok, struct} -> struct_to_wire_ops(struct, :set, opts)
{:error, cs} -> {:error, cs}
end
end
defp batch_op_to_wire({:delete, %_{} = struct}), do: batch_op_to_wire({:delete, struct, []})
defp batch_op_to_wire({:delete, %schema{slug: slug}, opts}) when is_atom(schema),
do: batch_op_to_wire({:delete, schema, slug, opts})
defp batch_op_to_wire({:delete, schema, slug}) when is_atom(schema) and is_binary(slug),
do: batch_op_to_wire({:delete, schema, slug, []})
defp batch_op_to_wire({:delete, schema, slug, opts})
when is_atom(schema) and is_binary(slug) and is_list(opts) do
prefix = schema.__dust_prefix__()
op = %{op: :delete, path: record_path!(prefix, slug)}
case Keyword.fetch(opts, :if_match) do
{:ok, n} when is_integer(n) -> {:ok, [Map.put(op, :if_match, n)]}
:error -> {:ok, [op]}
end
end
defp batch_op_to_wire(other) do
{:error,
Error.new(
:invalid_params,
"unrecognised batch_write op: #{inspect(other)}",
retryable?: false
)}
end
defp struct_to_wire_ops(struct, set_atom, opts) do
schema = struct.__struct__
prefix = schema.__dust_prefix__()
slug = struct.slug
mode = schema.__dust_mode__()
cond do
is_nil(slug) or slug == "" ->
{:error, Error.new(:invalid_params, "missing slug", retryable?: false)}
Keyword.has_key?(opts, :if_match) and mode == :flat ->
raise ArgumentError,
":if_match is not supported on :flat-mode schemas in batch_write. " <>
"Per-field CAS would require per-field revisions; that API isn't exposed yet."
mode == :map ->
body = dump_for_wire(struct)
op = %{op: set_atom, path: record_path!(prefix, slug), value: body}
case Keyword.fetch(opts, :if_match) do
{:ok, n} when is_integer(n) -> {:ok, [Map.put(op, :if_match, n)]}
:error -> {:ok, [op]}
end
mode == :flat ->
body = dump_for_wire(struct)
ops =
schema.__dust_field_names__()
|> Enum.reject(&(&1 == :slug))
|> Enum.flat_map(fn field ->
case Map.fetch(body, field) do
{:ok, value} ->
[%{op: set_atom, path: field_path!(prefix, slug, field), value: value}]
:error ->
[]
end
end)
case ops do
[] ->
{:error, Error.new(:nothing_to_write, "no fields to write", retryable?: false)}
ops ->
{:ok, ops}
end
end
end
@doc """
Removes every record of `schema`. Returns `{:ok, %{store_seq: n}}`
— note the server's DELETE doesn't report a row count.
"""
@spec delete_all(module()) :: {:ok, %{store_seq: integer()}} | {:error, Error.t()}
def delete_all(schema) when is_atom(schema) do
prefix = schema.__dust_prefix__()
{transport, _} = Transport.pick()
store = Transport.store!()
transport.delete(store, prefix_path!(prefix), [])
end
# ------------------------------------------------------------------
# Subscribe
# ------------------------------------------------------------------
@doc """
Subscribes the given callback to record-level changes for `schema`.
Callback receives `{:upserted, struct}` or `{:deleted, slug}` events
in the dust SDK's `:committed` mode — exactly one delivery per write,
including for the writer's own changes, with `store_seq` durably
attached.
HTTP mode: returns `{:error, %DustEcto.Error{kind: :not_supported}}`.
The returned ref can be passed to `unsubscribe/1`.
"""
@spec subscribe(module(), (term() -> any())) ::
{:ok, reference()} | {:error, Error.t()}
def subscribe(schema, callback) when is_atom(schema) and is_function(callback, 1) do
prefix = schema.__dust_prefix__()
pattern = prefix_pattern!(prefix)
{transport, _} = Transport.pick()
store = Transport.store!()
wrapper = fn event ->
case ecto_event(schema, prefix, event, transport, store) do
nil -> :ok
translated -> callback.(translated)
end
end
transport.subscribe(store, pattern, wrapper)
end
@doc """
Subscribes to the raw underlying op events for `schema` — no
reassembly into structs. Callback receives the SDK's event map
`%{op:, path:, value:, store_seq:, ...}` exactly.
Useful for users who need per-leaf provenance or want to run their
own assembly. HTTP mode: same `:not_supported` as `subscribe/2`.
"""
@spec subscribe_raw(module(), (map() -> any())) ::
{:ok, reference()} | {:error, Error.t()}
def subscribe_raw(schema, callback) when is_atom(schema) and is_function(callback, 1) do
prefix = schema.__dust_prefix__()
pattern = prefix_pattern!(prefix)
{transport, _} = Transport.pick()
store = Transport.store!()
transport.subscribe(store, pattern, callback)
end
@doc "Removes a subscription previously registered via subscribe/2 or subscribe_raw/2."
@spec unsubscribe(reference()) :: :ok
def unsubscribe(ref) when is_reference(ref) do
{transport, _} = Transport.pick()
store = Transport.store!()
transport.unsubscribe(store, ref)
end
defp ecto_event(schema, prefix, %{op: :delete, path: path}, transport, store) do
case slug_from_path(path, prefix) do
{:ok, slug, []} ->
# Whole-record delete: path is exactly `<prefix>.<slug>`.
{:deleted, slug}
{:ok, slug, _field_segments} ->
# Field-level delete: the record may still exist with its
# remaining fields. Re-read; emit :upserted if it loads, or
# :deleted only when the slug is truly gone.
case transport.get(store, record_path!(prefix, slug)) do
{:ok, %{value: value}} ->
case load_record_for_event(schema, slug, value) do
{:ok, struct} -> {:upserted, struct}
:error -> {:deleted, slug}
end
{:error, :not_found} ->
{:deleted, slug}
_ ->
nil
end
:error ->
nil
end
end
defp ecto_event(schema, prefix, %{path: path} = _event, transport, store) do
with {:ok, slug, _field_segments} <- slug_from_path(path, prefix),
{:ok, %{value: value}} <- transport.get(store, record_path!(prefix, slug)),
{:ok, struct} <- load_record_for_event(schema, slug, value) do
{:upserted, struct}
else
_ -> nil
end
end
defp ecto_event(_schema, _prefix, _event, _t, _s), do: nil
# Walks prefix-many segments off the front of an incoming path's
# segment list; the next segment is the record's slug; the rest is
# the field-segments tail (empty if the event targets the slug
# itself, e.g. a whole-record delete).
#
# `prefix` is the schema's `__dust_prefix__/0` segment list. `path`
# is canonical slash-rendered.
defp slug_from_path(path, prefix) when is_binary(path) and is_list(prefix) do
case DustPath.parse_rendered(path) do
{:ok, path_segs} ->
prefix_len = length(prefix)
if Enum.take(path_segs, prefix_len) == prefix do
case Enum.drop(path_segs, prefix_len) do
[slug | rest] when slug != "" -> {:ok, slug, rest}
_ -> :error
end
else
:error
end
_ ->
:error
end
end
defp load_record_for_event(schema, slug, value) when is_map(value) do
case load_record(schema, slug, value) do
{:ok, struct} -> {:ok, struct}
:missing_required -> :error
end
end
defp load_record_for_event(_schema, _slug, _other), do: :error
# ------------------------------------------------------------------
# Internals
# ------------------------------------------------------------------
defp stream_all_items(pattern) do
{transport, _} = Transport.pick()
store = Transport.store!()
do_stream_all(transport, store, pattern, nil, [])
end
defp do_stream_all(transport, store, pattern, cursor, acc) do
opts = [select: :entries, limit: 200]
opts = if cursor, do: Keyword.put(opts, :after, cursor), else: opts
case transport.list(store, pattern, opts) do
{:ok, %{items: items, next_cursor: nil}} ->
{:ok, acc ++ items}
{:ok, %{items: items, next_cursor: next}} ->
do_stream_all(transport, store, pattern, next, acc ++ items)
{:error, _} = err ->
err
end
end
# Group items by slug, then rebuild each as a struct. The server's
# subtree response gives us a fully-assembled value when we GET
# <prefix>.<slug>; the LIST response, however, returns flat leaf
# entries. Reassemble by slug.
#
# `parse_path/2` handles two cases the original implementation
# missed: (1) dotted prefixes like `reading.links` (since the prefix
# itself contains `.`); (2) nested leaf paths like
# `things.foo.meta.a` produced when a map-typed field gets flattened
# server-side. The nested leaves get reassembled into a nested map
# so `Ecto.embedded_load` can reconstruct the original :map field.
defp rebuild_records(items, schema, prefix) do
items
|> Enum.reduce(%{}, fn item, acc ->
case parse_path(item.path, prefix) do
{:ok, slug, field_segments} ->
existing = Map.get(acc, slug, %{})
Map.put(acc, slug, put_nested(existing, field_segments, item.value))
:error ->
acc
end
end)
|> Enum.reduce([], fn {slug, fields}, structs ->
case load_record(schema, slug, fields) do
{:ok, struct} -> [struct | structs]
:missing_required -> structs
end
end)
|> Enum.reverse()
end
# Pair to `slug_from_path/2` for list-output use cases: walks the
# prefix off the front of a slash-rendered path and returns `{:ok,
# slug, field_segments}` only when there is at least one field
# segment past the slug. Used by `rebuild_records/3` to assemble
# record bodies from leaf entries.
defp parse_path(path, prefix) when is_binary(path) and is_list(prefix) do
case DustPath.parse_rendered(path) do
{:ok, path_segs} ->
prefix_len = length(prefix)
if Enum.take(path_segs, prefix_len) == prefix do
case Enum.drop(path_segs, prefix_len) do
[slug | [_ | _] = field_segments] when slug != "" ->
{:ok, slug, field_segments}
_ ->
:error
end
else
:error
end
_ ->
:error
end
end
defp put_nested(map, [key], value), do: Map.put(map, key, value)
defp put_nested(map, [key | rest], value) do
child = Map.get(map, key, %{})
Map.put(map, key, put_nested(child, rest, value))
end
# `fields` is a string-keyed map gathered from the LIST response, OR
# the assembled subtree from a GET response. Inject the slug, run
# Ecto.embedded_load, then check required fields.
defp load_record(schema, slug, fields) when is_map(fields) do
data = Map.put(fields, "slug", slug)
struct = Ecto.embedded_load(schema, data, :json)
case missing_required_fields(schema, struct) do
[] ->
{:ok, struct}
missing ->
log_skip(schema, slug, fields, missing: missing)
:missing_required
end
rescue
e ->
Logger.warning(
"DustEcto.Repo: failed to load #{inspect(schema)} slug=#{inspect(slug)}: " <>
Exception.message(e)
)
:missing_required
end
defp missing_required_fields(schema, struct) do
schema.__dust_required_fields__()
|> Enum.filter(fn field ->
case Map.get(struct, field) do
nil -> true
"" -> true
_ -> false
end
end)
end
defp log_skip(schema, slug, fields, extras) do
known = schema.__dust_field_names__()
received = if is_map(fields), do: Map.keys(fields), else: []
received_atoms = Enum.map(received, &maybe_to_atom/1)
unrecognized = received_atoms -- known
Logger.warning(fn ->
"DustEcto.Repo: skipping #{inspect(schema)} slug=#{inspect(slug)} " <>
"missing=#{inspect(Keyword.get(extras, :missing, []))} " <>
"unrecognized=#{inspect(unrecognized)}"
end)
end
defp maybe_to_atom(s) when is_atom(s), do: s
defp maybe_to_atom(s) when is_binary(s) do
String.to_existing_atom(s)
rescue
ArgumentError -> :__unknown__
end
defp write_struct(struct, fields, opts) do
schema = struct.__struct__
slug = struct.slug
cond do
is_nil(slug) or slug == "" ->
{:error, Error.new(:invalid_params, "missing slug", retryable?: false)}
true ->
case schema.__dust_mode__() do
:map -> write_map_mode(struct, slug, opts)
:flat -> write_flat_mode(struct, slug, fields, opts)
end
end
end
defp write_map_mode(struct, slug, opts) do
schema = struct.__struct__
prefix = schema.__dust_prefix__()
{transport, _} = Transport.pick()
store = Transport.store!()
body = dump_for_wire(struct)
if map_size(body) == 0 do
{:error, Error.new(:nothing_to_write, "struct dumped to an empty body", retryable?: false)}
else
put_opts = Keyword.take(opts, [:if_match])
case transport.put(store, record_path!(prefix, slug), body, put_opts) do
{:ok, _} -> {:ok, struct}
err -> err
end
end
end
defp write_flat_mode(struct, slug, :all_fields, opts) do
schema = struct.__struct__
fields = schema.__dust_field_names__() |> Enum.reject(&(&1 == :slug))
write_flat_mode(struct, slug, fields, opts)
end
defp write_flat_mode(struct, slug, fields, _opts) when is_list(fields) do
# `opts` deliberately unused — flat-mode :if_match is rejected in
# update/2 before we get here. Any future per-field opt would land
# in this signature.
schema = struct.__struct__
prefix = schema.__dust_prefix__()
{transport, _} = Transport.pick()
store = Transport.store!()
body = dump_for_wire(struct)
writes =
Enum.flat_map(fields, fn field ->
case Map.fetch(body, field) do
{:ok, value} -> [{field, value}]
:error -> []
end
end)
case writes do
[] ->
{:error, Error.new(:nothing_to_write, "no fields to write", retryable?: false)}
pairs ->
Enum.reduce_while(pairs, {:ok, struct}, fn {field, value}, _acc ->
case transport.put(store, field_path!(prefix, slug, field), value, []) do
{:ok, _} -> {:cont, {:ok, struct}}
err -> {:halt, err}
end
end)
end
end
# Always drop :slug from the wire body — it's the primary key,
# encoded in the URL path, never serialized as data. Plain-nil fields
# are kept (write JSON null at that field) since nil is a deliberate
# value in dustlayer_ecto's contract.
defp dump_for_wire(struct) do
struct
|> Ecto.embedded_dump(:json)
|> Map.delete(:slug)
|> Map.delete("slug")
end
end