lib/immudb/stream.ex

defmodule Immudb.Stream do
  use Immudb.Grpc, :schema

  alias Immudb.Util
  alias Immudb.Socket

  @spec stream_get(Socket.t(),
          key: String.t(),
          at_tx: integer(),
          since_tx: integer()
        ) ::
          {:error, String.t() | atom()} | {:ok, nil}
  def stream_get(%Socket{channel: %GRPC.Channel{} = channel, token: token},
        key: key,
        at_tx: at_tx
      ) do
    channel
    |> Stub.stream_get(
      Schema.KeyRequest.new(
        key: key,
        atTx: at_tx
      ),
      metadata: token |> Util.metadata()
    )
    |> case do
      {:ok, v} ->
        {:ok, v}

      {:error, %GRPC.RPCError{message: message}} ->
        {:error, message}

      _ ->
        {:error, :unknown}
    end
  end

  def stream_get(%Socket{channel: %GRPC.Channel{} = channel, token: token},
        key: key,
        since_tx: since_tx
      ) do
    channel
    |> Stub.stream_get(
      Schema.KeyRequest.new(
        key: key,
        sinceTx: since_tx
      ),
      metadata: token |> Util.metadata()
    )
    |> case do
      {:ok, v} ->
        {:ok, v}

      {:error, %GRPC.RPCError{message: message}} ->
        {:error, message}

      _ ->
        {:error, :unknown}
    end
  end

  def stream_get(_, _) do
    {:error, :invalid_params}
  end

  def stream_set(_channel, _params) do
  end

  @spec stream_verifiable_get(Socket.t(),
          key: String.t(),
          at_tx: integer(),
          since_tx: integer(),
          prove_since_tx: integer()
        ) ::
          {:error, String.t() | atom()} | {:ok, nil}
  def stream_verifiable_get(%Socket{channel: %GRPC.Channel{} = channel, token: token},
        key: key,
        at_tx: at_tx,
        since_tx: since_tx,
        prove_since_tx: prove_since_tx
      ) do
    channel
    |> Stub.stream_verifiable_get(
      Schema.VerifiableGetRequest.new(
        keyRequest:
          Schema.KeyRequest.new(
            key: key,
            atTx: at_tx,
            sinceTx: since_tx
          ),
        proveSinceTx: prove_since_tx
      ),
      metadata: token |> Util.metadata()
    )
    |> case do
      {:ok, v} ->
        {:ok, v}

      {:error, %GRPC.RPCError{message: message}} ->
        {:error, message}

      _ ->
        {:error, :unknown}
    end
  end

  def stream_verifiable_get(_, _) do
    {:error, :invalid_params}
  end

  def stream_verifiable_set(_channel, _params) do
  end

  @spec stream_scan(Socket.t(),
          seek_key: binary(),
          prefix: binary(),
          desc: binary(),
          limit: integer(),
          since_tx: binary(),
          no_wait: boolean()
        ) ::
          {:error, String.t() | atom()} | {:ok, Entries.t()}
  def stream_scan(%Socket{channel: %GRPC.Channel{} = channel, token: token},
        seek_key: seek_key,
        prefix: prefix,
        desc: desc,
        limit: limit,
        since_tx: since_tx,
        no_wait: no_wait
      ) do
    channel
    |> Stub.stream_scan(
      Schema.ScanRequest.new(
        seekKey: seek_key,
        prefix: prefix,
        desc: desc,
        limit: limit,
        sinceTx: since_tx,
        noWait: no_wait
      ),
      metadata: token |> Util.metadata()
    )
    |> case do
      {:ok, v} ->
        {:ok, v |> Immudb.Schemas.Entries.convert()}

      {:error, %GRPC.RPCError{message: message}} ->
        {:error, message}

      _ ->
        {:error, :unknown}
    end
  end

  def stream_scan(_, _) do
    {:error, :invalid_params}
  end

  def stream_z_scan(_channel, _params) do
  end

  @spec stream_history(Socket.t(), binary(),
          offset: integer(),
          limit: integer(),
          desc: boolean(),
          since_tx: integer()
        ) ::
          {:error, String.t() | atom()} | {:ok, Immudb.Schemas.Entries.t()}
  def stream_history(%Socket{channel: %GRPC.Channel{} = channel, token: token}, key,
        offset: offset,
        limit: limit,
        desc: desc,
        since_tx: since_tx
      ) do
    channel
    |> Stub.stream_history(
      Schema.HistoryRequest.new(
        key: key,
        offset: offset,
        limit: limit,
        desc: desc,
        sinceTx: since_tx
      ),
      metadata: token |> Util.metadata()
    )
    |> case do
      {:ok, v} ->
        {:ok, v |> Immudb.Schemas.Entries.convert()}

      {:error, %GRPC.RPCError{message: message}} ->
        {:error, message}

      _ ->
        {:error, :unknown}
    end
  end

  def stream_history(_, _, _) do
    {:error, :invalid_params}
  end

  def stream_exec_all(_channel, _params) do
  end
end