defmodule Loader do
@moduledoc "README.md"
|> File.read!()
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)
use Supervisor
@external_resource "README.md"
defmodule WorkResponse do
@moduledoc """
Internal data structure used to represent the results of executing a `WorkSpec`
## Properties
- `:data` - whatever important data is returned by the work
- `:response_time` - **must be an integer number, in microseconds**, which is the "client-side" view of how long the work took. I recommend using `System.monotonic_time/0` or `:timer.tc/1`
"""
defstruct [:data, :kind, :response_time]
@type t :: %__MODULE__{
data: any(),
kind: :ok | :error,
response_time: integer()
}
end
defmodule WorkSpec do
@moduledoc """
A specification for some "work" to do, to generate load.
"""
# TODO: should a `reason` be attached to the `is_success?` callback? so that a user can do something like `{false, "too slow"}`?
defstruct [:task, :is_success?]
@type t :: %__MODULE__{
task: (() -> term()) | mfa(),
is_success?: (Loader.WorkResponse.t() -> boolean())
}
end
@doc """
Start an instance of `Loader`
## Options
* `:name` - The name of your Loader instance. This field is required.
"""
def start_link(opts) do
name = opts[:name] || raise(ArgumentError, "must supply a name")
config = %{
dynamic_supervisor_name: dynamic_supervisor_name(name),
execution_store_name: execution_store_name(name),
task_supervisors_name: task_supervisors_name(name)
}
Supervisor.start_link(__MODULE__, config, name: :"#{name}.Supervisor")
end
def child_spec(opts) do
%{
id: opts[:name] || raise(ArgumentError, "must supply a name"),
start: {__MODULE__, :start_link, [opts]}
}
end
@impl Supervisor
def init(config) do
children = [
# {Loader.LocalReporter,
# name: LocalReporter,
# metrics: [
# Telemetry.Metrics.last_value("loader.task.stop.last_value",
# event_name: "loader.task.stop",
# measurement: :duration,
# unit: {:native, :microsecond}
# ),
# Telemetry.Metrics.counter("loader.task.stop.counter", event_name: "loader.task.stop", measurement: :duration),
# Telemetry.Metrics.sum("loader.task.stop.sum", event_name: "loader.task.stop", measurement: :duration),
# Telemetry.Metrics.summary("loader.task.stop.summary",
# reporter_options: [mode_rounding_places: 0, percentile_targets: [0, 10, 25, 75, 90, 95, 99]],
# event_name: "loader.task.stop",
# measurement: :duration,
# tags: [:scheduled_loader_ref, :work_spec, :instance_name],
# tag_values: fn metadata ->
# %{
# scheduled_loader_ref: metadata |> Map.get(:scheduled_loader_ref, "") |> inspect(),
# work_spec: metadata |> Map.get(:work_spec, "") |> inspect(),
# instance_name: Map.fetch!(metadata, :instance_name)
# }
# end,
# unit: {:native, :microsecond}
# ),
# Telemetry.Metrics.distribution("loader.task.stop.distribution",
# reporter_options: [buckets: {:percentiles, [0, 10, 25, 75, 90, 95, 99]}],
# event_name: "loader.task.stop",
# measurement: :duration,
# unit: {:native, :microsecond}
# ),
# Telemetry.Metrics.distribution("loader.load_profile_execution.stop.distribution",
# reporter_options: [buckets: {:percentiles, [0, 10, 25, 75, 90, 95, 99]}],
# event_name: "loader.load_profile_execution.stop",
# measurement: :duration,
# unit: {:native, :microsecond}
# )
# ]},
{Loader.ExecutionStore, name: config.execution_store_name},
{PartitionSupervisor, child_spec: Task.Supervisor, name: config.task_supervisors_name},
{DynamicSupervisor, name: config.dynamic_supervisor_name}
]
Supervisor.init(children, strategy: :one_for_one)
end
@doc """
Execute tasks defined by the `work_spec`, scheduled based on the `load_profile`. When provided with a list, all profiles will be executed concurrently.
See `Loader.LoadProfile` for more information on how to define a profile.
"""
@spec execute({Loader.LoadProfile.t(), Loader.WorkSpec.t()}, atom()) ::
DynamicSupervisor.on_start_child()
def execute({load_profile, work_spec}, instance_name) do
DynamicSupervisor.start_child(
dynamic_supervisor_name(instance_name),
Loader.ScheduledLoader.child_spec(load_profile: load_profile, work_spec: work_spec, instance_name: instance_name)
)
end
@spec execute([{Loader.LoadProfile.t(), Loader.WorkSpec.t()}], atom()) :: [
DynamicSupervisor.on_start_child()
]
def execute(profile_spec_pairs, instance_name) do
Enum.map(profile_spec_pairs, fn {profile, spec} -> execute({profile, spec}, instance_name) end)
end
@doc false
def execution_store_name(instance_name), do: :"#{instance_name}.ExecutionStore"
@doc false
def task_supervisors_name(instance_name), do: :"#{instance_name}.TaskSupervisors"
@doc false
def dynamic_supervisor_name(instance_name), do: :"#{instance_name}.DynamicSupervisor"
end