defmodule Glific.Triggers do
@moduledoc """
The trigger manager for all the trigger system that starts flows
within Glific
"""
import Ecto.Query, warn: false
require Logger
alias Glific.{
AccessControl,
AccessControl.TriggerRole,
Flows,
Flows.Flow,
Groups,
Partners,
Repo,
Triggers,
Triggers.Helper,
Triggers.Trigger
}
@max_trigger_limit 1000
@doc """
Periodic call to execute the triggers outstanding for the day
"""
@spec execute_triggers(non_neg_integer(), DateTime.t()) :: [Trigger.t()]
def execute_triggers(organization_id, now \\ DateTime.utc_now()) do
# triggers can be executed multiple times a day based on frequency
now = Timex.shift(now, minutes: 1)
Trigger
|> where([t], t.organization_id == ^organization_id and t.is_active == true)
|> where([t], t.next_trigger_at < ^now)
|> select([t], t.id)
|> limit(@max_trigger_limit)
|> Repo.all()
|> Enum.map(&execute_trigger(&1, now))
end
@spec execute_trigger(non_neg_integer, DateTime.t()) :: nil
defp execute_trigger(id, now) do
# we fetch the trigger and immediately update the execution value
# to avoid other process, unlikely to happen, but might
trigger = Repo.get!(Trigger, id)
cond do
is_nil(trigger.last_trigger_at) or
Date.diff(DateTime.to_date(trigger.last_trigger_at), DateTime.to_date(now)) < 0 ->
do_execute_trigger(trigger)
trigger.frequency == ["hourly"] and trigger.last_trigger_at.hour < now.hour ->
do_execute_trigger(trigger)
end
nil
end
defp do_execute_trigger(trigger) do
Logger.info("executing trigger: #{trigger.name} for org_id: #{trigger.organization_id}")
trigger
|> update_next()
|> start_flow()
end
@spec update_next(Trigger.t()) :: Trigger.t()
defp update_next(%Trigger{is_repeating: false} = trigger) do
Logger.info(
"updating trigger: #{trigger.name} of org_id: #{trigger.organization_id} as inactive"
)
{:ok, trigger} =
Triggers.update_trigger(
trigger,
%{
is_active: false,
start_at: trigger.start_at,
flow_id: trigger.flow_id,
organization_id: trigger.organization_id,
name: trigger.name
}
)
trigger
end
defp update_next(trigger) do
next_trigger_at = Helper.compute_next(trigger)
Logger.info(
"updating next trigger time for trigger: #{trigger.name} of org_id: #{trigger.organization_id} with time #{next_trigger_at}"
)
{next_trigger_at, is_active} =
if Date.compare(DateTime.to_date(next_trigger_at), trigger.end_date) == :lt,
do: {next_trigger_at, true},
else: {nil, false}
attrs = %{
# we keep the time component constant
start_at: trigger.start_at,
last_trigger_at: trigger.next_trigger_at,
next_trigger_at: next_trigger_at,
flow_id: trigger.flow_id,
organization_id: trigger.organization_id,
is_active: is_active,
name: trigger.name
}
{:ok, trigger} = Triggers.update_trigger(trigger, attrs)
with true <- trigger.is_active,
false <- is_nil(trigger.next_trigger_at),
:gt <- DateTime.compare(DateTime.utc_now(), trigger.next_trigger_at) do
update_next(trigger)
else
_ ->
trigger
end
end
@spec start_flow(Trigger.t()) :: any
defp start_flow(trigger) do
if Glific.Clients.trigger_condition(trigger),
do: do_start_flow(trigger)
end
@spec do_start_flow(Trigger.t()) :: any
defp do_start_flow(trigger) do
flow = Flows.get_flow!(trigger.flow_id)
Logger.info(
"Starting flow: #{flow.name} for trigger: #{trigger.name} of org_id: #{trigger.organization_id} with time #{trigger.next_trigger_at}"
)
if !is_nil(trigger.group_id) do
group = Groups.get_group!(trigger.group_id)
Flows.start_group_flow(flow, group)
end
end
@doc """
Creates a trigger.
## Examples
iex> create_trigger(%{field: value})
{:ok, %Trigger{}}
iex> create_trigger(%{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
@spec create_trigger(map()) :: {:ok, Trigger.t()} | {:error, Ecto.Changeset.t()}
def create_trigger(attrs) do
with {:ok, trigger} <-
%Trigger{}
|> Trigger.changeset(fix_attrs(Map.put_new(attrs, :start_at, nil)))
|> Repo.insert() do
if Map.has_key?(attrs, :add_role_ids),
do: update_trigger_roles(attrs, trigger),
else: {:ok, trigger}
end
end
@spec update_trigger_roles(map(), Trigger.t()) :: {:ok, Trigger.t()}
defp update_trigger_roles(attrs, trigger) do
%{access_controls: access_controls} =
attrs
|> Map.put(:trigger_id, trigger.id)
|> TriggerRole.update_trigger_roles()
trigger
|> Map.put(:roles, access_controls)
|> then(&{:ok, &1})
end
@doc """
Updates a trigger.
## Examples
iex> update_trigger(trigger, %{field: new_value})
{:ok, %Trigger{}}
iex> update_trigger(trigger, %{field: bad_value})
{:error, %Ecto.Changeset{}}
"""
@spec update_trigger(Trigger.t(), map()) :: {:ok, Trigger.t()} | {:error, Ecto.Changeset.t()}
def update_trigger(%Trigger{} = trigger, attrs) do
with {:ok, updated_trigger} <-
trigger
|> Trigger.changeset(fix_attrs(Map.put_new(attrs, :start_at, nil)))
|> Repo.update() do
if Map.has_key?(attrs, :add_role_ids),
do: update_trigger_roles(attrs, updated_trigger),
else: {:ok, updated_trigger}
end
end
@doc """
Gets a single trigger.
Raises `Ecto.NoResultsError` if the Trigger does not exist.
## Examples
iex> get_trigger!(123)
%Trigger{}
iex> get_trigger!(456)
** (Ecto.NoResultsError)
"""
@spec get_trigger!(integer) :: Trigger.t()
def get_trigger!(id), do: Repo.get!(Trigger, id)
@doc """
Returns the list of triggers filtered by args
## Examples
iex> list_triggers()
[%Trigger{}, ...]
"""
@spec list_triggers(map()) :: [Trigger.t()]
def list_triggers(args) do
Repo.list_filter_query(args, Trigger, &Repo.opts_with_name/2, &filter_with/2)
|> AccessControl.check_access(:trigger)
|> Repo.all()
end
@spec filter_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
defp filter_with(query, filter) do
query = Repo.filter_with(query, filter)
Enum.reduce(filter, query, fn
{:flow, flow}, query ->
from q in query,
join: c in assoc(q, :flow),
where: ilike(c.name, ^"%#{flow}%")
{:group, group}, query ->
from q in query,
join: g in assoc(q, :group),
where: ilike(g.label, ^"%#{group}%")
_, query ->
query
end)
end
@doc false
@spec delete_trigger(Trigger.t()) :: {:ok, Trigger.t()} | {:error, Ecto.Changeset.t()}
def delete_trigger(%Trigger{} = trigger) do
trigger
|> Trigger.changeset(%{})
|> Repo.delete()
end
@doc """
Return the count of triggers, using the same filter as list_triggers
"""
@spec count_triggers(map()) :: integer
def count_triggers(args),
do: Repo.count_filter(args, Trigger, &Repo.filter_with/2)
@spec start_at(map()) :: DateTime.t()
defp start_at(%{start_at: nil} = attrs), do: DateTime.new!(attrs.start_date, attrs.start_time)
defp start_at(%{start_at: start_at} = _attrs), do: start_at
@spec get_name(map()) :: String.t()
defp get_name(%{name: name} = _attrs) when not is_nil(name), do: name
defp get_name(attrs) do
with {:ok, flow} <-
Repo.fetch_by(Flow, %{id: attrs.flow_id, organization_id: attrs.organization_id}) do
tz = Partners.organization_timezone(attrs.organization_id)
time = DateTime.new!(attrs.start_date, attrs.start_time)
org_time = DateTime.shift_zone!(time, tz)
{:ok, date} = Timex.format(org_time, "_{D}/{M}/{YYYY}_{h12}:{0m}{AM}")
"#{flow.name}#{date}"
end
end
defp get_next_trigger_at(%{next_trigger_at: next_trigger_at} = _attrs, _start_at)
when not is_nil(next_trigger_at),
do: next_trigger_at
defp get_next_trigger_at(_attrs, start_at), do: start_at
@spec fix_attrs(map()) :: map()
defp fix_attrs(attrs) do
# compute start_at if not set
start_at = start_at(attrs)
attrs
|> Map.put(:start_at, start_at)
|> Map.put(:name, get_name(attrs))
# set the last_trigger_at value to nil whenever trigger is updated or new trigger is created
|> Map.put(:last_trigger_at, Map.get(attrs, :last_trigger_at, nil))
# set the initial value of the next firing of the trigger
|> Map.put(:next_trigger_at, get_next_trigger_at(attrs, start_at))
end
end