defmodule Glific.Communications.Message do
@moduledoc """
The Message Communication Context, which encapsulates and manages tags and the related join tables.
"""
import Ecto.Query
require Logger
alias Glific.{
Communications,
Contacts,
Contacts.Contact,
Mails.BalanceAlertMail,
Messages,
Messages.Message,
Partners,
Repo
}
@doc false
defmacro __using__(_opts \\ []) do
quote do
end
end
@type_to_token %{
text: :send_text,
image: :send_image,
audio: :send_audio,
video: :send_video,
document: :send_document,
sticker: :send_sticker,
list: :send_interactive,
quick_reply: :send_interactive
}
@doc """
Send message to receiver using define provider.
"""
@spec send_message(Message.t(), map()) :: {:ok, Message.t()} | {:error, String.t()}
def send_message(message, attrs \\ %{}) do
message = Repo.preload(message, [:receiver, :sender, :media])
Logger.info(
"Sending message: type: '#{message.type}', contact_id: '#{message.receiver.id}', message_id: '#{message.id}'"
)
with {:ok, _} <-
apply(
Communications.provider_handler(message.organization_id),
@type_to_token[message.type],
[message, attrs]
) do
:telemetry.execute(
[:glific, :message, :sent],
# currently we are not measuring latency
%{duration: 1},
%{
type: message.type,
sender_id: message.sender_id,
receiver_id: message.receiver_id,
organization_id: message.organization_id
}
)
publish_message(message)
end
rescue
# An exception is thrown if there is no provider handler and/or sending the message
# via the provider fails
_ ->
log_error(message, "Could not send message to contact: Check Gupshup Setting")
end
@spec log_error(Message.t(), String.t()) :: {:error, String.t()}
defp log_error(message, reason) do
message = Repo.preload(message, [:receiver])
Messages.notify(message, reason)
{:ok, _} = Messages.update_message(message, %{status: :error})
{:error, reason}
end
@spec publish_message(Message.t()) :: {:ok, Message.t()}
defp publish_message(message) do
{
:ok,
if(message.publish?,
do: publish_data(message, :sent_message),
else: message
)
}
end
@doc """
Callback when message send successfully.
"""
@spec handle_success_response(Tesla.Env.t(), Message.t()) :: {:ok, Message.t()}
def handle_success_response(response, message) do
body = response.body |> Jason.decode!()
{:ok, message} =
message
|> Poison.encode!()
|> Poison.decode!(as: %Message{})
|> Messages.update_message(%{
bsp_message_id: body["messageId"],
bsp_status: :enqueued,
status: :sent,
flow: :outbound,
sent_at: DateTime.truncate(DateTime.utc_now(), :second)
})
publish_message_status(message)
{:ok, message}
end
@spec build_error(any()) :: map()
defp build_error(body) do
cond do
is_binary(body) -> %{message: body}
is_map(body) -> body
true -> %{message: inspect(body)}
end
end
@spec fetch_and_publish_message_status(String.t()) :: any()
defp fetch_and_publish_message_status(bsp_message_id) do
with {:ok, message} <- Repo.fetch_by(Message, %{bsp_message_id: bsp_message_id}) do
publish_message_status(message)
end
end
@spec publish_message_status(Message.t()) :: any()
defp publish_message_status(message) do
if is_nil(message.group_id),
do: publish_data(message, :update_message_status)
end
@doc """
Callback in case of any error while sending the message
"""
@spec handle_error_response(Tesla.Env.t() | map(), Message.t()) :: {:error, String.t()}
def handle_error_response(response, message) do
{:ok, message} =
message
|> Poison.encode!()
|> Poison.decode!(as: %Message{})
|> Messages.update_message(%{
bsp_status: :error,
status: :sent,
flow: :outbound,
errors: build_error(response.body)
})
publish_message_status(message)
{:error, response.body}
end
@doc """
Callback to update the provider status for a message
"""
@spec update_bsp_status(String.t(), atom(), map()) :: any()
def update_bsp_status(bsp_message_id, :error, errors) do
# we are making an additional query to db to fetch message for sending message status subscription
from(m in Message, where: m.bsp_message_id == ^bsp_message_id)
|> Repo.update_all(set: [bsp_status: :error, errors: errors, updated_at: DateTime.utc_now()])
Repo.fetch_by(Message, %{bsp_message_id: bsp_message_id})
|> case do
{:ok, message} ->
publish_message_status(message)
process_errors(message, errors, errors["payload"]["payload"]["code"])
error ->
Logger.error("Could not update message status: #{inspect(error)}")
end
end
def update_bsp_status(bsp_message_id, bsp_status, _params) do
# we are making an additional query to db to fetch message for sending message status subscription
from(m in Message, where: m.bsp_message_id == ^bsp_message_id)
|> Repo.update_all(set: [bsp_status: bsp_status, updated_at: DateTime.utc_now()])
fetch_and_publish_message_status(bsp_message_id)
end
@doc """
Callback when we receive a message from whats app
"""
@spec receive_message(map(), atom()) :: :ok | {:error, String.t()}
def receive_message(%{organization_id: organization_id} = message_params, type \\ :text) do
Logger.info(
"Received message: type: '#{type}', phone: '#{message_params.sender.phone}', id: '#{message_params.bsp_message_id}'"
)
{:ok, contact} =
message_params.sender
|> Map.put(:organization_id, organization_id)
|> Contacts.maybe_create_contact()
if Contacts.is_contact_blocked?(contact),
do: :ok,
else: do_receive_message(contact, message_params, type)
end
@spec do_receive_message(Contact.t(), map(), atom()) :: :ok | {:error, String.t()}
defp do_receive_message(contact, %{organization_id: organization_id} = message_params, type) do
{:ok, contact} = Contacts.set_session_status(contact, :session)
metadata = %{
type: type,
sender_id: contact.id,
receiver_id: Partners.organization_contact_id(organization_id),
organization_id: contact.organization_id
}
message_params =
message_params
|> Map.merge(metadata)
|> Map.merge(%{
flow: :inbound,
bsp_status: :delivered,
status: :received
})
# publish a telemetry event about the message being received
:telemetry.execute(
[:glific, :message, :received],
# currently we are not measuring latency
%{duration: 1},
metadata
)
cond do
type in [:quick_reply, :list, :text] -> receive_text(message_params)
type == :location -> receive_location(message_params)
true -> receive_media(message_params)
end
end
# handler for receiving the text message
@spec receive_text(map()) :: :ok
defp receive_text(message_params) do
message_params
|> Messages.create_message()
|> publish_data(:received_message)
|> process_message()
end
# handler for receiving the media (image|video|audio|document|sticker) message
@spec receive_media(map()) :: :ok
defp receive_media(message_params) do
{:ok, message_media} = Messages.create_message_media(message_params)
message_params
|> Map.put(:media_id, message_media.id)
|> Messages.create_message()
|> publish_data(:received_message)
|> process_message()
:ok
end
# handler for receiving the location message
@spec receive_location(map()) :: :ok
defp receive_location(message_params) do
{:ok, message} = Messages.create_message(message_params)
message_params
|> Map.put(:contact_id, message_params.sender_id)
|> Map.put(:message_id, message.id)
|> Contacts.create_location()
message
|> publish_data(:received_message)
|> process_message()
:ok
end
# preload the context message if it exists, so frontend can do the right thing
@spec publish_data(Message.t() | {:ok, Message.t()} | {:error, any()}, atom()) ::
Message.t() | nil
defp publish_data({:error, error}, _data_type) do
error("Create message error", error)
end
defp publish_data({:ok, message}, data_type),
do: publish_data(message, data_type)
defp publish_data(message, data_type) do
message
|> Repo.preload([:context_message, :contact])
|> Communications.publish_data(
data_type,
message.organization_id
)
|> publish_simulator(data_type)
end
# check if the contact is simulator and send another subscription only for it
@spec publish_simulator(Message.t() | nil, atom()) :: Message.t() | nil
defp publish_simulator(message, type) when type in [:sent_message, :received_message] do
if Contacts.is_simulator_contact?(message.contact.phone) do
message_type =
if type == :sent_message,
do: :sent_simulator_message,
else: :received_simulator_message
Communications.publish_data(
message,
message_type,
message.organization_id
)
end
message
end
defp publish_simulator(message, _type), do: message
# lets have a default timeout of 5 seconds for each call
@timeout 5000
@spec error(String.t(), any(), any(), list() | nil) :: nil
defp error(error, e, r \\ nil, stacktrace \\ nil) do
error = error <> ": #{inspect(e)}, #{inspect(r)}"
Logger.error(error)
stacktrace =
if stacktrace == nil,
do: Process.info(self(), :current_stacktrace) |> elem(1),
else: stacktrace
Appsignal.send_error(:error, error, stacktrace)
nil
end
@spec process_message(Message.t() | nil) :: any
defp process_message(nil), do: :ok
defp process_message(message) do
# lets transfer the organization id and current user to the poolboy worker
process_state = {
Repo.get_organization_id(),
Repo.get_current_user()
}
self = self()
# We don't want to block the input pipeline, and we are unsure how long the consumer worker
# will take. So we run it as a separate task
# We will also set a short timeout for both the genserver and the poolboy transaction
Task.start(fn ->
:poolboy.transaction(
Glific.Application.message_poolname(),
fn pid ->
try do
GenServer.call(pid, {message, process_state, self}, @timeout)
catch
e, r ->
error(
"Poolboy genserver caught error while processing the message for flow.",
e,
r,
__STACKTRACE__
)
end
end
)
end)
end
@spec process_errors(Message.t(), map(), integer | nil) :: any
defp process_errors(message, _errors, 1002) do
# Issue #2047 - Number does not exist in WhatsApp
# Lets disable this contact and make it inactive
# This is relatively common, so we don't send an email or log this error
Contacts.number_does_not_exist(message.contact_id)
end
defp process_errors(message, _errors, 471) do
# Issue #2049 - Organization has hit rate limit and
# WABA is now rejecting messages
organization = Partners.organization(message.organization_id)
Partners.suspend_organization(organization)
# We should send a message to ops and also email the org and glific support
body = """
#{organization.name} account has been suspended since it hit the WhatsApp rate limit.
Your services will resume automatically at the start of the next day. Please be patient :)
"""
Glific.log_error(body)
BalanceAlertMail.rate_exceeded(organization, body)
end
defp process_errors(message, _errors, 1003) do
# Issue #2049 - Organization has insufficient balance
# Gupshup is now rejecting messages
# We should send a message to ops and also email the org and glific support
organization = Partners.organization(message.organization_id)
Partners.suspend_organization(organization, 3)
# We should send a message to ops and also email the org and glific support
body = """
#{organization.name} account has been suspended since its BSP balance is insufficient.
Please refill your account immediately so Glific can send and receive messages on your behalf.
"""
Glific.log_error(body)
BalanceAlertMail.no_balance(organization, body)
end
defp process_errors(_message, _errors, _code), do: nil
end