defmodule Svadilfari do
@moduledoc ~S"""
A logger backend that logs messages to Grafana Loki.
## Options
* `:level` - the level to be logged by this backend.
Note that messages are filtered by the general
`:level` configuration for the `:logger` application first.
* `:format` - the format message used to print logs.
Defaults to: `"\n$time $metadata[$level] $message\n"`.
It may also be a `{module, function}` tuple that is invoked
with the log level, the message, the current timestamp and
the metadata and must return `t:IO.chardata/0`. See
`Logger.Formatter`.
* `:metadata` - the metadata to be printed by `$metadata`.
Defaults to an empty list (no metadata).
Setting `:metadata` to `:all` prints all metadata. See
the "Metadata" section for more information.
* `:max_buffer` - maximum events to buffer while waiting
for the client to successfully send the logs to Grafana Loki.
Once the buffer is full, the backend will block until
a confirmation is received.
* `:labels` - A list of {String.t, String.t} tuples that represents Grafana Loki labels.
* `:client` - a keyword list of the following options:
* `url` - The URL to which logs should be pushed. The `loki/api/v1/push` path is inferred
and does not need to be specified.
* `opts` - Svadilfari uses Sleipnir's `Sleipnir.Client.Tesla` client under the hood.
Opts can be passed to it here.
Here's an example of how to configure the `Svadilfari` backend in a
`config/config.exs` file:
config :logger, :backends, [:console, Svadilfari]
config :logger, :svadilfari,
format: "\n$time $metadata[$level] $message\n",
metadata: [:user_id]
"""
@behaviour :gen_event
alias Sleipnir.Client.Tesla, as: TeslaClient
@default_level nil
@default_format Logger.Formatter.compile(nil)
@default_max_buffer 100
@default_metadata []
@default_url "http://localhost:3100"
@type t :: %__MODULE__{
buffer: list(Sleipnir.entry()),
buffer_size: non_neg_integer(),
format: term(),
level: Logger.level(),
max_buffer: non_neg_integer(),
metadata: Keyword.t(),
labels: list({String.t(), String.t()}),
client: struct(),
ref: reference(),
output: term()
}
@enforce_keys [
:labels,
:client
]
defstruct buffer: [],
buffer_size: 0,
format: @default_format,
level: @default_level,
max_buffer: @default_max_buffer,
metadata: @default_metadata,
labels: nil,
client: nil,
ref: nil,
output: nil
@impl true
def init(__MODULE__) do
config = Application.get_env(:logger, :svadilfari)
{:ok, do_init(config)}
end
def init({__MODULE__, opts}) when is_list(opts) do
config = configure_merge(Application.get_env(:logger, :svadilfari), opts)
{:ok, do_init(config)}
end
@impl true
def handle_call({:configure, options}, state) do
{:ok, :ok, configure(options, state)}
end
@impl true
def handle_event({level, _gl, {Logger, msg, ts, md}}, state) do
%{level: log_level, buffer_size: buffer_size, max_buffer: max_buffer} = state
{:erl_level, level} = List.keyfind(md, :erl_level, 0, {:erl_level, level})
cond do
not meet_level?(level, log_level) ->
{:ok, state}
is_nil(state.ref) ->
{:ok, log_event(level, msg, ts, md, state)}
buffer_size < max_buffer ->
{:ok, buffer_event(level, msg, ts, md, state)}
buffer_size === max_buffer ->
state = buffer_event(level, msg, ts, md, state)
{:ok, await_io(state)}
end
end
def handle_event(:flush, state) do
{:ok, flush(state)}
end
def handle_event(_, state) do
{:ok, state}
end
@impl true
def handle_info({:io_reply, ref, msg}, %{ref: ref} = state) do
{:ok, handle_io_reply(msg, state)}
end
def handle_info(_, state) do
{:ok, state}
end
@impl true
def code_change(_old_vsn, state, _extra) do
{:ok, state}
end
@impl true
def terminate(_reason, _state) do
:ok
end
## Helpers
defp meet_level?(_lvl, nil), do: true
defp meet_level?(lvl, min) do
Logger.compare_levels(lvl, min) != :lt
end
defp configure(opts, state) do
config = configure_merge(Application.get_env(:logger, :svadilfari), opts)
Application.put_env(:logger, :svadilfari, config)
do_init(config, state)
end
defp to_config(opts) do
level = Keyword.get(opts, :level, @default_level)
format = Logger.Formatter.compile(Keyword.get(opts, :format))
metadata = Keyword.get(opts, :metadata, @default_metadata) |> configure_metadata()
max_buffer = Keyword.get(opts, :max_buffer, @default_max_buffer)
labels = Keyword.fetch!(opts, :labels)
client =
Keyword.fetch!(opts, :client)
|> case do
client when is_struct(client) ->
client
client_opts when is_list(client_opts) ->
opts = Keyword.get(client_opts, :opts, [])
client_opts
|> Keyword.get(:url, @default_url)
|> TeslaClient.new(opts)
end
[
level: level,
format: format,
metadata: metadata,
max_buffer: max_buffer,
labels: labels,
client: client
]
end
defp do_init(opts, state \\ nil) do
config = to_config(opts)
case state do
nil -> Kernel.struct!(__MODULE__, config)
term -> Kernel.struct!(term, config)
end
end
defp configure_metadata(:all), do: :all
defp configure_metadata(metadata), do: Enum.reverse(metadata)
defp configure_merge(env, options) do
Keyword.merge(env, options, fn
_, _v1, v2 -> v2
end)
end
defp log_event(level, msg, ts, md, state) do
output = [format_entry(level, msg, ts, md, state)]
%{state | ref: async_io(state.client, output, state.labels), output: output}
end
defp buffer_event(level, msg, ts, md, state) do
%{buffer: buffer, buffer_size: buffer_size} = state
buffer = [format_entry(level, msg, ts, md, state) | buffer]
%{state | buffer: buffer, buffer_size: buffer_size + 1}
end
defp async_io(client, output, labels) do
ref = make_ref()
Svadilfari.Async.send(client, self(), ref, output, labels)
ref
end
defp await_io(%{ref: nil} = state), do: state
defp await_io(%{ref: ref} = state) do
receive do
{:io_reply, ^ref, :ok} -> handle_io_reply(:ok, state)
{:io_reply, ^ref, error} -> handle_io_reply(error, state) |> await_io()
end
end
defp format_entry(level, msg, ts, md, state) do
timestamp = Sleipnir.Timestamp.from(ts)
level
|> format_event(msg, ts, md, state)
|> Sleipnir.entry(timestamp)
end
defp format_event(level, msg, ts, md, %__MODULE__{format: format, metadata: keys}) do
format
|> Logger.Formatter.format(level, msg, ts, take_metadata(md, keys))
|> IO.iodata_to_binary()
end
defp take_metadata(metadata, :all) do
metadata
end
defp take_metadata(metadata, keys) do
Enum.reduce(keys, [], fn key, acc ->
case Keyword.fetch(metadata, key) do
{:ok, val} -> [{key, val} | acc]
:error -> acc
end
end)
end
defp log_buffer(%{buffer_size: 0, buffer: []} = state), do: state
defp log_buffer(state) do
%{
state
| ref: async_io(state.client, state.buffer, state.labels),
buffer: [],
buffer_size: 0,
output: state.buffer
}
end
defp handle_io_reply(:ok, state) do
log_buffer(%{state | ref: nil, output: nil})
end
defp handle_io_reply(reason, _) do
raise "failure while logging to Loki: " <> inspect(reason)
end
defp flush(%{ref: nil} = state), do: state
defp flush(state) do
state
|> await_io()
|> flush()
end
end