defmodule DetsPlus do
@moduledoc """
DetsPlus persistent tuple/struct/map storage.
[DetsPlus](https://github.com/dominicletz/dets_plus) has a similiar API as `dets` but without
the 2GB file storage limit. Writes are buffered in an
internal ETS table and synced every `auto_save` period
to the persistent storage.
While `sync()` or `auto_save` are in progress the database
is still readable and writeable.
There is no commitlog so not synced writes are lost.
Lookups are possible by key and non-matches are accelerated
using a bloom filter. The persistent file concept follows
DJ Bernsteins CDB database format, but uses an Elixir
encoded header https://cr.yp.to/cdb.html. When syncing
a new CDB database file is created and replaces
the old CDB atomically file using `File.rename!` so
database corruptions are not possible from incomplete updates.
Limits are:
- Total file size: 18_446 Petabyte
- Maximum entry size: 4 Gigabyte
- Maximum entry count: :infinity
Example:
```elixir
{:ok, dets} = DetsPlus.open_file(:example)
DetsPlus.insert(dets, {1, 1, 1})
[{1, 1, 1}] = DetsPlus.lookup(dets, 1)
:ok = DetsPlus.close(dets)
```
"""
# This limits the total database size to 18_446 PB
@slot_size 8
@slot_size_bits @slot_size * 8
# This limits the biggest entry size to 4 GB
@entry_size_size 4
@entry_size_size_bits @entry_size_size * 8
# We're using sha256 as source - should not have conflicts ever
@hash_size 8
@hash_size_bits @hash_size * 8
# Tuples for use in the hash tables
@null_tuple {<<0::unsigned-size(64)>>, 0}
@null_binary <<0::unsigned-size(128)>>
@version 3
alias DetsPlus.{Bloom, EntryWriter, FileReader, FileWriter}
use GenServer
require Logger
defstruct [:pid, :ets, :keyfun, :keyhashfun, :hashfun, :serializer]
@type t :: %__MODULE__{pid: pid()}
defmodule State do
@moduledoc false
@enforce_keys [:version]
defstruct [
:auto_save_memory,
:auto_save,
:bloom_size,
:bloom,
:creation_stats,
:ets_memory,
:ets,
:file_entries,
:file_size,
:filename,
:fp,
:hashfun,
:header_size,
:keyfun,
:keyhashfun,
:keypos,
:mode,
:name,
:page_cache_memory,
:serializer,
:slot_counts,
:sync_fallback_memory,
:sync_fallback,
:sync_waiters,
:sync,
:table_offsets,
:type,
:version
]
end
@doc """
Opens an existing table or creates a new table. If no
`file` argument is provided the table name will be used.
Dets registers a Process under the provided name which can
be used for calling alternatively to the pid.
Arguments:
- `file` - An optional path + filename for the database file.
- `auto_save` - The autosave interval. If the interval is an integer Time, the table is flushed to disk whenever it is not accessed for Time milliseconds. If the interval is the atom infinity, autosave is disabled. Defaults to `180_000` (3 minutes).
- `auto_save_memory` - The autosave threshold in memory. When the internal ETS table reaches a size bigger than this the table is flushed to disk. Defaults to `1_000_000_000` (1 GB)
- `page_cache_memory` - The amount of memory to use for file system caching. Defaults to `1_000_000_000` (1 GB)
- `keypos` - The position of the element of each object to be used as key. Defaults to 1. The ability to explicitly state the key position is most convenient when we want to store Erlang records in which the first position of the record is the name of the record type.
- `compressed` - Indicates whether the terms stored on disk should be compressed. Possible values are [true, false, 0 - 9]. Defaults to `false`.
"""
def open_file(name, args \\ []) when is_atom(name) do
case start_link([{:name, name} | args]) do
{:ok, pid} -> {:ok, get_handle(pid)}
err -> err
end
end
@doc """
Same as `open_file` but for embedding in a supervision tree.
Opens an existing table or creates a new table. If no
`file` argument is provided the table name will be used.
Dets registers a Process under the provided name which can
be used for calling alternatively to the pid.
Arguments:
- `name` - Registered name for the dets .
- `file` - An optional path + filename for the database file.
- `auto_save` - The autosave interval. If the interval is an integer Time, the table is flushed to disk whenever it is not accessed for Time milliseconds. If the interval is the atom infinity, autosave is disabled. Defaults to `180_000` (3 minutes).
- `auto_save_memory` - The autosave threshold in memory. When the internal ETS table reaches a size bigger than this the table is flushed to disk. Defaults to `1_000_000_000` (1 GB)
- `page_cache_memory` - The amount of memory to use for file system caching. Defaults to `1_000_000_000` (1 GB)
- `keypos` - The position of the element of each object to be used as key. Defaults to 1. The ability to explicitly state the key position is most convenient when we want to store Erlang records in which the first position of the record is the name of the record type.
- `compressed` - Indicates whether the terms stored on disk should be compressed. Possible values are [true, false, 0 - 9]. Defaults to `false`.
Example:
```elixir
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
{DetsPlus, name: :example}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
```
"""
def start_link(args) when is_list(args) do
{name, args} = Keyword.pop!(args, :name)
{filename, args} = Keyword.pop(args, :file, name)
filename = do_string(filename)
# amount of memory to use max before a flush is enforced
{auto_save_memory, args} = Keyword.pop(args, :auto_save_memory, 1_000_000_000)
{auto_save, args} = Keyword.pop(args, :auto_save, 180_000)
{page_cache_memory, args} = Keyword.pop(args, :page_cache_memory, 1_000_000_000)
{mode, args} = Keyword.pop(args, :access, :read_write)
{keypos, args} = Keyword.pop(args, :keypos, 1)
{type, args} = Keyword.pop(args, :type, :set)
{compressed, args} = Keyword.pop(args, :compressed, false)
# unused options from dets:
# - ram_file
# - max_no_slots
# - min_no_slots
# - repair
args = Keyword.drop(args, [:ram_file, :max_no_slots, :min_no_slots, :repair])
for {key, _value} <- args do
raise ArgumentError, "Unknown option: #{key}"
end
state =
with true <- File.exists?(filename),
{:ok, %File.Stat{size: file_size}} when file_size > 0 <- File.stat(filename) do
load_state(filename, file_size, page_cache_memory)
else
_ ->
%State{
version: @version,
bloom: "",
bloom_size: 0,
filename: filename,
fp: nil,
name: name,
mode: mode,
auto_save: auto_save,
keypos: keypos,
type: type,
file_entries: 0,
slot_counts: %{},
file_size: 0,
sync: nil,
sync_waiters: [],
sync_fallback: %{},
sync_fallback_memory: 0
}
end
|> init_hashfuns()
serializer =
case compressed do
true -> &:erlang.term_to_binary(&1, [:compressed])
false -> &:erlang.term_to_binary/1
0..9 -> fn term -> :erlang.term_to_binary(term, [:compressed, compressed]) end
end
# These properties should override what is stored on disk
state = %State{
state
| auto_save_memory: auto_save_memory,
auto_save: auto_save,
mode: mode,
page_cache_memory: page_cache_memory,
serializer: serializer
}
GenServer.start_link(__MODULE__, state, hibernate_after: 5_000, name: name)
end
defp init_hashfuns(state = %State{keypos: keypos}) when is_integer(keypos) do
%State{
state
| keyfun: fn tuple -> elem(tuple, keypos - 1) end,
keyhashfun: &default_hash/1,
hashfun: fn tuple -> default_hash(elem(tuple, keypos - 1)) end
}
end
defp init_hashfuns(state = %State{keypos: keypos}) when is_atom(keypos) do
%State{
state
| keyfun: fn map -> Map.get(map, keypos) end,
keyhashfun: &default_hash/1,
hashfun: fn map -> default_hash(Map.get(map, keypos)) end
}
end
defp load_state(filename, file_size, page_cache_memory) do
fp = file_open(filename, page_cache_memory)
{:ok, <<header_offset::unsigned-size(@slot_size_bits)>>} =
PagedFile.pread(fp, file_size - @slot_size, @slot_size)
{:ok, header} = PagedFile.pread(fp, header_offset, file_size - header_offset - @slot_size)
%State{version: version, bloom: bloom} = state = :erlang.binary_to_term(header)
if version != @version do
raise("incompatible dets+ version #{version}")
end
state = %State{
state
| fp: fp,
file_size: file_size,
header_size: byte_size(header),
filename: filename
}
{:ok, bloom} = PagedFile.pread(fp, bloom, header_offset - bloom)
%State{state | bloom: bloom}
|> Map.put(:ets_memory, 0)
|> Map.put(:sync_fallback_memory, 0)
|> Map.put(:auto_save_memory, nil)
|> Map.put(:page_cache_memory, nil)
|> Map.put(:serializer, nil)
end
@wfile PagedFile
defp store_state(state = %State{fp: fp, bloom: bloom}) do
bloom_offset = @wfile.size(fp)
@wfile.pwrite(fp, bloom_offset, bloom)
header_offset = @wfile.size(fp)
bin =
:erlang.term_to_binary(
%State{
state
| version: @version,
bloom: bloom_offset,
fp: nil,
sync: nil,
sync_waiters: [],
ets: nil
},
[:compressed]
)
@wfile.pwrite(fp, header_offset, bin <> <<header_offset::unsigned-size(@slot_size_bits)>>)
%State{state | file_size: @wfile.size(fp), header_size: byte_size(bin)}
end
@impl true
def init(state = %State{auto_save: auto_save}) do
if is_integer(auto_save) do
:timer.send_interval(auto_save, :auto_save)
end
{:ok, init_ets(state)}
end
defp init_ets(state = %State{name: name, type: type}) do
%State{state | ets: :ets.new(name, [type]), ets_memory: 0}
end
defp init_table_offsets(state = %State{slot_counts: slot_counts}, start_offset) do
table_offsets =
Enum.reduce(1..256, %{0 => start_offset}, fn table_idx, table_offsets ->
offset =
Map.get(table_offsets, table_idx - 1) +
Map.get(slot_counts, table_idx - 1, 0) * (@slot_size + @hash_size)
Map.put(table_offsets, table_idx, offset)
end)
%State{state | table_offsets: table_offsets}
end
@doc """
Syncs pending writes to the persistent file and closes the table.
"""
@spec close(DetsPlus.t() | pid() | atom()) :: :ok
def close(%__MODULE__{pid: pid}), do: close(pid)
def close(pid) when is_pid(pid) or is_atom(pid) do
call(pid, :sync)
GenServer.stop(pid)
end
@doc """
Deletes all objects from a table in almost constant time.
"""
@spec delete_all_objects(DetsPlus.t() | pid() | atom()) :: :ok | {:error, atom()}
def delete_all_objects(%__MODULE__{pid: pid}), do: delete_all_objects(pid)
def delete_all_objects(pid) when is_pid(pid) or is_atom(pid) do
call(pid, :delete_all_objects)
end
@doc """
Deletes all instances of a specified object from a table.
"""
@spec delete_object(DetsPlus.t() | pid() | atom(), tuple() | map()) :: :ok | {:error, atom()}
def delete_object(pid, object) when is_pid(pid) or is_atom(pid) do
delete_object(get_handle(pid), object)
end
def delete_object(dets = %__MODULE__{keyfun: keyfun}, object) do
delete(dets, keyfun.(object))
end
@doc """
Deletes all objects with key Key from table Name.
"""
@spec delete(DetsPlus.t() | pid() | atom(), any()) :: :ok | {:error, atom()}
def delete(%__MODULE__{pid: pid}, key), do: delete(pid, key)
def delete(pid, key) when is_pid(pid) or is_atom(pid) do
call(pid, {:insert, [{key, :delete}]})
end
@doc """
Inserts one or more objects into the table. If there already exists an object with a key matching the key of some of the given objects, the old object will be replaced.
"""
@spec insert(DetsPlus.t() | pid() | atom(), tuple() | map() | [tuple() | map()]) ::
:ok | {:error, atom()}
def insert(pid, objects) when is_pid(pid) or is_atom(pid) do
insert(get_handle(pid), objects)
end
def insert(%__MODULE__{pid: pid, keyfun: keyfun}, objects) do
objects =
List.wrap(objects)
|> Enum.map(fn object -> {keyfun.(object), object} end)
call(pid, {:insert, objects})
end
@doc """
Inserts one or more objects into the table. If there already exists an object with a key matching the key of some of the given objects, the old object will be replaced.
"""
@spec insert_async_if_not_busy(
DetsPlus.t() | pid() | atom(),
tuple() | map() | [tuple() | map()]
) :: :ok
def insert_async_if_not_busy(pid, objects) when is_pid(pid) or is_atom(pid) do
insert_async_if_not_busy(get_handle(pid), objects)
end
def insert_async_if_not_busy(%__MODULE__{pid: pid, keyfun: keyfun}, objects) do
objects =
List.wrap(objects)
|> Enum.map(fn object -> {keyfun.(object), object} end)
GenServer.cast(pid, {:insert_async_if_not_busy, objects})
end
@doc """
Inserts one or more objects into the table. If there already exists an object with a key matching the key of some of the given objects, the old object will be retained and false will be returned.
"""
@spec insert_new(DetsPlus.t() | pid() | atom(), tuple() | map() | [tuple() | map()]) ::
true | false
def insert_new(pid, object) when is_pid(pid) or is_atom(pid) do
insert_new(get_handle(pid), object)
end
def insert_new(%__MODULE__{pid: pid, hashfun: hashfun, keyfun: keyfun}, object) do
objects =
List.wrap(object)
|> Enum.map(fn object -> {keyfun.(object), hashfun.(object), object} end)
case call(pid, {:insert_new, objects}) do
:ok -> true
false -> false
end
end
@doc """
Returns the number of object in the table. This is an estimate and the same as `info(dets, :size)`.
"""
@spec count(DetsPlus.t() | pid() | atom()) :: integer()
def count(pid) when is_pid(pid) or is_atom(pid), do: count(get_handle(pid))
def count(dets = %__MODULE__{}) do
info(dets, :size)
end
@doc """
Reducer function following the `Enum` protocol.
"""
@spec reduce(DetsPlus.t() | pid() | atom(), any(), fun()) :: any()
def reduce(pid, acc, fun) when is_pid(pid) or is_atom(pid),
do: reduce(get_handle(pid), acc, fun)
def reduce(dets = %__MODULE__{}, acc, fun) do
Enum.reduce(dets, acc, fun)
end
@doc """
Returns a list of all objects with key Key stored in the table.
Example:
```
2> State.open_file(:abc)
{ok,:abc}
3> State.insert(:abc, {1,2,3})
ok
4> State.insert(:abc, {1,3,4})
ok
5> State.lookup(:abc, 1).
[{1,3,4}]
```
If the table type is set, the function returns either the empty list or a list with one object, as there cannot be more than one object with a given key. If the table type is bag or duplicate_bag, the function returns a list of arbitrary length.
Notice that the order of objects returned is unspecified. In particular, the order in which objects were inserted is not reflected.
"""
@spec lookup(DetsPlus.t() | pid() | atom(), any) :: [tuple() | map()] | {:error, atom()}
def lookup(pid, key) when is_pid(pid) or is_atom(pid), do: lookup(get_handle(pid), key)
def lookup(%__MODULE__{pid: pid, keyhashfun: keyhashfun}, key) do
case call(pid, {:lookup, key, keyhashfun.(key)}) do
{m, f, a} -> apply(m, f, a)
other -> other
end
end
@doc """
Works like `lookup/2`, but does not return the objects. Returns true if one or more table elements has the key `key`, otherwise false.
"""
@spec member?(DetsPlus.t() | pid() | atom(), any) :: false | true | {:error, atom}
def member?(dets, key) do
case lookup(dets, key) do
[] -> false
list when is_list(list) -> true
error -> error
end
end
@doc """
Same as `member?/2`
"""
@spec member(DetsPlus.t() | pid() | atom(), any) :: false | true | {:error, atom}
def member(dets, key) do
member?(dets, key)
end
@doc """
Ensures that all updates made to table are written to disk. While the sync is running the
table can still be used for reads and writes, but writes issued after the `sync/1` call
will not be part of the persistent file. These new changes will only be included in the
next sync call.
"""
@spec sync(DetsPlus.t() | pid() | atom()) :: :ok
def sync(%__MODULE__{pid: pid}), do: sync(pid)
def sync(pid) when is_pid(pid) or is_atom(pid) do
call(pid, :sync, :infinity)
end
@doc """
Starts a sync of all changes to the disk. Same as `sync/1` but doesn't block
"""
@spec start_sync(DetsPlus.t() | pid() | atom()) :: :ok
def start_sync(%__MODULE__{pid: pid}), do: start_sync(pid)
def start_sync(pid) when is_pid(pid) or is_atom(pid) do
call(pid, :start_sync, :infinity)
end
@doc """
Returns information about table Name as a list of objects:
- `{file_size, integer() >= 0}}` - The file size, in bytes.
- `{filename, file:name()}` - The name of the file where objects are stored.
- `{keypos, keypos()}` - The key position.
- `{size, integer() >= 0}` - The number of objects estimated in the table.
- `{type, type()}` - The table type.
"""
@spec info(DetsPlus.t() | pid() | atom()) :: [] | nil
def info(%__MODULE__{pid: pid}), do: info(pid)
def info(pid) when is_pid(pid) or is_atom(pid) do
call(pid, :info)
end
@doc """
Returns the information associated with `item` for the table. The following items are allowed:
- `{file_size, integer() >= 0}}` - The file size, in bytes.
- `{header_size, integer() >= 0}}` - The size of erlang term encoded header.
- `{bloom_bytes, integer() >= 0}}` - The size of the in-memory and on-disk bloom filter, in bytes.
- `{hashtable_bytes, integer() >= 0}}` - The size of the on-disk lookup hashtable, in bytes.
- `{filename, file:name()}` - The name of the file where objects are stored.
- `{keypos, keypos()}` - The key position.
- `{size, integer() >= 0}` - The number of objects estimated in the table.
- `{type, type()}` - The table type.
"""
@spec info(
DetsPlus.t(),
:file_size
| :header_size
| :filename
| :keypos
| :size
| :type
| :creation_stats
| :bloom_bytes
| :hashtable_bytes
) ::
any()
def info(dets, item)
when item == :file_size or item == :filename or item == :keypos or item == :size or
item == :type or item == :creation_stats do
case info(dets) do
nil -> nil
list -> Keyword.get(list, item)
end
end
@doc """
Returns the contents of the table as list. Same as calling `Enum.to_list()`
"""
def to_list(pid) when is_pid(pid) or is_atom(pid), do: to_list(get_handle(pid))
def to_list(dets), do: Enum.to_list(dets)
def get_handle(pid) when is_pid(pid) or is_atom(pid), do: call(pid, :get_handle)
def get_handle(dets), do: dets
defp call(pid, cmd, timeout \\ :infinity) do
GenServer.call(pid, cmd, timeout)
end
@impl true
def handle_call(
:get_handle,
_from,
state = %State{
ets: ets,
keyfun: keyfun,
keyhashfun: keyhashfun,
hashfun: hashfun,
serializer: serializer
}
) do
{:reply,
%DetsPlus{
pid: self(),
ets: ets,
keyfun: keyfun,
keyhashfun: keyhashfun,
hashfun: hashfun,
serializer: serializer
}, state}
end
def handle_call(
:delete_all_objects,
_from,
state = %State{fp: fp, ets: ets, sync: sync, sync_waiters: waiters, filename: filename}
) do
:ets.delete_all_objects(ets)
if is_pid(sync) do
Process.unlink(sync)
Process.exit(sync, :kill)
end
for w <- waiters do
:ok = GenServer.reply(w, :ok)
end
if fp != nil do
PagedFile.close(fp)
end
PagedFile.delete(filename)
{:reply, :ok,
%State{
state
| sync: nil,
sync_waiters: [],
sync_fallback: %{},
fp: nil,
bloom: "",
bloom_size: 0,
file_entries: 0,
slot_counts: %{},
file_size: 0
}}
end
def handle_call(
:get_reduce_state,
_from,
state = %State{sync_fallback: fallback, filename: filename}
) do
{:reply, {fallback, filename}, state}
end
# this needs to be configured...
def handle_call(
{:insert, objects},
from,
state = %State{
ets: ets,
ets_memory: ets_memory,
sync: sync,
sync_fallback: fallback,
sync_fallback_memory: fallback_memory,
sync_waiters: sync_waiters,
auto_save_memory: auto_save_memory
}
) do
if sync == nil do
:ets.insert(ets, objects)
ets_memory = ets_memory + estimate_size(objects)
sync =
sync ||
if ets_memory > auto_save_memory do
spawn_sync_worker(state)
end
state = %State{state | ets_memory: ets_memory, sync: sync}
{:reply, :ok, state}
else
fallback =
Enum.reduce(objects, fallback, fn {key, object}, fallback ->
Map.put(fallback, key, object)
end)
fallback_memory = fallback_memory + estimate_size(objects)
state = %State{state | sync_fallback: fallback, sync_fallback_memory: fallback_memory}
if fallback_memory > auto_save_memory do
# this pause exists to protect from out_of_memory situations when the writer can't
# finish in time
Logger.warning(
"State flush slower than new inserts - pausing writes until flush is complete"
)
{:noreply, %State{state | sync_waiters: [from | sync_waiters]}}
else
{:reply, :ok, state}
end
end
end
def handle_call({:insert_new, objects}, from, state) do
exists =
Enum.any?(objects, fn {key, hash, _object} -> lookup(key, hash, state) != [] end)
if exists do
{:reply, false, state}
else
objects = Enum.map(objects, fn {key, _hash, object} -> {key, object} end)
handle_call({:insert, objects}, from, state)
end
end
def handle_call({:lookup, key, hash}, _from, state = %State{}) do
{:reply, do_lookup(key, hash, state), state}
end
def handle_call(
:info,
_from,
state = %State{
bloom: bloom,
filename: filename,
keypos: keypos,
type: type,
ets: ets,
file_size: file_size,
header_size: header_size,
file_entries: file_entries,
creation_stats: creation_stats,
slot_counts: slot_counts
}
) do
size = :ets.info(ets, :size) + file_entries
table_size = Enum.sum(Map.values(slot_counts)) * @slot_size + @hash_size
info = [
file_size: file_size,
header_size: header_size,
bloom_bytes: byte_size(bloom),
hashtable_bytes: table_size,
filename: filename,
keypos: keypos,
size: size,
type: type,
creation_stats: creation_stats
]
{:reply, info, state}
end
def handle_call(:start_sync, _from, state = %State{sync: sync, ets: ets}) do
sync =
if sync == nil and :ets.info(ets, :size) > 0 do
spawn_sync_worker(state)
else
sync
end
{:reply, :ok, %State{state | sync: sync}}
end
def handle_call(:sync, from, state = %State{sync: nil, ets: ets}) do
if :ets.info(ets, :size) == 0 do
{:reply, :ok, state}
else
sync = spawn_sync_worker(state)
{:noreply, %State{state | sync: sync, sync_waiters: [from]}}
end
end
def handle_call(:sync, from, state = %State{sync: sync, sync_waiters: sync_waiters})
when is_pid(sync) do
{:noreply, %State{state | sync_waiters: [from | sync_waiters]}}
end
defp lookup(key, hash, state) do
case do_lookup(key, hash, state) do
{m, f, a} -> apply(m, f, a)
other -> other
end
end
defp do_lookup(key, hash, state = %State{ets: ets, sync_fallback: fallback}) do
case Map.get(fallback, key) do
:delete ->
[]
nil ->
case :ets.lookup(ets, key) do
[] -> {__MODULE__, :file_lookup, [state, key, hash]}
[{_key, :delete}] -> []
[{_key, object}] -> [object]
end
value ->
[value]
end
end
@impl true
def handle_cast(
{:insert_async_if_not_busy, objects},
state = %State{
ets: ets,
ets_memory: ets_memory,
sync: sync,
sync_fallback: fallback,
sync_fallback_memory: fallback_memory,
auto_save_memory: auto_save_memory
}
) do
if sync == nil do
:ets.insert(ets, objects)
ets_memory = ets_memory + estimate_size(objects)
sync =
sync ||
if ets_memory > auto_save_memory do
spawn_sync_worker(state)
end
state = %State{state | ets_memory: ets_memory, sync: sync}
{:noreply, state}
else
if fallback_memory > auto_save_memory do
# this pause exists to protect from out_of_memory situations when the writer can't
# finish in time
{:noreply, state}
else
fallback =
Enum.reduce(objects, fallback, fn {key, object}, fallback ->
Map.put(fallback, key, object)
end)
fallback_memory = fallback_memory + estimate_size(objects)
state = %State{state | sync_fallback: fallback, sync_fallback_memory: fallback_memory}
{:noreply, state}
end
end
end
def handle_cast(
{:sync_complete, sync_pid, new_filename, new_state = %State{}},
%State{
fp: fp,
filename: filename,
ets: ets,
sync: sync_pid,
sync_waiters: waiters,
sync_fallback: fallback,
page_cache_memory: page_cache_memory
}
) do
if fp != nil do
:ok = PagedFile.close(fp)
end
File.rename!(new_filename, filename)
fp = file_open(filename, page_cache_memory)
:ets.delete_all_objects(ets)
:ets.insert(ets, Map.to_list(fallback))
for w <- waiters do
:ok = GenServer.reply(w, :ok)
end
{:noreply,
%State{
new_state
| fp: fp,
sync: nil,
sync_fallback: %{},
sync_fallback_memory: 0,
sync_waiters: [],
ets_memory: estimate_size(fallback)
}}
end
# this is pending sync finishing while a complete delete_all_objects has been executed on the state
def handle_cast({:sync_complete, _sync_pid, _new_filename, _new_state}, state = %State{}) do
{:noreply, state}
end
@impl true
def handle_info(:auto_save, state = %State{sync: sync, ets: ets}) do
sync =
if sync == nil and :ets.info(ets, :size) > 0 do
spawn_sync_worker(state)
else
sync
end
{:noreply, %State{state | sync: sync}}
end
defp estimate_size(term, depth \\ 0) do
case term do
bin when is_binary(bin) ->
byte_size(bin)
num when is_number(num) ->
8
struct when is_struct(struct) ->
estimate_size(Map.from_struct(struct), depth)
enum when is_map(enum) or is_list(enum) ->
Enum.reduce(enum, 0, fn term, size -> size + estimate_size(term, depth + 1) end)
tuple when is_tuple(tuple) ->
estimate_size(Tuple.to_list(tuple), depth)
atom when is_atom(atom) ->
8
end
end
defp add_stats({prev, stats}, label) do
now = :erlang.timestamp()
elapsed = div(:timer.now_diff(now, prev), 1000)
# IO.puts("#{label} #{elapsed}ms")
{now, [{label, elapsed} | stats]}
end
defp spawn_sync_worker(
state = %State{
ets: ets,
fp: fp,
filename: filename,
file_entries: file_entries,
keyhashfun: keyhashfun
}
) do
# assumptions here
# 1. ets data set is small enough to fit into memory
# 2. fp entries are sorted by hash
dets = self()
spawn_link(fn ->
Process.flag(:priority, :low)
register_name()
stats = {:erlang.timestamp(), []}
new_dataset = :ets.tab2list(ets)
stats = add_stats(stats, :ets_flush)
# Ensuring hash function sort order
new_dataset = parallel_hash(keyhashfun, new_dataset)
stats = add_stats(stats, :ets_hash)
old_file =
if fp != nil do
FileReader.new(fp, byte_size("DET+"), module: PagedFile, buffer_size: 100_000)
else
nil
end
new_filename = "#{filename}.tmp"
@wfile.delete(new_filename)
# ~5mb for this write buffer, it's mostly append only, higher values
# didn't make an impact.
# (the only non-append workflow on this fp is the hash overflow handler, but that is usually small)
opts = [page_size: 512_000, max_pages: 10, priority: :low]
{:ok, new_file} = @wfile.open(new_filename, opts)
state = %State{state | fp: new_file}
stats = add_stats(stats, :fopen)
new_dataset_length = length(new_dataset)
# setting the bloom size based of a size estimate
future_bloom = task_async(fn -> write_bloom(file_entries + new_dataset_length) end)
future_entries = task_async(fn -> write_entries(state, new_dataset_length) end)
future_state = task_async(fn -> write_data(state) end)
# This starts of the file_reader sending entry data to above the future_* workers
pids = [future_bloom.pid, future_entries.pid, future_state.pid]
async_iterate_produce(state, new_dataset, old_file, pids)
state = Task.await(future_state, :infinity)
{bloom, bloom_size} = Task.await(future_bloom, :infinity)
state = %State{state | bloom: bloom, bloom_size: bloom_size}
entries = Task.await(future_entries, :infinity)
stats = add_stats(stats, :write_entries)
state = write_hashtable(state, entries)
stats = add_stats(stats, :write_hashtable)
state = store_state(state)
stats = add_stats(stats, :header_store)
@wfile.close(new_file)
{_, stats} = add_stats(stats, :file_close)
state = %State{state | creation_stats: stats}
GenServer.cast(dets, {:sync_complete, self(), new_filename, state})
end)
# Profiler.fprof(worker)
end
@max 1
defp register_name(n \\ 0) do
n =
if n == @max do
0
else
n + 1
end
try do
Process.register(self(), String.to_atom("DetsPlus_Flush_#{n}"))
rescue
_e ->
Process.sleep(100)
register_name(n)
end
end
@min_chunk_size 10_000
@max_tasks 4
@doc false
def parallel_hash(keyhashfun, new_dataset, tasks \\ 1) do
len = length(new_dataset)
if len > @min_chunk_size and tasks < @max_tasks do
{a, b} = Enum.split(new_dataset, div(len, 2))
task = task_async(fn -> parallel_hash(keyhashfun, a, tasks * 2) end)
result = parallel_hash(keyhashfun, b, tasks * 2)
:lists.merge(Task.await(task, :infinity), result)
else
Enum.map(new_dataset, fn {key, object} ->
{{keyhashfun.(key), key}, object}
end)
|> :lists.sort()
end
end
defp task_async(fun) do
Task.async(fn ->
Process.flag(:priority, :low)
fun.()
end)
end
defp write_bloom(estimated_file_entries) do
bloom_size = estimated_file_entries * 10
Bloom.create(bloom_size)
|> async_iterate_consume(fn bloom, entry_hash, _entry, _ ->
{:cont, Bloom.add(bloom, entry_hash)}
end)
|> Bloom.finalize()
end
defp write_entries(
%State{file_entries: old_file_entries, filename: filename},
new_dataset_length
) do
estimated_entry_count = new_dataset_length + old_file_entries
entries = EntryWriter.new(filename, estimated_entry_count)
{entries, _offset} =
async_iterate_consume({entries, 4}, fn {entries, offset},
entry_hash,
entry_bin,
_entry_term ->
size = byte_size(entry_bin)
table_idx = table_idx(entry_hash)
entries =
EntryWriter.insert(
entries,
{table_idx, entry_hash, offset + @hash_size}
)
offset = offset + @hash_size + @entry_size_size + size
{:cont, {entries, offset}}
end)
entries
end
defp write_data(state = %State{fp: fp}) do
state = %State{state | file_entries: 0, slot_counts: %{}}
writer = FileWriter.new(fp, 0, module: @wfile)
writer = FileWriter.write(writer, "DET+")
{state, writer} =
async_iterate_consume(
{state, writer},
fn {state = %State{file_entries: file_entries, slot_counts: slot_counts}, writer},
entry_hash,
entry_bin,
_entry_term ->
table_idx = table_idx(entry_hash)
slot_counts = Map.update(slot_counts, table_idx, 1, fn count -> count + 1 end)
state = %State{state | file_entries: file_entries + 1, slot_counts: slot_counts}
size = byte_size(entry_bin)
writer =
FileWriter.write(
writer,
<<entry_hash::binary-size(@hash_size), size::unsigned-size(@entry_size_size_bits),
entry_bin::binary>>
)
{:cont, {state, writer}}
end
)
# final zero offset after all data entries
FileWriter.write(
writer,
<<0::unsigned-size(@hash_size_bits), 0::unsigned-size(@entry_size_size_bits)>>
)
|> FileWriter.sync()
state
end
_ =
Enum.reduce(1..56, {[], 2}, fn bits, {code, size} ->
next =
defp slot_idx(unquote(size), <<_, slot::unsigned-size(unquote(bits)), _::bitstring>>) do
slot
end
{[next | code], size * 2}
end)
|> elem(0)
defp write_hashtable(state = %State{slot_counts: slot_counts, fp: fp}, entries) do
new_slot_counts =
Enum.map(slot_counts, fn {key, value} ->
{key, next_power_of_two(trunc(value * 1.3) + 1)}
end)
|> Map.new()
state =
%State{state | slot_counts: new_slot_counts}
|> init_table_offsets(@wfile.size(fp))
Enum.chunk_every(0..255, 64)
|> Enum.map(fn range ->
start_offset = table_offset(state, hd(range))
end_offset = table_offset(state, List.last(range) + 1)
writer = FileWriter.new(fp, start_offset, module: @wfile, limit: end_offset)
task_async(fn -> write_hashtable_range(range, writer, entries, new_slot_counts) end)
end)
|> Task.await_many(:infinity)
EntryWriter.close(entries)
state
end
defp write_hashtable_range(range, writer, entries, slot_counts) do
Enum.reduce(range, writer, fn table_idx, writer ->
slot_count = Map.get(slot_counts, table_idx, 0)
entries = EntryWriter.lookup(entries, table_idx)
start_offset = FileWriter.offset(writer)
{writer, overflow} = reduce_entries(entries, writer, slot_count)
if overflow == [] do
writer
else
writer = FileWriter.sync(writer)
reduce_overflow(
Enum.reverse(overflow),
FileReader.new(writer.fp, start_offset, module: @wfile)
)
writer
end
end)
|> FileWriter.sync()
end
defp next_power_of_two(n), do: next_power_of_two(n, 2)
defp next_power_of_two(n, x) when n < x, do: x
defp next_power_of_two(n, x), do: next_power_of_two(n, x * 2)
defp reduce_entries(entries, writer, slot_count),
do: reduce_entries(entries, writer, slot_count, -1, [])
defp reduce_entries([], writer, slot_count, last_slot, overflow)
when last_slot + 1 == slot_count,
do: {writer, overflow}
defp reduce_entries([], writer, slot_count, last_slot, overflow) do
writer = FileWriter.write(writer, @null_binary)
reduce_entries([], writer, slot_count, last_slot + 1, overflow)
end
defp reduce_entries(
[
<<entry_hash::binary-size(8), _offset::unsigned-size(@slot_size_bits)>> = entry
| entries
],
writer,
slot_count,
last_slot,
overflow
) do
slot_idx = slot_idx(slot_count, entry_hash)
cond do
last_slot + 1 == slot_count ->
reduce_entries(entries, writer, slot_count, last_slot, [entry | overflow])
last_slot + 1 >= slot_idx ->
writer = FileWriter.write(writer, entry)
reduce_entries(entries, writer, slot_count, last_slot + 1, overflow)
true ->
writer = FileWriter.write(writer, @null_binary)
reduce_entries([entry | entries], writer, slot_count, last_slot + 1, overflow)
end
end
defp reduce_overflow([], _reader), do: :ok
defp reduce_overflow([entry | overflow], reader) do
curr = FileReader.offset(reader)
{reader, next} = FileReader.read(reader, @slot_size + @hash_size)
case next do
@null_binary ->
@wfile.pwrite(reader.fp, curr, entry)
reduce_overflow(overflow, reader)
_ ->
reduce_overflow([entry | overflow], reader)
end
end
@async_send_buffer_size 10_000_000
@async_send_buffer_trigger div(10_000_000, 2)
defp async_iterate_produce(
state,
new_dataset,
file_reader,
targets
) do
spawn_link(fn ->
init_acc = {[], 0, targets, -@async_send_buffer_trigger}
{:done, {items, _item_count, _targets, _byte_count}} =
iterate(state, {:cont, init_acc}, new_dataset, file_reader, &async_iterator/4)
for pid <- targets do
send(pid, {:entries, self(), Enum.reverse(items)})
send(pid, :done)
end
end)
end
defp async_iterator(state, _entry_hash, _binary_entry, :delete) do
{:cont, state}
end
defp async_iterator(
{items, item_count, targets, byte_count},
entry_hash,
entry_bin,
entry_term
) do
items = [{entry_hash, entry_bin, entry_term} | items]
item_count = item_count + 1
byte_count = byte_count + byte_size(entry_bin)
if item_count > 128 or byte_count >= @async_send_buffer_size do
for pid <- targets do
send(pid, {:entries, self(), Enum.reverse(items)})
end
{:cont, {[], 0, targets, await_processing(byte_count, targets)}}
else
{:cont, {items, item_count, targets, byte_count}}
end
end
defp await_processing(byte_count, targets) do
if byte_count >= @async_send_buffer_size do
for pid <- targets do
receive do
{^pid, :continue} -> :ok
end
end
await_processing(byte_count - @async_send_buffer_size, targets)
else
byte_count
end
end
defp async_iterate_consume(acc, fun, byte_count0 \\ 0) do
receive do
{:entries, producer, entries} ->
{acc, byte_count} =
Enum.reduce(entries, {acc, byte_count0}, fn {entry_hash, entry_bin, entry_term},
{acc, byte_count} ->
byte_count = byte_count + byte_size(entry_bin)
{:cont, acc} = fun.(acc, entry_hash, entry_bin, entry_term)
{acc, byte_count}
end)
async_iterate_consume(acc, fun, confirm_processing(byte_count, producer))
:done ->
acc
end
end
defp confirm_processing(byte_count, producer) do
if byte_count >= @async_send_buffer_size do
send(producer, {self(), :continue})
confirm_processing(byte_count - @async_send_buffer_size, producer)
else
byte_count
end
end
defp read_next_entry(nil), do: nil
defp read_next_entry(file_reader) do
case FileReader.read(file_reader, @hash_size + @entry_size_size) do
{_new_file_reader,
<<0::unsigned-size(@hash_size_bits), 0::unsigned-size(@entry_size_size_bits)>>} ->
nil
{new_file_reader,
<<entry_hash::binary-size(@hash_size), old_size::unsigned-size(@entry_size_size_bits)>>} ->
{file_reader, entry_bin} = FileReader.read(new_file_reader, old_size)
{file_reader, entry_hash, entry_bin}
end
end
# this function takes a new_dataset and an old file and merges them, it calls
# on every entry the callback `fun.(acc, entry_hash, entry_bin, entry_term)` and returns the final acc
@doc false
def iterate(state, {:cont, acc}, new_dataset, file_reader, fun) do
do_iterate(state, {:cont, acc}, new_dataset, read_next_entry(file_reader), fun)
end
defp do_iterate(_state, {:halt, acc}, _new_dataset, _old_dataset, _fun) do
{:halted, acc}
end
# Nothing left
defp do_iterate(_state, {:cont, acc}, [], nil, _fun) do
{:done, acc}
end
# Only old entries left
defp do_iterate(state, {:cont, acc}, [], {fr, entry_hash, entry_bin}, fun) do
acc = fun.(acc, entry_hash, entry_bin, nil)
do_iterate(state, acc, [], read_next_entry(fr), fun)
end
# Only new entries left
defp do_iterate(state, {:cont, acc}, new_dataset, nil, fun) do
{{entry_hash, _entry_key}, entry_term} = hd(new_dataset)
entry_bin = state.serializer.(entry_term)
acc = fun.(acc, entry_hash, entry_bin, entry_term)
do_iterate(state, acc, tl(new_dataset), nil, fun)
end
# Both types are still there
defp do_iterate(state, {:cont, acc}, new_dataset, {fr, old_entry_hash, old_entry_bin}, fun) do
# Reading a new entry from the top of the dataset
{{new_entry_hash, new_entry_key}, new_entry_term} = hd(new_dataset)
# Reading a new entry from the file or falling back to else if there is no next
# entry in the file anymore
case compare(state.keyfun, {new_entry_hash, new_entry_key}, {old_entry_hash, old_entry_bin}) do
:equal ->
entry_bin = state.serializer.(new_entry_term)
acc = fun.(acc, new_entry_hash, entry_bin, new_entry_term)
do_iterate(state, acc, tl(new_dataset), read_next_entry(fr), fun)
:new ->
entry_bin = state.serializer.(new_entry_term)
acc = fun.(acc, new_entry_hash, entry_bin, new_entry_term)
do_iterate(state, acc, tl(new_dataset), {fr, old_entry_hash, old_entry_bin}, fun)
:old ->
acc = fun.(acc, old_entry_hash, old_entry_bin, nil)
do_iterate(state, acc, new_dataset, read_next_entry(fr), fun)
end
end
defp compare(
keyfun,
{new_entry_hash, new_entry_key},
{old_entry_hash, old_entry_bin}
) do
case {new_entry_hash, old_entry_hash} do
{same, same} ->
# hash collision should be really seldom, or this is going to be expensive
old_entry_key = keyfun.(:erlang.binary_to_term(old_entry_bin))
case {new_entry_key, old_entry_key} do
{same, same} -> :equal
{new, old} when new < old -> :new
{new, old} when new > old -> :old
end
{new, old} when new < old ->
:new
{new, old} when new > old ->
:old
end
end
defp table_offset(%State{table_offsets: nil}, _table_idx) do
nil
end
defp table_offset(%State{table_offsets: table_offsets}, table_idx) do
Map.get(table_offsets, table_idx)
end
def file_lookup(%State{file_entries: 0}, _key, _hash), do: []
def file_lookup(state = %State{slot_counts: slot_counts}, key, hash) do
table_idx = table_idx(hash)
slot_count = Map.get(slot_counts, table_idx, 0)
if Bloom.lookup(state, hash) and slot_count > 0 do
slot = slot_idx(slot_count, hash)
{ret, _n} =
file_lookup_slot_loop(state, key, hash, table_offset(state, table_idx), slot, slot_count)
ret
else
[]
end
end
defp batch_read(fp, point, count) do
{:ok, data} = PagedFile.pread(fp, point, (@slot_size + @hash_size) * count)
for <<hash::binary-size(@hash_size), offset::unsigned-size(@slot_size_bits) <- data>> do
{hash, offset}
end ++ [@null_tuple]
end
@batch_size 32
defp file_lookup_slot_loop(
state = %State{fp: fp, keyfun: keyfun},
key,
hash,
base_offset,
slot,
slot_count,
n \\ 0
) do
slot = rem(slot, slot_count)
point = base_offset + slot * (@slot_size + @hash_size)
batch_size = min(@batch_size, slot_count - slot)
hash_offsets = batch_read(fp, point, batch_size)
# a zero offset is an indication of the end
# a hash bigger than the searched hash means
# we reached the next entry or and overflow entry
hash_offsets =
Enum.take_while(hash_offsets, fn {<<khash::binary-size(@hash_size)>>, offset} ->
offset != 0 and khash <= hash
end)
len = length(hash_offsets)
offsets =
Enum.filter(hash_offsets, fn {rhash, _offset} ->
rhash == hash
end)
|> Enum.map(fn {_hash, offset} -> offset end)
{:ok, sizes} =
PagedFile.pread(fp, Enum.zip(offsets, List.duplicate(@entry_size_size, length(offsets))))
sizes = Enum.map(sizes, fn <<size::unsigned-size(@entry_size_size_bits)>> -> size end)
offsets = Enum.map(offsets, fn offset -> offset + @entry_size_size end)
{:ok, entries} = PagedFile.pread(fp, Enum.zip(offsets, sizes))
Enum.find_value(entries, fn entry ->
entry = :erlang.binary_to_term(entry)
if keyfun.(entry) == key do
entry
end
end)
|> case do
nil ->
if len < batch_size do
{[], n}
else
file_lookup_slot_loop(
state,
key,
hash,
base_offset,
slot + batch_size,
slot_count,
n + 1
)
end
entry ->
{[entry], n}
end
end
defp default_hash(key) do
<<hash::binary-size(@hash_size), _::binary>> =
:crypto.hash(:sha256, :erlang.term_to_binary(key))
hash
end
# get a table idx from the hash value
defp table_idx(<<idx, _::binary>>) do
idx
end
defp do_string(atom) when is_atom(atom), do: Atom.to_string(atom)
defp do_string(list) when is_list(list), do: List.to_string(list)
defp do_string(string) when is_binary(string), do: string
defp file_open(filename, page_cache_memory) do
# Defaults to page size 1mb with at least 10 pages
# if smaller than 10mb total cache are requested
# defaults to at least 10x100kb pages (so 1mb is always consumed)
opts =
if page_cache_memory > 10_000_000 do
max_pages = div(page_cache_memory, 1_000_000)
[page_size: 1_000_000, max_pages: max_pages]
else
max_pages = div(page_cache_memory, 100_000) |> min(10)
[page_size: 100_000, max_pages: max_pages]
end
{:ok, fp} = PagedFile.open(filename, opts)
fp
end
@impl true
def terminate(_reason, %State{fp: fp}) do
if fp != nil do
:ok = PagedFile.close(fp)
end
end
end
defimpl Enumerable, for: DetsPlus do
alias DetsPlus.FileReader
def count(_pid), do: {:error, __MODULE__}
def member?(_pid, _key), do: {:error, __MODULE__}
def slice(_pid), do: {:error, __MODULE__}
def reduce(state = %DetsPlus{pid: pid, ets: ets, keyhashfun: keyhashfun}, acc, fun) do
# Calling to ensure all other async casts have been processed
DetsPlus.info(state)
new_data =
:ets.tab2list(ets)
|> Enum.reduce(%{}, fn {key, object}, map -> Map.put(map, key, object) end)
{new_data2, filename} = GenServer.call(pid, :get_reduce_state)
new_data = Map.merge(new_data, new_data2)
opts = [page_size: 1_000_000, max_pages: 1000]
{fp, old_file} =
with true <- File.exists?(filename),
{:ok, fp} <- PagedFile.open(filename, opts) do
old_file = FileReader.new(fp, byte_size("DET+"), module: PagedFile, buffer_size: 512_000)
{fp, old_file}
else
_ -> {nil, nil}
end
# Ensuring hash function sort order
new_dataset = DetsPlus.parallel_hash(keyhashfun, Map.to_list(new_data))
ret =
DetsPlus.iterate(state, acc, new_dataset, old_file, fn acc,
_entry_hash,
entry_blob,
entry ->
if entry != :delete do
fun.(entry || :erlang.binary_to_term(entry_blob), acc)
else
{:cont, acc}
end
end)
if fp != nil, do: PagedFile.close(fp)
ret
end
end