lib/confluent_schema_registry.ex

defmodule ConfluentSchemaRegistry do
  @moduledoc """
  Elixir client for [Confluent Schema Registry](https://docs.confluent.io/current/schema-registry/)
  [API](https://docs.confluent.io/current/schema-registry/develop/api.html).
  """

  # Argument types
  @type id :: pos_integer
  @type schema :: binary
  @type subject :: binary
  @type code :: non_neg_integer
  @type reason :: any
  @type version :: pos_integer | binary
  @type level :: binary

  @doc ~S"""
  Create client to talk to Schema Registry.

  Options are:

  * base_url: URL of schema registry (optional, default "http://localhost:8081")
  * username: username for BasicAuth (optional)
  * password: password for BasicAuth (optional)
  * adapter: Tesla adapter config
  * middleware: List of additional middleware module config (optional)

  ## Examples

      iex> client = ConfluentSchemaRegistry.client()
      %Tesla.Client{
        adapter: nil,
        fun: nil,
        post: [],
        pre: [
          {Tesla.Middleware.BaseUrl, :call, ["http://localhost:8081"]},
          {Tesla.Middleware.Headers, :call,
           [[{"content-type", "application/vnd.schemaregistry.v1+json"}]]},
          {Tesla.Middleware.JSON, :call,
           [[decode_content_types: ["application/vnd.schemaregistry.v1+json"]]]}
        ]
      }
  """
  @spec client(Keyword.t) :: Tesla.Client.t
  def client(opts \\ []) do
    base_url = opts[:base_url] || "http://localhost:8081"
    opts_middleware = opts[:middleware] || []
    adapter = opts[:adapter]

    middleware = opts_middleware ++ [
      {Tesla.Middleware.BaseUrl, base_url},
      {Tesla.Middleware.Headers, [
        {"content-type", "application/vnd.schemaregistry.v1+json"},
        {"accept", "application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json"}
      ]},
      {Tesla.Middleware.JSON, decode_content_types: [
        "application/vnd.schemaregistry.v1+json",
        "application/vnd.schemaregistry+json"
      ]},
    ] ++ basic_auth(opts)

    Tesla.client(middleware, adapter)
  end

  # Configure Tesla.Middleware.BasicAuth
  @spec basic_auth(Keyword.t) :: [{Tesla.Middleware.BasicAuth, map}]
  defp basic_auth(opts) do
    if opts[:username] do
      auth_opts = opts
                  |> Keyword.take([:username, :password])
                  |> Map.new()
      [{Tesla.Middleware.BasicAuth, auth_opts}]
    else
      []
    end
  end


  @doc ~S"""
  Get the schema string identified by the input ID.

  https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id

  Returns binary schema.

  ## Examples

      iex> ConfluentSchemaRegistry.get_schema(client, 21)
      {:ok, "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}"}

  """
  @spec get_schema(Tesla.Client.t, id) :: {:ok, schema} | {:error, code, reason}
  def get_schema(client, id) when is_integer(id) do
    case do_get(client, "/schemas/ids/#{id}") do
      {:ok, %{"schema" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: " <> value}
      error -> error
    end
  end

  @doc ~S"""
  Get a list of registered subjects.

  https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects

  Returns list of subject name strings.

  ## Examples

      iex> ConfluentSchemaRegistry.get_subjects(client)
      {:ok, ["test"]}

  """
  @spec get_subjects(Tesla.Client.t) :: {:ok, list(subject)} | {:error, code, reason}
  def get_subjects(client) do
    case do_get(client, "/subjects") do
      {:ok, _} = result -> result
      error -> error
    end
  end

  @doc ~S"""
  Get a list of versions registered under the specified subject.

  https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions

  ```
  {:ok, [1, 2, 3, 4]} = ConfluentSchemaRegistry.get_versions(client, "test")
  ```

  Returns list of integer ids.
  """
  @spec get_versions(Tesla.Client.t, subject) :: {:ok, list(id)} | {:error, code, reason}
  def get_versions(client, subject) do
    case do_get(client, "/subjects/#{subject}/versions") do
      {:ok, _} = result -> result
      error -> error
    end
  end

  @doc ~S"""
  Deletes the specified subject and its associated compatibility level if registered.
  It is recommended to use this API only when a topic needs to be recycled or
  in development environment.

  https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)

  Returns list of integer ids.

  ## Examples

      iex> ConfluentSchemaRegistry.delete_subject(client, "test")
      {:ok, [1]}

  """
  @spec delete_subject(Tesla.Client.t, subject) :: {:ok, list(id)} | {:error, code, reason}
  def delete_subject(client, subject) do
    case do_delete(client, "/subjects/#{subject}") do
      {:ok, _} = result -> result
      error -> error
    end
  end

  @doc ~S"""
  Get a specific version of the schema registered under this subject

  https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)

  Returns a map with the following keys:

  * subject (string) -- Name of the subject that this schema is registered under
  * id (int) -- Globally unique identifier of the schema
  * version (int) -- Version of the returned schema
  * schema (string) -- The Avro schema string

  ```
  case ConfluentSchemaRegistry.get_schema(client, "test", "latest") do
    {:ok, reg} ->
      # Already registered
      schema = reg["schema"]
      schema_id = reg["id"]
    {:error, 404, %{"error_code" => 40401}} ->
      # Subject not found
    {:error, 404, %{"error_code" => 40402}} ->
      # Version not found
    {:error, 422, reason} ->
      # Unprocessable Entity, Invalid Avro version
    {:error, code, reason} ->
      # Other error
  end
  ```

  ## Examples

      iex> ConfluentSchemaRegistry.get_schema(client, "test")
      {:ok,
       %{
         "id" => 21,
         "schema" => "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}",
         "subject" => "test",
         "version" => 13
       }}

      iex> ConfluentSchemaRegistry.get_schema(client, "test", 13)
      {:ok,
       %{
         "id" => 21,
         "schema" => "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}",
         "subject" => "test",
         "version" => 13
       }}

  """
  @spec get_schema(Tesla.Client.t, subject, version) ::
    {:ok, map} | {:error, code, reason}
  def get_schema(client, subject, version \\ "latest") do
    case do_get(client, "/subjects/#{subject}/versions/#{version}") do
      {:ok, _} = result -> result
      error -> error
    end
  end

  # NOTE: /subjects/#{subject}/versions/#{version}/schema not implemented, as
  # it's redundant with get_schema/3

  @doc ~S"""
  Register a new schema under the specified subject. If successfully
  registered, this returns the unique identifier of this schema in the registry.

  https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions

  Returns the integer id.

  ```
  case ConfluentSchemaRegistry.register_schema(client, "test", schema) do
    {:ok, schema_id} ->
      # Already registered
    {:error, 409, reason} ->
      # Conflict -- Incompatible Avro schema
    {:error, 422, reason} ->
      # Unprocessable Entity, Invalid Avro schema
    {:error, code, reason} ->
      # Other error
  end
  ```

  ## Examples

      iex> ConfluentSchemaRegistry.register_schema(client, "test", schema)
      {:ok, 21}

  """
  @spec register_schema(Tesla.Client.t, subject, schema) :: {:ok, id} | {:error, code, reason}
  def register_schema(client, subject, schema) do
    case do_post(client, "/subjects/#{subject}/versions", %{schema: schema}) do
      {:ok, %{"id" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: #{inspect value}"}
      error -> error
    end
  end

  @doc ~S"""
  Check if a schema has already been registered under the specified subject. If
  so, this returns the schema string along with its globally unique identifier,
  its version under this subject and the subject name.

  https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)

  Returns map with the following keys:

  * subject (string) -- Name of the subject that this schema is registered under
  * id (int) -- Globally unique identifier of the schema
  * version (int) -- Version of the returned schema
  * schema (string) -- The Avro schema string

  ```
  schema = "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}"
  case ConfluentSchemaRegistry.is_registered(client, "test", schema) do
    {:ok, reg} ->
      # Found
      schema = reg["schema"]
      schema_id = reg["id"]
    {:error, 404, %{"error_code" => 40401}} ->
      # Subject not found
    {:error, 404, %{"error_code" => 40403}} ->
      # Schema not found
    {:error, code, reason} ->
      # Other error
  end
  ```

  ## Examples

      iex> ConfluentSchemaRegistry.is_registered(client, "test", schema)
      {:ok,
       %{
         "id" => 21,
         "schema" => "{\"type\":\"record\",\"name\":\"test\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}",
         "subject" => "test",
         "version" => 1
       }}

      iex> ConfluentSchemaRegistry.is_registered(client, "test2", schema)
      {:error, 404, %{"error_code" => 40401, "message" => "Subject not found. ..."}}

  """
  @spec is_registered(Tesla.Client.t, subject, schema) :: {:ok, map} | {:error, code, reason}
  def is_registered(client, subject, schema) do
    case do_post(client, "/subjects/#{subject}", %{schema: schema}) do
      {:ok, _} = result -> result
      error -> error
    end
  end

  @doc ~S"""
  Deletes a specific version of the schema registered under this subject. This
  only deletes the version and the schema ID remains intact making it still
  possible to decode data using the schema ID.

  https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)-versions-(versionId-%20version)

  ```
  {:ok, 1} = ConfluentSchemaRegistry.delete_version(client, "test") # latest
  {:ok, 1} = ConfluentSchemaRegistry.delete_version(client, "test", 1)
  ```

  Returns integer id of deleted version.
  """
  @spec delete_version(Tesla.Client.t, subject, version) :: {:ok, id} | {:error, code, reason}
  def delete_version(client, subject, version \\ "latest") do
    case do_delete(client, "/subjects/#{subject}/versions/#{version}") do
      {:ok, value} when is_integer(value) -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: #{inspect value}"}
      error -> error
    end
  end

  @doc ~S"""
  Test input schema against a particular version of a subject's schema for
  compatibility. Note that the compatibility level applied for the check is the
  configured compatibility level for the subject (`get_compatibility/2`).
  If this subject's compatibility level was never changed, then the
  global compatibility level applies (`get_compatibility/1`).

  https://docs.confluent.io/current/schema-registry/develop/api.html#post--compatibility-subjects-(string-%20subject)-versions-(versionId-%20version)

  Returns boolean.

  ## Examples

      iex> ConfluentSchemaRegistry.is_compatible(client, "test", schema)
      {:ok, true}

      iex> ConfluentSchemaRegistry.is_compatible(client, "test", schema, "latest")
      {:ok, true}

      iex> ConfluentSchemaRegistry.is_compatible(client, "test", schema, 1)
      {:ok, true}

  """
  @spec is_compatible(Tesla.Client.t, subject, schema, version) :: {:ok, boolean} | {:error, code, reason}
  def is_compatible(client, subject, schema, version \\ "latest") do
    case do_post(client, "/compatibility/subjects/#{subject}/versions/#{version}", %{schema: schema}) do
      {:ok, %{"is_compatible" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: " <> value}
      error -> error
    end
  end

  @doc ~S"""
  Update global compatibility level.

  Level is a string which must be one of BACKWARD, BACKWARD_TRANSITIVE, FORWARD,
  FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE

  https://docs.confluent.io/current/schema-registry/develop/api.html#put--config

  Returns string.

  ## Examples

      iex> ConfluentSchemaRegistry.update_compatibility(client, "test", "FULL")
      {:ok, "FULL"}

  """
  @spec update_compatibility(Tesla.Client.t, level) :: {:ok, level} | {:error, code, reason}
    when level: binary, code: non_neg_integer, reason: any
  def update_compatibility(client, level) do
    case do_put(client, "/config", %{compatibility: level}) do
      {:ok, %{"compatibility" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: " <> value}
      error -> error
    end
  end

  @doc ~S"""
  Get global compatibility level.

  Level is a string which will be one of BACKWARD, BACKWARD_TRANSITIVE, FORWARD,
  FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE

  https://docs.confluent.io/current/schema-registry/develop/api.html#put--config

  Returns string.

  ## Examples

      iex> ConfluentSchemaRegistry.get_compatibility(client)
      {:ok, "BACKWARD"}

  """
  @spec get_compatibility(Tesla.Client.t) :: {:ok, level} | {:error, code, reason}
    when level: binary, code: non_neg_integer, reason: any
  def get_compatibility(client) do
    case do_get(client, "/config") do
      {:ok, %{"compatibilityLevel" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: #{inspect value}"}
      error -> error
    end
  end

  @doc ~S"""
  Update compatibility level for the specified subject.

  Leve is a string which must be one of BACKWARD, BACKWARD_TRANSITIVE, FORWARD,
  FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE

  https://docs.confluent.io/current/schema-registry/develop/api.html#put--config

  Returns string.

  ## Examples

      iex> ConfluentSchemaRegistry.update_compatibility(client, "test", "FULL")
      {:ok, "FULL"}

  """
  @spec update_compatibility(Tesla.Client.t, subject, level) ::
    {:ok, level} | {:error, code, reason}
  def update_compatibility(client, subject, level) do
    case do_put(client, "/config/#{subject}", %{compatibility: level}) do
      {:ok, %{"compatibility" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: #{inspect value}"}
      error -> error
    end
  end

  @doc ~S"""
  Get compatibility level for a subject.

  Level is a string which will be one of BACKWARD, BACKWARD_TRANSITIVE, FORWARD,
  FORWARD_TRANSITIVE, FULL, FULL_TRANSITIVE, NONE

  https://docs.confluent.io/current/schema-registry/develop/api.html#put--config

  Returns string.

  ## Examples

      iex> ConfluentSchemaRegistry.get_compatibility(client, "test")
      {:ok, "FULL"}

  """
  @spec get_compatibility(Tesla.Client.t, subject) ::
    {:ok, level} | {:error, code, reason}
  def get_compatibility(client, subject) do
    case do_get(client, "/config/#{subject}") do
      {:ok, %{"compatibilityLevel" => value}} -> {:ok, value}
      {:ok, value} -> {:error, 1, "Unexpected response: " <> value}
      error -> error
    end
  end


  # Internal utility functions

  @spec do_get(Tesla.Client.t, binary) :: {:ok, any} | {:error, code, reason}
  defp do_get(client, url) do
    tesla_response(Tesla.get(client, url))
  end

  @spec do_delete(Tesla.Client.t, binary) :: {:ok, any} | {:error, code, reason}
  defp do_delete(client, url) do
    tesla_response(Tesla.delete(client, url))
  end

  @spec do_post(Tesla.Client.t, binary, any) :: {:ok, any} | {:error, code, reason}
  defp do_post(client, url, data) when is_binary(data) do
    tesla_response(Tesla.post(client, url, data))
  end
  defp do_post(client, url, data) do
    case Jason.encode(data) do
      {:ok, encoded} ->
        do_post(client, url, encoded)
      {:error, reason} ->
        {:error, 0, reason}
    end
  end

  @spec do_put(Tesla.Client.t, binary, any) :: {:ok, any} | {:error, code, reason}
  defp do_put(client, url, data) when is_binary(data) do
    tesla_response(Tesla.put(client, url, data))
  end
  defp do_put(client, url, data) do
    case Jason.encode(data) do
      {:ok, encoded} ->
        do_put(client, url, encoded)
      {:error, reason} ->
        {:error, 0, reason}
    end
  end

  defp tesla_response({:ok, %{status: 200, body: body}}), do: {:ok, body}
  defp tesla_response({:ok, %{status: status, body: body}}), do: {:error, status, body}
  defp tesla_response({:error, reason}), do: {:error, 0, reason}

end