%% @doc: Registers stored in one large binary
%%
%% This backend uses one plain Erlang binary to store registers. The
%% cost of rebuilding the binary is amortized by keeping a buffer of
%% inserts to perform in the future.
-module(hyper_binary).
-behaviour(hyper_register).
%%-compile(native).
-export([
new/1,
set/3,
compact/1,
max_merge/1, max_merge/2,
reduce_precision/2,
bytes/1,
register_sum/1,
register_histogram/1,
zero_count/1,
encode_registers/1,
decode_registers/2,
empty_binary/1,
max_registers/1,
precision/1
]).
-define(VALUE_SIZE, 6).
-define(MERGE_THRESHOLD, 0.05).
-type precision() :: 4..16.
-record(buffer, {buf, buf_size, p :: precision(), convert_threshold}).
-record(dense, {b, buf, buf_size, p :: precision(), merge_threshold}).
new(P) ->
new_buffer(P).
new_buffer(P) ->
M = hyper_utils:m(P),
% 5 words for each entry
ConvertThreshold = M div (5 * 8),
#buffer{
buf = [],
buf_size = 0,
p = P,
convert_threshold = ConvertThreshold
}.
new_dense(P) ->
M = hyper_utils:m(P),
T = max(trunc(M * ?MERGE_THRESHOLD), 16),
#dense{
b = empty_binary(M),
buf = [],
buf_size = 0,
p = P,
merge_threshold = T
}.
set(Index, Value, #buffer{buf = [{Index, OldValue} | Rest]} = Buffer) ->
NewVal = hyper_utils:run_of_zeroes(Value),
Buffer#buffer{buf = [{Index, max(NewVal, OldValue)} | Rest]};
set(Index, Value, #buffer{buf = Buf, buf_size = BufSize} = Buffer) ->
NewVal = hyper_utils:run_of_zeroes(Value),
NewBuffer = Buffer#buffer{buf = [{Index, NewVal} | Buf], buf_size = BufSize + 1},
case NewBuffer#buffer.buf_size < NewBuffer#buffer.convert_threshold of
true ->
NewBuffer;
false ->
buffer2dense(NewBuffer)
end;
set(Index, Value, #dense{buf = Buf, buf_size = BufSize} = Dense) ->
LeftOffset = Index * ?VALUE_SIZE,
<<_:LeftOffset/bitstring, R:?VALUE_SIZE/integer, _/bitstring>> = Dense#dense.b,
NewVal = hyper_utils:run_of_zeroes(Value),
if
R < NewVal ->
New = Dense#dense{buf = [{Index, NewVal} | Buf], buf_size = BufSize + 1},
case New#dense.buf_size < Dense#dense.merge_threshold of
true ->
New;
false ->
compact(New)
end;
true ->
Dense
end.
compact(#buffer{buf_size = BufSize, convert_threshold = ConvertThreshold} = Buffer) ->
case BufSize < ConvertThreshold of
true ->
Buffer;
false ->
buffer2dense(Buffer)
end;
compact(#dense{b = B, buf = Buf} = Dense) ->
NewB = merge_buf(B, max_registers(Buf)),
Dense#dense{
b = NewB,
buf = [],
buf_size = 0
}.
max_merge([First | Rest]) ->
lists:foldl(fun max_merge/2, First, Rest).
max_merge(
#dense{
b = SmallB,
buf = SmallBuf,
buf_size = SmallBufSize
},
#dense{
b = BigB,
buf = BigBuf,
buf_size = BigBufSize
} =
Big
) ->
case SmallBufSize + BigBufSize < Big#dense.merge_threshold of
true ->
Merged = do_merge(SmallB, BigB, <<>>),
Big#dense{
b = Merged,
buf = SmallBuf ++ BigBuf,
buf_size = SmallBufSize + BigBufSize
};
false ->
BigWithBuf = merge_buf(BigB, max_registers(SmallBuf ++ BigBuf)),
Merged = do_merge(SmallB, BigWithBuf, <<>>),
Big#dense{
b = Merged,
buf = [],
buf_size = 0
}
end;
max_merge(
#buffer{buf = Buf, buf_size = BufferSize},
#dense{buf = DenseBuf, buf_size = DenseSize} = Dense
) ->
case BufferSize + DenseSize < Dense#dense.merge_threshold of
true ->
Dense#dense{buf = Buf ++ DenseBuf, buf_size = BufferSize + DenseSize};
false ->
Merged = max_registers(DenseBuf ++ Buf),
Dense#dense{buf = Merged, buf_size = length(Merged)}
end;
max_merge(#dense{} = Dense, #buffer{} = Buffer) ->
max_merge(Buffer, Dense);
max_merge(
#buffer{buf = LeftBuf, buf_size = LeftBufSize},
#buffer{buf = RightBuf, buf_size = RightBufSize} = Right
) ->
case LeftBufSize + RightBufSize < Right#buffer.convert_threshold of
true ->
Right#buffer{buf = LeftBuf ++ RightBuf, buf_size = LeftBufSize + RightBufSize};
false ->
Merged = max_registers(LeftBuf ++ RightBuf),
NewRight = Right#buffer{buf = Merged, buf_size = length(Merged)},
case NewRight#buffer.buf_size < NewRight#buffer.convert_threshold of
true ->
NewRight;
false ->
buffer2dense(NewRight)
end
end.
reduce_precision(NewP, #dense{p = OldP} = Dense) ->
ChangeP = OldP - NewP,
Buf = register_fold(ChangeP, Dense),
Empty = new_dense(NewP),
compact(Empty#dense{buf = Buf});
reduce_precision(NewP, #buffer{p = OldP} = Buffer) ->
ChangeP = OldP - NewP,
Buf = register_fold(ChangeP, Buffer),
Empty = new_dense(NewP),
compact(Empty#dense{buf = Buf}).
%% fold an HLL to a lower number of registers `NewM` by projecting the values
%% onto their new index, see
%% http://research.neustar.biz/2012/09/12/set-operations-on-hlls-of-different-sizes/
%% NOTE: this function does not perform the max_registers step
register_fold(ChangeP, B) ->
ChangeM = hyper_utils:m(ChangeP),
element(
3,
fold(
fun
(I, V, {_Index, CurrentList, Acc}) when CurrentList == [] ->
{I, [V], Acc};
(I, V, {Index, CurrentList, Acc}) when I - Index < ChangeM ->
{Index, [V | CurrentList], Acc};
(I, V, {_Index, CurrentList, Acc}) ->
{I, [V], [
{I bsr ChangeP, hyper_utils:changeV(lists:reverse(CurrentList), ChangeP)}
| Acc
]}
end,
{0, [], []},
B
)
).
register_sum(B) ->
fold(
fun
(_, 0, Acc) ->
Acc + 1.0;
(_, 1, Acc) ->
Acc + 0.5;
(_, 2, Acc) ->
Acc + 0.25;
(_, 3, Acc) ->
Acc + 0.125;
(_, 4, Acc) ->
Acc + 0.0625;
(_, 5, Acc) ->
Acc + 0.03125;
(_, 6, Acc) ->
Acc + 0.015625;
(_, 7, Acc) ->
Acc + 0.0078125;
(_, 8, Acc) ->
Acc + 0.00390625;
(_, 9, Acc) ->
Acc + 0.001953125;
(_, V, Acc) ->
Acc + math:pow(2, -V)
end,
0,
B
).
register_histogram(#buffer{p = P} = B) ->
register_histogram(B, P);
register_histogram(#dense{p = P} = B) ->
register_histogram(B, P).
register_histogram(B, P) ->
% todo use from_keys once we are at otp 26
fold(
fun(_, Value, Acc) -> maps:update_with(Value, fun(V) -> V + 1 end, 0, Acc) end,
maps:from_list(
lists:map(fun(I) -> {I, 0} end, lists:seq(0, 65 - P))
),
B
).
zero_count(B) ->
fold(
fun
(_, 0, Acc) ->
Acc + 1;
(_, _, Acc) ->
Acc
end,
0,
B
).
encode_registers(#buffer{} = Buffer) ->
encode_registers(buffer2dense(Buffer));
encode_registers(#dense{b = B}) ->
<<<<I:8/integer>> || <<I:?VALUE_SIZE/integer>> <= B>>.
decode_registers(AllBytes, P) ->
M = hyper_utils:m(P),
Bytes =
case AllBytes of
<<B:M/binary>> ->
B;
<<B:M/binary, 0>> ->
B
end,
Dense = new_dense(P),
Dense#dense{b = <<<<I:?VALUE_SIZE/integer>> || <<I:8>> <= Bytes>>}.
bytes(#dense{b = B}) ->
erlang:byte_size(B);
bytes(#buffer{} = Buffer) ->
erts_debug:flat_size(Buffer) * 8.
precision(#dense{p = P}) ->
P;
precision(#buffer{p = P}) ->
P.
%%
%% INTERNALS
%%
empty_binary(M) ->
list_to_bitstring([<<0:?VALUE_SIZE/integer>> || _ <- lists:seq(0, M - 1)]).
max_registers(Buf) ->
lists:keysort(
1,
maps:to_list(
lists:foldl(
fun({I, V}, Acc) ->
case maps:find(I, Acc) of
{ok, R} when R >= V -> Acc;
_ -> maps:put(I, V, Acc)
end
end,
#{},
Buf
)
)
).
buffer2dense(#buffer{buf = Buf, p = P}) ->
Dense = new_dense(P),
Merged = merge_buf(Dense#dense.b, max_registers(Buf)),
Dense#dense{b = Merged}.
do_merge(<<>>, <<>>, Acc) ->
Acc;
do_merge(
<<Left:?VALUE_SIZE/integer, SmallRest/bitstring>>,
<<Right:?VALUE_SIZE/integer, BigRest/bitstring>>,
Acc
) ->
do_merge(SmallRest, BigRest, <<Acc/bits, (max(Left, Right)):?VALUE_SIZE>>).
fold(F, Acc, #buffer{} = Buffer) ->
% TODO fix this to not need to move to a dense, it is not technically neeeded
% We can just fold on the list i think
fold(F, Acc, buffer2dense(Buffer));
fold(F, Acc, #dense{b = B, buf = Buf}) ->
do_fold(F, Acc, merge_buf(B, max_registers(Buf)), 0).
do_fold(_, Acc, <<>>, _) ->
Acc;
do_fold(F, Acc, <<Value:?VALUE_SIZE/integer, Rest/bitstring>>, Index) ->
do_fold(F, F(Index, Value, Acc), Rest, Index + 1).
merge_buf(B, L) ->
merge_buf(B, L, -1, <<>>).
merge_buf(B, [], _PrevIndex, Acc) ->
<<Acc/bitstring, B/bitstring>>;
merge_buf(B, [{Index, Value} | Rest], PrevIndex, Acc) ->
I = (Index - PrevIndex - 1) * ?VALUE_SIZE,
case B of
<<Left:I/bitstring, OldValue:?VALUE_SIZE/integer, Right/bitstring>> ->
case OldValue < Value of
true ->
NewAcc = <<Acc/bitstring, Left/bitstring, Value:?VALUE_SIZE/integer>>,
merge_buf(Right, Rest, Index, NewAcc);
false ->
NewAcc = <<Acc/bitstring, Left/bitstring, OldValue:?VALUE_SIZE/integer>>,
merge_buf(Right, Rest, Index, NewAcc)
end;
<<Left:I/bitstring>> ->
<<Acc/bitstring, Left/bitstring, Value:?VALUE_SIZE/integer>>
end.