lib/avrora/resolver.ex

defmodule Avrora.Resolver do
  @moduledoc """
  Resolve schema name or global ID to a schema, keeping cache up to date.
  """

  require Logger
  alias Avrora.Config
  alias Avrora.Schema.Name

  @doc """
  Resolve schema, trying multiple methods. First tries integer id, then string name.

  ## Examples

      ...> {:ok, schema} = Avrora.Resolver.resolve_any([1, "io.confluent.Payment"])
      ...> schema.full_name
      "io.confluent.Payment"
  """
  @spec resolve_any(nonempty_list(integer() | String.t())) :: {:ok, Avrora.Schema.t()} | {:error, term()}
  def resolve_any(ids) do
    ids = List.wrap(ids)
    total = Enum.count(ids)

    ids
    |> Stream.map(&{&1, resolve(&1)})
    |> Stream.with_index(1)
    |> Enum.find_value(fn {{id, {status, result}}, index} ->
      if status == :error, do: Logger.debug("fail to resolve schema by identifier `#{id}`")
      if status == :ok || index == total, do: {status, result}
    end)
  end

  @doc """
  Resolve schema by integer ID or by a string name with optional version,
  then update memory storage.

  In case of an integer ID it stores schema in memory with ID key.

  ## Examples

      ...> {:ok, schema} = Avrora.Resolver.resolve(1)
      ...> schema.full_name
      "io.confluent.Payment"

  In case of a string name it stores schema in memory with key `name` and `name:version`,
  also adds schema to the Schema Registry (if it's configured).

  A version for the name can be provided by adding `:` with the version number,
  e.g. `io.confluent.Payment:5`. If the Schema Registry is configured (`:registry_url`),
  it will first try fetch there, then local schemas folder (`:schemas_path`).

  ## Examples

      ...> {:ok, schema1} = Avrora.Resolver.resolve("io.confluent.Payment")
      ...> {:ok, schema2} = Avrora.Resolver.resolve("io.confluent.Payment:42")
      ...> schema1.version
      42
      ...> schema2.version
      42
      ...> schema1.full_name
      "io.confluent.Payment"
      ...> schema.full_name
      "io.confluent.Payment"

  """
  @spec resolve(integer() | String.t()) :: {:ok, Avrora.Schema.t()} | {:error, term()}
  def resolve(id) when is_integer(id) do
    with {:ok, nil} <- memory_storage().get(id),
         {:ok, avro} <- registry_storage().get(id) do
      memory_storage().put(id, avro)
    end
  end

  def resolve(name) when is_binary(name) do
    with {:ok, schema_name} <- Name.parse(name),
         {:ok, nil} <- memory_storage().get(name) do
      case resolve_with_registry(schema_name) do
        {:ok, schema} ->
          with {:ok, schema} <- memory_storage().put(schema.id, schema),
               {:ok, schema} <- memory_storage().put(schema_name.name, schema),
               {:ok, timestamp} <- memory_storage().expire(schema_name.name, names_ttl()) do
            if timestamp == :infinity,
              do: Logger.debug("schema `#{schema_name.name}` will be always resolved from memory")

            if is_nil(schema.version),
              do: {:ok, schema},
              else: memory_storage().put("#{schema_name.name}:#{schema.version}", schema)
          end

        {:reclaim, schema} ->
          memory_storage().put(schema_name.name, schema)

        {:error, reason} ->
          {:error, reason}
      end
    end
  end

  defp resolve_with_registry(schema_name) do
    if Config.self().registry_schemas_autoreg() && is_nil(schema_name.version) do
      with {:ok, schema} <- file_storage().get(schema_name.origin),
           {:error, :unconfigured_registry_url} <- registry_storage().put(schema_name.name, schema.json),
           do: {:reclaim, schema}
    else
      with {:error, :unconfigured_registry_url} <- registry_storage().get(schema_name.origin),
           {:ok, schema} <- file_storage().get(schema_name.origin),
           do: {:reclaim, schema}
    end
  end

  defp file_storage, do: Config.self().file_storage()
  defp memory_storage, do: Config.self().memory_storage()
  defp registry_storage, do: Config.self().registry_storage()
  defp names_ttl, do: Config.self().names_cache_ttl()
end