defmodule Luminous.Query do
@moduledoc """
A query is embedded in a panel and contains a function
which will be executed upon panel refresh to fetch the query's data.
"""
alias Luminous.{TimeRange, Variable}
defmodule Attributes do
@moduledoc """
This struct collects all the attributes that apply to a particular Dataset.
It is specified in the `attrs` argument of `Luminous.Query.Result.new/2`.
"""
@type t :: %__MODULE__{
type: :line | :bar,
order: non_neg_integer() | nil,
fill: boolean(),
unit: binary()
}
@derive Jason.Encoder
defstruct [:type, :order, :fill, :unit]
@spec define(Keyword.t()) :: t()
def define(opts) do
%__MODULE__{
type: Keyword.get(opts, :type, :line),
order: Keyword.get(opts, :order),
fill:
if(Keyword.has_key?(opts, :fill),
do: Keyword.get(opts, :fill),
else: true
),
unit: Keyword.get(opts, :unit)
}
end
@spec define() :: t()
def define(), do: define([])
end
defmodule DataSet do
@moduledoc """
A `DataSet` essentially wraps a list of 1-d or 2-d data points
that has a label and a type (for visualization).
"""
@type type :: :line | :bar
@type value :: Decimal.t() | binary()
@type row :: %{y: value()} | %{x: any(), y: value()}
@type t :: %__MODULE__{
rows: [row()],
label: binary(),
attrs: Attributes.t()
}
@derive Jason.Encoder
defstruct [:rows, :label, :attrs]
@spec new([row()], atom() | binary(), Attributes.t() | nil) :: t()
def new(rows, label, attrs \\ nil) do
%__MODULE__{
rows: rows,
label: to_string(label),
attrs: if(is_nil(attrs), do: Attributes.define(), else: attrs)
}
end
@doc """
Extract and return the first value out of rows.
"""
@spec first_value(t()) :: nil | any()
def first_value(%{rows: []}), do: nil
def first_value(%{rows: [%{y: val} | _]}), do: val
@doc """
Calculate and return the basic statistics of the dataset in one pass (loop).
"""
@spec statistics(t()) :: %{
label: binary(),
min: any(),
max: any(),
n: non_neg_integer(),
sum: Decimal.t() | nil,
avg: Decimal.t() | nil
}
def statistics(dataset) do
init_stats = %{n: 0, sum: nil, min: nil, max: nil, max_decimal_digits: 0}
stats =
Enum.reduce(dataset.rows, init_stats, fn %{y: y}, stats ->
min = Map.fetch!(stats, :min) || y
max = Map.fetch!(stats, :max) || y
sum = Map.fetch!(stats, :sum)
n = Map.fetch!(stats, :n)
max_decimal_digits = Map.fetch!(stats, :max_decimal_digits)
new_sum =
case {sum, y} do
{nil, y} -> y
{sum, nil} -> sum
{sum, y} -> Decimal.add(y, sum)
end
decimal_digits =
with y when not is_nil(y) <- y,
[_, dec] <- Decimal.to_string(y, :normal) |> String.split(".") do
String.length(dec)
else
_ -> 0
end
stats
|> Map.put(:min, if(!is_nil(y) && Decimal.lt?(y, min), do: y, else: min))
|> Map.put(:max, if(!is_nil(y) && Decimal.gt?(y, max), do: y, else: max))
|> Map.put(:sum, new_sum)
|> Map.put(:n, if(is_nil(y), do: n, else: n + 1))
|> Map.put(
:max_decimal_digits,
if(decimal_digits > max_decimal_digits, do: decimal_digits, else: max_decimal_digits)
)
end)
# we use this to determine the rounding for the average dataset value
max_decimal_digits = Map.fetch!(stats, :max_decimal_digits)
# calculate the average
avg =
cond do
stats[:n] == 0 ->
nil
is_nil(stats[:sum]) ->
nil
true ->
Decimal.div(stats[:sum], Decimal.new(stats[:n])) |> Decimal.round(max_decimal_digits)
end
stats
|> Map.put(:avg, avg)
|> Map.put(:label, dataset.label)
|> Map.delete(:max_decimal_digits)
end
@doc """
Override the dataset's unit with the provided string only if it's not already present.
"""
@spec maybe_override_unit(t(), binary()) :: t()
def maybe_override_unit(%{attrs: %{unit: nil}} = dataset, unit) do
attrs = Map.put(dataset.attrs, :unit, unit)
Map.put(dataset, :attrs, attrs)
end
def maybe_override_unit(dataset, _), do: dataset
end
defmodule Result do
@moduledoc """
A query Result wraps a columnar data frame with multiple variables.
`attrs` is a map where keys are variable labels (as specified
in the query's select statement) and values are keyword lists with
visualization properties for the corresponding `DataSet`. See
`Luminous.Query.DataSet.new/3` for details.
"""
@type label :: atom() | binary()
@type value :: number() | Decimal.t() | binary()
@type point :: {label(), value()}
@type row :: [point()]
@type t :: %__MODULE__{
rows: row(),
attrs: %{binary() => Attributes.t()}
}
@enforce_keys [:rows, :attrs]
defstruct [:rows, :attrs]
@doc """
This function can be called in the following ways:
- with a list of rows, i.e. a list of lists containing 2-tuples {label, value}
- with a single row, i.e. a list of 2-tuples of the form {label, value} (e.g. in the case of single- or multi- stats)
- with a single value (for use in a single-valued stat panel with no label)
"""
@spec new([row()] | row() | point() | value(), Keyword.t()) :: t()
def new(_, opts \\ [])
def new(rows, opts) when is_list(rows) do
%__MODULE__{
rows: rows,
attrs: Keyword.get(opts, :attrs, %{})
}
end
def new({_, _} = row, opts), do: new([row], opts)
def new(value, opts) do
%__MODULE__{
rows: value,
attrs: Keyword.get(opts, :attrs, %{})
}
end
@doc """
Transform the query Result (multiple variables as columns) to a list of Datasets
timestamps are converted to unix time in milliseconds (JS-compatible).
"""
@spec transform(t()) :: [DataSet.t()]
def transform(%__MODULE__{rows: rows} = result) when is_list(rows) do
# first, let's see if there's a specified ordering in var attrs
order =
Enum.reduce(result.attrs, %{}, fn {label, attrs}, acc ->
Map.put(acc, label, attrs.order)
end)
result.rows
|> extract_labels()
|> Enum.map(fn label ->
data =
Enum.map(result.rows, fn row ->
{x, y} =
case row do
# chart: row is a list of {label, value} tuples
l when is_list(l) ->
x =
case Keyword.get(row, :time) do
%DateTime{} = time -> DateTime.to_unix(time, :millisecond)
_ -> nil
end
y =
Enum.find_value(l, fn
{^label, value} -> value
_ -> nil
end)
{x, y}
# stat: row is a single {label, value} tuple
{^label, value} ->
{nil, value}
# stat: row is a single {label, value} tuple but we are processing a different label
{_, _} ->
{nil, nil}
# stat: row is a single number
n when is_number(n) ->
{nil, n}
_ ->
raise "Can not process data row #{inspect(row)}"
end
case {x, y} do
{nil, y} -> %{y: convert_to_decimal(y)}
{x, y} -> %{x: x, y: convert_to_decimal(y)}
end
end)
|> Enum.reject(&is_nil(&1.y))
attrs =
Map.get(result.attrs, label) ||
Map.get(result.attrs, to_string(label)) ||
Attributes.define()
DataSet.new(data, label, attrs)
end)
|> Enum.sort_by(fn dataset -> order[dataset.label] end)
end
def transform(%__MODULE__{rows: value}),
do: [DataSet.new([%{y: convert_to_decimal(value)}], nil, Attributes.define())]
defp extract_labels(rows) when is_list(rows) do
rows
|> Enum.flat_map(fn
# example: [{:time, #DateTime<2022-10-01 01:00:00+00:00 UTC UTC>}, {"foo", #Decimal<0.65>}]
row when is_list(row) ->
row
|> Enum.map(fn {label, _value} -> label end)
|> Enum.reject(&(&1 == :time))
# example: {:single_stat, #Decimal<0.65>}
{label, _} ->
[label]
end)
|> Enum.uniq()
end
defp convert_to_decimal(nil), do: nil
defp convert_to_decimal(value) do
case Decimal.cast(value) do
{:ok, dec} -> dec
_ -> value
end
end
end
@doc """
A module must implement this behaviour to be passed as an argument to `Luminous.Query.define/2`.
A query must return a list of 2-tuples:
- the 2-tuple's first element is the time series' label
- the 2-tuple's second element is the label's value
the list must contain a 2-tuple with the label `:time` and a `DateTime` value.
"""
@callback query(atom(), TimeRange.t(), [Variable.t()]) :: Result.t()
@type t :: %__MODULE__{
id: atom(),
mod: module()
}
@enforce_keys [:id, :mod]
defstruct [:id, :mod]
@doc """
Initialize a query at compile time. The module must implement the `Luminous.Query` behaviour.
"""
@spec define(atom(), module()) :: t()
def define(id, mod), do: %__MODULE__{id: id, mod: mod}
@doc """
Execute the query and return the data as multiple TimeSeries structs.
"""
@spec execute(t(), TimeRange.t(), [Variable.t()]) :: Result.t()
def execute(query, time_range, variables) do
apply(query.mod, :query, [query.id, time_range, variables])
end
end