lib/glific/searches.ex

defmodule Glific.Searches do
  @moduledoc """
  The Searches context.
  """

  import Ecto.Query, warn: false
  require Logger

  alias __MODULE__

  alias Glific.{
    Contacts.Contact,
    Conversations,
    Conversations.Conversation,
    ConversationsGroup,
    Groups,
    Groups.ContactGroup,
    Groups.UserGroup,
    Messages.Message,
    Repo,
    Search.Full,
    Searches.SavedSearch,
    Searches.Search,
    Users.User
  }

  @doc """
  Returns the list of searches.

  ## Examples

      iex> list_saved_searches()
      [%SavedSearch{}, ...]

  """
  @spec list_saved_searches(map()) :: [SavedSearch.t()]
  def list_saved_searches(args),
    do: Repo.list_filter(args, SavedSearch, &Repo.opts_with_label/2, &filter_with/2)

  @doc """
  Returns the count of searches, using the same filter as list_saved_searches
  """
  @spec count_saved_searches(map()) :: integer
  def count_saved_searches(args),
    do: Repo.count_filter(args, SavedSearch, &Repo.filter_with/2)

  @spec filter_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
  defp filter_with(query, filter) do
    query = Repo.filter_with(query, filter)

    Enum.reduce(filter, query, fn
      {:is_reserved, is_reserved}, query ->
        from(q in query, where: q.is_reserved == ^is_reserved)

      _, query ->
        query
    end)
  end

  @doc """
  Gets a single search.

  Raises `Ecto.NoResultsError` if the SavedSearch does not exist.

  ## Examples

      iex> get_saved_search!(123)
      %SavedSearch{}

      iex> get_saved_search!(456)
      ** (Ecto.NoResultsError)

  """
  @spec get_saved_search!(integer) :: SavedSearch.t()
  def get_saved_search!(id), do: Repo.get!(SavedSearch, id)

  @doc """
  Creates a search.

  ## Examples

      iex> create_saved_search(%{field: value})
      {:ok, %SavedSearch{}}

      iex> create_saved_search(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_saved_search(map()) :: {:ok, SavedSearch.t()} | {:error, Ecto.Changeset.t()}
  def create_saved_search(attrs) do
    %SavedSearch{}
    |> SavedSearch.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a search.

  ## Examples

      iex> update_saved_search(search, %{field: new_value})
      {:ok, %SavedSearch{}}

      iex> update_saved_search(search, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_saved_search(SavedSearch.t(), map()) ::
          {:ok, SavedSearch.t()} | {:error, Ecto.Changeset.t()}
  def update_saved_search(%SavedSearch{} = search, attrs) do
    search
    |> SavedSearch.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes a search.

  ## Examples

      iex> delete_saved_search(search)
      {:ok, %SavedSearch{}}

      iex> delete_saved_search(search)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_saved_search(SavedSearch.t()) ::
          {:ok, SavedSearch.t()} | {:error, Ecto.Changeset.t()}
  def delete_saved_search(%SavedSearch{} = search) do
    Repo.delete(search)
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking search changes.

  ## Examples

      iex> change_saved_search(search)
      %Ecto.Changeset{data: %Search{}}

  """
  @spec change_saved_search(SavedSearch.t(), map()) :: Ecto.Changeset.t()
  def change_saved_search(%SavedSearch{} = search, attrs \\ %{}) do
    SavedSearch.changeset(search, attrs)
  end

  @spec filter_active_contacts_of_organization(non_neg_integer() | [non_neg_integer()]) ::
          Ecto.Query.t()
  defp filter_active_contacts_of_organization(contact_id)
       when is_integer(contact_id) do
    filter_active_contacts_of_organization([contact_id])
  end

  defp filter_active_contacts_of_organization(contact_ids)
       when is_list(contact_ids) do
    query = from(c in Contact, as: :c)

    query
    |> where([c], c.id in ^contact_ids)
    |> where([c], c.status != :blocked)
    |> select([c], c.id)
    |> Repo.add_permission(&Searches.add_permission/2)
  end

  @spec status_query(map()) :: Ecto.Query.t()
  defp status_query(opts) do
    query = from(c in Contact, as: :c)

    query
    |> where([c], c.status != :blocked)
    |> select([c], %{id: c.id, last_communication_at: c.last_communication_at})
    |> distinct(true)
    |> add_contact_opts(opts)
    |> Repo.add_permission(&Searches.add_permission/2)
  end

  @spec add_contact_opts(Ecto.Query.t(), map()) :: Ecto.Query.t()
  defp add_contact_opts(query, %{limit: limit, offset: offset}) do
    query
    |> limit(^limit)
    |> offset(^offset)
    |> order_by([c], desc: c.last_communication_at)
  end

  defp add_contact_opts(query, _opts) do
    # always order in descending order of most recent communications
    query
    |> order_by([c], desc: c.last_communication_at)
  end

  # codebeat:disable[ABC]
  @spec filter_status_contacts_of_organization(String.t(), map()) :: Ecto.Query.t()
  defp filter_status_contacts_of_organization("Unread", opts) do
    status_query(opts)
    |> where([c], c.is_org_read == false)
  end

  defp filter_status_contacts_of_organization("Optout", opts) do
    status_query(opts)
    |> where([c], c.status != :blocked)
    |> where([c], not is_nil(c.optout_time))
  end

  defp filter_status_contacts_of_organization("Optin", opts) do
    status_query(opts)
    |> where([c], c.status != :blocked)
    |> where([c], c.optin_status == true)
  end

  defp filter_status_contacts_of_organization("Not replied", opts) do
    status_query(opts)
    |> where([c], c.is_org_replied == false)
  end

  defp filter_status_contacts_of_organization("Not Responded", opts) do
    status_query(opts)
    |> where([c], c.is_contact_replied == false)
  end

  # codebeat:enable[ABC]

  @spec permission_query(User.t()) :: Ecto.Query.t()
  defp permission_query(user) do
    ContactGroup
    |> select([cg], cg.contact_id)
    |> join(:inner, [cg], ug in UserGroup, as: :ug, on: ug.group_id == cg.group_id)
    |> where([cg, ug: ug], ug.user_id == ^user.id)
  end

  @doc """
  Add permission specific to searches, in this case we want to restrict the visibility of
  contact ids where the contact is the main query table
  """
  @spec add_permission(Ecto.Query.t(), User.t()) :: Ecto.Query.t()
  def add_permission(query, user) do
    sub_query = permission_query(user)

    query
    |> where([c: c], c.id == ^user.contact_id or c.id in subquery(sub_query))
  end

  @spec basic_query(map()) :: Ecto.Query.t()
  defp basic_query(args) do
    query = from(c in Contact, as: :c)

    query
    |> add_message_clause(args)
    |> order_by([c: c], desc: c.last_communication_at)
    |> where([c: c], c.status != :blocked)
    |> group_by([c: c], c.id)
    |> Repo.add_permission(&Searches.add_permission/2)
  end

  @spec add_message_clause(Ecto.Query.t(), map()) :: Ecto.Query.t()
  defp add_message_clause(query, %{filter: filters} = _args)
       when is_map(filters) do
    if map_size(filters) > 1,
      do: query |> join(:left, [c: c], m in Message, as: :m, on: c.id == m.contact_id),
      else: query
  end

  defp add_message_clause(query, _args),
    do: query

  # codebeat:enable[ABC]

  # common function to build query between count and search
  # order by the last time there was communication with this contact
  # whether inbound or outbound
  @spec search_query(String.t(), map()) :: Ecto.Query.t()
  defp search_query(term, args) do
    basic_query(args)
    |> add_contact_opts(args.contact_opts)
    |> select([c: c], c.id)
    |> Full.run(term, args)
  end

  @spec do_save_search(map()) :: SavedSearch.t() | nil
  defp do_save_search(%{save_search_input: save_search_input} = args)
       when save_search_input != nil,
       do:
         create_saved_search(%{
           label: args.save_search_input.label,
           shortcode: args.save_search_input.shortcode,
           args: Map.put(args, :save_search_input, nil),
           organization_id: args.filter.organization_id
         })

  defp do_save_search(_args), do: nil

  @spec group_ids(map()) :: list() | nil
  defp group_ids(%{filter: %{include_groups: gids}}), do: gids
  defp group_ids(%{filter: %{ids: gids}}), do: gids
  defp group_ids(%{filter: %{id: gid}}), do: [gid]

  defp group_ids(%{filter: %{group_label: group_label}}) do
    Groups.list_groups(%{filter: %{label: group_label}})
    |> Enum.map(fn group -> group.id end)
  end

  defp group_ids(_), do: nil

  @doc """
  Full text search interface via Postgres
  """
  @spec search(map(), boolean) :: [Conversation.t()] | integer
  def search(args, count \\ false)

  def search(%{filter: %{search_group: true, group_label: group_label}} = args, _count) do
    Logger.info(
      "Searches.Search/2 with : args: #{inspect(args)} group label: #{inspect(group_label)}"
    )

    ConversationsGroup.list_conversations(
      group_ids(args),
      args
    )
  end

  def search(%{filter: %{search_group: true}} = args, _count) do
    Logger.info("Searches.Search/2 with : args: #{inspect(args)}")

    ConversationsGroup.list_conversations(
      group_ids(args),
      args
    )
  end

  # codebeat:disable[ABC]
  def search(args, count) do
    # save the search if needed
    Logger.info("Searches.Search/2 with : args: #{inspect(args)}")
    do_save_search(args)

    args =
      args
      |> check_filter_for_save_search()
      |> update_args_for_count(count)

    is_status? =
      is_nil(args.filter[:id]) &&
        is_nil(args.filter[:ids]) &&
        !is_nil(args.filter[:status])

    contact_ids =
      cond do
        args.filter[:id] != nil ->
          filter_active_contacts_of_organization(args.filter.id)

        args.filter[:ids] != nil ->
          filter_active_contacts_of_organization(args.filter.ids)

        args.filter[:status] != nil ->
          filter_status_contacts_of_organization(args.filter.status, args.contact_opts)

        true ->
          search_query(args.filter[:term], args)
      end
      |> Repo.all(timeout: 60_000)
      |> get_contact_ids(is_status?)

    # if we don't have any contact ids at this stage
    # it means that the user did not have permission
    if contact_ids == [] do
      if count, do: 0, else: []
    else
      put_in(args, [Access.key(:filter, %{}), :ids], contact_ids)
      |> Conversations.list_conversations(count)
    end
  end

  # codebeat:enable[ABC]

  @spec get_contact_ids(list(), boolean | nil) :: list()
  defp get_contact_ids(results, false), do: results

  defp get_contact_ids(results, true) do
    # one set of queries (status queries) return a map for each row
    # where id is a key in the map
    results
    |> Enum.map(fn data -> data.id end)
  end

  @doc """
  Search across multiple tables, and return a multiple context
  result back to the frontend. First step in emulating a whatsapp
  search
  """
  @spec search_multi(String.t(), map()) :: Search.t()
  def search_multi(term, args) do
    Logger.info("Search Multi: term: '#{term}'")
    org_id = Repo.get_organization_id()

    ## We are not showing tags on Glific frontend
    ## so we don't need to make extra query for multi search
    tags = []

    search_item_tasks = [
      Task.async(fn ->
        Repo.put_process_state(org_id)
        get_filtered_contacts(term, args)
      end),
      Task.async(fn ->
        Repo.put_process_state(org_id)
        get_filtered_messages_with_term(term, args)
      end),
      Task.async(fn ->
        Repo.put_process_state(org_id)
        get_filtered_labeled_message(term, args)
      end)
    ]

    [contacts, messages, labels] = Task.await_many(search_item_tasks)

    Search.new(contacts, messages, tags, labels)
  end

  @spec filtered_query(map()) :: Ecto.Query.t()
  defp filtered_query(args) do
    {limit, offset} = {args.message_opts.limit, args.message_opts.offset}
    # always cap out limit to 250, in case frontend sends too many
    limit = min(limit, 250)

    query = from(m in Message, as: :m)

    query
    |> join(:left, [m: m], c in Contact, as: :c, on: m.contact_id == c.id)
    |> where([m, c: c], c.status != :blocked)
    |> Repo.add_permission(&Searches.add_permission/2)
    |> limit(^limit)
    |> offset(^offset)
    |> order_by([c: c], desc: c.last_message_at)
  end

  # codebeat:disable[ABC]
  @spec get_filtered_contacts(String.t(), map()) :: list()
  defp get_filtered_contacts(term, args) do
    {limit, offset} = {args.contact_opts.limit, args.contact_opts.offset}

    # since this revolves around contacts
    args
    |> basic_query()
    |> where([c: c], ilike(c.name, ^"%#{term}%") or ilike(c.phone, ^"%#{term}%"))
    |> limit(^limit)
    |> offset(^offset)
    |> order_by([c: c], desc: c.last_message_at)
    |> Repo.all()
  end

  # codebeat:enable[ABC]

  @spec get_filtered_messages_with_term(String.t(), map()) :: list()
  defp get_filtered_messages_with_term(term, args) do
    filtered_query(args)
    |> where([m: m], ilike(m.body, ^"%#{term}%"))
    |> order_by([m: m], desc: m.message_number)
    |> Repo.all()
  end

  @spec get_filtered_labeled_message(String.t(), map()) :: list()
  defp get_filtered_labeled_message(term, args) do
    filtered_query(args)
    |> where([m: m], ilike(m.flow_label, ^"%#{term}%"))
    |> order_by([m: m], desc: m.message_number)
    |> Repo.all()
  end

  # codebeat:enable[ABC]

  # Add the term if present to the list of args
  @spec add_term(map(), String.t() | nil) :: map()
  defp add_term(args, term) when is_nil(term) or term == "", do: args
  defp add_term(args, term), do: Map.put(args, :term, term)

  @doc """
  Execute a saved search, if term is sent in, it is added to
  the saved search. Either return conversations or count
  """
  @spec saved_search_count(map()) :: [Conversation.t()] | integer
  def saved_search_count(%{id: id} = args),
    do:
      saved_search_args_map(id, args)
      |> search(true)

  @doc """
  Given a jsonb string, typically either from the database, or maybe via graphql
  convert the string keys to atoms
  """
  @spec convert_to_atom(map()) :: map()
  def convert_to_atom(json) do
    Map.new(
      json,
      fn {k, v} ->
        atom_k =
          if is_atom(k),
            do: k,
            else: k |> Macro.underscore() |> Glific.safe_string_to_atom()

        if atom_k in [:filter, :contact_opts, :message_opts],
          do: {atom_k, convert_to_atom(v)},
          else: {atom_k, v}
      end
    )
  end

  # disabling all contact filters to get the count
  @spec update_args_for_count(map(), boolean()) :: map()
  defp update_args_for_count(args, true) do
    args
    |> put_in([:contact_opts, :limit], nil)
    |> put_in([:contact_opts, :offset], nil)
  end

  defp update_args_for_count(args, false), do: args

  # Get all the filters from saved search
  @spec check_filter_for_save_search(map()) :: map()
  defp check_filter_for_save_search(%{filter: %{saved_search_id: id}} = args),
    do: saved_search_args_map(id, args)

  defp check_filter_for_save_search(args), do: args

  # Get the args map from the saved search and override the term
  @spec saved_search_args_map(integer(), map) :: map()
  defp saved_search_args_map(id, args) do
    saved_search = get_saved_search!(id)

    saved_search
    |> Map.get(:args)
    |> add_term(Map.get(args, :term))
    |> convert_to_atom()
    |> put_in([:filter, :organization_id], saved_search.organization_id)
  end
end