defmodule Glific.Flows.Flow do
@moduledoc """
The flow object which encapsulates the complete flow as emitted by
by `https://github.com/nyaruka/floweditor`
"""
alias __MODULE__
use Ecto.Schema
import Ecto.Changeset
import Ecto.Query, warn: false
alias Glific.{
AccessControl.Role,
Enums.FlowType,
Flows,
Flows.FlowContext,
Flows.FlowRevision,
Flows.Localization,
Flows.Node,
Partners.Organization,
Repo
}
@required_fields [:name, :uuid, :organization_id]
@optional_fields [
:flow_type,
:keywords,
:version_number,
:uuid_map,
:nodes,
:ignore_keywords,
:is_active,
:is_background,
:is_pinned,
:respond_other,
:respond_no_response
]
@type t :: %__MODULE__{
__meta__: Ecto.Schema.Metadata.t(),
id: non_neg_integer | nil,
name: String.t() | nil,
uuid: Ecto.UUID.t() | nil,
uuid_map: map() | nil,
keywords: [String.t()] | nil,
ignore_keywords: boolean() | nil,
is_active: boolean() | nil,
is_background: boolean() | nil,
is_pinned: boolean() | nil,
respond_other: boolean() | nil,
respond_no_response: boolean() | nil,
flow_type: String.t() | nil,
status: String.t(),
definition: map() | nil,
localization: Localization.t() | nil,
start_node: Node.t() | nil,
nodes: [Node.t()] | nil,
version_number: String.t() | nil,
revisions: [FlowRevision.t()] | Ecto.Association.NotLoaded.t() | nil,
organization_id: non_neg_integer | nil,
organization: Organization.t() | Ecto.Association.NotLoaded.t() | nil,
inserted_at: :utc_datetime | nil,
updated_at: :utc_datetime | nil
}
schema "flows" do
field :name, :string
# this is the flow editor version number
field :version_number, :string
field :flow_type, FlowType
field :uuid, Ecto.UUID
field :uuid_map, :map, virtual: true
field :start_node, :map, virtual: true
field :nodes, :map, virtual: true
field :localization, :map, virtual: true
field :last_published_at, :utc_datetime, virtual: true
field :last_changed_at, :utc_datetime, virtual: true
# This is the dynamic status that we use primarily during
# flow execution. It tells us if we are using the draft version
# or the published version of the flow
field :status, :string, virtual: true, default: "published"
field :keywords, {:array, :string}, default: []
field :ignore_keywords, :boolean, default: false
field :is_active, :boolean, default: true
field :is_background, :boolean, default: false
field :is_pinned, :boolean, default: false
field :respond_other, :boolean, default: false
field :respond_no_response, :boolean, default: false
# we use this to store the latest definition and version from flow_revisions for this flow
field :definition, :map, virtual: true
# this is the version of the flow revision
field :version, :integer, virtual: true, default: 0
belongs_to :organization, Organization
has_many :revisions, FlowRevision
many_to_many :roles, Role, join_through: "flow_roles", on_replace: :delete
timestamps(type: :utc_datetime_usec)
end
@doc """
Standard changeset pattern we use for all data types
"""
@spec changeset(Flow.t(), map()) :: Ecto.Changeset.t()
def changeset(flow, attrs) do
changeset =
flow
|> cast(attrs, @required_fields ++ @optional_fields)
|> validate_required(@required_fields)
|> unique_constraint([:name, :organization_id],
message: "Sorry, the flow name already exists."
)
|> unique_constraint([:uuid, :organization_id])
|> update_change(:keywords, &update_keywords(&1))
validate_keywords(changeset, get_change(changeset, :keywords))
end
@spec update_keywords(any()) :: list()
defp update_keywords(keywords) when is_list(keywords),
do: Enum.map(keywords, fn keyword -> String.downcase(keyword) end)
defp update_keywords(_), do: []
@doc """
Changeset helper for keywords
"""
@spec validate_keywords(Ecto.Changeset.t(), any()) :: Ecto.Changeset.t()
def validate_keywords(changeset, nil), do: validate_keywords(changeset, [])
def validate_keywords(changeset, keywords) do
id = get_field(changeset, :id)
organization_id = get_field(changeset, :organization_id)
query =
if is_nil(id),
do: Flows.Flow,
else: Flows.Flow |> where([f], f.id != ^id and f.organization_id == ^organization_id)
flow_keyword_list = get_other_flow_keyword_list(query)
keywords_list = Map.keys(flow_keyword_list)
existing_keywords =
keywords
|> Enum.filter(fn keyword ->
if keyword in keywords_list, do: Glific.string_clean(keyword)
end)
if existing_keywords != [] do
changeset
|> add_error(
:keywords,
create_keywords_error_message(existing_keywords, flow_keyword_list)
)
else
changeset
end
end
@spec create_keywords_error_message([], map()) :: String.t()
defp create_keywords_error_message(existing_keywords, flow_keyword_list) do
existing_keywords_string =
existing_keywords
|> Enum.map_join(", ", fn keyword ->
"The keyword `#{keyword}` was already used in the `#{flow_keyword_list[keyword]}` Flow"
end)
# this should be combined with the above pipe, leaving for now since
# i'm just cleaning up credo errors
"#{existing_keywords_string}."
end
@spec get_other_flow_keyword_list(Ecto.Query.t()) :: map()
defp get_other_flow_keyword_list(query),
do:
query
|> select([f], %{keywords: f.keywords, name: f.name})
|> Repo.all()
|> Enum.reduce(%{}, fn flow, acc ->
flow.keywords
|> Enum.reduce(%{}, fn keyword, acc_2 ->
Map.put(acc_2, Glific.string_clean(keyword), flow.name)
end)
|> Map.merge(acc)
end)
@doc """
Process a json structure from flow editor to the Glific data types. While we are doing
this we also fix the map, if the variables to resolve Other/No Response is true
"""
@spec process(map(), Flow.t(), Ecto.UUID.t()) :: Flow.t()
def process(json, flow, start_node_uuid) do
{nodes, uuid_map} =
Enum.reduce(
json["nodes"],
{[], %{}},
fn node_json, acc ->
{node, uuid_map} = Node.process(node_json, elem(acc, 1), flow)
{[node | elem(acc, 0)], uuid_map}
end
)
{nodes, uuid_map} = fix_nodes(nodes, uuid_map, flow)
{:node, start_node} = Map.get(uuid_map, start_node_uuid)
flow
|> Map.put(:uuid_map, uuid_map)
|> Map.put(:localization, Localization.process(json["localization"]))
|> Map.put(:nodes, nodes)
|> Map.put(:start_node, start_node)
end
@spec fix_nodes(Node.t(), map(), Flow.t()) :: {[Node.t()], map()}
defp fix_nodes(nodes, uuid_map, %{respond_other: false, respond_no_response: false}),
do: {Enum.reverse(nodes), uuid_map}
defp fix_nodes(nodes, uuid_map, flow) do
Enum.reduce(
nodes,
{[], uuid_map},
fn node, {nodes, uuid_map} ->
{node, uuid_map} = Node.fix_node(node, flow, uuid_map)
{[node | nodes], uuid_map}
end
)
end
# in some cases flow editor wraps the json under a "definition" key
@spec clean_definition(map()) :: map()
defp clean_definition(json),
do:
json
|> Map.get("definition", json)
|> Map.delete("_ui")
@doc """
load the latest revision, specifically json definition from the
flow_revision table. We return the clean definition back
"""
@spec get_latest_definition(integer) :: map()
def get_latest_definition(flow_id) do
query =
from fr in FlowRevision,
where: fr.revision_number == 0 and fr.flow_id == ^flow_id,
select: fr.definition
Repo.one(query)
# lets get rid of stuff we don't use, specifically the definition and
# UI layout of the flow
|> clean_definition()
end
@doc """
Create a sub flow of an existing flow
"""
@spec start_sub_flow(FlowContext.t(), Ecto.UUID.t(), non_neg_integer) ::
{:ok, FlowContext.t(), [String.t()]} | {:error, String.t()}
def start_sub_flow(context, uuid, parent_id) do
# we might want to put the current one under some sort of pause status
flow = get_flow(context.flow.organization_id, uuid, context.status)
parent =
Glific.delete_multiple(
context.results,
["parent", :parent, "child", :child]
)
FlowContext.init_context(flow, context.contact, context.status,
parent_id: parent_id,
delay: context.delay,
uuids_seen: context.uuids_seen,
# lets keep only one level of results, rather than a lot of them
results: %{"parent" => parent}
)
end
@doc """
Return a flow for a specific uuid. Cache is not present in cache
"""
@spec get_flow(non_neg_integer, Ecto.UUID.t(), String.t()) :: map()
def get_flow(organization_id, uuid, status) do
{:ok, flow} = Flows.get_cached_flow(organization_id, {:flow_uuid, uuid, status})
flow
end
@doc """
Helper function to load a active flow from the database and build an object
"""
@spec get_loaded_flow(non_neg_integer, String.t(), map()) :: map()
def get_loaded_flow(organization_id, status, args) do
query =
from f in Flow,
join: fr in assoc(f, :revisions),
where: f.organization_id == ^organization_id,
where: fr.flow_id == f.id,
select: %Flow{
id: f.id,
name: f.name,
uuid: f.uuid,
is_background: f.is_background,
is_active: f.is_active,
keywords: f.keywords,
ignore_keywords: f.ignore_keywords,
respond_other: f.respond_other,
respond_no_response: f.respond_no_response,
organization_id: f.organization_id,
definition: fr.definition,
version: fr.version
}
flow =
query
|> status_clause(status)
|> args_clause(args)
|> Repo.one!()
|> Map.put(:status, status)
if flow.definition["nodes"] == [] do
flow
else
start_node_uuid = start_node(flow.definition["_ui"])
flow.definition
|> clean_definition()
|> process(flow, start_node_uuid)
end
end
@spec start_node(map()) :: Ecto.UUID.t() | nil
defp start_node(json) do
{node_uuid, _top, _left} =
json["nodes"]
|> Enum.reduce(
{nil, 1_000_000, 1_000_000},
fn {node_uuid, node}, {uuid, top, left} ->
pos_top = get_in(node, ["position", "top"])
pos_left = get_in(node, ["position", "left"])
if pos_top < top || (pos_top == top && pos_left < left) do
{node_uuid, pos_top, pos_left}
else
{uuid, top, left}
end
end
)
node_uuid
end
@doc """
Validate a flow and ensures the flow is valid with our internal rule-set
"""
@spec validate_flow(non_neg_integer, String.t(), map()) :: Keyword.t()
def validate_flow(organization_id, status, args) do
organization_id
|> get_loaded_flow(status, args)
|> validate_flow()
end
@spec validate_flow(map()) :: Keyword.t()
defp validate_flow(flow) do
if flow.definition["nodes"] == [] do
[Flow: "Flow is empty"]
else
all_nodes = flow_objects(flow, :node)
flow.nodes
|> Enum.reduce(
[],
&Node.validate(&1, &2, flow)
)
|> dangling_nodes(flow, all_nodes)
|> missing_flow_context_nodes(flow, all_nodes)
end
end
@spec flow_objects(map(), atom()) :: MapSet.t()
defp flow_objects(flow, type) do
flow.uuid_map
|> Enum.filter(fn {_k, v} -> elem(v, 0) == type end)
|> Enum.map(fn {k, _v} -> k end)
|> MapSet.new()
end
@spec dangling_nodes(Keyword.t(), map(), MapSet.t()) :: Keyword.t()
defp dangling_nodes(errors, flow, all_nodes) do
all_exits = flow_objects(flow, :exit)
# the first node is always reachable
reachable_nodes =
all_exits
|> Enum.reduce(
MapSet.new([flow.start_node.uuid]),
fn e, acc ->
{:exit, exit} = flow.uuid_map[e]
MapSet.put(acc, exit.destination_node_uuid)
end
)
|> MapSet.delete(nil)
dangling = MapSet.difference(all_nodes, reachable_nodes)
if MapSet.size(dangling) == 0,
do: errors,
else: [dangling: "Your flow has dangling nodes"] ++ errors
end
@spec missing_flow_context_nodes(Keyword.t(), map(), MapSet.t()) :: Keyword.t()
defp missing_flow_context_nodes(errors, flow, all_nodes) do
flow_context_nodes =
FlowContext
|> where([fc], fc.flow_id == ^flow.id and is_nil(fc.completed_at))
|> select([fc], fc.node_uuid)
|> distinct(true)
|> Repo.all()
|> MapSet.new()
if MapSet.subset?(flow_context_nodes, all_nodes),
do: errors,
else: [flowContext: "Some of your users in the flow have their node deleted"] ++ errors
end
# add the appropriate where clause as needed
@spec args_clause(Ecto.Queryable.t(), map()) :: Ecto.Queryable.t()
defp args_clause(query, %{id: id}),
do: query |> where([f, _fr], f.id == ^id)
defp args_clause(query, %{uuid: uuid}),
do: query |> where([f, _fr], f.uuid == ^uuid)
defp args_clause(query, %{keyword: keyword}),
do: query |> where([f, _fr], ^keyword in f.keywords)
defp args_clause(query, _args), do: query
defp status_clause(query, "published" = status),
do: query |> where([_f, fr], fr.status == ^status)
defp status_clause(query, "draft"),
do: query |> where([_f, fr], fr.revision_number == 0)
end