lib/kdb.ex

defmodule Kdb do
  @type tname :: atom() | binary()

  @type t :: %__MODULE__{
          name: tname(),
          # rocksdb database handle

          store: reference() | nil,
          # sqlite connection

          indexer: reference() | nil,
          folder: charlist(),
          buckets: %{tname() => Kdb.Bucket.t()}
        }

  defstruct [:name, :store, :indexer, :folder, :buckets]
  @default_cfs ~c"default"

  @open_options [
    create_if_missing: true,
    merge_operator: :erlang_merge_operator
  ]

  @compile {:inline, get: 1, get_bucket: 2}

  alias __MODULE__

  def child_spec(opts) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :supervisor
    }
  end

  def start_link(opts) do
    Kdb.Supervisor.start_link(opts)
  end

  @doc """
  Open a new database.

  ## Options
    * `:folder` - The folder to store the database.
    * `:buckets` - A list of buckets to create.
  """
  @spec new(otps :: keyword()) :: t()
  def new(opts) do
    dbname = Keyword.fetch!(opts, :name)
    root = Keyword.fetch!(opts, :folder)
    File.mkdir(root)
    modules = Keyword.get(opts, :buckets, [])
    folder = Path.join(root, "data") |> to_charlist()
    conn = Kdb.Indexer.new(opts)

    # open database and load/create column families

    {db, cfs, default_cf} =
      if File.exists?(folder) do
        # check column families

        {:ok, column_families} = :rocksdb.list_column_families(folder, [])

        column_families_mod = Enum.map(modules, &to_charlist(&1.name()))

        columns_to_create =
          column_families_mod -- column_families

        column_families_to_load = column_families_mod -- columns_to_create

        cfs_opts = [
          {@default_cfs, []} | Enum.map(column_families_to_load, &{&1, []})
        ]

        # open database with column families

        {:ok, db, [default_cf | cfs]} =
          :rocksdb.open(folder, @open_options, cfs_opts)

        # create column families

        cfsh =
          for name <- columns_to_create do
            {:ok, handle} = :rocksdb.create_column_family(db, name, [])
            handle
          end

        {db, cfs ++ cfsh, default_cf}
      else
        try do
          {:ok, db, [default_cf | _cfs]} =
            :rocksdb.open(folder, @open_options, [{@default_cfs, []}])

          cfs =
            Enum.map(modules, fn mod ->
              name = mod.name() |> to_charlist()
              {:ok, handle} = :rocksdb.create_column_family(db, name, [])
              handle
            end)

          {db, cfs, default_cf}
        rescue
          e ->
            File.rm_rf!(folder)
            reraise e, __STACKTRACE__
        end
      end

    # load default bucket

    default_bucket_opts = [
      dbname: dbname,
      handle: default_cf
    ]

    default_bucket = Kdb.DefaultBucket.new(default_bucket_opts)

    # load buckets

    buckets =
      Enum.zip(modules, cfs)
      |> Enum.map(fn {mod, handle} ->
        bucket_opts = [
          dbname: dbname,
          handle: handle
        ]

        bucket = mod.new(bucket_opts)
        {bucket.name, bucket}
      end)
      |> Map.new()
      |> Map.put(default_bucket.name, default_bucket)

    kdb = %__MODULE__{name: dbname, store: db, indexer: conn, folder: root, buckets: buckets}
    # register kdb

    Kdb.Registry.register(kdb)
    kdb
  end

  def get(name) do
    Kdb.Registry.get_db(name)
  end

  def get_bucket(%Kdb{buckets: buckets}, name) do
    Map.get(buckets, name)
  end

  @spec transaction(t(), (Kdb.Batch.t() -> any())) :: :ok | {:error, term()}
  def transaction(kdb, fun) do
    batch = Kdb.Batch.new(name: make_ref(), db: kdb)

    result =
      try do
        fun.(batch)
        :ok = Kdb.Batch.commit(batch)
        :ok = Kdb.Batch.release(batch)
      catch
        _exit, reason ->
          :ok = Kdb.Batch.release(batch)
          {:error, reason}
      end

    result
  end

  def close(%Kdb{store: db, indexer: indexer} = kdb) do
    Kdb.Registry.unregister(kdb)

    try do
      :ok = :rocksdb.close(db)
      :ok = Kdb.Indexer.close(indexer)
      :ok
    rescue
      e ->
        {:error, e}
    end
  end

  def destroy(%Kdb{folder: folder} = kdb) do
    Kdb.Registry.unregister(kdb)

    try do
      close(kdb)
      {:ok, _} = File.rm_rf(folder)
      :ok
    rescue
      e ->
        {:error, e}
    end
  end

  @spec backup(t(), Path.t()) :: :ok | {:error, term()}
  def backup(%Kdb{store: db, indexer: conn}, target) do
    cond do
      File.exists?(target) ->
        {:error, :target_exists}

      true ->
        # Create target directory

        target = to_charlist(target)
        File.mkdir_p(target)
        File.mkdir(target)
        data_folder = Path.join(target, "backup_data") |> to_charlist()
        indexer_folder = Path.join(target, "indexer.db") |> to_charlist()

        # Create a backup of the indexer

        :ok = Kdb.Indexer.backup(conn, indexer_folder)
        # Create a backup of the database

        {:ok, ref} = :rocksdb.open_backup_engine(data_folder)

        try do
          :ok = :rocksdb.create_new_backup(ref, db)
          zip_file = [target, ".zip"] |> Enum.join("") |> to_charlist()
          ZipUtil.compress_folder(target, zip_file)
          {:ok, _} = File.rm_rf(target)
          :rocksdb.close_backup_engine(ref)
        rescue
          e ->
            IO.inspect(e, label: "Backup error")
            :rocksdb.close_backup_engine(ref)
            {:error, e}
        end
    end
  end

  @spec restore(Path.t(), Path.t()) :: :ok | {:error, term()}
  def restore(source_file, folder_destiny) do
    cond do
      File.exists?(folder_destiny) ->
        {:error, :folder_destiny_exists}

      not File.exists?(source_file) ->
        {:error, :source_file_not_found}

      true ->
        source_file = to_charlist(source_file)
        folder_destiny = to_charlist(folder_destiny)
        source_folder = Path.rootname(source_file) |> to_charlist()
        File.mkdir_p(folder_destiny)

        case ZipUtil.extract(source_file, ~c"") do
          {:ok, _} ->
            data_folder_temp = Path.join(source_folder, "backup_data") |> to_charlist()
            data_folder_destiny = Path.join(source_folder, "data") |> to_charlist()
            {:ok, ref} = :rocksdb.open_backup_engine(data_folder_temp)

            try do
              {:ok, backups} = :rocksdb.get_backup_info(ref)

              if backups == [] do
                {:error, :no_backups}
              else
                backup = List.first(backups)
                :ok = :rocksdb.restore_db_from_backup(ref, backup.backup_id, data_folder_destiny)
                File.rm_rf(data_folder_temp)
                :ok = File.rename(source_folder, folder_destiny)
              end
            rescue
              e ->
                IO.inspect(e, label: "Restore error")
                {:error, e}
            after
              :rocksdb.close_backup_engine(ref)
            end

          {:error, reason} ->
            {:error, reason}
        end
    end
  end
end