lib/glific/messages.ex

defmodule Glific.Messages do
  @moduledoc """
  The Messages context.
  """
  import Ecto.Query, warn: false
  import GlificWeb.Gettext

  require Logger

  alias Glific.{
    Caches,
    Communications,
    Contacts,
    Contacts.Contact,
    Conversations.Conversation,
    Flows.Broadcast,
    Flows.FlowContext,
    Flows.MessageVarParser,
    Groups.Group,
    Messages.Message,
    Messages.MessageMedia,
    Notifications,
    Partners,
    Repo,
    Tags,
    Tags.MessageTag,
    Tags.Tag,
    Templates,
    Templates.InteractiveTemplate,
    Templates.InteractiveTemplates,
    Templates.SessionTemplate
  }

  @doc """
  Returns the list of filtered messages.

  ## Examples

      iex> list_messages(map())
      [%Message{}, ...]

  """
  @spec list_messages(map()) :: [Message.t()]
  def list_messages(args) do
    args
    |> Glific.add_limit()
    |> Repo.list_filter(Message, &Repo.opts_with_body/2, &filter_with/2)
    |> Enum.map(&put_clean_body/1)
  end

  @doc """
  Return the count of messages, using the same filter as list_messages
  """
  @spec count_messages(map()) :: integer
  def count_messages(args),
    do: Repo.count_filter(args, Message, &filter_with/2)

  # codebeat:disable[ABC, LOC]
  @spec filter_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
  defp filter_with(query, filter) do
    query = Repo.filter_with(query, filter)

    Enum.reduce(filter, query, fn
      {:sender, sender}, query ->
        from(q in query,
          join: c in assoc(q, :sender),
          where: ilike(c.name, ^"%#{sender}%")
        )

      {:receiver, receiver}, query ->
        from(q in query,
          join: c in assoc(q, :receiver),
          where: ilike(c.name, ^"%#{receiver}%")
        )

      {:contact, contact}, query ->
        from(q in query,
          join: c in assoc(q, :contact),
          where: ilike(c.name, ^"%#{contact}%")
        )

      {:either, phone}, query ->
        from(q in query,
          join: c in assoc(q, :contact),
          where: ilike(c.phone, ^"%#{phone}%")
        )

      {:user, user}, query ->
        from(q in query,
          join: c in assoc(q, :user),
          where: ilike(c.name, ^"%#{user}%")
        )

      {:tags_included, tags_included}, query ->
        message_ids =
          MessageTag
          |> where([p], p.tag_id in ^tags_included)
          |> select([p], p.message_id)
          |> Repo.all()

        query |> where([m], m.id in ^message_ids)

      {:tags_excluded, tags_excluded}, query ->
        message_ids =
          MessageTag
          |> where([p], p.tag_id in ^tags_excluded)
          |> select([p], p.message_id)
          |> Repo.all()

        query |> where([m], m.id not in ^message_ids)

      {:bsp_status, bsp_status}, query ->
        from(q in query, where: q.bsp_status == ^bsp_status)

      {:flow_id, flow_id}, query ->
        from(q in query, where: q.flow_id == ^flow_id)

      _, query ->
        query
    end)
  end

  @doc """
  Gets a single message.

  Raises `Ecto.NoResultsError` if the Message does not exist.

  ## Examples

      iex> get_message!(123)
      %Message{}

      iex> get_message!(456)
      ** (Ecto.NoResultsError)

  """
  @spec get_message!(integer) :: Message.t()
  def get_message!(id), do: Repo.get!(Message, id) |> put_clean_body()

  @doc """
  Creates a message.

  ## Examples

      iex> create_message(%{field: value})
      {:ok, %Message{}}

      iex> create_message(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_message(map()) :: {:ok, Message.t()} | {:error, Ecto.Changeset.t()}
  def create_message(attrs) do
    attrs =
      %{flow: :inbound, status: :enqueued}
      |> Map.merge(attrs)
      |> parse_message_vars()
      |> put_contact_id()
      |> put_clean_body()

    %Message{}
    |> Message.changeset(attrs)
    |> Repo.insert(
      returning: [:message_number, :session_uuid, :context_message_id],
      timeout: 45_000
    )
  end

  @spec put_contact_id(map()) :: map()
  defp put_contact_id(%{flow: :inbound} = attrs),
    do: Map.put(attrs, :contact_id, attrs[:sender_id])

  defp put_contact_id(%{flow: :outbound} = attrs),
    do: Map.put(attrs, :contact_id, attrs[:receiver_id])

  defp put_contact_id(attrs), do: attrs

  @spec put_clean_body(map()) :: map()
  defp put_clean_body(%{body: body} = attrs),
    do: Map.put(attrs, :clean_body, Glific.string_clean(body))

  defp put_clean_body(attrs), do: attrs

  @doc """
  Updates a message.

  ## Examples

      iex> update_message(message, %{field: new_value})
      {:ok, %Message{}}

      iex> update_message(message, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_message(Message.t(), map()) :: {:ok, Message.t()} | {:error, Ecto.Changeset.t()}
  def update_message(%Message{} = message, attrs) do
    message
    |> Message.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes a message.

  ## Examples

      iex> delete_message(message)
      {:ok, %Message{}}

      iex> delete_message(message)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_message(Message.t()) :: {:ok, Message.t()} | {:error, Ecto.Changeset.t()}
  def delete_message(%Message{} = message) do
    Repo.delete(message)
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking message changes.

  ## Examples

      iex> change_message(message)
      %Ecto.Changeset{data: %Message{}}

  """
  @spec change_message(Message.t(), map()) :: Ecto.Changeset.t()
  def change_message(%Message{} = message, attrs \\ %{}) do
    Message.changeset(message, attrs)
  end

  @doc false
  @spec create_and_send_message(map()) :: {:ok, Message.t()} | {:error, atom() | String.t()}
  def create_and_send_message(%{body: body, type: :text} = _attrs) when body in ["", nil],
    do: {:error, "Could not send message with empty body"}

  def create_and_send_message(attrs) do
    contact = Contacts.get_contact!(attrs.receiver_id)
    attrs = Map.put(attrs, :receiver, contact)

    ## we need to clean this code in the future.
    attrs = check_for_interactive(attrs, contact.language_id)

    check_for_hsm_message(attrs, contact)
  end

  @spec check_for_interactive(map(), non_neg_integer()) :: map()
  defp check_for_interactive(
         %{flow_id: flow_id, interactive_template_id: _interactive_template_id} = attrs,
         _language_id
       )
       when flow_id not in [nil, ""],
       do: attrs

  defp check_for_interactive(
         %{interactive_template_id: interactive_template_id} = attrs,
         language_id
       ) do
    {:ok, interactive_template} = Repo.fetch(InteractiveTemplate, interactive_template_id)

    # Check if this is coming form a flow
    {interactive_content, body, media_id} =
      if attrs[:interactive_content] in [nil, %{}] do
        InteractiveTemplates.formatted_data(interactive_template, language_id)
      else
        {attrs[:interactive_content], attrs[:body], attrs[:media_id]}
      end

    Map.merge(attrs, %{
      body: body,
      interactive_content: interactive_content,
      type: interactive_content["type"],
      media_id: media_id
    })
  end

  defp check_for_interactive(attrs, _language_id), do: attrs

  @doc false
  @spec check_for_hsm_message(map(), Contact.t()) ::
          {:ok, Message.t()} | {:error, atom() | String.t()}
  defp check_for_hsm_message(attrs, contact) do
    if Map.has_key?(attrs, :template_id) && Map.get(attrs, :is_hsm) do
      attrs
      ## We need to fix this inconsistency in the parameter and params name
      |> Map.put(:parameters, attrs.params)
      |> create_and_send_hsm_message()
    else
      Contacts.can_send_message_to?(contact, Map.get(attrs, :is_hsm, false), attrs)
      |> do_send_message(attrs)
    end
  end

  @doc false
  @spec do_send_message({:ok | :error, any()}, map()) ::
          {:ok, Message.t()} | {:error, atom() | String.t()}
  defp do_send_message(
         {:ok, _} = _is_valid_contact,
         %{organization_id: organization_id} = attrs
       ) do
    {:ok, message} =
      attrs
      |> Map.put_new(:type, :text)
      |> Map.merge(%{
        sender_id: Partners.organization_contact_id(organization_id),
        flow: :outbound
      })
      |> create_message()

    Communications.Message.send_message(message, attrs)
  end

  defp do_send_message({:error, reason}, attrs) do
    notify(attrs, reason)
    {:error, reason}
  end

  @doc """
  Create and insert a notification for this error when sending a message.
  Add as much detail, so we can reverse-engineer why the sending failed.
  """
  @spec notify(map(), String.t()) :: nil
  def notify(attrs, reason \\ "Cannot send the message to the contact.") do
    contact =
      if is_nil(Map.get(attrs, :receiver, nil)),
        do: Contacts.get_contact!(attrs.receiver_id),
        else: attrs.receiver

    Logger.error(
      "Could not send message: contact: #{contact.id}, message: '#{Map.get(attrs, :id)}', reason: #{reason}"
    )

    {:ok, _} =
      Notifications.create_notification(%{
        category: "Message",
        message: reason,
        severity: Notifications.types().warning,
        organization_id: attrs.organization_id,
        entity: %{
          id: contact.id,
          name: contact.name,
          phone: contact.phone,
          bsp_status: contact.bsp_status,
          status: contact.status,
          last_message_at: contact.last_message_at,
          is_hsm: Map.get(attrs, :is_hsm),
          flow_id: Map.get(attrs, :flow_id),
          group_id: Map.get(attrs, :group_id)
        }
      })

    nil
  end

  @spec parse_message_vars(map()) :: map()
  defp parse_message_vars(attrs) do
    message_vars =
      if is_integer(attrs[:receiver_id]) or is_binary(attrs[:receiver_id]),
        do: %{"contact" => Contacts.get_contact_field_map(attrs.receiver_id)},
        else: %{}

    parse_text_message_fields(attrs, message_vars)
    |> parse_media_message_fields(message_vars)
    |> parse_interactive_message_fields(message_vars)
  end

  @spec parse_text_message_fields(map(), map()) :: map()
  defp parse_text_message_fields(attrs, message_vars) do
    if is_binary(attrs[:body]) do
      {:ok, msg_uuid} = Ecto.UUID.cast(:crypto.hash(:md5, attrs.body))

      attrs
      |> Map.merge(%{
        uuid: attrs[:uuid] || msg_uuid,
        body: MessageVarParser.parse(attrs.body, message_vars)
      })
    else
      attrs
    end
  end

  @spec parse_media_message_fields(map(), map()) :: map()
  defp parse_media_message_fields(attrs, message_vars) do
    ## if message media is present change the variables in caption
    if is_integer(attrs[:media_id]) or is_binary(attrs[:media_id]) do
      message_media = get_message_media!(attrs.media_id)

      message_media
      |> update_message_media(%{
        caption: MessageVarParser.parse(message_media.caption, message_vars)
      })
    end

    attrs
  end

  @spec parse_interactive_message_fields(map(), map()) :: map()
  defp parse_interactive_message_fields(attrs, message_vars) do
    attrs[:interactive_content]
    |> MessageVarParser.parse_map(message_vars)
    |> InteractiveTemplates.clean_template_title()
    |> then(&Map.merge(attrs, %{interactive_content: &1}))
  end

  @doc false
  @spec create_and_send_otp_verification_message(Contact.t(), String.t()) ::
          {:ok, Message.t()}
  def create_and_send_otp_verification_message(contact, otp) do
    case Contacts.can_send_message_to?(contact, false) do
      {:ok, _} -> create_and_send_otp_session_message(contact, otp)
      _ -> create_and_send_otp_template_message(contact, otp)
    end
  end

  @doc false
  @spec create_and_send_otp_session_message(Contact.t(), String.t()) ::
          {:ok, Message.t()}
  def create_and_send_otp_session_message(contact, otp) do
    ttl = Application.get_env(:passwordless_auth, :verification_code_ttl) |> div(60)

    body = "Your OTP for Registration is #{otp}. This is valid for #{ttl} minutes."
    send_default_message(contact, body)
  end

  @doc false
  @spec create_and_send_otp_template_message(Contact.t(), String.t()) ::
          {:ok, Message.t()}
  def create_and_send_otp_template_message(contact, otp) do
    # fetch session template by shortcode "verification"
    {:ok, session_template} =
      Repo.fetch_by(SessionTemplate, %{
        shortcode: "common_otp",
        is_hsm: true,
        organization_id: contact.organization_id
      })

    ttl = Application.get_env(:passwordless_auth, :verification_code_ttl) |> div(60)

    parameters = [
      "Registration",
      otp,
      "#{ttl} minutes"
    ]

    %{template_id: session_template.id, receiver_id: contact.id, parameters: parameters}
    |> create_and_send_hsm_message()
  end

  @doc """
  Send a session template to the specific contact. This is typically used in automation
  """
  @spec create_and_send_session_template(String.t(), integer) :: {:ok, Message.t()}
  def create_and_send_session_template(template_id, receiver_id) when is_binary(template_id),
    do: create_and_send_session_template(String.to_integer(template_id), receiver_id)

  @spec create_and_send_session_template(integer, integer) :: {:ok, Message.t()}
  def create_and_send_session_template(template_id, receiver_id) when is_integer(template_id) do
    {:ok, session_template} = Repo.fetch(SessionTemplate, template_id)

    create_and_send_session_template(
      session_template,
      %{receiver_id: receiver_id}
    )
  end

  @spec create_and_send_session_template(SessionTemplate.t() | map(), map()) :: {:ok, Message.t()}
  def create_and_send_session_template(session_template, args) do
    message_params = %{
      body: session_template.body,
      type: session_template.type,
      template_id: session_template.id,
      media_id: session_template.message_media_id,
      sender_id: Partners.organization_contact_id(session_template.organization_id),
      receiver_id: args[:receiver_id],
      send_at: args[:send_at],
      flow_id: args[:flow_id],
      message_broadcast_id: args[:message_broadcast_id],
      uuid: args[:uuid],
      is_hsm: Map.get(args, :is_hsm, false),
      flow_label: args[:flow_label],
      organization_id: session_template.organization_id,
      params: args[:params]
    }

    create_and_send_message(message_params)
  end

  @spec fetch_language_specific_template(map(), integer()) :: tuple()
  defp fetch_language_specific_template(session_template, id) do
    contact = Contacts.get_contact!(id)

    with true <- session_template.language_id != contact.language_id,
         translation <- session_template.translations[Integer.to_string(contact.language_id)],
         false <- is_nil(translation),
         "APPROVED" <- translation["status"] do
      template =
        session_template
        |> Map.from_struct()
        |> Map.put(:body, translation["body"])
        |> Map.put(:uuid, translation["uuid"])

      {true, template}
    else
      _ -> {false, session_template}
    end
  end

  @spec hsm_message_params(SessionTemplate.t(), map(), boolean()) :: map()
  defp hsm_message_params(
         session_template,
         %{template_id: template_id, receiver_id: receiver_id, parameters: parameters} = attrs,
         is_translated
       ) do
    # sending default media when media type is not defined
    media_id = Map.get(attrs, :media_id, session_template.message_media_id)

    updated_template =
      session_template
      |> Templates.parse_buttons(is_translated, session_template.has_buttons)
      |> parse_template_vars(parameters)

    parsed_body =
      session_template
      |> parse_template_vars(parameters)
      |> Map.get(:body)

    %{
      parsed_body: parsed_body,
      body: updated_template.body,
      type: updated_template.type,
      is_hsm: updated_template.is_hsm,
      organization_id: session_template.organization_id,
      sender_id: Partners.organization_contact_id(session_template.organization_id),
      receiver_id: receiver_id,
      template_uuid: session_template.uuid,
      template_id: template_id,
      template_type: session_template.type,
      has_buttons: session_template.has_buttons,
      params: parameters,
      media_id: media_id,
      is_optin_flow: Map.get(attrs, :is_optin_flow, false),
      flow_label: Map.get(attrs, :flow_label, ""),
      message_broadcast_id: Map.get(attrs, :message_broadcast_id, nil),
      user_id: attrs[:user_id],
      flow_id: attrs[:flow_id],
      send_at: attrs[:send_at]
    }
  end

  @doc """
  Send a hsm template message to the specific contact.
  """
  @spec create_and_send_hsm_message(map()) ::
          {:ok, Message.t()} | {:error, String.t()}
  def create_and_send_hsm_message(
        %{template_id: template_id, receiver_id: receiver_id, parameters: parameters} = attrs
      ) do
    contact_vars = %{"contact" => Contacts.get_contact_field_map(attrs.receiver_id)}
    parsed_params = Enum.map(parameters, &MessageVarParser.parse(&1, contact_vars))

    ## As per the WhatsApp policy the params in an HSM can not have more then two
    ## consecutive spaces and a new line.
    parsed_params = Enum.map(parsed_params, &Enum.join(String.split(&1), " "))

    attrs = Map.put(attrs, :parameters, parsed_params)

    Repo.fetch(SessionTemplate, template_id)
    |> case do
      {:ok, template} ->
        media_id = Map.get(attrs, :media_id, nil)

        {is_translated, session_template} =
          fetch_language_specific_template(template, receiver_id)

        with true <- session_template.number_parameters == length(parameters),
             {"type", true} <- {"type", session_template.type == :text || media_id != nil} do
          # Passing uuid to save db call when sending template via provider
          message_params = hsm_message_params(session_template, attrs, is_translated)

          receiver_id
          |> Glific.Contacts.get_contact!()
          |> Contacts.can_send_message_to?(true, attrs)
          |> do_send_message(message_params)
        else
          false ->
            {:error,
             dgettext(
               "errors",
               "Please provide the right number of parameters for the template."
             )}

          {"type", false} ->
            {:error, dgettext("errors", "Please provide media for media template.")}
        end

      {:error, error} ->
        {:error,
         "Not able to fetch the template with id #{template_id}. ERROR: #{inspect(error)}"}
    end
  end

  @doc false
  @spec parse_template_vars(SessionTemplate.t(), [String.t()]) :: SessionTemplate.t()
  def parse_template_vars(%{number_parameters: np} = session_template, _parameters)
      when is_nil(np) or np <= 0,
      do: session_template

  def parse_template_vars(session_template, parameters) do
    parameters_map =
      1..session_template.number_parameters
      |> Enum.zip(parameters)

    updated_body =
      Enum.reduce(parameters_map, session_template.body, fn {key, value}, body ->
        String.replace(body, "{{#{key}}}", value)
      end)

    session_template
    |> Map.merge(%{body: updated_body})
  end

  @doc false
  @spec create_and_send_message_to_contacts(map(), [], atom()) :: {:ok, list()}
  def create_and_send_message_to_contacts(message_params, contact_ids, type) do
    contact_ids =
      contact_ids
      |> Enum.reduce([], fn contact_id, contact_ids ->
        message_params = Map.put(message_params, :receiver_id, contact_id)

        result =
          if type == :session,
            do: create_and_send_message(message_params),
            else: create_and_send_hsm_message(message_params)

        case result do
          {:ok, message} ->
            [message.contact_id | contact_ids]

          {:error, _} ->
            contact_ids
        end
      end)

    {:ok, contact_ids}
  end

  @doc """
  Record a message sent to a group in the message table. This message is actually not
  sent, but is used for display purposes in the group listings
  """
  @spec create_group_message(map()) :: {:ok, Message.t()} | {:error, Ecto.Changeset.t()}
  def create_group_message(attrs) do
    # We first need to just create a meta level group message
    organization_id = Repo.get_organization_id()
    sender_id = Partners.organization_contact_id(organization_id)

    attrs
    |> Map.merge(%{
      organization_id: organization_id,
      sender_id: sender_id,
      receiver_id: sender_id,
      contact_id: sender_id,
      flow: :outbound
    })
    |> create_message()
    |> case do
      {:ok, message} ->
        group_message_subscription(message)
        {:ok, message}

      {:error, error} ->
        {:error, error}
    end
  end

  @spec group_message_subscription(Message.t()) :: any()
  defp group_message_subscription(message) do
    Communications.publish_data(
      message,
      :sent_group_message,
      message.organization_id
    )
  end

  @doc """
  Create and send message to all contacts of a group
  """
  @spec create_and_send_message_to_group(map(), Group.t(), atom()) ::
          {:ok, any()} | {:error, any()}
  def create_and_send_message_to_group(message_params, group, _type) do
    message_params =
      message_params
      |> Map.merge(%{group_id: group.id})
      # this is an exception because of an inconsistency with the key name
      |> Map.put_new(:parameters, message_params[:params])

    {:ok, group_message} =
      if message_params[:is_hsm] in [nil, false],
        do: create_group_message(message_params |> Map.put_new(:type, :text)),
        else:
          message_params
          |> Map.put_new(:type, :text)
          |> Map.put(
            :body,
            "Sending HSM template #{message_params.template_id}, params: #{message_params.parameters}"
          )
          |> create_group_message()

    {:ok, message_broadcast} =
      Broadcast.broadcast_message_to_group(group_message, group, message_params)

    {:ok, Broadcast.get_broadcast_contact_ids(message_broadcast)}
  end

  @doc """
  Check if the tag is present in message
  """
  @spec tag_in_message?(Message.t(), integer) :: boolean
  def tag_in_message?(message, tag_id) do
    Ecto.assoc_loaded?(message.tags) &&
      Enum.find(message.tags, fn t -> t.id == tag_id end) != nil
  end

  @doc """
  Returns the list of message media.

  ## Examples

      iex> list_messages_media(map())
      [%MessageMedia{}, ...]

  """
  @spec list_messages_media(map()) :: [MessageMedia.t()]
  def list_messages_media(args \\ %{}) do
    args
    |> Glific.add_limit()
    |> Repo.list_filter(MessageMedia, &opts_media_with/2, &filter_media_with/2)
  end

  defp filter_media_with(query, _), do: query

  defp opts_media_with(query, opts) do
    Enum.reduce(opts, query, fn
      {:order, order}, query ->
        query |> order_by([m], {^order, fragment("lower(?)", m.caption)})

      _, query ->
        query
    end)
  end

  @doc """
  Return the count of messages, using the same filter as list_messages
  """
  @spec count_messages_media(map()) :: integer
  def count_messages_media(args \\ %{}),
    do: Repo.count_filter(args, MessageMedia, &filter_media_with/2)

  @doc """
  Gets a single message media.

  Raises `Ecto.NoResultsError` if the Message media does not exist.

  ## Examples

      iex> get_message_media!(123)
      %MessageMedia{}

      iex> get_message_media!(456)
      ** (Ecto.NoResultsError)

  """
  @spec get_message_media!(integer) :: MessageMedia.t()
  def get_message_media!(id), do: Repo.get!(MessageMedia, id)

  @doc """
  Creates a message media.

  ## Examples

      iex> create_message_media(%{field: value})
      {:ok, %MessageMedia{}}

      iex> create_message_media(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_message_media(map()) :: {:ok, MessageMedia.t()} | {:error, Ecto.Changeset.t()}
  def create_message_media(attrs \\ %{}) do
    %MessageMedia{}
    |> MessageMedia.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a message media.

  ## Examples

      iex> update_message_media(message_media, %{field: new_value})
      {:ok, %MessageMedia{}}

      iex> update_message_media(message_media, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_message_media(MessageMedia.t(), map()) ::
          {:ok, MessageMedia.t()} | {:error, Ecto.Changeset.t()}
  def update_message_media(%MessageMedia{} = message_media, attrs) do
    message_media
    |> MessageMedia.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes a message media.

  ## Examples

      iex> delete_message_media(message_media)
      {:ok, %MessageMedia{}}

      iex> delete_message_media(message_media)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_message_media(MessageMedia.t()) ::
          {:ok, MessageMedia.t()} | {:error, Ecto.Changeset.t()}
  def delete_message_media(%MessageMedia{} = message_media) do
    Repo.delete(message_media)
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking message media changes.

  ## Examples

      iex> change_message_media(message_media)
      %Ecto.Changeset{data: %MessageMedia{}}

  """
  @spec change_message_media(MessageMedia.t(), map()) :: Ecto.Changeset.t()
  def change_message_media(%MessageMedia{} = message_media, attrs \\ %{}) do
    MessageMedia.changeset(message_media, attrs)
  end

  defp do_list_conversations(query, args, false = _count) do
    query
    |> preload([:contact, :sender, :receiver, :context_message, :tags, :user, :media])
    |> Repo.all()
    |> make_conversations()
    |> add_empty_conversations(args)
  end

  defp do_list_conversations(query, _args, true = _count) do
    query
    |> select([m], m.contact_id)
    |> distinct(true)
    |> exclude(:order_by)
    |> Repo.aggregate(:count)
  end

  @doc """
  Given a list of message ids builds a conversation list with most recent conversations
  at the beginning of the list
  """
  @spec list_conversations(map(), boolean) :: [Conversation.t()] | integer
  def list_conversations(args, count \\ false) do
    args
    |> Enum.reduce(
      Message,
      fn
        {:ids, ids}, query ->
          query
          |> where([m], m.id in ^ids)
          |> order_by([m], desc: m.inserted_at)

        {:filter, filter}, query ->
          query |> conversations_with(filter)

        _, query ->
          query
      end
    )
    |> do_list_conversations(args, count)
  end

  # given all the messages related to multiple contacts, group them
  # by contact id into conversation objects
  @spec make_conversations([Message.t()]) :: [Conversation.t()]
  defp make_conversations(messages) do
    # now format the results,
    {contact_messages, _processed_contacts, contact_order} =
      Enum.reduce(
        messages,
        {%{}, %{}, []},
        fn m, {conversations, processed_contacts, contact_order} ->
          conversations = add(m, conversations)

          # We need to do this to maintain the sort order when returning
          # the results. The first time we see a contact, we add them to
          # the contact_order and processed map (using a map for faster lookups)
          if Map.has_key?(processed_contacts, m.contact_id) do
            {conversations, processed_contacts, contact_order}
          else
            {
              conversations,
              Map.put(processed_contacts, m.contact_id, true),
              [m.contact | contact_order]
            }
          end
        end
      )

    # Since we are doing two reduces, we end up with the right order due to the way lists are
    # constructed efficiently (add to front)
    Enum.reduce(
      contact_order,
      [],
      fn contact, acc ->
        [Conversation.new(contact, nil, Enum.reverse(contact_messages[contact])) | acc]
      end
    )
  end

  # for all input contact ids that do not have messages attached to them
  # return a conversation data type with empty messages
  # we don't add empty conversations when we have either include tags or include users set
  @spec add_empty_conversations([Conversation.t()], map()) :: [Conversation.t()]
  defp add_empty_conversations(results, %{filter: %{include_tags: _tags}}),
    do: results

  defp add_empty_conversations(results, %{filter: %{include_labels: _labels}}),
    do: results

  defp add_empty_conversations(results, %{filter: %{include_users: _users}}),
    do: results

  defp add_empty_conversations(results, %{filter: %{id: id}}),
    do: add_empty_conversation(results, [id])

  defp add_empty_conversations(results, %{filter: %{ids: ids}}),
    do: add_empty_conversation(results, ids)

  defp add_empty_conversations(results, _), do: results

  # helper function that actually implements the above functionality
  @spec add_empty_conversations([Conversation.t()], [integer]) :: [Conversation.t()]
  defp add_empty_conversation(results, contact_ids) when is_list(contact_ids) do
    # first find all the contact ids that we have some messages
    present_contact_ids =
      Enum.reduce(
        results,
        [],
        fn r, acc -> [r.contact.id | acc] end
      )

    # the difference is the empty contacts id list
    empty_contact_ids = contact_ids -- present_contact_ids

    # lets load all contacts ids in one query, rather than multiple single queries
    empty_results =
      Contact
      |> where([c], c.id in ^empty_contact_ids)
      |> Repo.all()
      # now only generate conversations objects for the empty contact ids
      |> Enum.reduce(
        [],
        fn contact, acc -> add_conversation(acc, contact) end
      )

    results ++ empty_results
  end

  # add an empty conversation for a specific contact if ONLY if it exists
  @spec add_conversation([Conversation.t()], Contact.t()) :: [Conversation.t()]
  defp add_conversation(results, contact) do
    [Conversation.new(contact, nil, []) | results]
  end

  # restrict the conversations query based on the filters in the input args
  @spec conversations_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
  defp conversations_with(query, filter) do
    Enum.reduce(filter, query, fn
      {:id, id}, query ->
        query |> where([m], m.contact_id == ^id)

      {:ids, ids}, query ->
        query |> where([m], m.contact_id in ^ids)

      {:include_tags, tag_ids}, query ->
        include_tag_filter(query, tag_ids)

      # commenting this out since we search for the labels in full.ex
      # and hence want to include the contacts even if the most recent messages
      # don't fit into the search criteria
      # {:include_labels, label_ids}, query ->
      #   include_label_filter(query, label_ids)

      {:include_users, user_ids}, query ->
        include_user_filter(query, user_ids)

      _filter, query ->
        query
    end)
  end

  # delete code in a few weeks, if re-inserting back, make function private
  # apply filter for message labels
  @doc false
  @spec _include_label_filter(Ecto.Queryable.t(), []) :: Ecto.Queryable.t()
  def _include_label_filter(query, []), do: query

  def _include_label_filter(query, label_ids) do
    flow_labels =
      Glific.Flows.FlowLabel
      |> where([f], f.id in ^label_ids)
      |> select([f], f.name)
      |> Repo.all()

    flow_labels
    |> Enum.reduce(query, fn flow_label, query ->
      where(query, [c], ilike(c.flow_label, ^"%#{flow_label}%"))
    end)
  end

  # apply filter for message tags
  @spec include_tag_filter(Ecto.Queryable.t(), []) :: Ecto.Queryable.t()
  defp include_tag_filter(query, []), do: query

  defp include_tag_filter(query, tag_ids) do
    # given a list of tag_ids, build another list, which includes the tag_ids
    # and also all its parent tag_ids
    all_tag_ids = Tags.include_all_ancestors(tag_ids)

    query
    |> join(:left, [m], mt in MessageTag, as: :mt, on: m.id == mt.message_id)
    |> join(:left, [mt: mt], t in Tag, as: :t, on: t.id == mt.tag_id)
    |> where([mt: mt], mt.tag_id in ^all_tag_ids)
  end

  # apply filter for user ids
  @spec include_user_filter(Ecto.Queryable.t(), []) :: Ecto.Queryable.t()
  defp include_user_filter(query, []), do: query

  defp include_user_filter(query, user_ids) do
    query
    |> where([m], m.user_id in ^user_ids)
  end

  defp add(element, map) do
    Map.update(
      map,
      element.contact,
      [element],
      &[element | &1]
    )
  end

  @doc """
  We need to simulate a few messages as we move to the system. This is a wrapper function
  to add those messages, which trigger specific actions within flows. e.g. include:
  Completed, Failure, Success etc
  """
  @spec create_temp_message(non_neg_integer, any(), Keyword.t()) :: Message.t()
  def create_temp_message(organization_id, body, attrs \\ []) do
    body = String.trim(body || "")

    opts =
      Keyword.merge(
        [
          organization_id: organization_id,
          body: body,
          clean_body: Glific.string_clean(body),
          type: :text
        ],
        attrs
      )

    Message
    |> struct(opts)
  end

  @doc """
  Delete all messages of a contact
  """
  @spec clear_messages(Contact.t()) :: :ok
  def clear_messages(%Contact{} = contact) do
    # get and delete all messages media
    messages_media_ids =
      Message
      |> where([m], m.contact_id == ^contact.id)
      |> where([m], m.organization_id == ^contact.organization_id)
      |> select([m], m.media_id)
      |> Repo.all()

    MessageMedia
    |> where([m], m.id in ^messages_media_ids)
    |> Repo.delete_all(timeout: 900_000)

    Message
    |> where([m], m.contact_id == ^contact.id)
    |> where([m], m.organization_id == ^contact.organization_id)
    |> Repo.delete_all()

    reset_contact_fields(contact)

    FlowContext.mark_flows_complete(contact.id, false, source: "clear_messages")

    Communications.publish_data(contact, :cleared_messages, contact.organization_id)

    :ok
  end

  @spec reset_contact_fields(Contact.t()) :: nil
  defp reset_contact_fields(contact) do
    simulator = Contacts.is_simulator_contact?(contact.phone)

    values = %{
      last_message_number: 0,
      is_org_read: true,
      is_org_replied: true,
      is_contact_replied: true
    }

    values =
      if simulator,
        ## if simulator let's clean all the fields and update reset the session window.
        do:
          values
          |> Map.merge(%{
            fields: %{},
            last_communication_at: DateTime.utc_now(),
            last_message_at: DateTime.utc_now(),
            bsp_status: :session
          }),
        else: values

    Contacts.update_contact(contact, values)

    if simulator,
      do: {:ok, _last_message} = send_default_message(contact)

    nil
  end

  @spec send_default_message(Contact.t(), String.t()) ::
          {:ok, Message.t()} | {:error, atom() | String.t()}
  defp send_default_message(contact, body \\ "Default message body") do
    org = Partners.organization(contact.organization_id)

    attrs = %{
      body: body,
      flow: :outbound,
      media_id: nil,
      organization_id: contact.organization_id,
      receiver_id: contact.id,
      sender_id: org.root_user.id,
      type: :text,
      user_id: org.root_user.id
    }

    create_and_send_message(attrs)
  end

  # cache ttl is 1 hour
  @ttl_limit 1

  @doc false
  @spec validate_media(String.t(), String.t()) :: map()
  def validate_media(url, _type) when url in ["", nil],
    do: %{is_valid: false, message: "Please provide a media URL"}

  def validate_media(url, type) do
    # ensure that this is approximately a url before we send to downstream functions
    if Glific.URI.cast(url) == :ok do
      # We can cache this across all organizations
      # We set a timeout of 60 minutes for this cache entry
      case Caches.get_global({:validate_media, url, type}) do
        {:ok, nil} ->
          do_validate_media(url, type)

        {:ok, value} ->
          value
      end
    else
      %{is_valid: false, message: "This media URL is invalid"}
    end
  end

  @size_limit %{
    "image" => 5120,
    "video" => 16_384,
    "audio" => 16_384,
    "document" => 102_400,
    "sticker" => 100
  }

  @spec do_validate_media(String.t(), String.t()) :: map()
  defp do_validate_media(url, type) do
    # we first decode the string since we have no idea if it was encoded or not
    # if the string was not encoded, decode should not really matter
    # once decoded we encode the string
    url = url |> URI.decode() |> URI.encode()

    case Tesla.get(url, opts: [adapter: [recv_timeout: 10_000]]) do
      {:ok, %Tesla.Env{status: status, headers: headers}} when status in 200..299 ->
        headers
        |> Enum.reduce(%{}, fn header, acc -> Map.put(acc, elem(header, 0), elem(header, 1)) end)
        |> Map.put_new("content-type", "")
        |> Map.put_new("content-length", 0)
        |> do_validate_media(type, url, @size_limit[type])

      _ ->
        %{is_valid: false, message: "This media URL is invalid"}
    end
  end

  @spec do_validate_media(map(), String.t(), String.t(), integer()) :: map()
  defp do_validate_media(headers, type, url, size_limit) do
    cond do
      !do_validate_headers(headers, type, url) ->
        %{is_valid: false, message: "Media content-type is not valid"}

      !do_validate_size(size_limit, headers["content-length"]) ->
        %{
          is_valid: false,
          message: "Size is too big for the #{type}. Maximum size limit is #{size_limit}KB"
        }

      true ->
        value = %{is_valid: true, message: "success"}
        Caches.put_global({:validate_media, url, type}, value, @ttl_limit)
        value
    end
  end

  @spec do_validate_headers(map(), String.t(), String.t()) :: boolean
  defp do_validate_headers(headers, "document", _url),
    do: String.contains?(headers["content-type"], ["pdf", "docx", "xlxs"])

  ## sometimes webp files does not return any content type. We need to figure out another way to validate this
  defp do_validate_headers(headers, "sticker", url),
    do:
      String.contains?(url, [".webp"]) && String.contains?(headers["content-type"], ["image", ""])

  defp do_validate_headers(headers, type, _url) when type in ["image", "video", "audio"],
    do: String.contains?(headers["content-type"], type)

  defp do_validate_headers(_, _, _), do: false

  @spec do_validate_size(Integer, String.t() | integer()) :: boolean
  defp do_validate_size(_size_limit, nil), do: false

  defp do_validate_size(size_limit, content_length) do
    {:ok, content_length} = Glific.parse_maybe_integer(content_length)
    content_length_in_kb = content_length / 1024
    size_limit >= content_length_in_kb
  end

  @doc """
    Get Media type from a url. We will primary use it for when we receive the url from EEX call.
  """
  @spec get_media_type_from_url(String.t(), Keyword.t()) :: tuple()
  def get_media_type_from_url(url, opts \\ []) do
    log_error = Keyword.get(opts, :log_error, true)

    extension =
      url
      |> Path.extname()
      |> String.downcase()
      |> String.replace(".", "")

    ## mime type
    ## We need to figure out a better way to get the mime type. May be MIME::type(url)
    mime_types = [
      {:image, ["png", "jpg", "jpeg"]},
      {:video, ["mp4", "3gp", "3gpp"]},
      {:audio, ["mp3", "wav", "acc", "ogg"]},
      {:document, ["pdf", "docx", "xlsx"]},
      {:sticker, ["webp"]}
    ]

    Enum.find(mime_types, fn {_type, extension_list} -> extension in extension_list end)
    |> case do
      {type, _} ->
        {type, url}

      _ ->
        if log_error, do: Logger.info("Could not find media type for extension: #{extension}")
        {:text, nil}
    end
  end

  @doc """
  Mark that the user has read all messages sent by a given contact
  """
  @spec mark_contact_messages_as_read(non_neg_integer, non_neg_integer) :: nil
  def mark_contact_messages_as_read(contact_id, _organization_id) do
    Contact
    |> where([c], c.id == ^contact_id)
    |> Repo.update_all(set: [is_org_read: true])
  end
end