defmodule Polyn.SchemaStore do
@moduledoc """
A SchemaStore for loading and accessing schemas from the NATS server that were
created via Polyn CLI.
You will need this running, likely in your application supervision tree, in order for
Polyn to access schemas
## Examples
```elixir
children = [
{Polyn.SchemaStore, connection_name: :connection_name_or_pid}
]
opts = [strategy: :one_for_one, name: MySupervisor]
Supervisor.start_link(children, opts)
```
"""
use GenServer
alias Jetstream.API.KV
@store_name "POLYN_SCHEMAS"
@already_in_use_code 10_058
@default_retry_timeout 5_000
@default_retry_interval 1_000
@default_retries 5
@type option ::
{:connection_name, Gnat.t()} | {:retry_interval, pos_integer()} | GenServer.option()
@doc """
Start a new SchemaStore process
## Examples
iex>Polyn.SchemaStore.start_link(connection_name: :gnat)
:ok
"""
@spec start_link(opts :: [option()]) :: GenServer.on_start()
def start_link(opts) do
{store_args, server_opts} =
Keyword.split(opts, [:schemas, :store_name, :connection_name, :retry_interval])
# For applications and application testing there should only be one SchemaStore running.
# For testing the library there could be multiple
process_name = Keyword.get(store_args, :store_name) |> process_name()
server_opts = Keyword.put_new(server_opts, :name, process_name)
GenServer.start_link(__MODULE__, store_args, server_opts)
end
# Get a process name for a given store name
@doc false
def process_name(nil), do: __MODULE__
def process_name(store_name) when is_binary(store_name), do: String.to_atom(store_name)
def process_name(store_name) when is_atom(store_name), do: store_name
@doc false
@spec get_schemas(pid()) :: map()
def get_schemas(pid) do
GenServer.call(pid, :get_schemas)
end
# Persist a schema. In prod/dev schemas should have already been persisted via
# the Polyn CLI.
@doc false
@spec save(pid :: pid(), type :: binary(), schema :: map()) :: :ok
def save(pid, type, schema) when is_map(schema) do
is_json_schema?(schema)
GenServer.call(pid, {:save, type, encode(schema)})
end
defp is_json_schema?(schema) do
ExJsonSchema.Schema.resolve(schema)
rescue
ExJsonSchema.Schema.InvalidSchemaError ->
reraise Polyn.SchemaException,
[message: "Schemas must be valid JSONSchema documents, got #{inspect(schema)}"],
__STACKTRACE__
end
defp encode(schema) do
case Jason.encode(schema) do
{:ok, encoded} -> encoded
{:error, reason} -> raise Polyn.SchemaException, inspect(reason)
end
end
# Remove a schema
@doc false
@spec delete(pid :: pid(), type :: binary()) :: :ok
def delete(pid, type) do
GenServer.call(pid, {:delete, type})
end
# Get the schema for an event
@doc false
@spec get(pid :: pid(), type :: binary()) :: nil | map()
def get(pid, type) do
case GenServer.call(pid, {:get, type}) do
nil ->
nil
schema ->
Jason.decode!(schema)
end
end
# Create the schema store if it doesn't exist already. In prod/dev the the store
# creation should have already been done via the Polyn CLI
@doc false
@spec create_store(conn :: Gnat.t()) :: :ok
@spec create_store(conn :: Gnat.t(), opts :: keyword()) :: :ok
def create_store(conn, opts \\ []) do
result =
KV.create_bucket(conn, store_name(opts),
description: "Contains Schemas for all events on the server"
)
case result do
{:ok, _info} -> :ok
# If some other client created the store first, with a slightly different
# description or config we'll just use the existing one
{:error, %{"err_code" => @already_in_use_code}} -> :ok
{:error, reason} -> raise Polyn.SchemaException, inspect(reason)
end
end
# Delete the schema store. Useful for test
@doc false
@spec delete_store(conn :: Gnat.t()) :: :ok
@spec delete_store(conn :: Gnat.t(), opts :: keyword()) :: :ok
def delete_store(conn, opts \\ []) do
KV.delete_bucket(conn, store_name(opts))
end
# Get a configured store name or the default
@doc false
def store_name(opts \\ []) do
Keyword.get(opts, :name, @store_name)
end
@impl GenServer
def init(init_args) do
store_name = Keyword.get(init_args, :store_name, @store_name)
conn = Keyword.fetch!(init_args, :connection_name)
preloaded_schemas = Keyword.get(init_args, :schemas)
retry_interval = Keyword.get(init_args, :retry_interval, @default_retry_interval)
schemas = preloaded_schemas || start_load_schemas(conn, store_name, retry_interval)
{:ok, %{conn: conn, store_name: store_name, schemas: schemas}}
end
# The `Gnat.ConnectionSupervisor` doesn't block for a connection so it's possible for
# the `SchemaStore` process to `init` without the connection being established
defp start_load_schemas(conn, store_name, retry_interval, retries_left \\ @default_retries) do
task =
Task.async(fn ->
load_schemas(%{
conn: conn,
store_name: store_name,
retries_left: retries_left,
retry_interval: retry_interval
})
end)
case Task.yield(task, @default_retry_timeout) do
{:ok, schemas} ->
schemas
nil ->
schema_load_failed(%{
conn: conn,
store_name: store_name,
failed_reason: "Connection timeout after #{@default_retry_timeout}"
})
end
end
defp load_schemas(%{retries_left: 0} = args) do
schema_load_failed(args)
end
defp load_schemas(%{conn: conn} = args) do
with true <- connection_alive?(conn),
{:ok, schemas} <- KV.contents(conn, args.store_name) do
schemas
else
{:error, reason} ->
args =
Map.put(args, :failed_reason, reason)
|> Map.put(:retries_left, args.retries_left - 1)
:timer.sleep(args.retry_interval)
load_schemas(args)
end
end
defp schema_load_failed(args) do
raise Polyn.SchemaException,
"Could not connect to Schema Store #{args.store_name} with connection #{inspect(args.conn)}, #{inspect(args.failed_reason)}"
end
defp connection_alive?(conn) when is_pid(conn) do
Process.alive?(conn)
end
defp connection_alive?(conn) do
case Process.whereis(conn) do
nil -> {:error, "NATS server #{inspect(conn)} not alive"}
_pid -> true
end
end
@impl GenServer
def handle_call(:get_schemas, _from, state) do
{:reply, state.schemas, state}
end
def handle_call({:save, type, schema}, _from, state) do
schemas = Map.put(state.schemas, type, schema)
{:reply, :ok, %{state | schemas: schemas}}
end
def handle_call({:get, type}, _from, state) do
{:reply, Map.get(state.schemas, type), state}
end
def handle_call({:delete, type}, _from, state) do
schemas = Map.delete(state.schemas, type)
{:reply, :ok, %{state | schemas: schemas}}
end
end