defmodule Kdb.Bucket do
defstruct [:name, :dbname, :handle, :module, :batch, :ttl]
@type t :: %__MODULE__{
name: atom(),
dbname: atom(),
handle: reference() | nil,
module: module(),
batch: Kdb.Batch.t() | nil,
ttl: integer()
}
defmacro __using__(opts) do
bucket = Keyword.get(opts, :name) || raise ArgumentError, "missing :name option"
cache = Keyword.get(opts, :cache, Kdb.Cache)
ttl = Keyword.get(opts, :ttl, 300_000)
unique = Keyword.get(opts, :unique, [])
secondary = Keyword.get(opts, :secondary, [])
decoder = Keyword.get(opts, :decoder, &Kdb.Utils.binary_to_term/1)
encoder = Keyword.get(opts, :encoder, &Kdb.Utils.term_to_binary/1)
stats = Keyword.get(opts, :stats, Kdb.DefaultBucket)
match_count = Keyword.get(opts, :match_count, [])
quote bind_quoted: [
bucket: bucket,
cache: cache,
ttl: ttl,
unique: unique,
secondary: secondary,
decoder: decoder,
encoder: encoder,
stats: stats,
match_count: match_count
] do
@bucket bucket |> Kdb.Utils.to_bucket_name()
@cache cache
@ttl ttl
@decoder decoder
@encoder encoder
@cacheable is_integer(ttl)
@unique unique
@secondary secondary
@has_unique length(unique) > 0
@has_secondary length(secondary) > 0
@has_index @has_unique or @has_secondary
@unique_map Enum.into(@unique, %{}, & &1)
@stats stats
@has_stats stats != false
@info_keys "#{@bucket}:keys"
@has_match_count length(match_count) > 0
@match_count Enum.map(match_count, fn {name, regex, fun} ->
{"#{@bucket}:#{name}:keys", regex, fun}
end)
@compile {:inline,
transform: 5, put_new: 3, get: 3, incr: 4, incr_if: 6, encoder: 1, decoder: 1}
@behaviour Kdb.Bucket
alias Kdb.Batch
alias Kdb.Cache
alias Kdb.Indexer
@impl true
def new(opts) do
dbname = Keyword.fetch!(opts, :dbname)
handle = Keyword.fetch!(opts, :handle)
batch = Keyword.get(opts, :batch)
%Kdb.Bucket{
dbname: dbname,
name: @bucket,
handle: handle,
module: __MODULE__,
batch: batch,
ttl: @ttl
}
end
@impl true
def name, do: @bucket
@impl true
def ttl, do: @ttl
@impl true
def indexes, do: (@unique |> Enum.map(fn {f, _mod} -> f end)) ++ @secondary
@impl true
def decoder(x), do: @decoder.(x)
@impl true
def encoder(x), do: @encoder.(x)
if @has_stats do
@impl true
def count_keys(batch) do
@stats.get(batch, @info_keys, 0)
end
def count_keys(batch, name) do
@stats.get(batch, "#{@bucket}:#{name}:keys", 0)
end
if @has_match_count do
defp incr_count(batch, key, amount) do
Enum.reduce_while(@match_count, :ok, fn {prefix, regex, fun}, acc ->
if fun.(key, regex) do
@stats.incr(batch, prefix, amount)
{:halt, acc}
else
{:cont, acc}
end
end)
@stats.incr(batch, @info_keys, amount)
end
else
defp incr_count(batch, _key, amount) do
@stats.incr(batch, @info_keys, amount)
end
end
else
@impl true
def count_keys(_batch), do: 0
defp incr_count(_batch, _keys, _amount), do: true
end
if @has_index do
@impl true
def put(
batch = %Kdb.Batch{
db:
db = %Kdb{
buckets: %{@bucket => %Kdb.Bucket{name: bucket_name, handle: handle}}
},
cache: cache,
store: store,
indexer: indexer
},
key,
value
) do
if put_unique(batch, key, value) do
:rocksdb.batch_put(store.batch, handle, key, @encoder.(value))
@has_secondary and put_secondary(indexer, key, value)
@cache.put(cache, bucket_name, key, value, @ttl)
else
false
end
end
if @has_unique do
defp put_unique(batch, key, value) do
results =
Enum.map(@unique, fn {field, module} ->
val = Map.get(value, field)
result = module.get(batch, val)
{result, module, val}
end)
if Enum.any?(results, fn {x, _mod, _v} -> x != nil end) do
false
else
Enum.each(results, fn {_r, module, val} ->
module.put(batch, val, key)
end)
true
end
end
else
def put_unique(_, _, _), do: true
end
defp put_secondary(indexer, key, value) do
Enum.each(@secondary, fn field ->
value = Map.get(value, field)
# IO.inspect(value, label: "Secondary Index Value for #{field}")
value != nil and
Indexer.Batch.add(indexer, :create_index, [@bucket, field, key, value])
end)
end
else
@impl true
def put(
%Kdb.Batch{
db:
db = %Kdb{
buckets: %{@bucket => %Kdb.Bucket{name: bucket_name, handle: handle}}
},
cache: cache,
store: store,
indexer: indexer
},
key,
value
) do
:rocksdb.batch_put(store.batch, handle, key, @encoder.(value))
@cache.put(cache, bucket_name, key, value, @ttl)
end
end
def put_new(batch, key, value) do
case get(batch, key) do
nil ->
put(batch, key, value) and incr_count(batch, key, 1)
_value ->
false
end
end
@impl true
def get(
%Kdb.Batch{
db: %Kdb{
buckets: %{@bucket => %Kdb.Bucket{name: bucket_name, handle: handle}}
},
cache: cache
} = batch,
key,
default \\ nil
) do
case @cache.get(cache, bucket_name, key, @ttl) do
:delete ->
default
nil ->
get_from_disk(batch, handle, key, default)
value ->
value
end
end
@impl true
def has_key?(
%Kdb.Batch{
db: %Kdb{
buckets: %{@bucket => %Kdb.Bucket{name: bucket_name, handle: handle} = bucket}
},
cache: cache
} = batch,
key
) do
case @cache.has_key?(cache, bucket_name, key) do
nil ->
case get_from_disk(batch, handle, key) do
nil -> false
_value -> true
end
result ->
result
end
end
defp get_from_disk(
%Kdb.Batch{
db: db,
cache: cache
},
handle,
key,
default \\ nil
) do
case :rocksdb.get(db.store, handle, key, []) do
{:ok, value} ->
result = @decoder.(value)
# put in memory cache
@cache.put(cache, @bucket, key, result, @ttl)
result
:not_found ->
default
end
end
@impl true
def incr(
batch = %Kdb.Batch{
db: %Kdb{
buckets: %{@bucket => %Kdb.Bucket{handle: handle}}
},
cache: cache,
store: store
},
key,
amount,
initial \\ 0
)
when is_integer(amount) do
raw_batch = store.batch
old_result = get(batch, key, initial)
# if initial == old_result do
# :rocksdb.batch_put(raw_batch, handle, key, @encoder.(initial))
# end
# :rocksdb.batch_merge(raw_batch, handle, key, @encoder.({:int_add, amount}))
result = @cache.update_counter(cache, @bucket, key, amount, old_result, @ttl)
:rocksdb.batch_put(raw_batch, handle, key, @encoder.(result))
result
end
@spec incr_if(
batch :: Kdb.Batch.t(),
key :: binary(),
amount :: integer(),
initial :: integer(),
threshold :: integer(),
comparator :: (integer(), integer() -> boolean())
) :: integer() | {:error, integer()}
def incr_if(
batch = %Kdb.Batch{
db: %Kdb{
buckets: %{@bucket => %Kdb.Bucket{handle: handle}}
},
cache: cache,
store: store
},
key,
amount,
initial \\ 0,
threshold \\ 0,
comparator \\ &>=/2
)
when is_integer(amount) do
raw_batch = store.batch
old_result = get(batch, key, initial)
if comparator.(old_result, threshold) do
result = @cache.update_counter(cache, @bucket, key, amount, old_result, @ttl)
:rocksdb.batch_put(raw_batch, handle, key, @encoder.(result))
else
{:error, old_result}
end
end
@impl true
def append(batch, key, new_item) do
transform(
batch,
key,
new_item,
# :list_append,
[],
fn old_list, new_item ->
old_list ++ [new_item]
end
)
end
@impl true
def remove(batch, key, items) when is_list(items) do
transform(
batch,
key,
items,
# :list_substract,
[],
fn old_list, items ->
old_list -- items
end
)
end
@impl true
def includes?(batch, key, item) do
case get(batch, key) do
nil -> false
list when is_list(list) -> item in list
_ -> false
end
end
defp transform(
batch = %Kdb.Batch{
db: %Kdb{
buckets: %{@bucket => %Kdb.Bucket{handle: handle}}
},
cache: cache,
store: %{batch: raw_batch}
},
key,
new_item,
# operation,
initial,
fun
) do
old_result = get(batch, key) || initial
# if initial == old_result do
# :rocksdb.batch_put(raw_batch, handle, key, @encoder.(initial))
# end
# :rocksdb.batch_merge(raw_batch, handle, key, @encoder.({operation, new_item}))
new_value = fun.(old_result, new_item)
:rocksdb.batch_put(raw_batch, handle, key, @encoder.(new_value))
@cache.update(cache, @bucket, key, new_value, new_value, @ttl)
end
@impl true
def delete(
batch = %Kdb.Batch{
db: %Kdb{
buckets: %{@bucket => %Kdb.Bucket{handle: handle}}
},
indexer: indexer,
cache: cache,
store: store
},
key
) do
:rocksdb.batch_delete(store.batch, handle, key)
@has_unique and delete_unique(batch, key)
@has_secondary and delete_secondary(indexer, key)
@cache.delete(cache, @bucket, key)
@has_stats and incr_count(batch, key, -1)
end
defp delete_unique(batch, key) do
Enum.each(@unique, fn {field, module} ->
case get(batch, key) do
nil ->
nil
map ->
val = Map.get(map, field)
module.delete(batch, val)
end
end)
end
defp delete_secondary(indexer = %Indexer.Batch{t: t}, key) do
Indexer.Batch.add(indexer, :delete_index, [@bucket, key])
end
if @has_index do
def get_unique(batch, field, val) do
case @unique_map[field] do
nil ->
nil
module ->
module.get(batch, val)
end
end
def exists?(batch, field, val) do
case @unique_map[field] do
nil ->
false
module ->
module.has_key?(batch, val)
end
end
def find(%Kdb.Batch{indexer: %{conn: conn}} = batch, attr, text, opts \\ []) do
Kdb.Indexer.find(conn, @bucket, attr, text, opts)
|> Stream.map(fn [result] ->
get(batch, result)
end)
end
end
def drop(%Kdb.Bucket{dbname: dbname, handle: handle}) do
db = Kdb.get(dbname)
:rocksdb.drop_column_family(db, handle)
end
## Multi API
def multi_put(batch, key, value) do
put(batch, key, value)
batch
end
def multi_delete(batch, key) do
delete(batch, key)
batch
end
def multi_append(batch, key, new_item) do
append(batch, key, new_item)
batch
end
def multi_remove(batch, key, items) do
remove(batch, key, items)
batch
end
defdelegate stream(batch, opts), to: Kdb.Bucket.Stream
defdelegate keys(batch, opts), to: Kdb.Bucket.Stream
defoverridable decoder: 1,
encoder: 1,
get: 3,
put: 3,
delete: 2,
has_key?: 2
end
end
@default_batch :default
def fetch(%Kdb.Bucket{dbname: dbname, module: module, batch: batch}, key) do
batch = batch || Kdb.Batch.new(name: @default_batch, db: Kdb.get(dbname))
case module.get(batch, key) do
nil -> {:error, :not_found}
value -> {:ok, value}
end
end
def get_and_update(
%Kdb.Bucket{dbname: dbname, module: module, batch: batch},
key,
fun
) do
batch = batch || Kdb.Batch.new(name: @default_batch, db: Kdb.get(dbname))
case module.get(batch, key) do
nil ->
{:ok, nil, nil}
value ->
result = fun.(value)
module.put(batch, key, result)
{:ok, value, result}
end
end
def put_batch(bucket, batch) when is_atom(batch) or is_reference(batch) do
%{bucket | batch: Kdb.Batch.new(name: batch, db: Kdb.get(bucket.dbname))}
end
def put_batch(bucket, batch) do
%{bucket | batch: batch}
end
@doc """
Creates a new bucket module with the given name and options.
The module will use `Kdb.Bucket` and the options provided.
"""
def make_bucket(mod_name, opts) when is_atom(mod_name) and is_list(opts) do
quoted =
quote do
use Kdb.Bucket, unquote_splicing(opts)
end
Module.create(mod_name, quoted, Macro.Env.location(__ENV__))
end
@callback new(Keyword.t()) :: Kdb.Bucket.t()
@callback name() :: atom
@callback ttl() :: non_neg_integer
@callback indexes() :: list()
@callback decoder(binary()) :: term()
@callback encoder(term()) :: binary()
@callback has_key?(Kdb.Batch.t(), String.t()) :: boolean
@callback count_keys(Kdb.Batch.t()) :: non_neg_integer
@callback get(Kdb.Batch.t(), String.t(), any()) :: any()
@callback put(Kdb.Batch.t(), String.t(), term()) :: any()
@callback incr(Kdb.Batch.t(), String.t(), integer()) :: integer()
@callback append(Kdb.Batch.t(), String.t(), term()) :: any()
@callback remove(Kdb.Batch.t(), String.t(), term()) :: any()
@callback includes?(Kdb.Batch.t(), String.t(), term()) :: boolean
@callback delete(Kdb.Batch.t(), String.t()) :: any()
@optional_callbacks [append: 3, remove: 3, includes?: 3, has_key?: 2]
end
defimpl Inspect, for: Kdb.Bucket do
def inspect(bucket, _opts) do
"#Kdb.Bucket<name: #{bucket.name}>"
end
end
defimpl Enumerable, for: Kdb.Bucket do
def reduce(bucket, acc, fun) do
# ← puedes parametrizar el nombre de tabla
stream = Kdb.Bucket.Stream.stream(bucket)
Enumerable.reduce(stream, acc, fun)
end
def count(bucket) do
# No implemented yet
{:error, bucket.module}
end
def member?(bucket, _element) do
# No implemented yet
{:error, bucket.module}
end
def slice(bucket) do
{:error, bucket.module}
end
end
defmodule Kdb.DefaultBucket do
use Kdb.Bucket, name: :default, stats: false
end