lib/schema/schema_store.ex

defmodule Polyn.SchemaStore do
  @moduledoc """
  A SchemaStore for loading and accessing schemas from the NATS server that were
  created via Polyn CLI.

  You will need this running, likely in your application supervision tree, in order for
  Polyn to access schemas

  ## Examples

      ```elixir
      children = [
        {Polyn.SchemaStore, connection_name: :connection_name_or_pid}
      ]

      opts = [strategy: :one_for_one, name: MySupervisor]
      Supervisor.start_link(children, opts)
      ```
  """

  use GenServer

  alias Jetstream.API.KV

  @store_name "POLYN_SCHEMAS"

  @type option :: {:connection_name, Gnat.t()} | GenServer.option()

  @doc """
  Start a new SchemaStore process

  ## Examples

      iex>Polyn.SchemaStore.start_link(connection_name: :gnat)
      :ok
  """
  @spec start_link(opts :: [option()]) :: GenServer.on_start()
  def start_link(opts) do
    {store_args, server_opts} = Keyword.split(opts, [:schemas, :store_name, :connection_name])
    # For applications and application testing there should only be one SchemaStore running.
    # For testing the library there could be multiple
    process_name = Keyword.get(store_args, :store_name) |> process_name()
    server_opts = Keyword.put_new(server_opts, :name, process_name)
    GenServer.start_link(__MODULE__, store_args, server_opts)
  end

  # Get a process name for a given store name
  @doc false
  def process_name(nil), do: __MODULE__
  def process_name(store_name) when is_binary(store_name), do: String.to_atom(store_name)
  def process_name(store_name) when is_atom(store_name), do: store_name

  @doc false
  @spec get_schemas(pid()) :: map()
  def get_schemas(pid) do
    GenServer.call(pid, :get_schemas)
  end

  # Persist a schema. In prod/dev schemas should have already been persisted via
  # the Polyn CLI.
  @doc false
  @spec save(pid :: pid(), type :: binary(), schema :: map()) :: :ok
  def save(pid, type, schema) when is_map(schema) do
    is_json_schema?(schema)
    GenServer.call(pid, {:save, type, encode(schema)})
  end

  defp is_json_schema?(schema) do
    ExJsonSchema.Schema.resolve(schema)
  rescue
    ExJsonSchema.Schema.InvalidSchemaError ->
      reraise Polyn.SchemaException,
              [message: "Schemas must be valid JSONSchema documents, got #{inspect(schema)}"],
              __STACKTRACE__
  end

  defp encode(schema) do
    case Jason.encode(schema) do
      {:ok, encoded} -> encoded
      {:error, reason} -> raise Polyn.SchemaException, inspect(reason)
    end
  end

  # Remove a schema
  @doc false
  @spec delete(pid :: pid(), type :: binary()) :: :ok
  def delete(pid, type) do
    GenServer.call(pid, {:delete, type})
  end

  # Get the schema for an event
  @doc false
  @spec get(pid :: pid(), type :: binary()) :: nil | map()
  def get(pid, type) do
    case GenServer.call(pid, {:get, type}) do
      nil ->
        nil

      schema ->
        Jason.decode!(schema)
    end
  end

  # Create the schema store if it doesn't exist already. In prod/dev the the store
  # creation should have already been done via the Polyn CLI
  @doc false
  @spec create_store(conn :: Gnat.t()) :: :ok
  @spec create_store(conn :: Gnat.t(), opts :: keyword()) :: :ok
  def create_store(conn, opts \\ []) do
    result =
      KV.create_bucket(conn, store_name(opts),
        description: "Contains Schemas for all events on the server"
      )

    case result do
      {:ok, _info} -> :ok
      # If some other client created the store first, with a slightly different
      # description or config we'll just use the existing one
      {:error, %{"err_code" => 10_058}} -> :ok
      {:error, reason} -> raise Polyn.SchemaException, inspect(reason)
    end
  end

  # Delete the schema store. Useful for test
  @doc false
  @spec delete_store(conn :: Gnat.t()) :: :ok
  @spec delete_store(conn :: Gnat.t(), opts :: keyword()) :: :ok
  def delete_store(conn, opts \\ []) do
    KV.delete_bucket(conn, store_name(opts))
  end

  # Get a configured store name or the default
  @doc false
  def store_name(opts \\ []) do
    Keyword.get(opts, :name, @store_name)
  end

  @impl GenServer
  def init(init_args) do
    store_name = Keyword.get(init_args, :store_name, @store_name)
    conn = Keyword.fetch!(init_args, :connection_name)
    preloaded_schemas = Keyword.get(init_args, :schemas)

    schemas = preloaded_schemas || load_schemas(conn, store_name)

    {:ok, %{conn: conn, store_name: store_name, schemas: schemas}}
  end

  defp load_schemas(conn, store_name) do
    case KV.contents(conn, store_name) do
      {:ok, schemas} ->
        schemas

      {:error, reason} ->
        raise Polyn.SchemaException, inspect(reason)
    end
  end

  @impl GenServer
  def handle_call(:get_schemas, _from, state) do
    {:reply, state.schemas, state}
  end

  def handle_call({:save, type, schema}, _from, state) do
    schemas = Map.put(state.schemas, type, schema)
    {:reply, :ok, %{state | schemas: schemas}}
  end

  def handle_call({:get, type}, _from, state) do
    {:reply, Map.get(state.schemas, type), state}
  end

  def handle_call({:delete, type}, _from, state) do
    schemas = Map.delete(state.schemas, type)
    {:reply, :ok, %{state | schemas: schemas}}
  end
end