defmodule Glific.Flows.Action do
@moduledoc """
The Action object which encapsulates one action in a given node.
"""
alias __MODULE__
use Ecto.Schema
import Ecto.Query, warn: false
alias Glific.{
Contacts,
Contacts.Contact,
Dialogflow,
Flows,
Flows.Flow,
Groups,
Groups.Group,
Messages,
Messages.Message,
Profiles,
Repo,
Sheets
}
alias Glific.Flows.{
ContactAction,
ContactField,
ContactSetting,
Flow,
FlowContext,
Node,
Templating,
Webhook
}
require Logger
@contact_profile %{
"Switch Profile" => :switch_profile,
"Create Profile" => :create_profile
}
@required_field_common [:uuid, :type]
@required_fields_enter_flow [:flow | @required_field_common]
@required_fields_language [:language | @required_field_common]
@required_fields_set_contact_field [:value, :field | @required_field_common]
@required_fields_set_contact_profile [:value, :profile_type | @required_field_common]
@required_fields_set_contact_name [:name | @required_field_common]
@required_fields_webhook [:url, :headers, :method, :result_name | @required_field_common]
@required_fields_classifier [:input, :result_name | @required_field_common]
@required_fields [:text | @required_field_common]
@required_fields_label [:labels | @required_field_common]
@required_fields_sheet [:sheet_id, :row, :result_name | @required_field_common]
@required_fields_group [:groups | @required_field_common]
@required_fields_contact [:contacts, :text | @required_field_common]
@required_fields_waittime [:delay]
@required_fields_interactive_template [:name | @required_field_common]
@required_fields_set_results [:name, :category, :value | @required_field_common]
@wait_for ["wait_for_time", "wait_for_result"]
@type t() :: %__MODULE__{
uuid: Ecto.UUID.t() | nil,
name: String.t() | nil,
text: String.t() | nil,
value: String.t() | nil,
input: String.t() | nil,
url: String.t() | nil,
headers: map() | nil,
method: String.t() | nil,
result_name: String.t() | nil,
body: String.t() | nil,
type: String.t() | nil,
profile_type: String.t() | nil,
field: map() | nil,
quick_replies: [String.t()],
enter_flow_uuid: Ecto.UUID.t() | nil,
enter_flow_name: String.t() | nil,
enter_flow_expression: String.t() | nil,
attachments: list() | nil,
labels: list() | nil,
groups: list() | nil,
contacts: list() | nil,
enter_flow: Flow.t() | nil,
node_uuid: Ecto.UUID.t() | nil,
node: Node.t() | nil,
templating: Templating.t() | nil,
## this is a custom delay in minutes for wait for time nodes.
## Currently we use this only for the wait for time node.
wait_time: integer() | nil,
## this is a custom delay in seconds before processing for the node.
## Currently only used for send messages
delay: integer() | 0,
# Interactive messages
interactive_template_id: integer() | nil,
interactive_template_expression: String.t() | nil,
params_count: String.t() | nil,
params: list() | nil,
attachment_type: String.t() | nil,
attachment_url: String.t() | nil,
category: String.t() | nil
}
embedded_schema do
field(:uuid, Ecto.UUID)
field(:name, :string)
field(:text, :string)
field(:value, :string)
field(:category, :string)
field(:input, :string)
# various fields for webhooks
field(:url, :string)
field(:headers, :map)
field(:method, :string)
field(:result_name, :string)
field(:body, :string)
# fields for certain actions: set_contact_field, set_contact_language
field(:field, :map)
field(:language, :string)
field(:type, :string)
field(:profile_type, :string)
field(:quick_replies, {:array, :string}, default: [])
field(:attachments, :map)
field(:labels, :map)
field(:groups, :map)
field(:contacts, :map)
field(:wait_time, :integer)
field(:interactive_template_id, :integer)
field(:node_uuid, Ecto.UUID)
embeds_one(:node, Node)
embeds_one(:templating, Templating)
field(:enter_flow_uuid, Ecto.UUID)
field(:enter_flow_name, :string)
field(:enter_flow_expression, :string)
field(:params, {:array, :string}, default: [])
field(:params_count, :string)
field(:interactive_template_expression, :string)
field(:attachment_type, :string)
field(:attachment_url, :string)
field(:delay, :integer, default: 0)
embeds_one(:enter_flow, Flow)
end
@spec process(map(), map(), Node.t(), map()) :: {Action.t(), map()}
defp process(json, uuid_map, node, attrs) do
action =
Map.merge(
%Action{
uuid: json["uuid"],
node_uuid: node.uuid,
type: json["type"],
delay: Glific.parse_maybe_integer!(json["delay"] || 0)
},
attrs
)
{action, Map.put(uuid_map, action.uuid, {:action, action})}
end
@doc """
Process a json structure from flow editor to the Glific data types
"""
@spec process(map(), map(), Node.t()) :: {Action.t(), map()}
def process(%{"type" => "link_google_sheet"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_sheet)
process(json, uuid_map, node, %{
sheet_id: json["sheet_id"],
row: json["row"],
result_name: json["result_name"]
})
end
def process(%{"type" => "enter_flow"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_enter_flow)
process(json, uuid_map, node, %{
enter_flow_uuid: json["flow"]["uuid"],
enter_flow_name: json["flow"]["name"],
enter_flow_expression: json["flow"]["expression"]
})
end
def process(%{"type" => "set_contact_language"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_language)
process(json, uuid_map, node, %{text: json["language"]})
end
def process(%{"type" => "set_contact_name"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_set_contact_name)
process(json, uuid_map, node, %{value: json["name"]})
end
def process(%{"type" => "set_contact_profile"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_set_contact_profile)
process(json, uuid_map, node, %{profile_type: json["profile_type"], value: json["value"]})
end
def process(%{"type" => "set_contact_field"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_set_contact_field)
name =
if is_nil(json["field"]["name"]),
do: json["field"]["key"],
else: json["field"]["name"]
process(json, uuid_map, node, %{
value: json["value"],
field: %{
name: name,
key: json["field"]["key"]
}
})
end
def process(%{"type" => "call_webhook"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_webhook)
process(json, uuid_map, node, %{
url: json["url"],
method: json["method"],
result_name: json["result_name"],
body: json["body"],
headers: json["headers"]
})
end
def process(%{"type" => "call_classifier"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_classifier)
process(json, uuid_map, node, %{
input: json["input"],
result_name: json["result_name"]
})
end
def process(%{"type" => "add_input_labels"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_label)
process(json, uuid_map, node, %{labels: process_labels(json["labels"])})
end
def process(%{"type" => "add_contact_groups"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_group)
process(json, uuid_map, node, %{groups: json["groups"]})
end
def process(%{"type" => "send_broadcast"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_contact)
attrs = %{
text: json["text"],
attachments: process_attachments(json["attachments"]),
contacts: json["contacts"]
}
{templating, uuid_map} = Templating.process(json["templating"], uuid_map)
attrs = Map.put(attrs, :templating, templating)
process(json, uuid_map, node, attrs)
end
def process(%{"type" => "send_interactive_msg"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_interactive_template)
process(json, uuid_map, node, %{
interactive_template_id: json["id"],
labels: process_labels(json["labels"]),
params: json["params"] || [],
params_count: json["paramsCount"] || "0",
attachment_url: json["attachment_url"],
attachment_type: json["attachment_type"],
interactive_template_expression: json["expression"] || nil
})
end
def process(%{"type" => "remove_contact_groups"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_group)
if json["all_groups"] do
process(json, uuid_map, node, %{groups: ["all_groups"]})
else
process(json, uuid_map, node, %{groups: json["groups"]})
end
end
def process(%{"type" => "set_run_result"} = json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields_set_results)
process(json, uuid_map, node, %{
value: json["value"],
category: json["category"],
name: json["name"]
})
end
@default_wait_time -1
def process(%{"type" => type} = json, uuid_map, node)
when type in @wait_for do
Flows.check_required_fields(json, @required_fields_waittime)
# use a default wait time< of 5 minutes
wait_time =
if is_nil(json["delay"]) || String.trim(json["delay"]) == "" do
@default_wait_time
else
time = String.to_integer(json["delay"])
if time <= 0,
do: @default_wait_time,
else: time
end
process(
json,
uuid_map,
node,
%{
wait_time: wait_time,
# this is potentially set in wait_for_result
result_name: json["result_name"]
}
)
end
def process(json, uuid_map, node) do
Flows.check_required_fields(json, @required_fields)
attrs = %{
name: json["name"],
text: json["text"],
labels: process_labels(json["labels"]),
quick_replies: json["quick_replies"],
attachments: process_attachments(json["attachments"])
}
{templating, uuid_map} = Templating.process(json["templating"], uuid_map)
attrs = Map.put(attrs, :templating, templating)
process(json, uuid_map, node, attrs)
end
@spec get_name(atom()) :: String.t()
defp get_name(module) do
module
|> Atom.to_string()
|> String.split(".")
|> List.last()
end
@spec check_entity_exists(map(), Keyword.t(), atom()) :: Keyword.t()
defp check_entity_exists(entity, errors, object) do
case Repo.fetch_by(object, %{id: entity["uuid"]}) do
{:ok, _} -> errors
_ -> [{object, "Could not find #{get_name(object)}: #{entity["name"]}"}] ++ errors
end
end
@spec object(String.t()) :: atom()
defp object("send_broadcast"), do: Contact
defp object("add_contact_groups"), do: Group
defp object("remove_contact_groups"), do: Group
@doc """
Validate a action and all its children
"""
@spec validate(Action.t(), Keyword.t(), map()) :: Keyword.t()
def validate(%{type: type, groups: groups} = action, errors, _flow)
when type in ["add_contact_groups", "remove_contact_groups", "send_broadcast"] do
# ensure that the contacts and/or groups exist that are involved in the above
# action
object = object(type)
Enum.reduce(
check_object(object, action, groups),
errors,
fn entity, errors ->
case Glific.parse_maybe_integer(entity["uuid"]) do
{:ok, _entity_id} ->
# ensure entity_id exists
check_entity_exists(entity, errors, object)
_ ->
[{object, "Could not parse #{get_name(object)}"}] ++ errors
end
end
)
end
def validate(%{type: "enter_flow"} = action, errors, _flow) do
# ensure that the flow exists
case Repo.fetch_by(Flow, %{uuid: action.enter_flow_uuid}) do
{:ok, _} -> errors
_ -> [{Flow, "Could not find Sub Flow: #{action.enter_flow_name}"}] ++ errors
end
end
def validate(%{type: type} = action, errors, flow)
when type in @wait_for do
# ensure that any downstream messages from this action are of type HSM
# if wait time > 24 hours!
if action.wait_time >= 24 * 60 * 60 &&
type_of_next_message(flow, action) == :session,
do:
[{Message, "The next message after a long wait for time should be an HSM template"}] ++
errors,
else: errors
end
def validate(%{type: "set_contact_language"} = action, errors, _flow) do
if is_nil(action.text) || action.text == "",
do: [{Message, "Language is a required field"}] ++ errors,
else: errors
end
# default validate, do nothing
def validate(_action, errors, _flow), do: errors
defp check_object(Contact, action, _groups), do: action.contacts
defp check_object(_object, _action, ["all_groups"]),
do: Group |> select([m], %{"uuid" => m.id, "name" => m.label}) |> Repo.all()
defp check_object(_object, action, _groups), do: action.groups
@spec type_of_next_message(Flow.t(), Action.t()) :: atom()
defp type_of_next_message(flow, action) do
# lets keep this simple for now, we'll just go follow the exit of this
# action to the next node
{:node, node} = flow.uuid_map[action.node_uuid]
[exit | _] = node.exits
{:node, dest_node} = flow.uuid_map[exit.destination_node_uuid]
[action | _] = dest_node.actions
if is_nil(action.templating),
do: :session,
else: :hsm
rescue
# in case any of the uuids don't exist, we just trap the exception
_ -> :unknown
end
## Label formatter so that we can apply the dynamic label to the message
@spec process_labels(list() | nil) :: list() | nil
defp process_labels(labels) when is_list(labels) do
Enum.map(
labels,
fn label ->
if is_nil(label["name_match"]),
do: label,
else: Map.put_new(label, "name", label["name_match"])
end
)
end
defp process_labels(labels), do: labels
@doc """
Execute a action, given a message stream.
Consume the message stream as processing occurs
"""
@spec execute(Action.t(), FlowContext.t(), [Message.t()]) ::
{:ok | :wait, FlowContext.t(), [Message.t()]} | {:error, String.t()}
def execute(%{type: "send_msg"} = action, context, messages) do
templating = Templating.execute(action.templating, context, messages)
action = Map.put(action, :templating, templating)
ContactAction.send_message(context, action, messages)
end
def execute(%{type: "send_interactive_msg"} = action, context, messages) do
ContactAction.send_interactive_message(context, action, messages)
end
def execute(%{type: "send_broadcast"} = action, context, messages) do
ContactAction.send_broadcast(context, action, messages)
end
def execute(%{type: "set_contact_language"} = action, context, messages) do
# make sure we have a valid language to set
context =
if is_nil(action.text) || action.text == "",
do: context,
else: ContactSetting.set_contact_language(context, action.text)
{:ok, context, messages}
end
def execute(%{type: "set_contact_name"} = action, context, messages) do
value = ContactField.parse_contact_field_value(context, action.value)
context = ContactSetting.set_contact_name(context, value)
{:ok, context, messages}
end
# Fake the valid key so we can have the same function signature and simplify the code base
def execute(%{type: "set_contact_field_valid"} = action, context, messages) do
name = action.field.name
key = action.field[:key] || String.downcase(name) |> String.replace(" ", "_")
value = ContactField.parse_contact_field_value(context, action.value)
context =
if key == "settings",
do: settings(context, value),
else: ContactField.add_contact_field(context, key, name, value, "string")
{:ok, context, messages}
end
def execute(%{type: "set_contact_field"} = action, context, messages) do
# sometimes action.field.name does not exist based on what the user
# has entered in the flow. We should have a validation for this, but
# lets prevent the error from happening
# if we don't recognize it, we just ignore it, and avoid an error being thrown
# Issue #858
if Map.get(action.field, :name) in ["", nil] do
{:ok, context, messages}
else
execute(Map.put(action, :type, "set_contact_field_valid"), context, messages)
end
end
def execute(%{type: "set_contact_profile"} = action, context, _messages) do
{context, message} =
@contact_profile
|> Map.get(action.profile_type)
|> Profiles.handle_flow_action(context, action)
{:ok, context, [message]}
end
def execute(%{type: "enter_flow"} = action, context, _messages) do
flow_uuid = get_flow_uuid(action, context)
# check if we've seen this flow in this execution
if Map.has_key?(context.uuids_seen, flow_uuid) do
Glific.log_error("Repeated loop, hence finished the flow", false)
else
# check if we are looping with the same flow, if so reset
# and start from scratch, since we really don't want to have too deep a stack
maybe_reset_flows(context, flow_uuid)
# if the action is part of a terminal node, then lets mark this context as
# complete, and use the parent context
{:node, node} = context.uuid_map[action.node_uuid]
{context, parent_id} =
if node.is_terminal == true,
do:
{FlowContext.reset_one_context(context,
source: "enter_flow",
event_meta: %{
"action" => "#{inspect(action)}",
"current_flow_uuid" => context.flow_uuid,
"new_flow" => flow_uuid
}
), context.parent_id},
else: {context, context.id}
# we start off a new context here and don't really modify the current context
# hence ignoring the return value of start_sub_flow
# for now, we'll just delay by at least min_delay second
context = Map.update!(context, :uuids_seen, &Map.put(&1, flow_uuid, 1))
Flow.start_sub_flow(context, flow_uuid, parent_id)
# We null the messages here, since we are going into a different flow
# this clears any potential errors
{:ok, context, []}
end
end
def execute(%{type: "link_google_sheet"} = action, context, _messages) do
{context, message} = Sheets.execute(action, context)
{:ok, context, [message]}
end
def execute(%{type: "call_webhook"} = action, context, messages) do
# just call the webhook, and ask the caller to wait
# we are processing the webhook using Oban and this happens asynchronously
Webhook.execute(action, context)
# webhooks don't consume a message, so we send it forward
{:wait, context, messages}
end
def execute(%{type: "call_classifier"} = action, context, messages) do
# just call the classifier, and ask the caller to wait
## Check if we have a different input then last message.
## If yes then pass that string as a message.
## we might need more refactoring here. But this is fine for now.
message =
if action.input in [nil, "@input.text"],
do: context.last_message,
else:
Messages.create_temp_message(
context.organization_id,
FlowContext.parse_context_string(context, action.input),
contact_id: context.contact_id,
session_uuid: context.id
)
|> Repo.preload(contact: [:language])
Dialogflow.execute(action, context, message)
{:wait, context, messages}
end
def execute(%{type: "add_input_labels"} = action, context, messages) do
## We will soon figure out how we will manage the UUID with tags
flow_label =
action.labels
|> Enum.map_join(", ", fn label ->
FlowContext.parse_context_string(context, label["name"])
end)
add_flow_label(context, flow_label)
{:ok, context, messages}
end
def execute(%{type: "add_contact_groups"} = action, context, messages) do
## We will soon figure out how we will manage the UUID with tags
_list =
Enum.reduce(
action.groups,
[],
fn group, _acc ->
case Glific.parse_maybe_integer(group["uuid"]) do
{:ok, group_id} ->
Groups.create_contact_group(%{
contact_id: context.contact_id,
group_id: group_id,
organization_id: context.organization_id
})
{:ok, _} =
Contacts.capture_history(context.contact_id, :contact_groups_updated, %{
event_label: "Added to collection: \"#{group["name"]}\"",
event_meta: %{
context_id: context.id,
group: %{
id: group_id,
name: group["name"],
uuid: group["uuid"]
},
flow: %{
id: context.flow.id,
name: context.flow.name,
uuid: context.flow.uuid
}
}
})
_ ->
Logger.error("Could not parse action groups: #{inspect(action)}")
end
[]
end
)
{:ok, context, messages}
end
def execute(%{type: "remove_contact_groups"} = action, context, messages) do
if action.groups == ["all_groups"] do
groups_ids = Groups.get_group_ids()
Groups.delete_contact_groups_by_ids(context.contact_id, groups_ids)
{:ok, _} =
Contacts.capture_history(context.contact_id, :contact_groups_updated, %{
event_label: "Removed from All the collections",
event_meta: %{
context_id: context.id,
group: %{
ids: groups_ids
},
flow: %{
id: context.flow.id,
name: context.flow.name,
uuid: context.flow.uuid
}
}
})
else
groups_ids =
Enum.map(
action.groups,
fn group ->
{:ok, group_id} = Glific.parse_maybe_integer(group["uuid"])
{:ok, _} =
Contacts.capture_history(context.contact_id, :contact_groups_updated, %{
event_label: "Removed from collection: \"#{group["name"]}\"",
event_meta: %{
context_id: context.id,
group: %{
id: group_id,
name: group["name"],
uuid: group["uuid"]
},
flow: %{
id: context.flow.id,
name: context.flow.name,
uuid: context.flow.uuid
}
}
})
group_id
end
)
Groups.delete_contact_groups_by_ids(context.contact_id, groups_ids)
end
{:ok, context, messages}
end
def execute(%{type: "set_run_result"} = action, context, messages) do
value =
context
|> FlowContext.parse_context_string(action.value)
|> Glific.execute_eex()
results = %{
"input" => value,
"value" => value,
"category" => action.category,
"inserted_at" => DateTime.utc_now()
}
updated_context = FlowContext.update_results(context, %{action.name => results})
{:ok, updated_context, messages}
end
def execute(%{type: type} = _action, context, [msg])
when type in @wait_for do
if msg.body != "No Response" do
Logger.info(
"Message #{msg.body} with context (#{context.id}) received while waiting for time"
)
{:error, "unexpected message received while waiting for time"}
else
{:ok, context, []}
end
end
@sleep_timeout 4 * 1000
def execute(%{type: type} = action, context, [])
when type in @wait_for do
if action.wait_time == @default_wait_time do
## Ideally we should do it by async call
## but this is fine as a sort term fix.
Process.sleep(@sleep_timeout)
{:ok, context, []}
else
{:ok, context} =
FlowContext.update_flow_context(
context,
%{
wakeup_at: DateTime.add(DateTime.utc_now(), action.wait_time),
is_background_flow: context.flow.is_background,
is_await_result: type == "wait_for_result"
}
)
{:wait, context, []}
end
end
def execute(action, _context, _messages),
do: raise(UndefinedFunctionError, message: "Unsupported action type #{action.type}")
@spec add_flow_label(FlowContext.t(), String.t()) :: nil
defp add_flow_label(%{last_message: nil}, _flow_label), do: nil
defp add_flow_label(%{last_message: last_message}, flow_label) do
# there is a chance that:
# when we send a fake temp message (like No Response)
# or when a flow is resumed, there is no last_message
# hence we check for the existence of one in these functions
message = Repo.get(Message, last_message.id)
new_labels =
if message.flow_label in [nil, ""] do
flow_label
else
message.flow_label <> ", " <> flow_label
end
{:ok, _} =
Repo.get(Message, last_message.id)
|> Message.changeset(%{flow_label: new_labels})
|> Repo.update()
nil
end
@spec settings(FlowContext.t(), String.t()) :: FlowContext.t()
defp settings(context, value) do
case String.downcase(value) do
"optout" ->
ContactAction.optout(context)
"optin" ->
message_id =
if context.last_message == nil,
do: nil,
else: context.last_message.bsp_message_id
ContactAction.optin(
context,
method: "WA",
message_id: message_id,
bsp_status: :session_and_hsm
)
_ ->
ContactSetting.set_contact_preference(context, value)
end
end
# let's format attachment and add as a map
@spec process_attachments(list()) :: map()
defp process_attachments(nil), do: %{}
## we will remove this once we have a fix it form the flow editor
defp process_attachments(attachment_list) do
attachment_list
|> Enum.reduce(%{}, fn attachment, acc -> do_process_attachment(attachment, acc) end)
end
@spec do_process_attachment(String.t(), map()) :: map()
defp do_process_attachment(attachment, acc) do
case String.split(attachment, ":", parts: 2) do
[type, url] ->
type = if type == "application", do: "document", else: type
Map.put(acc, type, url)
_ ->
acc
end
end
@spec maybe_reset_flows(FlowContext.t(), Ecto.UUID.t()) :: boolean
defp maybe_reset_flows(context, flow_uuid) do
# check and see if there are any matching flows that are not completed
matching =
FlowContext
|> where([fc], fc.contact_id == ^context.contact_id)
|> where([fc], fc.flow_uuid == ^flow_uuid)
|> where([fc], is_nil(fc.completed_at))
|> Repo.aggregate(:count)
if matching > 0 do
FlowContext.reset_all_contexts(context, "Repeated loop, hence finished the flow")
true
else
false
end
end
@spec get_flow_uuid(Action.t(), FlowContext.t()) :: String.t()
defp get_flow_uuid(
%{enter_flow_name: "Expression", enter_flow_expression: expression} = _action,
context
),
do:
FlowContext.parse_context_string(context, expression)
|> Glific.execute_eex()
|> String.trim()
defp get_flow_uuid(action, _),
do: action.enter_flow_uuid
end