defmodule Chaperon.LoadTest do
@moduledoc """
Implementation & helper module for defining load tests.
A load test defines a list of scenarios and their config to run them with.
## Example
defmodule LoadTest.Staging do
use Chaperon.LoadTest
# You can define a default config that is used by default.
# Any additional config passed at runtime will be merged into this.
# If default_config/0 is not defined, it defaults to %{}.
def default_config, do: %{
scenario_timeout: 15_000,
base_url: "http://staging.mydomain.com"
}
def scenarios, do: [
# session name is "my_session_name"
{MyScenarioModule, "my_session_name", %{
delay: 2 |> seconds,
my_config_key: "my_config_val"
}},
# will get an assigned session name based on module name and UUID
{MyScenarioModule, %{
delay: 10 |> seconds,
my_config_key: "my_config_val"
}},
# same as above but spawned 10 times (across the cluster):
{{10, MyScenarioModule}, "my_session_name", %{
random_delay: 5 |> seconds,
my_config_key: "my_config_val"
}},
# run Scenario A, followed by Scenario B as a new scenario
{[A, B], %{
# ...
}},
# same as above, but spawned 10 times
{{10, [A, B]}, %{
# ...
}}
]
end
"""
defstruct name: nil,
scenarios: [],
config: %{}
@type t :: %Chaperon.LoadTest{
name: atom,
scenarios: [Chaperon.Scenario.t()],
config: map
}
@type lt_conf :: module | %{name: String.t(), scenarios: [Chaperon.Scenario.t()], config: map}
defmodule Results do
@moduledoc """
LoadTest results struct.
"""
defstruct load_test: nil,
start_ms: nil,
end_ms: nil,
duration_ms: nil,
sessions: [],
max_timeout: nil,
timed_out: nil
@type t :: %Chaperon.LoadTest.Results{
load_test: module,
start_ms: integer,
end_ms: integer,
duration_ms: integer,
sessions: [Chaperon.Session.t()],
timed_out: integer
}
end
defmacro __using__(_opts) do
quote do
require Chaperon.LoadTest
import Chaperon.LoadTest
import Chaperon.Timing
end
end
alias Chaperon.{Session, Scenario, Worker}
alias Chaperon.LoadTest.Results
require Logger
@spec run(lt_conf(), map) :: Chaperon.LoadTest.Results.t()
def run(lt_mod, extra_config \\ %{}) do
start_time = Chaperon.Timing.timestamp()
{timeout, sessions, timed_out} =
lt_mod
|> start_workers_with_config(extra_config)
|> await_workers
end_time = Chaperon.Timing.timestamp()
%Results{
load_test: lt_mod,
start_ms: start_time,
end_ms: end_time,
duration_ms: end_time - start_time,
sessions: sessions,
max_timeout: timeout,
timed_out: timed_out
}
end
@spec default_config(lt_conf()) :: any()
def default_config(lt_conf) when is_map(lt_conf) do
lt_conf[:default_config] || %{}
end
def default_config(lt_mod) do
if lt_mod.module_info(:exports)[:default_config] do
lt_mod.default_config
else
%{}
end
end
defp scenarios(%{scenarios: scenarios}), do: scenarios
defp scenarios(lt_mod) when is_atom(lt_mod), do: lt_mod.scenarios()
@spec name(lt_conf()) :: String.t()
def name(lt_mod), do: Chaperon.Util.module_name(lt_mod)
defp start_workers_with_config(lt_mod, extra_config) do
lt_mod
|> scenarios
|> Enum.map(fn
{scenario, name, scenario_config} ->
config =
lt_mod
|> default_config
|> DeepMerge.deep_merge(scenario_config)
|> DeepMerge.deep_merge(extra_config)
start_worker(scenario, Map.put(config, :session_name, name))
{scenario, scenario_config} ->
config =
lt_mod
|> default_config
|> DeepMerge.deep_merge(scenario_config)
|> DeepMerge.deep_merge(extra_config)
start_worker(scenario, config)
end)
|> List.flatten()
end
def start_worker({concurrency, scenarios}, config)
when is_list(scenarios) do
config = Scenario.Sequence.config_for(scenarios, config)
concurrency
|> Worker.start(Scenario.Sequence, config)
|> Enum.map(&{&1, config})
end
def start_worker(scenarios, config)
when is_list(scenarios) do
config = Scenario.Sequence.config_for(scenarios, config)
w = Worker.start(Scenario.Sequence, config)
{w, config}
end
def start_worker({concurrency, scenario}, config) do
concurrency
|> Worker.start(scenario, config)
|> Enum.map(&{&1, config})
end
def start_worker(scenario, config) do
w = Worker.start(scenario, config)
{w, config}
end
def await_workers(tasks_with_config) do
case max_timeout(tasks_with_config) do
:infinity ->
sessions =
tasks_with_config
|> Enum.map(fn {task, config} ->
Task.await(task, Chaperon.Worker.timeout(config))
end)
{:infinity, sessions, 0}
timeout when is_integer(timeout) ->
results =
tasks_with_config
|> worker_tasks
|> Task.yield_many(timeout)
|> Enum.map(fn {task, res} ->
res || Task.shutdown(task, :brutal_kill)
end)
sessions = for {:ok, session} <- results, do: session
sessions = sessions |> Enum.reject(&is_nil/1)
timed_out_count = Enum.count(tasks_with_config) - Enum.count(sessions)
{timeout, sessions, timed_out_count}
end
end
defp worker_tasks(tasks_with_config) do
for {task, _config} <- tasks_with_config, do: task
end
defp max_timeout(tasks_with_config) do
timeout =
tasks_with_config
|> Enum.reduce(nil, fn {_, config}, last_timeout ->
case {last_timeout, Chaperon.Worker.timeout(config)} do
{:infinity, _} ->
:infinity
{_, :infinity} ->
:infinity
{nil, t} when is_integer(t) ->
t
{last, t} when is_integer(last) and is_integer(t) and t > last ->
t
_ ->
last_timeout
end
end)
timeout || :infinity
end
def timeout(lt_mod) do
default_config(lt_mod)[:load_test_timeout] || :infinity
end
@doc """
Merges metrics & results of all `Chaperon.Session`s in a list.
"""
@spec merge_sessions(Results.t()) :: Session.t()
def merge_sessions(results = %Results{sessions: [], max_timeout: timeout}) do
Logger.warn(
"No scenario task finished in time (timeout = #{timeout}) for load_test: #{
inspect(results.load_test)
}"
)
%Session{}
end
def merge_sessions(r = %Results{sessions: [nil | sessions]}) do
Logger.warn("Skipping timed out session while merging session metrics & results")
merge_sessions(%{r | sessions: sessions})
end
def merge_sessions(%Results{sessions: [s | sessions]}) do
sessions
|> Enum.reduce(s |> prepare_merge, &Session.merge(&2, &1))
end
@doc """
Prepares `session` to be merged.
This wraps all metrics and results with the session's name to be able to
differentiate later on for which session they were recorded.
"""
@spec prepare_merge(Session.t()) :: Session.t()
def prepare_merge(session) do
%{
session
| metrics: session |> Session.session_metrics(),
results: session |> Session.session_results()
}
end
end