defmodule Pi.Eval.Snapshot do
@moduledoc "Sidecar representation for durable eval session state snapshots."
@version 2
@default_max_bytes 10 * 1_024 * 1_024
@type t :: %{binding: Code.binding(), env: Macro.Env.t(), metadata: map()}
@spec load(Path.t() | nil) :: {:ok, t()} | :error
def load(nil), do: :error
def load(path) when is_binary(path) do
with true <- File.regular?(path),
{:ok, binary} <- File.read(path),
{:ok, decoded} <- decode(binary) do
{:ok, decoded}
else
_ -> :error
end
end
@spec store(Path.t() | nil, Code.binding(), Macro.Env.t(), keyword()) ::
{:ok, map()} | {:error, term()}
def store(nil, _binding, %Macro.Env{}, _opts), do: {:ok, %{persisted?: false}}
def store(path, binding, %Macro.Env{} = env, opts) when is_binary(path) and is_list(binding) do
max_bytes = Keyword.get(opts, :max_bytes, max_bytes())
{persisted_binding, dropped} = shrink_binding(binding, env, max_bytes)
payload = payload(persisted_binding, env, dropped)
binary = :erlang.term_to_binary(payload)
with :ok <- File.mkdir_p(Path.dirname(path)),
:ok <- atomic_write(path, binary),
:ok <- write_meta(path, payload, byte_size(binary)) do
{:ok,
%{
persisted?: true,
path: path,
bytes: byte_size(binary),
binding_count: length(persisted_binding),
dropped_bindings: dropped
}}
end
rescue
exception in [ArgumentError, ErlangError, File.Error, Jason.EncodeError] ->
{:error, exception}
end
@spec binding_info(Code.binding()) :: [map()]
def binding_info(binding) when is_list(binding) do
Enum.map(binding, fn {name, value} ->
%{
name: name,
type: value_type(value),
bytes: value_bytes(value),
preview: inspect(value, charlists: :as_lists, limit: 20, pretty: true)
}
end)
end
@spec serializable_binding(Code.binding()) :: Code.binding()
def serializable_binding(binding) when is_list(binding) do
Enum.filter(binding, fn {_name, value} -> serializable_term?(value) end)
end
defp decode(binary) when is_binary(binary) do
case :erlang.binary_to_term(binary, [:safe]) do
%{version: @version, binding: binding, env: env_snapshot} = payload
when is_map(env_snapshot) ->
{:ok,
%{
binding: restore_binding(binding),
env: restore_env(env_snapshot),
metadata: Map.get(payload, :metadata, %{})
}}
_other ->
:error
end
rescue
_exception in [ArgumentError] -> :error
end
defp payload(binding, env, dropped) do
%{
version: @version,
binding: snapshot_binding(binding),
env: snapshot_env(env),
metadata: %{
bridge_version: bridge_version(),
elixir_version: System.version(),
otp_release: System.otp_release(),
updated_at: DateTime.utc_now(),
dropped_bindings: dropped
}
}
end
defp shrink_binding(binding, env, max_bytes) do
binding = serializable_binding(binding)
binary = :erlang.term_to_binary(payload(binding, env, []))
if byte_size(binary) <= max_bytes do
{binding, []}
else
drop_until_fits(binding, env, max_bytes, [])
end
end
defp drop_until_fits([], _env, _max_bytes, dropped), do: {[], Enum.reverse(dropped)}
defp drop_until_fits(binding, env, max_bytes, dropped) do
{name, _value} = largest_binding(binding)
kept = Keyword.delete(binding, name)
dropped = [name | dropped]
binary = :erlang.term_to_binary(payload(kept, env, dropped))
if byte_size(binary) <= max_bytes do
{kept, Enum.reverse(dropped)}
else
drop_until_fits(kept, env, max_bytes, dropped)
end
end
defp largest_binding(binding),
do: Enum.max_by(binding, fn {_name, value} -> value_bytes(value) end)
defp atomic_write(path, binary) do
tmp = path <> ".tmp-" <> Integer.to_string(System.unique_integer([:positive]))
try do
case File.write(tmp, binary) do
:ok -> File.rename(tmp, path)
{:error, reason} -> {:error, reason}
end
after
File.rm(tmp)
end
end
defp write_meta(path, payload, bytes) do
metadata = %{
version: @version,
bytes: bytes,
bindingCount: length(payload.binding),
bindings:
Enum.map(payload.binding, fn {name, value} ->
%{name: to_string(name), type: to_string(value_type(value))}
end),
droppedBindings: Enum.map(payload.metadata.dropped_bindings, &to_string/1),
updatedAt: DateTime.to_iso8601(payload.metadata.updated_at),
bridgeVersion: payload.metadata.bridge_version,
elixirVersion: payload.metadata.elixir_version,
otpRelease: payload.metadata.otp_release
}
File.write(path <> ".meta.json", Jason.encode!(metadata))
end
defp max_bytes do
Application.get_env(:pi_bridge, :eval_state_max_bytes, @default_max_bytes)
end
defp snapshot_binding(binding) do
Enum.map(binding, fn {name, value} -> {Atom.to_string(name), value} end)
end
defp restore_binding(binding) do
binding
|> Enum.map(fn {name, value} when is_binary(name) -> {existing_atom(name), value} end)
|> Enum.reject(fn {name, _value} -> is_nil(name) end)
end
defp snapshot_env(%Macro.Env{} = env) do
%{
aliases: snapshot_aliases(env.aliases),
macro_aliases: snapshot_aliases(env.macro_aliases)
}
end
defp restore_env(snapshot) when is_map(snapshot) do
struct(Code.env_for_eval([]), %{
aliases: restore_aliases(Map.get(snapshot, :aliases, [])),
macro_aliases: restore_aliases(Map.get(snapshot, :macro_aliases, []))
})
end
defp snapshot_aliases(aliases) do
Enum.map(aliases, fn {alias_name, module} -> {Atom.to_string(alias_name), module} end)
end
defp restore_aliases(aliases) do
aliases
|> Enum.map(fn {alias_name, module} when is_binary(alias_name) ->
{existing_atom(alias_name), module}
end)
|> Enum.reject(fn {alias_name, _module} -> is_nil(alias_name) end)
end
defp existing_atom(name) when is_binary(name) do
String.to_existing_atom(name)
rescue
_exception in [ArgumentError] -> nil
end
defp bridge_version do
case Application.spec(:pi_bridge, :vsn) do
nil -> "unknown"
version -> to_string(version)
end
end
defp value_type(%module{}), do: module
defp value_type(value) when is_binary(value), do: :binary
defp value_type(value) when is_boolean(value), do: :boolean
defp value_type(value) when is_atom(value), do: :atom
defp value_type(value) when is_integer(value), do: :integer
defp value_type(value) when is_float(value), do: :float
defp value_type(value) when is_list(value), do: :list
defp value_type(value) when is_tuple(value), do: :tuple
defp value_type(value) when is_map(value), do: :map
defp value_type(value) when is_function(value), do: :function
defp value_type(value) when is_pid(value), do: :pid
defp value_type(value) when is_port(value), do: :port
defp value_type(value) when is_reference(value), do: :reference
defp value_type(_value), do: :term
defp value_bytes(value) do
value |> :erlang.term_to_binary() |> byte_size()
rescue
_exception in [ArgumentError] -> byte_size(inspect(value, limit: 20))
end
defp serializable_term?(term)
when is_pid(term) or is_port(term) or is_reference(term) or is_function(term),
do: false
defp serializable_term?(term) when is_list(term), do: Enum.all?(term, &serializable_term?/1)
defp serializable_term?(term) when is_tuple(term) do
term |> Tuple.to_list() |> Enum.all?(&serializable_term?/1)
end
defp serializable_term?(%_module{} = term),
do: term |> Map.from_struct() |> serializable_term?()
defp serializable_term?(term) when is_map(term) do
Enum.all?(term, fn {key, value} -> serializable_term?(key) and serializable_term?(value) end)
end
defp serializable_term?(_term), do: true
end