lib/immudb.ex

defmodule Immudb do
  use GRPC.Server, service: Immudb.Schema.ImmuService.Service

  alias Immudb.Schema
  alias Immudb.Schema.ImmuService.Stub
  alias Google.Protobuf
  alias Immudb.Socket

  defp grpc_uri(host, port) do
    host <> ":" <> to_string(port)
  end

  def new(
        host: host,
        port: port,
        username: username,
        password: password,
        database: database
      ) do
    with {:grpc_connect, {:ok, channel}} <-
           {:grpc_connect,
            grpc_uri(host, port)
            |> GRPC.Stub.connect(interceptors: [GRPC.Logger.Client])},
         {:immudb_login, {:ok, response}} <-
           {:immudb_login,
            channel
            |> login(username, password)},
         socket <- %Socket{channel: channel, token: response.token},
         {:immudb_use_database, {:ok, token}} <-
           {:immudb_use_database,
            socket
            |> use_database(database)} do
      {:ok, %Socket{channel: channel, token: token}}
    else
      {:grpc_connect, {:error, _}} -> {:error, "Cannot connect to immudb"}
      {:immudb_login, {:error, _}} -> {:error, "Cannot login to immudb"}
      {:immudb_use_database, {:error, _}} -> {:error, "Cannot use database in immudb"}
    end
  end

  def new(url: url) do
    with {:parse_uri, {:ok, uri}} <- {:parse_uri, URI.new(url)},
         {:is_immudb_schema, true} <- {:is_immudb_schema, uri.scheme == "immudb"},
         userinfo <- uri.userinfo |> String.split(":", trim: true),
         {:is_userinfo, true} <- {:is_userinfo, userinfo |> length > 0} do
      {username, password} =
        userinfo
        |> case do
          [username] -> {username, ""}
          [username, password] -> {username, password}
          _ -> {"", ""}
        end

      new(
        host: uri.host,
        port: uri.port,
        username: username,
        password: password,
        database: uri.path |> String.slice(1, String.length(uri.path) - 1)
      )
    else
      {:parse_uri, {:error, _}} -> {:error, "Invalid connection string"}
      {:is_immudb_schema, false} -> {:error, "Invalid connection string"}
      {:is_userinfo, false} -> {:error, "Invalid user info"}
    end
  end

  defp metadata(socket) do
    %{authorization: "Bearer #{socket.token}", content_type: "application/grpc"}
  end

  def list_users(socket) do
    with {:ok, response} <-
           socket.channel
           |> Stub.list_users(Protobuf.Empty.new(), metadata: metadata(socket)) do
      {:ok,
       for user <- response.users do
         %{user: user.user}
       end}
    else
      {:error, _} -> "Cannot list users"
    end
  end

  def create_user(channel, params) do
    channel
    |> Stub.create_user(
      Schema.CreateUserRequest.new(
        user: params.user,
        password: params.password,
        permission: params.permission,
        database: params.database
      )
    )
  end

  def change_password(channel, params) do
    channel
    |> Stub.change_password(
      Schema.ChangePasswordRequest.new(
        user: params.user,
        oldPassword: params.old_password,
        newPassword: params.new_password
      )
    )
  end

  def update_auth_config(channel, params) do
    channel
    |> Stub.update_auth_config(Schema.AuthConfig.new(kind: params.kind))
  end

  def update_mtls_confg(channel, params) do
    channel
    |> Stub.update_mtls_config(Schema.MTLSConfig.new(enabled: params.enabled))
  end

  def login(channel, user, password) do
    channel
    |> Stub.login(Schema.LoginRequest.new(user: user, password: password))
  end

  def logout(socket) do
    socket.channel
    |> Stub.logout(Protobuf.Empty.new(), metadata: metadata(socket))
  end

  def set(socket, key, value) do
    with {:ok, _} <-
           socket.channel
           |> Stub.set(
             Schema.SetRequest.new(KVs: [Schema.KeyValue.new(key: key, value: value)]),
             metadata: metadata(socket)
           ) do
      :ok
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def verifiable_set(socket, key, value) do
    with {:ok, _} <-
           socket.channel
           |> Stub.verifiable_set(
             Schema.VerifiableSetRequest.new(
               setRequest:
                 Schema.SetRequest.new(KVs: [Schema.KeyValue.new(key: key, value: value)])
             ),
             metadata: metadata(socket)
           ) do
      :ok
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def get(socket, key) do
    with {:ok, response} <-
           socket.channel
           |> Stub.get(
             Schema.KeyRequest.new(key: key),
             metadata: metadata(socket)
           ) do
      {:ok, response.value}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def verifiable_get(socket, key) do
    with {:ok, response} <-
           socket.channel
           |> Stub.verifiable_get(
             Schema.VerifiableGetRequest.new(keyRequest: Schema.KeyRequest.new(key: key)),
             metadata: metadata(socket)
           ) do
      {:ok, response.entry.value}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def set_all(socket, params) do
    kvs =
      for {key, value} <- params do
        Schema.KeyValue.new(key: key, value: value)
      end

    with {:ok, _} <-
           socket.channel
           |> Stub.set(
             Schema.SetRequest.new(KVs: kvs),
             metadata: metadata(socket)
           ) do
      :ok
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def get_all(socket, keys) do
    with {:ok, response} <-
           socket.channel
           |> Stub.get_all(
             Schema.KeyListRequest.new(keys: keys),
             metadata: metadata(socket)
           ) do
      {:ok,
       for entri <- response.entries do
         %{tx: entri.tx, value: entri.value}
       end}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def exec_all(socket, params) do
    socket.channel
    |> Stub.exec_all(
      Schema.ExecAllRequest.new(
        Operations: Schema.Op.new(nil),
        noWait: params.no_wait
      ),
      metadata: metadata(socket)
    )
  end

  def scan(socket) do
    with {:ok, response} <-
           socket.channel
           |> Stub.scan(
             Schema.ScanRequest.new(
               # seekKey: params.seen_key,
               # prefix: params.prefix,
               # desc: params.desc,
               # limit: params.limit,
               # sinceTx: params.since_tx,
               # noWait: params.no_wait
             ),
             metadata: metadata(socket)
           ) do
      for entri <- response.entries do
        %{tx: entri.tx, value: entri.value, key: entri.key}
      end
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def count(socket, prefix) do
    socket.channel
    |> Stub.count(Schema.KeyPrefix.new(prefix: prefix), metadata: metadata(socket))
  end

  def count_all(socket) do
    socket.channel
    |> Stub.count_all(Protobuf.Empty.new(), metadata: metadata(socket))
  end

  def tx_by_id(socket, params) do
    socket.channel
    |> Stub.tx_by_id(Schema.TxRequest.new(tx: params.tx), metadata: metadata(socket))
  end

  def verifiable_tx_by_id(socket, params) do
    socket.channel
    |> Stub.verifiable_tx_by_id(
      Schema.VerifiableTxRequest.new(
        tx: params.tx,
        proveSinceTx: params.prove_since_tx
      ),
      metadata: metadata(socket)
    )
  end

  def tx_scan(socket, params) do
    socket.channel
    |> Stub.tx_scan(
      Schema.TxScanRequest.new(
        initialTx: params.initial_tx,
        limit: params.limit,
        desc: params.desc
      ),
      metadata: metadata(socket)
    )
  end

  def history(socket, key) do
    with {:ok, response} <-
           socket.channel
           |> Stub.history(
             Schema.HistoryRequest.new(
               key: key
               # offset: params.offset,
               # limit: params.limit,
               # desc: params.desc,
               # sinceTx: params.since_tx
             ),
             metadata: metadata(socket)
           ) do
      {:ok,
       for entri <- response.entries do
         %{tx: entri.tx, value: entri.value}
       end}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def health(socket) do
    socket.channel
    |> Stub.health(Protobuf.Empty.new())
  end

  def current_state(socket) do
    socket.channel
    |> Stub.current_state(Protobuf.Empty.new(), metadata: metadata(socket))
  end

  def set_reference(channel, params) do
    channel
    |> Stub.set_reference(
      Schema.ReferenceRequest.new(
        key: params.key,
        referencedKey: params.referenced_key,
        atTx: params.at_tx,
        boundRef: params.bound_ref,
        noWait: params.no_wait
      )
    )
  end

  def verifiable_set_reference(channel, params) do
    channel
    |> Stub.verifiable_set_reference(
      Schema.VerifiableReferenceRequest.new(
        referenceRequest:
          Schema.ReferenceRequest.new(
            key: params.key,
            referencedKey: params.referenced_key,
            atTx: params.at_tx,
            boundRef: params.bound_ref,
            noWait: params.no_wait
          ),
        proveSinceTx: params.prove_since_tx
      )
    )
  end

  def z_add(channel, params) do
    channel
    |> Stub.z_add(
      Schema.ZAddRequest.new(
        set: params.set,
        score: params.score,
        key: params.key,
        atTx: params.at_tx,
        boundRef: params.bound_ref,
        noWait: params.no_wait
      )
    )
  end

  def verifiable_z_add(channel, params) do
    channel
    |> Stub.verifiable_z_add(
      Schema.VerifiableZAddRequest.new(
        zAddRequest:
          Schema.ZAddRequest.new(
            set: params.set,
            score: params.score,
            key: params.key,
            atTx: params.at_tx,
            boundRef: params.bound_ref,
            noWait: params.no_wait
          ),
        proveSinceTx: params.prove_since_tx
      )
    )
  end

  def z_scan(channel, params) do
    channel
    |> Stub.z_scan(
      Schema.ZScanRequest.new(
        set: params.set,
        seekKey: params.seek_key,
        seekScore: params.seek_score,
        seekAtTx: params.seek_at_tx,
        inclusiveSeek: params.inclusive_seek,
        limit: params.limit,
        desc: params.desc,
        minScore: params.min_score,
        maxScore: params.max_score,
        sinceTx: params.since_tx,
        noWait: params.no_wait
      )
    )
  end

  def create_database(socket, database_name) do
    with {:ok, _} <-
           socket.channel
           |> Stub.create_database(Schema.Database.new(databaseName: database_name),
             metadata: metadata(socket)
           ) do
      :ok
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def list_databases(socket) do
    with {:ok, response} <-
           socket.channel
           |> Stub.database_list(Protobuf.Empty.new(), metadata: metadata(socket)) do
      {:ok,
       for database <- response.databases do
         database.databaseName
       end}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def use_database(socket, database_name) do
    with {:ok, response} <-
           socket.channel
           |> Stub.use_database(Schema.Database.new(databaseName: database_name),
             metadata: metadata(socket)
           ) do
      {:ok, response.token}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def compact_index(channel) do
    channel
    |> Stub.compact_index(Protobuf.Empty.new())
  end

  def change_permission(channel, params) do
    channel
    |> Stub.change_permission(
      Schema.ChangePermissionRequest.new(
        action: params.action,
        username: params.username,
        database: params.database,
        permission: params.permission
      )
    )
  end

  def set_active_user(channel, params) do
    channel
    |> Stub.set_active_user(
      Schema.SetActiveUserRequest.new(
        active: params.active,
        username: params.username
      )
    )
  end

  def stream_get(channel, params) do
    channel
    |> Stub.stream_get(
      Schema.KeyRequest.new(
        key: params.key,
        atTx: params.at_tx,
        sinceTx: params.since_tx
      )
    )
  end

  def stream_set(_channel, _params) do
  end

  def stream_verifiable_get(_channel, _params) do
  end

  def stream_verifiable_set(_channel, _params) do
  end

  def stream_scan(_channel, _params) do
  end

  def stream_z_scan(_channel, _params) do
  end

  def stream_history(_channel, _params) do
  end

  def stream_exec_all(_channel, _params) do
  end

  def use_snapshot(channel, params) do
    channel
    |> Stub.use_snapshot(
      Schema.UseSnapshotRequest.new(
        sinceTx: params.since_tx,
        asBeforeTx: params.as_before_tx
      )
    )
  end

  def sql_exec(socket, sql) do
    socket.channel
    |> Stub.sql_exec(
      Schema.SQLExecRequest.new(sql: sql),
      metadata: metadata(socket)
    )
  end

  defp sql_value(value) when is_nil(value) do
    Schema.SQLValue.new(value: {:null, Protobuf.NullValue})
  end

  defp sql_value(value) when is_number(value) do
    Schema.SQLValue.new(value: {:n, value})
  end

  defp sql_value(value) when is_binary(value) do
    Schema.SQLValue.new(value: {:s, value})
  end

  defp sql_value(value) when is_boolean(value) do
    Schema.SQLValue.new(value: {:b, value})
  end

  defp sql_value(value) when is_bitstring(value) do
    Schema.SQLValue.new(value: {:bs, value})
  end

  def sql_exec(socket, sql, params) do
    with {:ok, _} <-
           socket.channel
           |> Stub.sql_exec(
             Schema.SQLExecRequest.new(
               sql: sql,
               params:
                 for {key, value} <- params do
                   Schema.NamedParam.new(name: Atom.to_string(key), value: sql_value(value))
                 end
             ),
             metadata: metadata(socket)
           ) do
      :ok
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def sql_query(socket, sql, params) do
    with {:ok, response} <-
           socket.channel
           |> Stub.sql_query(
             Schema.SQLQueryRequest.new(
               sql: sql,
               params:
                 for {key, value} <- params do
                   Schema.NamedParam.new(name: Atom.to_string(key), value: sql_value(value))
                 end
             ),
             metadata: metadata(socket)
           ) do
      {:ok, response.rows}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def list_tables(socket) do
    with {:ok, response} <-
           socket.channel
           |> Stub.list_tables(Protobuf.Empty.new(), metadata: metadata(socket)) do
      {:ok, response}
    else
      {:error, %GRPC.RPCError{message: message}} -> {:error, message}
    end
  end

  def describe_table(socket, table_name) do
    socket.channel
    |> Stub.describe_table(Schema.Table.new(tableName: table_name), metadata: metadata(socket))
  end

  def verifiable_sql_get(socket, params) do
    socket.channel
    |> Stub.verifiable_sql_get(
      Schema.VerifiableSQLGetRequest.new(
        sqlGetRequest:
          Schema.SQLGetRequest.new(
            table: params.table,
            pkValue: Schema.SQLValue.new(nil),
            atTx: params.at_tx,
            sinceTx: params.since_tx
          ),
        proveSinceTx: params.prove_since_tx
      ),
      metadata: metadata(socket)
    )
  end
end