defmodule Toolbox.Report do
@moduledoc """
Behaviour module for notification reports.
This behaviour encapsulates notifications which behave like reports = notifications are sent
periodically.
## Example
```
defmodule Scenarios.Template.Reporter do
@callback Runbox.Scenario.Template.StageBased
alias Crontab.CronExpression
alias Runbox.Message
alias Runbox.Runtime.Stage.Unit
alias Scenarios.Template.Reporter.DailyReport
alias Toolbox.Report
import Crontab.CronExpression
@impl true
def init(init_timestamp, %Unit{} = unit) do
# report schedule defined as crontab expression
report_schedule = ~e[* * * * *]
# timezone used to display localized timestamp in notification
timezone = "Europe/Prague"
# args to init report state
init_arg = :ok
{:ok, outputs, report_state} =
Report.init(DailyReport, init_timestamp, report_schedule, timezone, init_arg)
{:ok, outputs, %Unit{unit | state: %{report_state: report_state}}}
end
@impl true
def handle_message(%Message{} = msg, %Unit{} = unit) do
{:ok, outputs, report_state} = Report.process_message(unit.state.report_state, msg)
{:reply, outputs, %Unit{unit | state: %{unit.state | report_state: report_state}}}
end
end
defmodule Scenarios.Template.Reporter.DailyReport do
@callback Toolbox.Report
alias Runbox.Message
alias Runbox.Scenario.OutputAction
@impl true
def init(:ok) do
{:ok, [], %{incoming_messages: []}}
end
@impl true
def handle_process_message(%{} = state, %Message{} = msg) do
{:ok, [], %{incoming_messages: [msg | state.incoming_messages]}}
end
@impl true
def handle_publish_report(%{} = state, publish_timestamp, last_publish_timestamp) do
report_notification =
%OutputAction.Notification{
type: :daily_report,
data: %{
"incoming_messages" => state.incoming_messages,
"publish_time" => publish_timestamp,
"last_publish_time" => last_timestamp
}
}
{:ok, [report_notification], %{incoming_messages: []}}
end
end
```
"""
alias Crontab.CronExpression
alias Crontab.Scheduler, as: CrontabScheduler
alias Runbox.Message
alias Runbox.Runtime.RuntimeInstruction
alias Timex
alias Timex.AmbiguousDateTime
alias Timex.Timezone
defmodule State do
@moduledoc false
@enforce_keys [:report_mod, :report_state, :cron_expr, :timezone]
defstruct [:report_mod, :report_state, :cron_expr, :timezone]
@type t :: %__MODULE__{
report_mod: module,
report_state: term,
cron_expr: CronExpression.t(),
timezone: String.t()
}
end
@type timestamp :: integer
@typedoc "User state where a module implementing `Toolbox.Report` can store anything it wants."
@type report_state :: term
@typedoc "State that the `Toolbox.Report` operates on; should be stored in a unit's state."
@opaque external_state :: State.t()
@callback init(term) :: {:ok, [term], report_state}
@callback handle_process_message(report_state, Message.t()) :: {:ok, [term], report_state}
@callback handle_publish_report(
report_state,
current_report :: timestamp,
last_report :: timestamp
) ::
{:ok, [term], report_state}
@doc "Initializes report"
@spec init(module, timestamp, CronExpression.t(), String.t(), term) ::
{:ok, [term], external_state()}
def init(report_mod, init_timestamp, cron_expr, timezone, init_arg) do
{:ok, outputs, report_state} = report_mod.init(init_arg)
state = %State{
report_mod: report_mod,
report_state: report_state,
cron_expr: cron_expr,
timezone: timezone
}
# first report publishment is scheduled (via timeout registration)
next_publish_timeout = create_next_publish_timeout(state, init_timestamp)
{:ok, [next_publish_timeout | outputs], state}
end
@doc "Processes report message"
# this function head matches only timeout registrations intended to current report module
# reports implementation are singletons
@spec process_message(external_state(), Message.t()) :: {:ok, [term], external_state()}
# credo:disable-for-lines:21 Credo.Check.Design.DuplicatedCode
def process_message(
%State{report_mod: report_mod} = state,
%Message{type: :publish_report_timeout, body: %{report_mod: report_mod}} = msg
) do
# next report publishment should be scheduled (via timeout registration)
next_publish_timeout = create_next_publish_timeout(state, msg.timestamp)
prev_publish_timestamp =
get_prev_publish_timestamp(msg.timestamp, state.cron_expr, state.timezone)
# report side effects are generated
{:ok, outputs, report_state} =
state.report_mod.handle_publish_report(
state.report_state,
msg.timestamp,
prev_publish_timestamp
)
state = %State{state | report_state: report_state}
{:ok, [next_publish_timeout | outputs], state}
end
# this function head matches all other timeout registrations which are not intended to current
# report module
def process_message(%State{} = state, %Message{type: :publish_report_timeout}) do
{:ok, [], state}
end
# this function head matches all remaining messages and delegates message processing to
# report module
def process_message(%State{report_mod: report_mod} = state, %Message{} = msg) do
{:ok, outputs, report_state} = report_mod.handle_process_message(state.report_state, msg)
{:ok, outputs, %State{state | report_state: report_state}}
end
# public for testing purposes
@doc false
@spec get_next_publish_timestamp(timestamp, CronExpression.t(), String.t()) :: timestamp
def get_next_publish_timestamp(timestamp, cron_expr, timezone) do
date_time = unix_to_naive_datetime(timestamp, timezone)
case next_crontab_run_timestamp(cron_expr, date_time, timezone) do
^timestamp ->
# when date_time is the same as calculated next_run_date,
# crontab returns this date_time:
# ^date_time = CrontabScheduler.get_next_run_date!(crontab_expr, date_time)
# this is not what we want here -> we need to update timestamp by adding 1 millisecond
date_time = unix_to_naive_datetime(timestamp + 1, timezone)
next_crontab_run_timestamp(cron_expr, date_time, timezone)
next_report_timestamp ->
next_report_timestamp
end
end
@doc false
@spec get_prev_publish_timestamp(timestamp, CronExpression.t(), String.t()) :: timestamp
def get_prev_publish_timestamp(timestamp, cron_expr, timezone) do
date_time = unix_to_naive_datetime(timestamp, timezone)
case prev_crontab_run_timestamp(cron_expr, date_time, timezone) do
^timestamp ->
# when date_time is the same as calculated prev_run_date,
# crontab returns this date_time:
# ^date_time = CrontabScheduler.get_next_run_date!(crontab_expr, date_time)
# this is not what we want here -> we need to update timestamp by adding 1 millisecond
date_time = unix_to_naive_datetime(timestamp - 1, timezone)
prev_crontab_run_timestamp(cron_expr, date_time, timezone)
prev_report_timestamp ->
prev_report_timestamp
end
end
@spec unix_to_naive_datetime(timestamp, String.t()) :: NaiveDateTime.t()
defp unix_to_naive_datetime(timestamp, timezone) do
timestamp
|> Timex.from_unix(:millisecond)
|> Timezone.convert(timezone)
|> resolve_ambiguous_datetime()
|> DateTime.to_naive()
end
@spec naive_datetime_to_unix(NaiveDateTime.t(), String.t()) :: timestamp
defp naive_datetime_to_unix(naive_datetime, timezone) do
naive_datetime
|> Timex.to_datetime(timezone)
|> resolve_ambiguous_datetime()
|> DateTime.to_unix(:millisecond)
end
# needed because Timezone.convert, Timex.to_datetime and other similar funs can return ambiguous
# times
@doc """
Resolves date time into non-ambiguous DateTime.
If AmbiguousDateTime is given, it picks one version. If any other non-ambiguous date time is
given it is returned as is.
"""
@spec resolve_ambiguous_datetime(AmbiguousDateTime.t() | DateTime.t()) :: DateTime.t()
def resolve_ambiguous_datetime(%AmbiguousDateTime{} = datetime), do: datetime.after
def resolve_ambiguous_datetime(%DateTime{} = datetime), do: datetime
@spec next_crontab_run_timestamp(CronExpression.t(), NaiveDateTime.t(), String.t()) ::
timestamp
defp next_crontab_run_timestamp(cron_expr, naive_datetime, timezone) do
next_run_naive = CrontabScheduler.get_next_run_date!(cron_expr, naive_datetime)
naive_datetime_to_unix(next_run_naive, timezone)
end
@spec prev_crontab_run_timestamp(CronExpression.t(), NaiveDateTime.t(), String.t()) ::
timestamp
defp prev_crontab_run_timestamp(cron_expr, naive_datetime, timezone) do
next_run_naive = CrontabScheduler.get_previous_run_date!(cron_expr, naive_datetime)
naive_datetime_to_unix(next_run_naive, timezone)
end
@spec create_next_publish_timeout(external_state(), timestamp) :: RuntimeInstruction.t()
defp create_next_publish_timeout(%State{} = state, timestamp) do
timeout_timestamp = get_next_publish_timestamp(timestamp, state.cron_expr, state.timezone)
timeout_msg = create_timeout_message(timeout_timestamp, state.report_mod)
RuntimeInstruction.register_timeout(timeout_msg)
end
# public for testing purposes
@doc false
@spec create_timeout_message(timestamp, module) :: Message.t()
def create_timeout_message(timeout_timestamp, report_mod) do
%Message{
timestamp: timeout_timestamp,
type: :publish_report_timeout,
body: %{report_mod: report_mod}
}
end
end