lib/kdb.ex

defmodule Kdb do
  @type tname :: atom() | binary()

  @type t :: %__MODULE__{
          db: reference() | nil,
          name: atom(),
          folder: charlist(),
          batch: reference() | nil,
          buckets: %{tname() => Kdb.Bucket.t()},
          global: Kdb.Bucket.t()
        }

  defstruct [:db, :name, :folder, :batch, :buckets, :global]
  @key :kdb
  # @stat_count "$count"
  @default_cfs ~c"default"

  @open_options [
    create_if_missing: true,
    merge_operator: :erlang_merge_operator
  ]

  @compile {:inline,
   [
     # has_key?: 3,
     # get: 3,
     # fetch: 3,
     # put: 4,
     # incr: 4,
     # delete: 3,
     # total: 2,
     binary_to_term: 1,
     term_to_binary: 1
   ]}

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :supervisor
    }
  end

  def start_link(opts) do
    Kdb.Supervisor.start_link(opts)
    # dbname = Keyword.get(opts, :name) || raise(ArgumentError, "`name` is required")
    # new(dbname, opts)

    # children = [
    #   Kdb.Registry,
    #   Kdb.Cache,
    #   {Kdb.Scheduler, [name: dbname]}
    # ]

    # case Supervisor.init(children, strategy: :one_for_one) do
    #   {:ok, pid} ->
    #     {:ok, pid}

    #   {:error, {:already_started, pid}} ->
    #     {:ok, pid}

    #   err ->
    #     err
    # end
  end

  @doc """
  Open a new database.

  ## Options
    * `:folder` - The folder to store the database.
    * `:buckets` - A list of buckets to create.
  """
  @spec new(dbname :: atom(), otps :: keyword()) :: t()
  def new(dbname, opts) do
    folder = Keyword.get(opts, :folder) || raise(ArgumentError, "`folder` is required")
    modules = Keyword.get(opts, :buckets, [])
    folder = to_charlist(folder)

    # open database and load/create column families
    {db, cfs, default_cf} =
      if File.exists?(folder) do
        # check column families
        {:ok, column_families} = :rocksdb.list_column_families(folder, [])

        column_families_mod = Enum.map(modules, &to_charlist(&1.name()))

        columns_to_create =
          column_families_mod -- column_families

        column_families_to_load = column_families_mod -- columns_to_create

        cfs_opts = [
          {@default_cfs, []} | Enum.map(column_families_to_load, &{&1, []})
        ]

        # open database with column families
        {:ok, db, [default_cf | cfs]} =
          :rocksdb.open(folder, @open_options, cfs_opts)

        # create column families
        cfsh =
          for name <- columns_to_create do
            {:ok, handle} = :rocksdb.create_column_family(db, name, [])
            handle
          end

        {db, cfs ++ cfsh, default_cf}
      else
        try do
          {:ok, db, [default_cf | _cfs]} =
            :rocksdb.open(folder, @open_options, [{@default_cfs, []}])

          cfs =
            Enum.map(modules, fn mod ->
              name = mod.name() |> to_charlist()
              {:ok, handle} = :rocksdb.create_column_family(db, name, [])
              handle
            end)

          {db, cfs, default_cf}
        rescue
          e ->
            File.rm_rf!(folder)
            reraise e, __STACKTRACE__
        end
      end

    # load default bucket
    default_bucket = DefaultBucket.new(dbname, db, default_cf)

    # load buckets
    buckets =
      Enum.zip(modules, cfs)
      |> Enum.map(fn {mod, handle} ->
        bucket = mod.new(dbname, db, handle)
        {mod.name(), bucket}
      end)
      |> Map.new()
      |> Map.put(DefaultBucket.name(), default_bucket)

    kdb = %__MODULE__{db: db, name: dbname, folder: folder, buckets: buckets}
    :persistent_term.put({@key, dbname}, kdb)
    kdb
  end

  def get(name) do
    :persistent_term.get({@key, name})
  end

  def get_bucket(%Kdb{buckets: buckets}, name) do
    Map.get(buckets, name)
  end

  def new_batch(kdb) do
    {:ok, batch} = :rocksdb.batch()
    %{kdb | batch: batch}
  end

  @spec batch(name :: atom() | String.t()) :: batch :: reference()
  def batch(name) do
    key = {:batch, name}

    case Kdb.Registry.lookup(key) do
      [{_, batch}] ->
        batch

      _ ->
        {:ok, batch} = :rocksdb.batch()
        Kdb.Registry.register(key, batch)
        batch
    end
  end

  @spec batch(Kdb.t(), name :: atom() | String.t()) :: Kdb.t()
  def batch(kdb, name) do
    batch = batch(name)
    put_batch(kdb, batch)
  end

  @spec transaction(bucket :: Kdb.t(), fun :: (Kdb.t() -> any())) :: :ok | {:error, any()}
  def transaction(%Kdb{db: db, buckets: buckets} = kdb, fun) do
    {:ok, batch} = :rocksdb.batch()

    kdb = %{
      kdb
      | batch: batch,
        buckets:
          Map.new(buckets, fn {name, bucket = %{module: module}} ->
            {name, %{bucket | batch: batch, t: module.new_table(), ttl: false}}
          end)
    }

    result =
      try do
        fun.(kdb)
        :rocksdb.write_batch(db, batch, [])
      catch
        _exit, reason ->
          {:error, reason}
      end

    # delete ets tables
    for {_, bucket} <- kdb.buckets do
      :ets.delete(bucket.t)
    end

    :rocksdb.release_batch(batch)

    result
  end

  def key_merge(keys) do
    Enum.join(keys, ":")
  end

  def key_merge(key1, key2) do
    <<key1::binary, ":", key2::binary>>
  end

  defp put_batch(%Kdb{buckets: buckets} = kdb, batch) do
    %{
      kdb
      | batch: batch,
        buckets: Map.new(buckets, fn {name, bucket} -> {name, %{bucket | batch: batch}} end)
    }
  end

  defp put_batch(kdb, batch) do
    %{kdb | batch: batch}
  end

  # def has_key?(%Kdb{db: db, tables: tables}, name, key) do
  #   %{ets: ets, handle: handle, exp: exp} = Map.get(tables, name)

  #   case :ets.member(ets, key) do
  #     true ->
  #       true

  #     false ->
  #       case :rocksdb.get(db, handle, key, []) do
  #         :not_found ->
  #           false

  #         {:ok, value} ->
  #           result = binary_to_term(value)
  #           :ets.insert(ets, {key, result})
  #           if exp, do: Cache.put(name, key)
  #           true

  #         err ->
  #           err
  #       end
  #   end
  # end

  @doc """
  Usage:
    tr = Kdb.get_tr(:blockchain)
    opts = [
      init: {:seek, "ac_"},
      direction: :next
    ]
    Kdb.foreach(tr, :accounts, fn key, value ->
      # do something with key and value
    end, opts)
  """

  # def foreach(%Kdb{db: db, tables: tables}, name, fun, opts \\ []) do
  #   %{handle: handle} = Map.get(tables, name)
  #   # seek: <<>> | :last | binary()
  #   initial_seek = Keyword.get(opts, :seek, <<>>)
  #   # direction: :next | :prev
  #   direction = Keyword.get(opts, :direction, :next)

  #   {:ok, iter} = :rocksdb.iterator(db, handle, [])

  #   try do
  #     case :rocksdb.iterator_move(iter, initial_seek) do
  #       {:ok, key, value} ->
  #         fun.(key, binary_to_term(value))
  #         do_foreach(iter, fun, direction)

  #       _ ->
  #         :rocksdb.iterator_close(iter)
  #     end
  #   rescue
  #     e ->
  #       :rocksdb.iterator_close(iter)
  #       reraise e, __STACKTRACE__
  #   end
  # end

  # defp do_foreach(iter, fun, direction) do
  #   case :rocksdb.iterator_move(iter, direction) do
  #     {:ok, key, value} ->
  #       fun.(key, binary_to_term(value))
  #       do_foreach(iter, fun, direction)

  #     _ ->
  #       :rocksdb.iterator_close(iter)
  #   end
  # end

  # def while(%Kdb{db: db, tables: tables}, name, acc, fun, opts \\ []) do
  #   %{handle: handle} = Map.get(tables, name)
  #   initial_seek = Keyword.get(opts, :seek, <<>>)
  #   direction = Keyword.get(opts, :direction, :next)

  #   {:ok, iter} = :rocksdb.iterator(db, handle, [])

  #   try do
  #     case :rocksdb.iterator_move(iter, initial_seek) do
  #       {:ok, key, value} ->
  #         {action, result} = fun.({key, binary_to_term(value)}, acc)

  #         if action == :cont do
  #           do_while(iter, acc, fun, direction)
  #         else
  #           :rocksdb.iterator_close(iter)
  #           result
  #         end

  #       _ ->
  #         :rocksdb.iterator_close(iter)
  #     end
  #   rescue
  #     e ->
  #       :rocksdb.iterator_close(iter)
  #       reraise e, __STACKTRACE__
  #   end
  # end

  # defp do_while(iter, acc, fun, direction) do
  #   case :rocksdb.iterator_move(iter, direction) do
  #     {:ok, key, value} ->
  #       {action, result} = fun.({key, binary_to_term(value)}, acc)

  #       if action == :cont do
  #         do_while(iter, acc, fun, direction)
  #       else
  #         :rocksdb.iterator_close(iter)
  #         result
  #       end

  #     _ ->
  #       :rocksdb.iterator_close(iter)
  #   end
  # end

  # def fold(%Kdb{db: db, tables: tables}, name, fun, acc, opts \\ []) do
  #   %{handle: handle} = Map.get(tables, name)
  #   initial_seek = Keyword.get(opts, :seek, <<>>)
  #   direction = Keyword.get(opts, :direction, :next)

  #   {:ok, iter} = :rocksdb.iterator(db, handle, [])

  #   try do
  #     case :rocksdb.iterator_move(iter, initial_seek) do
  #       {:ok, key, value} ->
  #         acc = fun.(key, binary_to_term(value), acc)
  #         do_fold(iter, fun, acc, direction)

  #       _ ->
  #         :rocksdb.iterator_close(iter)
  #         acc
  #     end
  #   rescue
  #     e ->
  #       :rocksdb.iterator_close(iter)
  #       reraise e, __STACKTRACE__
  #   end
  # end

  # defp do_fold(iter, fun, acc, direction) do
  #   case :rocksdb.iterator_move(iter, direction) do
  #     {:ok, key, value} ->
  #       acc = fun.(key, binary_to_term(value), acc)
  #       do_fold(iter, fun, acc, direction)

  #     _ ->
  #       :rocksdb.iterator_close(iter)
  #       acc
  #   end
  # end

  # def put(%Kdb{batch: batch, tables: tables}, name, key, value) do
  #   case Map.get(tables, name) do
  #     %{handle: handle, ets: ets, exp: false} ->
  #       :ets.insert(ets, {key, value})
  #       :rocksdb.batch_put(batch, handle, key, term_to_binary(value))

  #     %{handle: handle, ets: ets} ->
  #       :ets.insert(ets, {key, value})
  #       :rocksdb.batch_put(batch, handle, key, term_to_binary(value))
  #       Cache.put(name, key)
  #   end
  # end

  # def put_db(%Kdb{batch: batch, tables: tables}, name, key, value) do
  #   %{handle: handle} = Map.get(tables, name)
  #   :rocksdb.batch_put(batch, handle, key, term_to_binary(value))
  # end

  # def get(tr = %Kdb{tables: tables}, name, key) do
  #   %{ets: ets} = Map.get(tables, name)

  #   case :ets.lookup(ets, key) do
  #     [{^key, value}] -> value
  #     [] -> get_from_db(tr, name, key)
  #   end
  # end

  # def fetch(tr = %Kdb{tables: tables}, name, key) do
  #   %{ets: ets} = Map.get(tables, name)

  #   case :ets.lookup(ets, key) do
  #     [{^key, value}] -> {:ok, value}
  #     [] -> fetch_from_db(tr, name, key)
  #   end
  # end

  # def slot(%Kdb{tables: tables}, name, position) do
  #   %{ets: ets} = Map.get(tables, name)
  #   :ets.slot(ets, position)
  # end

  # defp load_from_db(tr, ets, name, key) do
  #   if not :ets.member(ets, key) do
  #     case fetch_from_db(tr, name, key) do
  #       {:ok, value} -> :ets.insert(ets, {key, value})
  #       _ -> false
  #     end
  #   end
  # end

  # def incr(tr = %Kdb{batch: batch, tables: tables}, name, key, {elem, amount}) do
  #   %{handle: handle, ets: ets} = Map.get(tables, name)

  #   load_from_db(tr, ets, name, key)

  #   result = :ets.update_counter(ets, key, {elem, amount}, {key, 0})
  #   :rocksdb.batch_put(batch, handle, key, term_to_binary(result))

  #   # :rocksdb.batch_merge(batch, key, term_to_binary({:int_add, amount}), [])
  #   result
  # end

  # def incr_non_zero(tr = %Kdb{batch: batch, tables: tables}, name, key, {elem, neg_amount}) do
  #   %{handle: handle, ets: ets} = Map.get(tables, name)

  #   load_from_db(tr, ets, name, key)

  #   case :ets.update_counter(ets, key, {elem, neg_amount}, {key, 0}) do
  #     result when 0 > result ->
  #       :ets.update_counter(ets, key, {elem, abs(neg_amount)})
  #       {:error, "Insufficient balance"}

  #     result ->
  #       :rocksdb.batch_put(batch, handle, key, term_to_binary(result))
  #       # :rocksdb.batch_merge(batch, handle, key, term_to_binary({:int_add, neg_amount}))
  #       {:ok, result}
  #   end
  # end

  # def incr_limit(tr = %Kdb{batch: batch, tables: tables}, name, key, {elem, amount}, limit) do
  #   %{handle: handle, ets: ets} = Map.get(tables, name)

  #   load_from_db(tr, ets, name, key)

  #   case :ets.update_counter(ets, key, {elem, amount}, {key, 0}) do
  #     result when limit != 0 and result > limit ->
  #       :ets.update_counter(ets, key, {elem, -amount})
  #       {:error, "Limit exceeded"}

  #     result ->
  #       :rocksdb.batch_put(batch, handle, key, term_to_binary(result))
  #       # :rocksdb.batch_merge(batch, handle, key, term_to_binary({:int_add, amount}))
  #       {:ok, result}
  #   end
  # end

  # def total(tr, name) do
  #   case fetch(tr, name, @stat_count) do
  #     {:ok, count} -> count
  #     _ -> 0
  #   end
  # end

  # def count_one(tr, name) do
  #   incr(tr, name, @stat_count, {2, 1})
  # end

  # def discount_one(tr, name) do
  #   incr(tr, name, @stat_count, {2, -1})
  # end

  # def ets_total(%Kdb{tables: tables}, name) do
  #   %{ets: ets} = Map.get(tables, name)
  #   :ets.info(ets, :size)
  # end

  # def delete(%Kdb{batch: batch, tables: tables}, name, key) do
  #   %{handle: handle, ets: ets, exp: exp} = Map.get(tables, name)
  #   :ets.delete(ets, key)
  #   if exp, do: Cache.remove(key)
  #   :rocksdb.batch_delete(batch, handle, key)
  # end

  # def delete_db(%Kdb{batch: batch, tables: tables}, name, key) do
  #   %{handle: handle} = Map.get(tables, name)
  #   :rocksdb.batch_delete(batch, handle, key)
  # end

  # def get_from_db(%Kdb{db: db, tables: tables}, name, key) do
  #   %{handle: handle, ets: ets, exp: exp} = Map.get(tables, name)

  #   case :rocksdb.get(db, handle, key, []) do
  #     {:ok, value} ->
  #       result = binary_to_term(value)
  #       if exp, do: Cache.put(name, key)
  #       :ets.insert(ets, {key, result})
  #       result

  #     _err ->
  #       nil
  #   end
  # end

  # def fetch_from_db(%Kdb{db: db, tables: tables}, name, key) do
  #   %{handle: handle, ets: ets, exp: exp} = Map.get(tables, name)

  #   case :rocksdb.get(db, handle, key, []) do
  #     {:ok, value} ->
  #       result = binary_to_term(value)
  #       if exp, do: Cache.put(name, key)
  #       :ets.insert(ets, {key, result})
  #       {:ok, result}

  #     err ->
  #       err
  #   end
  # end

  def batch_save(%{batch: batch}, filename) do
    binary = :rocksdb.batch_tolist(batch) |> term_to_binary()
    File.write(filename, binary)
  end

  def batch_load(%{buckets: buckets}, dbfile, filename) do
    binary = File.read!(filename)
    operations = binary_to_term(binary)

    if byte_size(operations) == 0 do
      {:ok, batch} = :rocksdb.batch()

      {:ok, cfs} =
        :rocksdb.list_column_families(dbfile, [])

      cfs_indexed =
        Enum.map(cfs, fn x ->
          Map.get(buckets, String.Chars.to_string(x) |> String.to_atom())[:handle]
        end)
        |> Enum.with_index(fn element, index -> {index, element} end)
        |> Enum.into(%{})

      Enum.each(operations, fn
        {:put, cf, key, value} ->
          :rocksdb.batch_put(batch, cfs_indexed[cf], key, value)

        {:delete, cf, key} ->
          :rocksdb.batch_delete(batch, cf, key)

        _ ->
          nil
      end)

      batch
    else
      nil
    end
  end

  def commit(%{db: db}, batch) when is_reference(batch) do
    if :rocksdb.batch_count(batch) > 0 do
      :rocksdb.write_batch(db, batch, [])
    end

    :rocksdb.release_batch(batch)

    :ok
  end

  def commit(%{db: db}, name) do
    key = {:batch, name}

    case Kdb.Registry.lookup(key) do
      {:ok, batch} ->
        if is_reference(batch) do
          if :rocksdb.batch_count(batch) > 0 do
            :rocksdb.write_batch(db, batch, [])
          end

          :rocksdb.release_batch(batch)
        end

      _ ->
        nil
    end

    Kdb.Registry.unregister(key)

    :ok
  end

  # def load_all(tr = %Kdb{tables: tables}, name) do
  #   %{ets: ets} = Map.get(tables, name)

  #   foreach(tr, name, fn key, value ->
  #     :ets.insert(ets, {key, value})
  #   end)
  # end

  # def savepoint(%{batch: batch}) do
  #   :rocksdb.batch_savepoint(batch)
  # end

  @spec snapshot(t()) :: no_return()
  def snapshot(%{db: db} = object) do
    {:ok, snapshot} = :rocksdb.snapshot(db)
    %{object | db: snapshot}
  end

  @spec release_snapshot(t() | reference()) :: no_return()
  def release_snapshot(%__MODULE__{db: db}) do
    :rocksdb.release_snapshot(db)
  end

  def release_snapshot(snapshot) when is_reference(snapshot) do
    :rocksdb.release_snapshot(snapshot)
  end

  @spec restore(charlist(), charlist()) :: :ok | {:error, term()}
  def restore(target, output) do
    zip_file = IO.iodata_to_binary([target, ".zip"]) |> to_charlist()

    case ZipUtil.extract(zip_file, target) do
      {:ok, _} ->
        case :rocksdb.open_backup_engine(target) do
          {:ok, ref} ->
            case :rocksdb.restore_db_from_latest_backup(ref, output) do
              :ok ->
                :rocksdb.close_backup_engine(ref)

              {:error, _reason} = err ->
                err
            end

          {:error, _reason} = err ->
            err
        end

      {:error, _reason} = err ->
        err
    end
  end

  @spec backup(t(), charlist() | binary()) :: :ok | {:error, term()}
  def backup(%Kdb{db: db}, target) do
    case :rocksdb.open_backup_engine(target) do
      {:ok, ref} ->
        case :rocksdb.create_new_backup(ref, db) do
          :ok ->
            :rocksdb.close_backup_engine(ref)
            zip_file = IO.iodata_to_binary([target, ".zip"])

            case ZipUtil.compress_folder(target, zip_file) do
              {:ok, _} ->
                File.rm_rf!(target)
                :ok

              {:error, _reason} = err ->
                err
            end

          {:error, _reason} = err ->
            err
        end

      {:error, _reason} = err ->
        err
    end
  end

  def close(%Kdb{name: name, batch: batch, db: db, buckets: buckets}) do
    if is_reference(batch) do
      :rocksdb.release_batch(batch)
    end

    for {_, %{t: t}} <- buckets do
      :ets.delete(t)
    end

    :persistent_term.erase({@key, name})
    :rocksdb.close(db)
  end

  def destroy(%Kdb{name: name, buckets: buckets, folder: folder}) do
    :rocksdb.destroy(folder, [])

    for {_, %{t: t}} <- buckets do
      try do
        :ets.delete(t)
      catch
        _, _ -> nil
      end
    end

    :persistent_term.erase({@key, name})
  end

  def term_to_binary(term) do
    :erlang.term_to_binary(term)
  end

  def binary_to_term(binary) do
    :erlang.binary_to_term(binary, [:safe])
  end
end