defmodule LoggerAmqpBackend do
@compile if Mix.env == :test, do: :export_all
use AMQP
@behaviour :gen_event
@default_state %{
name: nil,
format: nil,
level: nil,
metadata: nil,
metadata_filter: nil,
exchange: "",
queue: "logs",
amqp_channel: nil,
amqp_conn: nil,
amqp_url: "",
routing_key: "",
declare_queue: true,
durable: true,
queue_args: [],
buffered: []
}
@default_format "{\"time\": \"$time $date\", \"level\": \"$level\", \"message\": \"$message\", \"metadata\":\"$metadata\"}"
@reconnect_interval 10_000
def init({__MODULE__, name}) do
handle_info(:connect, configure(name, []))
end
def handle_call({:configure, opts}, %{name: name} = state) do
{:ok, :ok, configure(name, opts, state)}
end
def handle_event({level, _gl, {Logger, msg, ts, md}}, %{level: min_level, metadata_filter: metadata_filter} = state) do
if (is_nil(min_level) or Logger.compare_levels(level, min_level) != :lt) and metadata_matches?(md, metadata_filter) do
log_event(level, msg, ts, md, state)
else
{:ok, state}
end
end
def handle_event(:flush, %{amqp_channel: nil} = state) do
{:ok, state}
end
def handle_event(:flush, %{amqp_channel: chan, exchange: exchange, routing_key: routing_key, buffered: [buffered_msg | rest]} = state) do
# We're not buffering anything so this is a no-op
send_amqp(chan, exchange, routing_key, buffered_msg)
send(self(), :flush) # Continue until buffered is empty
{:ok, %{state | buffered: rest}}
end
def handle_event(:flush, %{buffered: []} = state) do
{:ok, state}
end
def handle_info(:connect, %{amqp_url: amqp_url, routing_key: routing_key, declare_queue: declare, durable: durable, queue_args: queue_args, buffered: buffered} = s) do
case Connection.open(amqp_url) do
{:ok, conn} ->
# Get notifications when the connection goes down
Process.monitor(conn.pid)
{:ok, chan} = Channel.open(conn)
if declare do
{:ok, _} = Queue.declare(chan, routing_key, durable: durable, arguments: queue_args)
end
send(self(), :flush)
{:ok, %{s | amqp_conn: conn, amqp_channel: chan, buffered: Enum.reverse(buffered)}}
{:error, _} ->
# Retry later
Process.send_after(self(), :connect, @reconnect_interval)
{:ok, %{s | amqp_conn: nil}}
end
end
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
# Stop GenServer. Will be restarted by Supervisor.
{:stop, {:connection_lost, reason}, nil}
end
def handle_info(_, state) do
{:ok, state}
end
# Helpers
defp log_event(_level, _msg, _ts, _md, %{amqp_url: nil} = state) do
{:ok, state}
end
defp log_event(level, msg, ts, md, %{amqp_channel: chan, exchange: exchange, routing_key: routing_key, buffered: buffered} = state) do
output = format_event(level, msg, ts, md, state)
case chan do
nil ->
{:ok, %{state | buffered: [output | buffered]}}
_ ->
send_amqp(chan, exchange, routing_key, output)
{:ok, state}
end
end
defp send_amqp(chan, exchange, routing_key, output) when is_list(output) do
AMQP.Basic.publish(chan, exchange, routing_key, IO.chardata_to_string(output))
end
defp send_amqp(chan, exchange, routing_key, output) when is_binary(output) do
AMQP.Basic.publish(chan, exchange, routing_key, output)
end
def format_json(level, msg, ts, md) do
{date, time} = ts
timestamp = IO.iodata_to_binary([Logger.Formatter.format_date(date), " ", Logger.Formatter.format_time(time)])
json_message = %{
"level" => level,
"message" => msg,
"timestamp" => timestamp,
}
IO.puts("METADATA #{inspect(md)}")
Enum.reduce(md, json_message, fn {k, v}, acc ->
Map.put(acc, k, v)
end)
|> Jason.encode!()
end
defp format_event(level, msg, ts, md, %{format: format, metadata: keys}) do
Logger.Formatter.format(format, level, msg, ts, take_metadata(md, keys))
end
@doc false
@spec metadata_matches?(Keyword.t, nil|Keyword.t) :: true|false
def metadata_matches?(_md, nil), do: true
def metadata_matches?(_md, []), do: true # all of the filter keys are present
def metadata_matches?(md, [{key, val}|rest]) do
case Keyword.fetch(md, key) do
{:ok, ^val} ->
metadata_matches?(md, rest)
_ -> false #fail on first mismatch
end
end
defp take_metadata(metadata, :all), do: metadata
defp take_metadata(metadata, keys) do
metadatas = Enum.reduce(keys, [], fn key, acc ->
case Keyword.fetch(metadata, key) do
{:ok, val} -> [{key, val} | acc]
:error -> acc
end
end)
Enum.reverse(metadatas)
end
defp configure(name, opts) do
configure(name, opts, @default_state)
end
defp configure(name, opts, state) do
env = Application.get_env(:logger, name, [])
opts = Keyword.merge(env, opts)
Application.put_env(:logger, name, opts)
level = Keyword.get(opts, :level, :info)
metadata = Keyword.get(opts, :metadata, [])
format_opts = Keyword.get(opts, :format, @default_format)
format = case format_opts do
:json ->
{__MODULE__, :format_json}
fo ->
Logger.Formatter.compile(fo)
end
amqp_url = Keyword.get(opts, :amqp_url)
metadata_filter = Keyword.get(opts, :metadata_filter, nil)
durable = Keyword.get(opts, :durable, true)
declare_queue = Keyword.get(opts, :declare_queue, true)
queue_args = Keyword.get(opts, :queue_args, [])
exchange = Keyword.get(opts, :exchange, "")
routing_key = Keyword.get(opts, :routing_key, Atom.to_string(name))
%{state |
name: name,
amqp_url: amqp_url,
format: format,
level: level,
metadata: metadata,
metadata_filter: metadata_filter,
exchange: exchange,
routing_key: routing_key,
durable: durable,
declare_queue: declare_queue,
queue_args: queue_args,
}
end
end