lib/glific/flows.ex

defmodule Glific.Flows do
  @moduledoc """
  The Flows context.
  """

  import Ecto.Query, warn: false
  import GlificWeb.Gettext

  require Logger

  alias Glific.{
    AccessControl,
    AccessControl.FlowRole,
    Caches,
    Contacts.Contact,
    Flows.ContactField,
    Groups,
    Groups.Group,
    Partners,
    Repo,
    Templates.InteractiveTemplate,
    Templates.InteractiveTemplates,
    Templates.SessionTemplate
  }

  alias Glific.Flows.{Broadcast, Flow, FlowContext, FlowRevision}

  @doc """
  Returns the list of flows.

  ## Examples

      iex> list_flows()
      [%Flow{}, ...]

  """
  @spec list_flows(map()) :: [Flow.t()]
  def list_flows(args) do
    flows =
      Repo.list_filter_query(args, Flow, &Repo.opts_with_name/2, &filter_with/2)
      |> AccessControl.check_access(:flow)
      |> Repo.all()

    flows
    # get all the flow ids
    |> Enum.map(fn f -> f.id end)
    # get their published_draft dates
    |> get_published_draft_dates()
    # merge with the original list of flows
    |> merge_original(flows)
  end

  @spec merge_original(map(), [Flow.t()]) :: [Flow.t()]
  defp merge_original(dates, flows) do
    Enum.map(flows, fn f -> Map.merge(f, Map.get(dates, f.id, %{})) end)
  end

  @spec get_published_draft_dates([non_neg_integer]) :: map()
  defp get_published_draft_dates(flow_ids) do
    FlowRevision
    |> where([fr], fr.status == "published")
    |> or_where([fr], fr.revision_number == 0)
    |> where([fr], fr.flow_id in ^flow_ids)
    |> select([fr], %{
      id: fr.flow_id,
      status: fr.status,
      last_changed_at: fr.inserted_at
    })
    |> Repo.all()
    |> add_dates()
  end

  defp update_dates(row, value) do
    if row.status == "published",
      do: Map.put(value, :last_published_at, row.last_changed_at),
      else: Map.put(value, :last_changed_at, row.last_changed_at)
  end

  @spec add_dates(list()) :: map()
  defp add_dates(rows) do
    rows
    |> Enum.reduce(%{}, fn row, acc ->
      acc
      |> Map.put_new(row.id, %{})
      |> Map.update!(row.id, &update_dates(row, &1))
    end)
  end

  # appending lastPublishedAt and lastChangedAt field in  the flow
  @spec get_status_flow(Flow.t()) :: map()
  defp get_status_flow(flow) do
    Map.merge(
      flow,
      Map.get(
        get_published_draft_dates([flow.id]),
        flow.id,
        %{}
      )
    )
  end

  @spec filter_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
  defp filter_with(query, filter) do
    query = Repo.filter_with(query, filter)

    Enum.reduce(filter, query, fn
      {:keyword, keyword}, query ->
        from(f in query,
          where: ^keyword in f.keywords
        )

      {:uuid, uuid}, query ->
        from(q in query, where: q.uuid == ^uuid)

      {:status, status}, query ->
        query
        |> where(
          [f],
          f.id in subquery(
            FlowRevision
            |> where([fr], fr.status == ^status)
            |> select([fr], fr.flow_id)
          )
        )

      {:is_active, is_active}, query ->
        from(q in query, where: q.is_active == ^is_active)

      {:is_background, is_background}, query ->
        from(q in query, where: q.is_background == ^is_background)

      {:is_pinned, is_pinned}, query ->
        from(q in query, where: q.is_pinned == ^is_pinned)

      {:name_or_keyword, name_or_keyword}, query ->
        query
        |> where([fr], ilike(fr.name, ^"%#{name_or_keyword}%"))
        |> or_where([fr], ^name_or_keyword in fr.keywords)

      _, query ->
        query
    end)
  end

  @doc """
  Return the count of flows, using the same filter as list_flows
  """
  @spec count_flows(map()) :: integer
  def count_flows(args),
    do: Repo.count_filter(args, Flow, &Repo.filter_with/2)

  @doc """
  Gets a single flow.

  Raises `Ecto.NoResultsError` if the Flow does not exist.

  ## Examples

      iex> get_flow!(123)
      %Flow{}

      iex> get_flow!(456)
      ** (Ecto.NoResultsError)

  """
  @spec get_flow!(integer) :: Flow.t()
  def get_flow!(id) do
    with flow <- Repo.get!(Flow, id) do
      get_status_flow(flow)
    end
  end

  @doc """
  Fetches a single flow

  Returns `Resource not found` if the Interactive Template does not exist.

  ## Examples

      iex> fetch_interactive_template(123, 1)
        {:ok, %Flow{}}

      iex> fetch_interactive_template(456, 1)
        {:error, ["Elixir.Glific.Flows.Flow", "Resource not found"]}

  """
  @spec fetch_flow(integer) :: {:ok, any} | {:error, any}
  def fetch_flow(id),
    do: Repo.fetch_by(Flow, %{id: id})

  @doc """
  Creates a flow.

  ## Examples

      iex> create_flow(%{field: value})
      {:ok, %Flow{}}

      iex> create_flow(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_flow(map()) :: {:ok, Flow.t()} | {:error, Ecto.Changeset.t()}
  def create_flow(attrs) do
    attrs =
      Map.put(attrs, :keywords, sanitize_flow_keywords(attrs[:keywords]))
      |> Map.put_new(:uuid, Ecto.UUID.generate())

    clean_cached_flow_keywords_map(attrs.organization_id)

    with {:ok, flow} <-
           %Flow{}
           |> Flow.changeset(attrs)
           |> Repo.insert() do
      {:ok, _} =
        FlowRevision.create_flow_revision(%{
          definition: FlowRevision.default_definition(flow),
          flow_id: flow.id,
          organization_id: flow.organization_id
        })

      flow = get_status_flow(flow)

      if Map.has_key?(attrs, :add_role_ids),
        do: update_flow_roles(attrs, flow),
        else: {:ok, flow}
    end
  end

  @spec update_flow_roles(map(), Flow.t()) :: {:ok, Flow.t()}
  defp update_flow_roles(attrs, flow) do
    %{access_controls: access_controls} =
      attrs
      |> Map.put(:flow_id, flow.id)
      |> FlowRole.update_flow_roles()

    flow
    |> Map.put(:roles, access_controls)
    |> then(&{:ok, &1})
  end

  @doc """
  Updates a flow.

  ## Examples

      iex> update_flow(flow, %{field: new_value})
      {:ok, %Flow{}}

      iex> update_flow(flow, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_flow(Flow.t(), map()) :: {:ok, Flow.t()} | {:error, Ecto.Changeset.t()}
  def update_flow(%Flow{} = flow, attrs) do
    # first delete the cached flow
    remove_flow_cache(flow)

    attrs =
      attrs
      |> Map.merge(%{keywords: sanitize_flow_keywords(attrs[:keywords] || flow.keywords)})

    with {:ok, updated_flow} <-
           flow
           |> Flow.changeset(attrs)
           |> Repo.update() do
      if Map.has_key?(attrs, :add_role_ids),
        do: update_flow_roles(attrs, updated_flow),
        else: {:ok, updated_flow}
    end
  end

  @doc """
  Deletes a flow.

  ## Examples

      iex> delete_flow(flow)
      {:ok, %Flow{}}

      iex> delete_flow(flow)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_flow(Flow.t()) :: {:ok, Flow.t()} | {:error, Ecto.Changeset.t()}
  def delete_flow(%Flow{} = flow) do
    remove_flow_cache(flow)
    Repo.delete(flow)
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking flow changes.

  ## Examples

      iex> change_flow(flow)
      %Ecto.Changeset{data: %Flow{}}

  """
  @spec change_flow(Flow.t(), map()) :: Ecto.Changeset.t()
  def change_flow(%Flow{} = flow, attrs \\ %{}) do
    Flow.changeset(flow, attrs)
  end

  defp get_user do
    user = Repo.get_current_user()

    {email, name} =
      if user,
        do: {"#{user.phone}@glific.org", user.name},
        else: {"unknown@glific.org", "Unknown Glific User"}

    %{email: email, name: name}
  end

  @doc """
  Get a list of all the revisions based on a flow UUID
  """
  @spec get_flow_revision_list(String.t()) :: %{results: list()}
  def get_flow_revision_list(flow_uuid) do
    results =
      FlowRevision
      |> join(:left, [fr], f in Flow, as: :f, on: f.id == fr.flow_id)
      |> where([fr, f], f.uuid == ^flow_uuid)
      |> select([fr, f], %FlowRevision{
        id: fr.id,
        inserted_at: fr.inserted_at,
        status: fr.status,
        revision_number: fr.revision_number,
        flow_id: fr.flow_id
      })
      |> order_by([fr], desc: fr.id)
      |> limit(15)
      |> Repo.all()

    # Instead of sorting this list we need to fetch the ordered items from the DB
    # We will optimize this more in the v0.4
    asset_list =
      results
      |> Enum.sort(fn fr1, fr2 -> fr1.id >= fr2.id end)
      |> Enum.reduce(
        [],
        fn revision, acc ->
          [
            %{
              user: get_user(),
              created_on: revision.inserted_at,
              id: revision.id,
              version: "13.0.0",
              revision: revision.id,
              status: revision.status
            }
            | acc
          ]
        end
      )

    %{results: asset_list |> Enum.reverse()}
  end

  @doc """
  Get specific flow revision by number
  """
  @spec get_flow_revision(String.t(), String.t()) :: map()
  def get_flow_revision(_flow_uuid, revision_id) do
    revision = Repo.get!(FlowRevision, revision_id)
    %{definition: revision.definition, metadata: %{issues: []}}
  end

  @doc """
  Save new revision for the flow
  """
  @spec create_flow_revision(map()) :: FlowRevision.t()
  def create_flow_revision(definition) do
    {:ok, flow} = Repo.fetch_by(Flow, %{uuid: definition["uuid"]})

    {:ok, revision} =
      %FlowRevision{}
      |> FlowRevision.changeset(%{
        definition: definition,
        flow_id: flow.id,
        organization_id: flow.organization_id
      })
      |> Repo.insert()

    # Now also delete the caches for the draft status, so we can reload
    # note that we don't bother reloading the cache, since we don't expect
    # draft simulator to run often, and drafts are being saved quite often
    Caches.remove(flow.organization_id, keys_to_cache_flow(flow, "draft"))

    revision
  end

  defp check_field(json, field, acc),
    do: if(Map.has_key?(json, field), do: acc, else: [field | acc])

  @doc """
  Check the required fields for all flow objects. If missing, raise an exception
  """
  @spec check_required_fields(map(), [atom()]) :: boolean()
  def check_required_fields(json, required) do
    result =
      Enum.reduce(
        required,
        [],
        fn field, acc ->
          check_field(json, to_string(field), acc)
        end
      )

    if result == [],
      do: true,
      else:
        raise(ArgumentError,
          message:
            "Missing required fields: #{result} with node uuid: #{json["uuid"]} and type: #{json["type"]}"
        )
  end

  @doc """
  A generic json traversal and building the structure for a specific flow schema
  which is an array of objects in the json file. Used for Node/Actions, Node/Exits,
  Router/Cases, and Router/Categories
  """
  @spec build_flow_objects(list(), map(), (map(), map(), any -> {any, map()}), any) ::
          {any, map()}
  def build_flow_objects(json, uuid_map, process_fn, object \\ nil) do
    {objects, uuid_map} =
      Enum.reduce(
        json,
        {[], uuid_map},
        fn object_json, acc ->
          {object, uuid_map} = process_fn.(object_json, elem(acc, 1), object)
          {[object | elem(acc, 0)], uuid_map}
        end
      )

    {Enum.reverse(objects), uuid_map}
  end

  # Get a list of all the keys to cache the flow.
  @spec keys_to_cache_flow(Flow.t(), String.t()) :: list()
  defp keys_to_cache_flow(flow, status),
    do:
      Enum.map(flow.keywords, fn keyword -> {:flow_keyword, keyword, status} end)
      |> Enum.concat([{:flow_uuid, flow.uuid, status}, {:flow_id, flow.id, status}])

  @spec make_args(atom(), any()) :: map()
  defp make_args(key, value) do
    case key do
      :flow_uuid -> %{uuid: value}
      :flow_id -> %{id: value}
      :flow_keyword -> %{keyword: value}
      _ -> raise ArgumentError, message: "Unknown key/value pair: #{key}, #{value}"
    end
  end

  @spec load_flow_cache(tuple()) :: tuple()
  defp load_flow_cache(cache_key) do
    {organization_id, {key, value, status}} = cache_key
    Repo.put_organization_id(organization_id)
    Logger.info("Loading flow cache: #{organization_id}, #{inspect(key)}")
    args = make_args(key, value)
    flow = Flow.get_loaded_flow(organization_id, status, args)
    Caches.set(organization_id, keys_to_cache_flow(flow, status), flow)

    # We are setting the cache in the above statement with multiple keys
    # hence we are asking Cachex to just ignore this aspect. All the other
    # requests will get the cache value sent above
    {:ignore, flow}
  end

  @doc """
  A helper function to interact with the Caching API and get the cached flow.
  It will also set the loaded flow to cache in case it does not exists.
  """
  @spec get_cached_flow(non_neg_integer, {atom(), any(), String.t()}) ::
          {atom, any} | {atom(), String.t()}
  def get_cached_flow(nil, _key), do: {:ok, nil}

  def get_cached_flow(organization_id, key) do
    case Caches.fetch(organization_id, key, &load_flow_cache/1) do
      {:error, error} ->
        Logger.info("Failed to retrieve flow, #{inspect(key)}, #{error}")
        {:error, error}

      {_, flow} ->
        {:ok, flow}
    end
  end

  @doc """
  Update the cached flow from db. This typically happens when the flow definition is updated
  via the UI
  """
  @spec update_cached_flow(Flow.t(), String.t()) :: {atom, any}
  def update_cached_flow(flow, status) do
    Caches.remove(flow.organization_id, keys_to_cache_flow(flow, status))
    get_cached_flow(flow.organization_id, {:flow_uuid, flow.uuid, status})
  end

  @doc """
  Check if a flow has been activated since the time sent as a parameter
  e.g. outOfOffice will check if that flow was activated in the last 24 hours
  daily/weekly will check since start of day/week, etc
  """
  @spec flow_activated(non_neg_integer, non_neg_integer, DateTime.t()) :: boolean
  def flow_activated(flow_id, contact_id, since) do
    results =
      FlowContext
      |> where([fc], fc.flow_id == ^flow_id)
      |> where([fc], fc.contact_id == ^contact_id)
      |> where([fc], fc.inserted_at >= ^since)
      |> Repo.all()

    if results != [],
      do: true,
      else: false
  end

  @doc """
  Update latest flow revision status as published and increment the version
  Update cached flow definition
  """
  @spec publish_flow(Flow.t()) :: {:ok, Flow.t()} | {:error, any()}
  def publish_flow(%Flow{} = flow) do
    Logger.info("Published Flow: flow_id: '#{flow.id}'")

    errors = Flow.validate_flow(flow.organization_id, "draft", %{id: flow.id})
    result = do_publish_flow(flow)

    cond do
      # if validate and published both worked
      errors == [] && elem(result, 0) == :ok ->
        {:ok, flow}

      # we had an error saving to the DB
      elem(result, 0) == :error ->
        Logger.info("error while publishing the flow. #{inspect(result)}")
        result

      # We had an error validating the flow
      true ->
        {:errors, format_flow_errors(errors)}
    end
  end

  @spec do_publish_flow(Flow.t()) :: {:ok, Flow.t()} | {:error, any()}
  defp do_publish_flow(%Flow{} = flow) do
    last_version = get_last_version_and_update_old_revisions(flow)
    ## if invalid flow then return the {:error, array} otherwise move forward
    {:ok, latest_revision} = Repo.fetch_by(FlowRevision, %{flow_id: flow.id, revision_number: 0})

    result =
      latest_revision
      |> FlowRevision.changeset(%{status: "published", version: last_version + 1})
      |> Repo.update()

    if elem(result, 0) == :ok,
      do: update_cached_flow(flow, "published")

    result
  end

  @spec format_flow_errors(list()) :: list()
  defp format_flow_errors(errors) when is_list(errors) do
    ## we can think about the warning based on keys
    Enum.reduce(errors, [], fn error, acc ->
      [%{key: elem(error, 0), message: elem(error, 1)} | acc]
    end)
  end

  # Get version of last published flow revision
  # Archive the last published flow revision
  @spec get_last_version_and_update_old_revisions(Flow.t()) :: integer
  defp get_last_version_and_update_old_revisions(flow) do
    FlowRevision
    |> Repo.fetch_by(%{flow_id: flow.id, status: "published"})
    |> case do
      {:ok, last_published_revision} ->
        {:ok, _} =
          last_published_revision
          |> FlowRevision.changeset(%{status: "archived"})
          |> Repo.update()

        delete_old_draft_flow_revisions(flow, last_published_revision)

        last_published_revision.version

      {:error, _} ->
        0
    end
  end

  # Delete all old draft flow revisions,
  # except the ones which are created after the last archived flow revision
  @spec delete_old_draft_flow_revisions(Flow.t(), FlowRevision.t()) :: {integer(), nil | [term()]}
  defp delete_old_draft_flow_revisions(flow, old_published_revision) do
    FlowRevision
    |> where([fr], fr.flow_id == ^flow.id)
    |> where([fr], fr.id < ^old_published_revision.id)
    |> where([fr], fr.status == "draft")
    |> Repo.delete_all()
  end

  @status "published"

  @doc """
  Start flow for a contact and cache the result
  """
  @spec start_contact_flow(Flow.t() | integer, Contact.t(), map()) ::
          {:ok, Flow.t()} | {:error, String.t()}

  def start_contact_flow(f, c, default_results \\ %{})

  def start_contact_flow(flow_id, %Contact{} = contact, default_results)
      when is_integer(flow_id) do
    case get_cached_flow(contact.organization_id, {:flow_id, flow_id, @status}) do
      {:ok, flow} ->
        process_contact_flow([contact], flow, default_results)

      {:error, _error} ->
        {:error, ["Flow", dgettext("errors", "Flow not found")]}
    end
  end

  def start_contact_flow(%Flow{} = flow, %Contact{} = contact, default_results),
    do: start_contact_flow(flow.id, contact, default_results)

  @spec process_contact_flow(list(), Flow.t(), map()) :: {:ok, Flow.t()}
  defp process_contact_flow(contacts, flow, default_results) do
    if flow.is_active do
      Broadcast.broadcast_contacts(flow, contacts, default_results)
      {:ok, flow}
    else
      {:error, ["Flow", dgettext("errors", "Flow is not active")]}
    end
  end

  @doc """
  Start flow for contacts of a group
  """
  @spec start_group_flow(Flow.t(), Group.t(), map()) :: {:ok, Flow.t()}
  def start_group_flow(flow, group, default_results \\ %{}) do
    # the flow returned is the expanded version
    {:ok, flow} = get_cached_flow(group.organization_id, {:flow_id, flow.id, @status})
    Broadcast.broadcast_flow_to_group(flow, group, default_results)
    {:ok, flow}
  end

  @doc """
  Make a copy of a flow
  """
  @spec copy_flow(Flow.t(), map()) :: {:ok, Flow.t()} | {:error, String.t()}
  def copy_flow(flow, attrs) do
    attrs =
      attrs
      |> Map.merge(%{
        version_number: flow.version_number,
        flow_type: flow.flow_type,
        organization_id: flow.organization_id,
        uuid: Ecto.UUID.generate()
      })

    with {:ok, flow_copy} <-
           %Flow{}
           |> Flow.changeset(attrs)
           |> Repo.insert() do
      Glific.State.reset()
      copy_flow_revision(flow, flow_copy)

      {:ok, flow_copy}
    end
  end

  @spec copy_flow_revision(Flow.t(), Flow.t()) :: {:ok, FlowRevision.t()} | {:error, String.t()}
  defp copy_flow_revision(flow, flow_copy) do
    with {:ok, latest_flow_revision} <-
           Repo.fetch_by(FlowRevision, %{flow_id: flow.id, revision_number: 0}) do
      definition_copy =
        latest_flow_revision.definition
        |> Map.merge(%{"uuid" => flow_copy.uuid})

      {:ok, _} =
        FlowRevision.create_flow_revision(%{
          definition: definition_copy,
          flow_id: flow_copy.id,
          organization_id: flow_copy.organization_id
        })
    end
  end

  @doc """
  Create a map of keywords that map to flow ids for each
  active organization. Also cache this value including the outOfOffice
  shortcode
  """
  @spec flow_keywords_map(non_neg_integer) :: map()
  def flow_keywords_map(organization_id) do
    case Caches.fetch(organization_id, "flow_keywords_map", &load_flow_keywords_map/1) do
      {:error, error} ->
        raise(ArgumentError,
          message: "Failed to retrieve flow_keywords_map, #{inspect(organization_id)}, #{error}"
        )

      {_, value} ->
        value
    end
  end

  @spec update_flow_keyword_map(map(), String.t(), String.t(), non_neg_integer) :: map()
  defp update_flow_keyword_map(map, status, keyword, flow_id) do
    map
    |> Map.update(
      status,
      %{keyword => flow_id},
      fn m -> Map.put(m, keyword, flow_id) end
    )
  end

  @spec add_flow_keyword_map(map(), map()) :: map()
  defp add_flow_keyword_map(flow, acc) do
    Enum.reduce(
      flow.keywords,
      acc,
      fn keyword, acc ->
        keyword = Glific.string_clean(keyword)
        acc = update_flow_keyword_map(acc, flow.status, keyword, flow.id)

        # always add to draft status if published
        if flow.status == "published",
          do: update_flow_keyword_map(acc, "draft", keyword, flow.id),
          else: acc
      end
    )
  end

  @spec load_flow_keywords_map(tuple()) :: tuple()
  defp load_flow_keywords_map(cache_key) do
    # this is of the form {organization_id, "flow_keywords_map}"
    # we want the organization_id
    organization_id = cache_key |> elem(0)
    organization = Partners.organization(organization_id)

    keyword_map =
      Flow
      |> where([f], f.organization_id == ^organization_id)
      |> where([f], f.is_active == true)
      |> join(:inner, [f], fr in FlowRevision, on: f.id == fr.flow_id)
      |> select([f, fr], %{keywords: f.keywords, id: f.id, status: fr.status})
      # the revisions table is potentially large, so we really want just a few rows from
      # it, hence this where clause
      |> where([f, fr], fr.status == "published" or fr.revision_number == 0)
      |> Repo.all(skip_organization_id: true)
      |> Enum.reduce(
        # create empty arrays always, so all map operations works
        # and wont throw an exception of "expected map, got nil"
        %{"published" => %{}, "draft" => %{}},
        fn flow, acc -> add_flow_keyword_map(flow, acc) end
      )
      |> add_default_flows(organization.out_of_office)
      |> Map.put("org_default_new_contact", organization.newcontact_flow_id)

    {:commit, keyword_map}
  end

  @spec add_default_flows(map(), map()) :: map()
  defp add_default_flows(keyword_map, out_of_office),
    do:
      keyword_map
      |> do_add_default_flow(out_of_office.enabled, "outofoffice", out_of_office.flow_id)
      |> do_add_default_flow(out_of_office.enabled, "defaultflow", out_of_office.default_flow_id)

  @spec do_add_default_flow(map(), boolean(), String.t(), nil | non_neg_integer()) :: map()
  defp do_add_default_flow(keyword_map, _enabled, _flow_name, nil), do: keyword_map

  defp do_add_default_flow(keyword_map, true, flow_name, flow_id),
    do:
      keyword_map
      |> update_flow_keyword_map("published", flow_name, flow_id)
      |> update_flow_keyword_map("draft", flow_name, flow_id)

  defp do_add_default_flow(keyword_map, _, _, _), do: keyword_map

  @doc false
  @spec clean_cached_flow_keywords_map(non_neg_integer) :: list()
  defp clean_cached_flow_keywords_map(organization_id) do
    Glific.State.reset()
    Caches.remove(organization_id, ["flow_keywords_map"])
  end

  @spec sanitize_flow_keywords(list) :: list()
  defp sanitize_flow_keywords(keywords) when is_list(keywords),
    do: Enum.map(keywords, &Glific.string_clean(&1))

  defp sanitize_flow_keywords(keywords), do: keywords

  @optin_flow_keyword "optin"

  @doc """
  Check if the flow is optin flow. Currently we are
  checking based on the optin keyword only.
  """
  @spec is_optin_flow?(Flow.t()) :: boolean()
  def is_optin_flow?(nil), do: false
  def is_optin_flow?(flow), do: Enum.member?(flow.keywords, @optin_flow_keyword)

  @doc """
  import a flow from json
  """
  @spec import_flow(map(), non_neg_integer()) :: boolean()
  def import_flow(import_flow, organization_id) do
    interactive_template_list = import_interactive_templates(import_flow, organization_id)

    import_flow_list =
      Enum.map(import_flow["flows"], fn flow_revision ->
        with {:ok, flow} <-
               create_flow(%{
                 name: flow_revision["definition"]["name"],
                 # we are reusing existing UUIDs against the spirit of UUIDs
                 # however this allows us to support sub flows
                 uuid: flow_revision["definition"]["uuid"],
                 keywords: flow_revision["keywords"],
                 organization_id: organization_id
               }),
             {:ok, _flow_revision} <-
               FlowRevision.create_flow_revision(%{
                 definition:
                   clean_flow_definition(flow_revision["definition"], interactive_template_list),
                 flow_id: flow.id,
                 organization_id: flow.organization_id
               }) do
          import_contact_field(import_flow, organization_id)
          import_groups(import_flow, organization_id)
          true
        else
          _ -> false
        end
      end)

    !Enum.member?(import_flow_list, false)
  end

  @spec clean_flow_definition(map(), list()) :: map()
  defp clean_flow_definition(definition, interactive_template_list) do
    nodes =
      definition
      |> Map.get("nodes", [])
      |> Enum.reduce([], &(&2 ++ process_node_actions(&1, interactive_template_list)))

    put_in(definition, ["nodes"], nodes)
  end

  @spec process_node_actions(map(), list()) :: list()
  defp process_node_actions(%{"actions" => actions} = node, _interactive_template_list)
       when actions == [],
       do: [node]

  defp process_node_actions(%{"actions" => actions} = node, interactive_template_list) do
    Enum.reduce(actions, [], fn action, acc ->
      template_uuid = get_in(action, ["templating", "template", "uuid"])

      cond do
        action["type"] == "send_msg" ->
          # checking if the imported template is present in database
          template_uuid_list = SessionTemplate |> select([st], st.uuid) |> Repo.all()

          with true <- Map.has_key?(action, "templating"),
               false <- template_uuid in template_uuid_list do
            # update the node if template uuid in the node is not present in DB
            action =
              action |> Map.delete("templating") |> put_in(["text"], "Update this with template")

            node = put_in(node, ["actions"], [action])
            acc ++ [node]
          else
            _ -> acc ++ [node]
          end

        action["type"] == "send_interactive_msg" ->
          {:ok, action_id} = Glific.parse_maybe_integer(action["id"])

          template_id = find_interactive_template(interactive_template_list, action_id)

          node = put_in(node, ["actions"], [Map.put(action, "id", template_id)])
          acc ++ [node]

        true ->
          acc ++ [node]
      end
    end)
  end

  @spec find_interactive_template(list(), integer | nil) :: String.t()
  defp find_interactive_template(interactive_template_list, action_id) do
    {_source_id, template_id, _interactive_template_label} =
      Enum.find(interactive_template_list, fn {source_id, _template_id,
                                               _interactive_template_label} ->
        source_id == action_id
      end)

    template_id
  end

  defp import_contact_field(import_flow, organization_id) do
    import_flow["contact_field"]
    |> Enum.each(fn contact_field ->
      %{
        name: contact_field,
        organization_id: organization_id,
        shortcode: contact_field
      }
      |> ContactField.create_contact_field()
    end)
  end

  defp import_groups(import_flow, organization_id) do
    import_flow["collections"]
    |> Enum.each(fn collection ->
      Groups.get_or_create_group_by_label(collection, organization_id)
    end)
  end

  defp import_interactive_templates(import_flow, organization_id) do
    import_flow["interactive_templates"]
    |> Enum.reduce([], fn interactive_template, acc ->
      Repo.fetch_by(InteractiveTemplate, %{label: interactive_template["label"]})
      |> case do
        {:ok, db_interactive_template} ->
          acc ++
            [
              {interactive_template["source_id"], db_interactive_template.id,
               db_interactive_template.label}
            ]

        _ ->
          {:ok, new_interactive_template} =
            interactive_template
            |> Map.delete("source_id")
            |> Map.put("organization_id", organization_id)
            |> InteractiveTemplates.create_interactive_template()

          acc ++
            [
              {interactive_template["source_id"], new_interactive_template.id,
               new_interactive_template.label}
            ]
      end
    end)
  end

  @doc """
  Generate a json map with all the flows related fields.
  """
  @spec export_flow(non_neg_integer()) :: map()
  def export_flow(flow_id) do
    flow = Repo.get!(Flow, flow_id)

    export_flow_details(
      flow.uuid,
      %{"flows" => [], "contact_field" => [], "collections" => [], "interactive_templates" => []}
    )
  end

  @doc """
  Process sub flows and check if there is more sub flows in it.
  """
  @spec export_flow_details(String.t(), map()) :: map()
  def export_flow_details(flow_uuid, results) do
    if Enum.any?(results["flows"], fn flow -> Map.get(flow.definition, "uuid") == flow_uuid end) do
      results
    else
      flow = Repo.get_by(Flow, %{uuid: flow_uuid})

      # definition can be nil, hence assigning empty map if so
      # Issue #2173
      definition =
        (get_latest_definition(flow_uuid) || %{})
        |> Map.put("name", flow.name)

      results =
        Map.put(
          results,
          "flows",
          results["flows"] ++ [%{definition: definition, keywords: flow.keywords}]
        )
        |> Map.put(
          "contact_field",
          results["contact_field"] ++ export_contact_fields(definition)
        )
        |> Map.put(
          "collections",
          results["collections"] ++ export_collections(definition)
        )
        |> Map.put(
          "interactive_templates",
          results["interactive_templates"] ++ export_interactive_templates(definition)
        )

      ## here we can export more details like fields, triggers, groups and all.

      definition
      |> Map.get("nodes", [])
      |> get_sub_flows()
      |> Enum.reduce(results, &export_flow_details(&1["uuid"], &2))
    end
  end

  @spec export_collections(map()) :: list()
  defp export_collections(definition) do
    definition
    |> Map.get("nodes", [])
    |> Enum.reduce([], &(&2 ++ do_export_collections(&1)))
  end

  @spec do_export_collections(map()) :: list()
  defp do_export_collections(%{"actions" => actions}) when actions == [], do: []

  defp do_export_collections(%{"actions" => actions}) do
    action = actions |> hd

    if action["type"] == "add_contact_groups",
      do: action["groups"] |> Enum.reduce([], &(&2 ++ [&1["name"]])),
      else: []
  end

  @spec export_contact_fields(map()) :: list()
  defp export_contact_fields(definition) do
    definition
    |> Map.get("nodes", [])
    |> Enum.reduce([], &(&2 ++ do_export_contact_fields(&1)))
  end

  @spec do_export_contact_fields(map()) :: list()
  defp do_export_contact_fields(%{"actions" => actions}) when actions == [], do: []

  defp do_export_contact_fields(%{"actions" => actions}) do
    action = actions |> hd
    if action["type"] == "set_contact_field", do: [action["field"]["key"]], else: []
  end

  @spec export_interactive_templates(map()) :: list()
  defp export_interactive_templates(definition) do
    definition
    |> Map.get("nodes", [])
    |> Enum.reduce([], &(&2 ++ do_export_interactive_templates(&1)))
    |> fetch_interactive_templates_from_db()
  end

  @spec do_export_interactive_templates(map()) :: list()
  defp do_export_interactive_templates(%{"actions" => actions}) when actions == [], do: []

  defp do_export_interactive_templates(%{"actions" => actions}) do
    actions
    |> Enum.reduce([], fn action, acc ->
      if action["type"] == "send_interactive_msg", do: acc ++ [action["id"]], else: acc
    end)
  end

  defp fetch_interactive_templates_from_db(ids) do
    InteractiveTemplate
    |> where([it], it.id in ^ids)
    |> select([it], %{
      source_id: it.id,
      label: it.label,
      type: it.type,
      interactive_content: it.interactive_content,
      translations: it.translations,
      language_id: it.language_id,
      send_with_title: it.send_with_title
    })
    |> Repo.all()
  end

  @doc """
    Extract all the sub flows form the parent flow definition.
  """
  @spec get_sub_flows(list()) :: list()
  def get_sub_flows(nodes),
    do: Enum.reduce(nodes, [], &do_get_sub_flows(&1, &2))

  @spec do_get_sub_flows(map(), list()) :: list()
  defp do_get_sub_flows(%{"actions" => actions}, list),
    do:
      Enum.reduce(actions, list, fn action, acc ->
        if action["type"] == "enter_flow" and action["flow"]["name"] != "Expression",
          do: acc ++ [action["flow"]],
          else: acc
      end)

  ## Get latest flow definition to export. There is one more function with the same name in
  ## Glific.Flows.flow but that gives us the definition without UI placements.
  @spec get_latest_definition(String.t()) :: map() | nil
  defp get_latest_definition(flow_uuid) do
    FlowRevision
    |> select([fr], fr.definition)
    |> join(:inner, [fr], fl in Flow, on: fr.flow_id == fl.id)
    |> where([fr, fl], fr.revision_number == 0 and fl.uuid == ^flow_uuid)
    |> Repo.one()
  end

  @doc """
  Check if the type is a media type we handle in flows
  """
  @spec is_media_type?(atom()) :: boolean()
  def is_media_type?(type),
    do: type in [:audio, :document, :image, :video]

  @doc """
   Terminate all flows for a contact
  """
  @spec terminate_contact_flows?(non_neg_integer) :: :ok
  def terminate_contact_flows?(contact_id) do
    FlowContext.mark_flows_complete(contact_id, false, source: "terminate_contact_flows")
    :ok
  end

  @spec remove_flow_cache(Flow.t()) :: :ok
  defp remove_flow_cache(flow) do
    Caches.remove(flow.organization_id, keys_to_cache_flow(flow, "draft"))
    Caches.remove(flow.organization_id, keys_to_cache_flow(flow, "published"))
    clean_cached_flow_keywords_map(flow.organization_id)
    :ok
  end
end