lib/schema/schema_store.ex

defmodule Polyn.SchemaStore do
  @moduledoc """
  A SchemaStore for loading and accessing schemas from the NATS server that were
  created via Polyn CLI.

  You will need this running, likely in your application supervision tree, in order for
  Polyn to access schemas

  ## Examples

      ```elixir
      children = [
        {Polyn.SchemaStore, connection_name: :connection_name_or_pid}
      ]

      opts = [strategy: :one_for_one, name: MySupervisor]
      Supervisor.start_link(children, opts)
      ```
  """

  use GenServer

  alias Jetstream.API.KV

  @store_name "POLYN_SCHEMAS"
  @already_in_use_code 10_058
  @default_retry_timeout 5_000
  @default_retry_interval 1_000
  @default_retries 5

  @type option ::
          {:connection_name, Gnat.t()} | {:retry_interval, pos_integer()} | GenServer.option()

  @doc """
  Start a new SchemaStore process

  ## Examples

      iex>Polyn.SchemaStore.start_link(connection_name: :gnat)
      :ok
  """
  @spec start_link(opts :: [option()]) :: GenServer.on_start()
  def start_link(opts) do
    {store_args, server_opts} =
      Keyword.split(opts, [:schemas, :store_name, :connection_name, :retry_interval])

    # For applications and application testing there should only be one SchemaStore running.
    # For testing the library there could be multiple
    process_name = Keyword.get(store_args, :store_name) |> process_name()
    server_opts = Keyword.put_new(server_opts, :name, process_name)
    GenServer.start_link(__MODULE__, store_args, server_opts)
  end

  # Get a process name for a given store name
  @doc false
  def process_name(nil), do: __MODULE__
  def process_name(store_name) when is_binary(store_name), do: String.to_atom(store_name)
  def process_name(store_name) when is_atom(store_name), do: store_name

  @doc false
  @spec get_schemas(pid()) :: map()
  def get_schemas(pid) do
    GenServer.call(pid, :get_schemas)
  end

  # Persist a schema. In prod/dev schemas should have already been persisted via
  # the Polyn CLI.
  @doc false
  @spec save(pid :: pid(), type :: binary(), schema :: map()) :: :ok
  def save(pid, type, schema) when is_map(schema) do
    is_json_schema?(schema)
    GenServer.call(pid, {:save, type, encode(schema)})
  end

  defp is_json_schema?(schema) do
    ExJsonSchema.Schema.resolve(schema)
  rescue
    ExJsonSchema.Schema.InvalidSchemaError ->
      reraise Polyn.SchemaException,
              [message: "Schemas must be valid JSONSchema documents, got #{inspect(schema)}"],
              __STACKTRACE__
  end

  defp encode(schema) do
    case Jason.encode(schema) do
      {:ok, encoded} -> encoded
      {:error, reason} -> raise Polyn.SchemaException, inspect(reason)
    end
  end

  # Remove a schema
  @doc false
  @spec delete(pid :: pid(), type :: binary()) :: :ok
  def delete(pid, type) do
    GenServer.call(pid, {:delete, type})
  end

  # Get the schema for an event
  @doc false
  @spec get(pid :: pid(), type :: binary()) :: nil | map()
  def get(pid, type) do
    case GenServer.call(pid, {:get, type}) do
      nil ->
        nil

      schema ->
        Jason.decode!(schema)
    end
  end

  # Create the schema store if it doesn't exist already. In prod/dev the the store
  # creation should have already been done via the Polyn CLI
  @doc false
  @spec create_store(conn :: Gnat.t()) :: :ok
  @spec create_store(conn :: Gnat.t(), opts :: keyword()) :: :ok
  def create_store(conn, opts \\ []) do
    result =
      KV.create_bucket(conn, store_name(opts),
        description: "Contains Schemas for all events on the server"
      )

    case result do
      {:ok, _info} -> :ok
      # If some other client created the store first, with a slightly different
      # description or config we'll just use the existing one
      {:error, %{"err_code" => @already_in_use_code}} -> :ok
      {:error, reason} -> raise Polyn.SchemaException, inspect(reason)
    end
  end

  # Delete the schema store. Useful for test
  @doc false
  @spec delete_store(conn :: Gnat.t()) :: :ok
  @spec delete_store(conn :: Gnat.t(), opts :: keyword()) :: :ok
  def delete_store(conn, opts \\ []) do
    KV.delete_bucket(conn, store_name(opts))
  end

  # Get a configured store name or the default
  @doc false
  def store_name(opts \\ []) do
    Keyword.get(opts, :name, @store_name)
  end

  @impl GenServer
  def init(init_args) do
    store_name = Keyword.get(init_args, :store_name, @store_name)
    conn = Keyword.fetch!(init_args, :connection_name)
    preloaded_schemas = Keyword.get(init_args, :schemas)
    retry_interval = Keyword.get(init_args, :retry_interval, @default_retry_interval)

    schemas = preloaded_schemas || start_load_schemas(conn, store_name, retry_interval)

    {:ok, %{conn: conn, store_name: store_name, schemas: schemas}}
  end

  # The `Gnat.ConnectionSupervisor` doesn't block for a connection so it's possible for
  # the `SchemaStore` process to `init` without the connection being established
  defp start_load_schemas(conn, store_name, retry_interval, retries_left \\ @default_retries) do
    task =
      Task.async(fn ->
        load_schemas(%{
          conn: conn,
          store_name: store_name,
          retries_left: retries_left,
          retry_interval: retry_interval
        })
      end)

    case Task.yield(task, @default_retry_timeout) do
      {:ok, schemas} ->
        schemas

      nil ->
        schema_load_failed(%{
          conn: conn,
          store_name: store_name,
          failed_reason: "Connection timeout after #{@default_retry_timeout}"
        })
    end
  end

  defp load_schemas(%{retries_left: 0} = args) do
    schema_load_failed(args)
  end

  defp load_schemas(%{conn: conn} = args) do
    with true <- connection_alive?(conn),
         {:ok, schemas} <- KV.contents(conn, args.store_name) do
      schemas
    else
      {:error, reason} ->
        args =
          Map.put(args, :failed_reason, reason)
          |> Map.put(:retries_left, args.retries_left - 1)

        :timer.sleep(args.retry_interval)
        load_schemas(args)
    end
  end

  defp schema_load_failed(args) do
    raise Polyn.SchemaException,
          "Could not connect to Schema Store #{args.store_name} with connection #{inspect(args.conn)}, #{inspect(args.failed_reason)}"
  end

  defp connection_alive?(conn) when is_pid(conn) do
    Process.alive?(conn)
  end

  defp connection_alive?(conn) do
    case Process.whereis(conn) do
      nil -> {:error, "NATS server #{inspect(conn)} not alive"}
      _pid -> true
    end
  end

  @impl GenServer
  def handle_call(:get_schemas, _from, state) do
    {:reply, state.schemas, state}
  end

  def handle_call({:save, type, schema}, _from, state) do
    schemas = Map.put(state.schemas, type, schema)
    {:reply, :ok, %{state | schemas: schemas}}
  end

  def handle_call({:get, type}, _from, state) do
    {:reply, Map.get(state.schemas, type), state}
  end

  def handle_call({:delete, type}, _from, state) do
    schemas = Map.delete(state.schemas, type)
    {:reply, :ok, %{state | schemas: schemas}}
  end
end