defmodule Gnat.Jetstream.API.KV do
@moduledoc """
API for interacting with the Key/Value store functionality in Nats Jetstream.
Learn about the Key/Value store: https://docs.nats.io/nats-concepts/jetstream/key-value-store
"""
alias Gnat.Jetstream.API.{Consumer, Stream, Util}
@stream_prefix "KV_"
@subject_prefix "$KV."
@two_minutes_in_nanoseconds 1_200_000_000
@type bucket_options ::
{:history, non_neg_integer()}
| {:ttl, non_neg_integer()}
| {:max_bucket_size, non_neg_integer()}
| {:max_value_size, non_neg_integer()}
| {:description, binary()}
| {:replicas, non_neg_integer()}
| {:storage, :file | :memory}
| {:placement, Stream.placement()}
@doc """
Create a new Key/Value bucket. Can include the following options
* `:history` - How many historic values to keep per key (defaults to 1, max of 64)
* `:ttl` - How long to keep values for (in nanoseconds)
* `:max_bucket_size` - The max number of bytes the bucket can hold
* `:max_value_size` - The max number of bytes a value may be
* `:description` - A description for the bucket
* `:replicas` - How many replicas of the data to store
* `:storage` - Storage backend to use (:file, :memory)
* `:placement` - A map with :cluster (required) and :tags (optional)
## Examples
iex>{:ok, info} = Jetstream.API.KV.create_bucket(:gnat, "my_bucket")
"""
@spec create_bucket(conn :: Gnat.t(), bucket_name :: binary(), params :: [bucket_options()]) ::
{:ok, Stream.info()} | {:error, any()}
def create_bucket(conn, bucket_name, params \\ []) do
# The primary NATS docs don't provide information about how to interact
# with Key-Value functionality over the wire. Turns out the KV store is
# just a Stream under-the-hood
# Discovered these settings from looking at the `nats-server -js -DV` logs
# as well as the GoLang implementation https://github.com/nats-io/nats.go/blob/dd91b86bc4f7fa0f061fefe11506aaee413bfafd/kv.go#L339
# If the settings aren't correct, NATS will not consider it a valid KV store
stream = %Stream{
name: stream_name(bucket_name),
subjects: stream_subjects(bucket_name),
description: Keyword.get(params, :description),
max_msgs_per_subject: Keyword.get(params, :history, 1),
discard: :new,
deny_delete: true,
allow_rollup_hdrs: true,
max_age: Keyword.get(params, :ttl, 0),
max_bytes: Keyword.get(params, :max_bucket_size, -1),
max_msg_size: Keyword.get(params, :max_value_size, -1),
num_replicas: Keyword.get(params, :replicas, 1),
storage: Keyword.get(params, :storage, :file),
placement: Keyword.get(params, :placement),
duplicate_window: adjust_duplicate_window(Keyword.get(params, :ttl, 0))
}
Stream.create(conn, stream)
end
# The `duplicate_window` can't be greater than the `max_age`. The default `duplicate_window`
# is 2 minutes. We'll keep the 2 minute window UNLESS the ttl is less than 2 minutes
defp adjust_duplicate_window(ttl) when ttl > 0 and ttl < @two_minutes_in_nanoseconds, do: ttl
defp adjust_duplicate_window(_ttl), do: @two_minutes_in_nanoseconds
@doc """
Delete a Key/Value bucket
## Examples
iex>:ok = Jetstream.API.KV.delete_bucket(:gnat, "my_bucket")
"""
@spec delete_bucket(conn :: Gnat.t(), bucket_name :: binary()) :: :ok | {:error, any()}
def delete_bucket(conn, bucket_name) do
Stream.delete(conn, stream_name(bucket_name))
end
@doc """
Create a Key in a Key/Value Bucket
## Examples
iex>:ok = Jetstream.API.KV.create_key(:gnat, "my_bucket", "my_key", "my_value")
"""
@spec create_key(conn :: Gnat.t(), bucket_name :: binary(), key :: binary(), value :: binary()) ::
:ok | {:error, any()}
def create_key(conn, bucket_name, key, value, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 5_000)
reply = Gnat.request(conn, key_name(bucket_name, key), value, receive_timeout: timeout)
case reply do
{:ok, _} -> :ok
error -> error
end
end
@doc """
Delete a Key from a K/V Bucket
## Examples
iex>:ok = Jetstream.API.KV.delete_key(:gnat, "my_bucket", "my_key")
"""
@spec delete_key(conn :: Gnat.t(), bucket_name :: binary(), key :: binary()) ::
:ok | {:error, any()}
def delete_key(conn, bucket_name, key, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 5_000)
reply =
Gnat.request(conn, key_name(bucket_name, key), "",
headers: [{"KV-Operation", "DEL"}],
receive_timeout: timeout
)
case reply do
{:ok, _} -> :ok
error -> error
end
end
@doc """
Purge a Key from a K/V bucket. This will remove any revision history the key had
## Examples
iex>:ok = Jetstream.API.KV.purge_key(:gnat, "my_bucket", "my_key")
"""
@spec purge_key(conn :: Gnat.t(), bucket_name :: binary(), key :: binary()) ::
:ok | {:error, any()}
def purge_key(conn, bucket_name, key, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 5_000)
reply =
Gnat.request(conn, key_name(bucket_name, key), "",
headers: [{"KV-Operation", "PURGE"}, {"Nats-Rollup", "sub"}],
receive_timeout: timeout
)
case reply do
{:ok, _} -> :ok
error -> error
end
end
@doc """
Put a value into a Key in a K/V Bucket
## Examples
iex>:ok = Jetstream.API.KV.put_value(:gnat, "my_bucket", "my_key", "my_value")
"""
@spec put_value(conn :: Gnat.t(), bucket_name :: binary(), key :: binary(), value :: binary()) ::
:ok | {:error, any()}
def put_value(conn, bucket_name, key, value, opts \\ []) do
timeout = Keyword.get(opts, :timeout, 5_000)
reply = Gnat.request(conn, key_name(bucket_name, key), value, receive_timeout: timeout)
case reply do
{:ok, _} -> :ok
error -> error
end
end
@doc """
Get the value for a key in a particular K/V bucket
## Examples
iex>"my_value" = Jetstream.API.KV.get_value(:gnat, "my_bucket", "my_key")
"""
@spec get_value(conn :: Gnat.t(), bucket_name :: binary(), key :: binary()) ::
binary() | {:error, any()} | nil
def get_value(conn, bucket_name, key) do
case Stream.get_message(conn, stream_name(bucket_name), %{
last_by_subj: key_name(bucket_name, key)
}) do
{:ok, message} -> message.data
error -> error
end
end
@doc """
Get all the non-deleted key-value pairs for a Bucket
## Examples
iex>{:ok, %{"key1" => "value1"}} = Jetstream.API.KV.contents(:gnat, "my_bucket")
"""
@spec contents(conn :: Gnat.t(), bucket_name :: binary(), domain :: nil | binary()) ::
{:ok, map()} | {:error, binary()}
def contents(conn, bucket_name, domain \\ nil) do
stream = stream_name(bucket_name)
inbox = Util.reply_inbox()
consumer_name = "all_key_values_consumer_#{Util.nuid()}"
with {:ok, sub} <- Gnat.sub(conn, self(), inbox),
{:ok, _consumer} <-
Consumer.create(conn, %Consumer{
durable_name: consumer_name,
deliver_subject: inbox,
stream_name: stream,
domain: domain,
ack_policy: :none,
max_ack_pending: -1,
max_deliver: 1
}) do
keys = receive_keys(bucket_name)
:ok = Gnat.unsub(conn, sub)
:ok = Consumer.delete(conn, stream, consumer_name, domain)
{:ok, keys}
end
end
@doc ~S"""
Starts a monitor for key changes in a given bucket. Supply a handler that will receive
key change notifications.
## Examples
iex>{:ok, _pid} = Jetstream.API.KV.watch(:gnat, "my_bucket", fn action, key, value ->
IO.puts("#{action} taken on #{key}")
end)
"""
def watch(conn, bucket_name, handler) do
Gnat.Jetstream.API.KV.Watcher.start_link(conn: conn, bucket_name: bucket_name, handler: handler)
end
@doc ~S"""
Stops a previously running monitor. This will unsubscribe from the key changes and remove the
ephemeral consumer
## Examples
iex>:ok = Jetstream.API.KV.unwatch(pid)
"""
def unwatch(pid) do
Gnat.Jetstream.API.KV.Watcher.stop(pid)
end
defp receive_keys(keys \\ %{}, bucket_name) do
receive do
{:msg, %{topic: key, body: body, headers: headers}} ->
if {"kv-operation", "DEL"} in headers do
receive_keys(keys, bucket_name)
else
Map.put(keys, subject_to_key(key, bucket_name), body) |> receive_keys(bucket_name)
end
{:msg, %{topic: key, body: body}} ->
Map.put(keys, subject_to_key(key, bucket_name), body) |> receive_keys(bucket_name)
after
100 ->
keys
end
end
@doc """
Returns true if the provided stream is a KV bucket, false otherwise
## Parameters
* `stream_name` - the stream name to test
"""
@spec is_kv_bucket_stream?(stream_name :: binary()) :: boolean()
def is_kv_bucket_stream?(stream_name) do
String.starts_with?(stream_name, "KV_")
end
@doc """
Returns a list of all the buckets in the KV
"""
@spec list_buckets(conn :: Gnat.t()) :: {:error, term()} | {:ok, list(String.t())}
def list_buckets(conn) do
with {:ok, %{streams: streams}} <- Stream.list(conn) do
stream_names =
streams
|> Enum.flat_map(fn bucket ->
if is_kv_bucket_stream?(bucket) do
[bucket |> String.trim_leading(@stream_prefix)]
else
[]
end
end)
{:ok, stream_names}
else
{:error, reason} ->
{:error, reason}
end
end
@doc false
def stream_name(bucket_name) do
"#{@stream_prefix}#{bucket_name}"
end
defp stream_subjects(bucket_name) do
["#{@subject_prefix}#{bucket_name}.>"]
end
defp key_name(bucket_name, key) do
"#{@subject_prefix}#{bucket_name}.#{key}"
end
@doc false
def subject_to_key(subject, bucket_name) do
String.replace(subject, "#{@subject_prefix}#{bucket_name}.", "")
end
end