defmodule Tracer.Collector do
alias Tracer.Formatter
import Tracer.Utils
def ensure_started(node, unlink?) do
case rpc(node, :erlang, :whereis, [__MODULE__]) do
pid when is_pid(pid) -> {:already_started, pid}
:undefined -> {:started, start(node, unlink?)}
end
end
def start(node \\ node(), unlink?) do
local? = node == node()
pid = :erlang.spawn(node, __MODULE__, :init, [{self(), local?, unlink?}])
{rpc(node, :erlang, :register, [__MODULE__, pid]), node}
end
def configure(node, io, limit, formatter) do
call({__MODULE__, node}, {:configure, io, limit, formatter})
end
def trace(node, processes, trace_options) do
call({__MODULE__, node}, {:trace, processes, trace_options})
end
def trace_pattern(node, pattern) do
call({__MODULE__, node}, {:set, pattern})
end
def trace_and_set(node, processes, trace_options, pattern) do
call({__MODULE__, node}, {:trace_and_set, processes, trace_options, pattern})
end
def status(node), do: call({__MODULE__, node}, :status)
def clear_traces(node), do: call({__MODULE__, node}, :clear_traces)
def stop(node), do: call({__MODULE__, node}, :stop)
def init({parent, local?, unlink?}) do
unless unlink?, do: :erlang.monitor(:process, parent)
loop(%{
parent: parent,
local?: local?,
formatter: nil,
io: nil,
collect_state: %{},
limit: %{time: nil, rate: nil, overall: nil},
window: :os.timestamp(),
count: 0,
all_count: 0
})
end
def loop(state = %{parent: parent}) do
receive do
{{pid, ref} = _from, msg} ->
{action, answer, new_state} = handle_call(msg, state)
send(pid, {ref, answer})
case action do
:stop -> stop(:stop, state)
:reply -> loop(new_state)
end
{:DOWN, _, _, ^parent, _} ->
stop(:stop, state)
msg ->
handle_trace(msg, state) |> loop()
end
end
def handle_call({:configure, io, new_limit, formatter_opts}, %{formatter: formatter, limit: limit} = state) do
new_limit = :maps.merge(limit, new_limit)
{:reply, :ok, %{state | formatter: start_formatter(formatter, io, formatter_opts), io: io, limit: new_limit}}
end
def handle_call({:trace, processes, trace_options}, state) do
:erlang.trace(processes, true, [{:tracer, self()} | trace_options])
{:reply, :ok, state}
end
def handle_call({:set, pattern}, state) do
{:reply, set_pattern(pattern), state}
end
def handle_call({:trace_and_set, processes, trace_options, pattern}, state) do
:erlang.trace(processes, true, [{:tracer, self()} | trace_options])
{:reply, set_pattern(pattern), state}
end
def handle_call(:status, state) do
{:reply, state, state}
end
def handle_call(:clear_traces, state) do
{:reply, clear_traces(), state}
end
def handle_call(:stop, state) do
{:stop, :ok, state}
end
defp clear_traces() do
:erlang.trace(:all, false, [:all])
:erlang.trace_pattern({:_, :_, :_}, false, [:local, :meta, :call_count, :call_time])
:erlang.trace_pattern({:_, :_, :_}, false, [])
end
def stop(reason, %{local?: local?, formatter: formatter}) do
clear_traces()
send(formatter, {:flush, self()})
receive do
:flushed ->
if reason == :limit and local?, do: IO.puts("Tracer collector reached limit.")
exit({:shutdown, reason})
after
5000 -> exit({:shutdown, reason})
end
end
defp set_pattern(pattern) do
{{module, _, _} = pattern, match_options, global_options} = pattern
case :code.ensure_loaded(module) do
{:module, ^module} -> :erlang.trace_pattern(pattern, match_options, global_options)
{:error, _} = error -> error
end
end
def handle_trace(trace, state) when elem(trace, 0) == :trace_ts do
%{
limit: limit,
formatter: formatter,
window: window,
count: count,
all_count: all,
collect_state: collect_state
} = state
{add, trace, collect_state} = collect(trace, collect_state)
send(formatter, trace)
%{time: time, rate: rate, overall: overall} = limit
now = :os.timestamp()
delay = :timer.now_diff(now, window) |> div(1000)
cond do
all >= overall ->
stop(:limit, state)
delay > time ->
%{state | collect_state: collect_state, window: now, count: 0, all_count: all + add}
rate <= count ->
stop(:limit, state)
rate > count ->
%{state | collect_state: collect_state, count: count + 1, all_count: all + add}
end
end
defp collect({:trace_ts, pid, :call, mfa, timestamp} = trace, collect_state) do
{1, trace, remember_call(collect_state, pid, mfa, timestamp)}
end
defp collect({:trace_ts, pid, :call, mfa, _dump, timestamp} = trace, collect_state) do
{1, trace, remember_call(collect_state, pid, mfa, timestamp)}
end
defp collect({:trace_ts, pid, type, mfa, _return, timestamp} = trace, collect_state)
when type in [:exception_from, :return_from] do
[start_ts | calls_on_stack] = :maps.get({pid, mfa}, collect_state)
collect_state =
case calls_on_stack do
[] -> :maps.remove({pid, mfa}, collect_state)
_ -> %{collect_state | {pid, mfa} => calls_on_stack}
end
time_used = with {_, _, _} <- start_ts, do: :timer.now_diff(timestamp, start_ts)
{0, put_elem(trace, 5, time_used), collect_state}
end
defp collect(trace, collect_state) do
{1, trace, collect_state}
end
defp remember_call(collect_state, pid, {mod, fun, args}, timestamp) do
key = {pid, {mod, fun, length(args)}}
:maps.update_with(key, &[timestamp | &1], [timestamp], collect_state)
end
defp start_formatter(nil, io, options) do
formatter =
with nil <- options[:formatter] do
opts = Keyword.put_new(options[:format_opts] || [], :structs, false)
{Formatter.Base, :format_trace, [opts]}
end
if options[:formatter_local] do
node = :erlang.node(io)
:erlang.spawn_link(node, Formatter, :init, [io, formatter])
else
Formatter.start_link(io, formatter)
end
end
defp start_formatter(formatter, _io, options) do
if new_formatter = options[:formatter], do: send(formatter, {:formatter, new_formatter})
formatter
end
end