# Copyright 2020 Kevin Bader
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
defmodule Cloudevents do
@external_resource "README.md"
@moduledoc File.read!("README.md")
alias Cloudevents.Format
alias Cloudevents.HttpBinding
alias Cloudevents.KafkaBinding
@typedoc "Cloudevent"
@type t :: Format.V_1_0.Event.t() | Format.V_0_2.Event.t() | Format.V_0_1.Event.t()
@typedoc "Configuration parameters for encoding and decoding of data"
@type options :: [option]
@typedoc "Configuration parameter"
@type option ::
{:confluent_schema_registry_url, confluent_schema_registry_url}
| {:avro_schemas_path, avro_schemas_path}
| {:avro_cache_ttl, avro_cache_ttl}
| {:avro_event_schema_name, avro_event_schema_name}
@typedoc "Confluent Schema Registry URL for resolving Avro schemas by ID"
@type confluent_schema_registry_url :: String.t()
@typedoc "Base path for locally stored schema files (default `./priv/schemas`)"
@type avro_schemas_path :: String.t()
@typedoc "Time in ms to cache Avro schemas in memory (default `300_000`)"
@type avro_cache_ttl :: non_neg_integer()
@typedoc "Name of the Avro-schema used to encode events"
@type avro_event_schema_name :: String.t()
@typedoc "HTTP body"
@type http_body :: binary()
@typedoc "HTTP headers"
@type http_headers :: [{String.t(), String.t()}]
@typedoc "Kafka body"
@type kafka_body :: binary()
@typedoc "Kafka headers"
@type kafka_headers :: [{String.t(), String.t()}]
use Supervisor
@doc """
Runs the `cloudevents` supervisor; needed for Avro support and its schema caching.
"""
@spec start_link(options) :: {:ok, pid} | {:error, any}
def start_link(opts) do
if Code.ensure_loaded?(Avrora) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
else
{:error, "Avrora dependency is required for Avro support"}
end
end
@doc """
Stops the `cloudevents` supervisor.
"""
def stop do
Supervisor.stop(__MODULE__)
end
@impl Supervisor
def init(opts) do
# e.g. http://localhost:8081, default to http://, so it's more user friendly
registry_url =
opts |> Keyword.get(:confluent_schema_registry_url) |> registry_url_with_http_schema()
Application.put_env(:avrora, :registry_url, registry_url, persistent: true)
schemas_path = Keyword.get(opts, :avro_schemas_path, Path.expand("./priv/schemas"))
Application.put_env(:avrora, :schemas_path, schemas_path, persistent: true)
names_cache_ttl = Keyword.get(opts, :avro_cache_ttl, :timer.minutes(5))
Application.put_env(:avrora, :names_cache_ttl, names_cache_ttl, persistent: true)
children = [
Avrora,
{Cloudevents.Config, opts}
]
Supervisor.init(children, strategy: :one_for_all)
end
def handle_call(:avro_event_schema_name, _from, state) do
reply =
case Keyword.get(state, :avro_event_schema_name) do
nil -> {:error, :unset}
schema -> {:ok, schema}
end
{:ok, reply}
end
# ---
defp registry_url_with_http_schema("http://" <> _registry_host = registry_url), do: registry_url
defp registry_url_with_http_schema("https://" <> _registry_host = registry_url),
do: registry_url
defp registry_url_with_http_schema(registry_url) when is_binary(registry_url),
do: "http://" <> registry_url
defp registry_url_with_http_schema(nil), do: nil
@doc """
Converts an Elixir map into a Cloudevent.
## Examples
iex> Cloudevents.from_map(%{"specversion" => "1.0", "type" => "test", "source" => "test", "id" => "1"})
{:ok,
%Cloudevents.Format.V_1_0.Event{
data: nil,
datacontenttype: nil,
dataschema: nil,
extensions: %{},
id: "1",
source: "test",
specversion: "1.0",
subject: nil,
time: nil,
type: "test"
}}
iex> Cloudevents.from_map(%{specversion: "1.0", type: "test", source: "test", id: "1"})
{:ok,
%Cloudevents.Format.V_1_0.Event{
data: nil,
datacontenttype: nil,
dataschema: nil,
extensions: %{},
id: "1",
source: "test",
specversion: "1.0",
subject: nil,
time: nil,
type: "test"
}}
"""
@spec from_map(map :: %{required(atom) => any} | %{required(String.t()) => any}) ::
{:ok, t()} | {:error, %Cloudevents.Format.ParseError{}}
defdelegate from_map(map), to: Format.Decoder.Map, as: :decode
@doc "Converts an Elixir map into a Cloudevent and panics otherwise."
@spec from_map!(map :: %{required(atom) => any} | %{required(String.t()) => any}) ::
t()
def from_map!(map) do
{:ok, event} = from_map(map)
event
end
# ---
@doc """
Converts a Cloudevent into an Elixir map. See also `Cloudevents.from_map/1`.
## Examples
iex> Cloudevents.to_map(Cloudevents.from_map!(%{"specversion" => "1.0", "type" => "test", "source" => "test", "id" => "1"}))
%{specversion: "1.0", type: "test", source: "test", id: "1"}
"""
@spec to_map(t()) :: %{required(atom) => any}
defdelegate to_map(cloudevent), to: Format.Encoder.Map, as: :convert
# ---
@doc "Decodes a JSON-encoded Cloudevent."
@spec from_json(json :: binary()) ::
{:ok, t()} | {:error, %Cloudevents.Format.Decoder.DecodeError{}}
defdelegate from_json(json), to: Format.Decoder.JSON, as: :decode
@doc "Decodes a JSON-encoded Cloudevent and panics otherwise."
@spec from_json!(json :: binary()) :: t()
def from_json!(json) do
{:ok, event} = from_json(json)
event
end
# ---
@doc "Encodes a Cloudevent using JSON format."
@spec to_json(t()) :: binary()
defdelegate to_json(cloudevent), to: Format.Encoder.JSON, as: :encode
# ---
@doc """
Decodes an Avro-encoded Cloudevent (requires `Cloudevents.start_link/1`).
TODO: tests/examples
"""
@spec from_avro(avro :: binary(), ctx_attrs :: map) ::
{:ok, t()} | {:error, %Cloudevents.Format.Decoder.DecodeError{}}
defdelegate from_avro(avro, ctx_attrs), to: Format.Decoder.Avro, as: :decode
# ---
@doc """
Encodes a Cloudevent using Avro binary encoding (requires `Cloudevents.start_link/1`).
TODO: tests/examples
"""
@spec to_avro(t()) :: {:ok, binary()} | {:error, term()}
defdelegate to_avro(avro), to: Format.Encoder.Avro, as: :encode
# ---
@doc ~S"""
Parses a HTTP request as one or more Cloudevents.
Note that the HTTP request may contain more than one event (called a "batch"). Because of this, the function always returns a _list_ of Cloudevents. Use pattern matching if you expect single events only:
with {:ok, [the_event]} = from_http_message(body, headers) do
"do something with the_event"
else
{:ok, events} -> "oops got a batch of events"
{:error, error} -> "failed to parse HTTP request: #{inspect(error)}"
end
"""
@spec from_http_message(http_body, http_headers) ::
{:ok, [t()]} | {:error, any}
defdelegate from_http_message(http_body, http_headers), to: HttpBinding.Decoder
# ---
@doc """
Serialize an event in HTTP binary content mode.
Binary mode basically means: the payload is in the body and the metadata is in the header.
iex> event = Cloudevents.from_map!(%{
...> specversion: "1.0",
...> type: "some-type",
...> source: "some-source",
...> id: "1",
...> data: %{"foo" => "bar"}})
iex> {_body, _headers} = Cloudevents.to_http_binary_message(event)
{
"{\\"foo\\":\\"bar\\"}",
[
{"content-type", "application/json"},
{"ce-specversion", "1.0"},
{"ce-type", "some-type"},
{"ce-source", "some-source"},
{"ce-id", "1"}
]
}
"""
@spec to_http_binary_message(t()) :: {http_body, http_headers}
defdelegate to_http_binary_message(event),
to: HttpBinding.V_1_0.Encoder,
as: :to_binary_content_mode
# ---
@doc """
Serialize an event in HTTP structured content mode.
Structured mode basically means: the full event - payload and metadata - is in the body.
"""
@spec to_http_structured_message(t(), event_format :: :json | :avro_binary) ::
{:ok, {http_body, http_headers}}
defdelegate to_http_structured_message(event, event_format),
to: HttpBinding.V_1_0.Encoder,
as: :to_structured_content_mode
# ---
# @doc "Serialize one or more events in HTTP batched content mode."
# @spec to_batched_http_message([t()]) :: {http_body, http_headers}
# defdelegate to_batched_http_message(events), to: HttpBinding.V_1_0.Encoder
# ---
@doc "Parses a Kafka message as a Cloudevent."
@spec from_kafka_message(kafka_body, kafka_headers) ::
{:ok, t()} | {:error, any}
defdelegate from_kafka_message(kafka_body, kafka_headers),
to: KafkaBinding.Decoder
# ---
@doc """
Serialize an event in Kafka binary content mode.
Binary mode basically means: the payload is in the body and the metadata is in the header.
iex> event = Cloudevents.from_map!(%{
...> specversion: "1.0",
...> type: "some-type",
...> source: "some-source",
...> id: "1",
...> data: %{"foo" => "bar"}})
iex> {_body, _headers} = Cloudevents.to_kafka_binary_message(event)
{
"{\\"foo\\":\\"bar\\"}",
[
{"content-type", "application/json"},
{"ce_specversion", "1.0"},
{"ce_type", "some-type"},
{"ce_source", "some-source"},
{"ce_id", "1"}
]
}
"""
@spec to_kafka_binary_message(t()) :: {kafka_body, kafka_headers}
defdelegate to_kafka_binary_message(event),
to: KafkaBinding.V_1_0.Encoder,
as: :to_binary_content_mode
# ---
@doc """
Serialize an event in Kafka structured content mode.
Structured mode basically means: the full event - payload and metadata - is in the body.
iex> event = Cloudevents.from_map!(%{
...> specversion: "1.0",
...> type: "some-type",
...> source: "some-source",
...> id: "1",
...> data: %{"foo" => "bar"}})
iex> {:ok, {body, headers}} = Cloudevents.to_kafka_structured_message(event, :json)
iex> {body, headers}
{
"{\\"data\\":{\\"foo\\":\\"bar\\"},\\"datacontenttype\\":\\"application/json\\",\\"id\\":\\"1\\",\\"source\\":\\"some-source\\",\\"specversion\\":\\"1.0\\",\\"type\\":\\"some-type\\"}",
[{"content-type", "application/cloudevents+json"}]
}
Note that Avro encoding requires a preceding call to `Cloudevents.start_link/1`.
"""
@spec to_kafka_structured_message(t(), event_format :: :json | :avro_binary) ::
{:ok, {kafka_body, kafka_headers}} | {:error, term}
defdelegate to_kafka_structured_message(event, event_format),
to: KafkaBinding.V_1_0.Encoder,
as: :to_structured_content_mode
end