defmodule Kdb.Bucket do
defstruct [:db, :name, :dbname, :handle, :t, :batch, :module, :ttl]
@type t :: %__MODULE__{
db: reference(),
name: atom(),
dbname: atom(),
handle: reference() | nil,
t: :ets.tid() | nil,
batch: reference() | nil,
module: module(),
ttl: boolean()
}
defmacro __using__(opts) do
bucket = Keyword.get(opts, :name) || raise ArgumentError, "missing :name option"
cache = Keyword.get(opts, :cache, Kdb.Cache)
ttl = Keyword.get(opts, :ttl, true)
decoder = Keyword.get(opts, :decoder, &Kdb.binary_to_term/1)
encoder = Keyword.get(opts, :encoder, &Kdb.term_to_binary/1)
quote bind_quoted: [
bucket: bucket,
cache: cache,
ttl: ttl,
decoder: decoder,
encoder: encoder
] do
@bucket bucket
@cache cache
@ttl ttl
@decoder decoder
@encoder encoder
@compile {:inline,
put: 3, get: 2, has_key?: 2, delete: 2, incr: 3, batch: 2, decoder: 1, encoder: 1}
def new(dbname, db, handle) do
%Kdb.Bucket{
dbname: dbname,
db: db,
name: @bucket,
handle: handle,
t: new_table(),
module: __MODULE__,
ttl: @ttl
}
end
def new_table do
:ets.new(__MODULE__, [
:set,
:public,
read_concurrency: true,
write_concurrency: true
])
end
def name, do: @bucket
def ttl, do: @ttl
def decoder(x), do: @decoder.(x)
def encoder(x), do: @encoder.(x)
def batch(%Kdb.Bucket{dbname: dbname} = bucket, name) do
batch = Kdb.batch(name)
%{bucket | batch: batch}
end
def put(%Kdb.Bucket{batch: batch, handle: handle, t: t, ttl: ttl}, key, value) do
:ets.insert(t, {key, value})
:rocksdb.batch_put(batch, handle, key, @encoder.(value))
ttl and @cache.put(@bucket, key)
end
defp put_in_memory(%Kdb.Bucket{t: t, ttl: ttl}, key, value) do
:ets.insert(t, {key, value})
ttl and @cache.put(@bucket, key)
end
def get(bucket, key) do
case :ets.lookup(bucket.t, key) do
[{^key, value}] ->
value
[] ->
get_from_disk(bucket, key)
end
end
def has_key?(bucket, key) do
case :ets.member(bucket.t, key) do
true ->
true
false ->
case get_from_disk(bucket, key) do
nil -> false
_value -> true
end
end
end
def get_from_disk(bucket, key) do
case :rocksdb.get(bucket.db, bucket.handle, key, []) do
{:ok, value} ->
result = @decoder.(value)
put_in_memory(bucket, key, result)
result
:not_found ->
nil
end
end
def incr(bucket, key, amount) when is_integer(amount) do
result = get(bucket, key) || 0
result = :ets.update_counter(bucket.t, key, {2, amount}, {key, result})
:rocksdb.batch_put(bucket.batch, bucket.handle, key, @encoder.(result))
result
end
def delete(bucket, key) do
:ets.delete(bucket.t, key)
:rocksdb.batch_delete(bucket.batch, bucket.handle, key)
@ttl and @cache.delete(@bucket, key)
end
end
end
defimpl Enumerable do
def reduce(bucket, acc, fun) do
# ← puedes parametrizar el nombre de tabla
stream = Kdb.Stream.stream(bucket, decoder: &bucket.module.decoder/1)
Enumerable.reduce(stream, acc, fun)
end
def count(bucket) do
# No se puede contar sin recorrer todo
{:error, bucket.module}
end
def member?(bucket, _element) do
# No implementado de forma eficiente
{:error, bucket.module}
end
def slice(bucket) do
{:error, bucket.module}
end
end
def fetch(bucket = %Kdb.Bucket{module: module}, key) do
case :ets.lookup(bucket.t, key) do
[{^key, value}] ->
{:ok, value}
[] ->
case module.get_from_disk(bucket, key) do
nil -> :error
value -> {:ok, value}
end
end
end
def get_and_update(bucket = %Kdb.Bucket{module: module}, key, fun) do
case module.get(bucket, key) do
nil ->
{:ok, nil, nil}
value ->
result = fun.(value)
module.put(bucket, key, result)
{:ok, value, result}
end
end
defimpl Inspect do
def inspect(bucket, _opts) do
"#Kdb.Bucket<name: #{bucket.name}>"
end
end
end
defmodule Kdb.Stream do
def stream(%Kdb.Bucket{db: db, handle: handle, module: module}, opts \\ []) do
# <<>> or <<0>> or :last
initial_seek = Keyword.get(opts, :seek, <<>>)
# :next or :prev
direction = Keyword.get(opts, :direction, :next)
decoder_fun = Keyword.get(opts, :decoder, &module.decoder/1)
Stream.resource(
# Start: open iterator and seek
fn ->
{:ok, iter} = :rocksdb.iterator(db, handle, [])
state =
case :rocksdb.iterator_move(iter, initial_seek) do
{:ok, key, value} -> {:ok, iter, key, value}
_ -> {:done, iter}
end
state
end,
# Next: return {k, v} and move iterator
fn
{:done, iter} ->
{:halt, iter}
{:ok, iter, key, value} ->
item = {key, decoder_fun.(value)}
next =
case :rocksdb.iterator_move(iter, direction) do
{:ok, next_key, next_val} -> {:ok, iter, next_key, next_val}
_ -> {:done, iter}
end
{[item], next}
end,
# After: close iterator
fn iter ->
:ok = :rocksdb.iterator_close(iter)
end
)
end
end
defmodule DefaultBucket do
use Kdb.Bucket, name: :default, ttl: true
end