defmodule Glific.Clients.ArogyaWorld do
@moduledoc """
Custom code extenison for ArogyaWorld
"""
require Logger
import Ecto.Query
alias Glific.{
Contacts.Contact,
GCS.GcsWorker,
Messages.Message,
Partners,
Partners.OrganizationData,
Repo,
Sheets.ApiClient,
Triggers.Trigger
}
@response_sheet_headers ["ID", "Q1_ID", "Q1_response", "Q2_ID", "Q2_response"]
@first_question_day "3"
@second_question_day "7"
@csv_url_key_map %{
"static_message_schedule" =>
"https://storage.googleapis.com/arogya-sheets/Arogya%20message%20HSM%20id's%20-%20Messages.csv",
"message_template_map" =>
"https://storage.googleapis.com/arogya-sheets/Arogya%20message%20HSM%20id's%20-%20Messages.csv",
"question_template_map" =>
"https://storage.googleapis.com/arogya-sheets/Arogya%20message%20HSM%20id's%20-%20Questions.csv",
"response_score_map" =>
"https://storage.googleapis.com/arogya-sheets/mDiabetes%20Content-Google%20AI%20Project.xlsx%20-%20%20mDiabetes%20Questions-AI(Kannada%20Final).csv",
"dynamic_message_schedule_week" =>
"https://storage.googleapis.com/participant-files/uploads/to_participants_week_"
}
@doc """
Run this function on the initial load
"""
@spec initial_load(non_neg_integer()) :: any()
def initial_load(org_id) do
dynamic_week_start = 2
static_message_schedule_map(@csv_url_key_map["static_message_schedule"], org_id)
message_hsm_mapping(@csv_url_key_map["message_template_map"], org_id)
question_hsm_mapping(@csv_url_key_map["question_template_map"], org_id)
response_score_mapping(@csv_url_key_map["response_score_map"], org_id)
load_participant_file(org_id, dynamic_week_start)
end
@doc """
Webhook functions for the ArogyaWorld
"""
@spec webhook(String.t(), map) :: map()
def webhook("static_message", fields) do
organization_id = Glific.parse_maybe_integer!(fields["organization_id"])
contact_id = Glific.parse_maybe_integer!(get_in(fields, ["contact", "id"]))
current_week = get_current_week(organization_id, contact_id)
current_week_day = get_week_day_number()
message_id = get_message_id(organization_id, current_week, current_week_day)
template_id = get_message_template_id(organization_id, message_id)
%{
message_id: message_id,
template_id: template_id || false,
current_week: current_week,
current_week_day: current_week_day,
message_label: "static_message_#{current_week}_#{current_week_day}"
}
end
def webhook("batch_current_week", fields) do
organization_id = Glific.parse_maybe_integer!(fields["organization_id"])
contact_id = Glific.parse_maybe_integer!(get_in(fields, ["contact", "id"]))
current_week = get_current_week(organization_id, contact_id)
%{
batch_current_week: current_week
}
end
def webhook("is_valid_response", fields) do
org_id = Glific.parse_maybe_integer!(fields["organization_id"])
question_id = get_in(fields, ["results", "webhook", "question_id"])
response = get_in(fields, ["results", "question_response", "input"])
%{is_valid: get_response_score(response, question_id, org_id) > 0}
end
def webhook("weekly_task", fields) do
organization_id = Glific.parse_maybe_integer!(fields["organization_id"])
{_current_week, next_week} = update_week_number(organization_id)
load_participant_file(organization_id, next_week)
%{success: true}
end
def webhook("send_participant_responses", fields) do
organization_id = Glific.parse_maybe_integer!(fields["organization_id"])
organization_id
|> get_current_week
|> Glific.parse_maybe_integer!()
|> then(&upload_participant_responses(organization_id, &1))
end
def webhook("dynamic_message", fields) do
organization_id = Glific.parse_maybe_integer!(fields["organization_id"])
contact_id = Glific.parse_maybe_integer!(get_in(fields, ["contact", "id"]))
current_week = get_current_week(organization_id)
current_week_day = get_week_day_number()
message_id =
get_dynamic_message_id(organization_id, current_week, current_week_day, contact_id)
question_id =
get_dynamic_question_id(organization_id, current_week, current_week_day, contact_id)
message_template_id = get_message_template_id(organization_id, message_id)
question_template_id = get_question_template_id(organization_id, question_id)
%{
current_week: current_week,
current_week_day: current_week_day,
message_id: message_id,
question_id: question_id,
message_template_id: message_template_id || false,
question_template_id: question_template_id || false,
question_label: "Q#{current_week}_#{current_week_day}_#{question_id}",
message_label: "dynamic_message_#{current_week}_#{current_week_day}"
}
end
def webhook(_, fields), do: fields
@doc """
Hourly task jobs for the ArogyaWorld
"""
@spec hourly_tasks(non_neg_integer()) :: any()
def hourly_tasks(org_id) do
Logger.info("Ran hourly tasks for organization #{org_id}")
sharing_file_time = Timex.now().hour === 13
current_week = Glific.parse_maybe_integer!(get_current_week(org_id))
get_week_day_number()
|> do_hourly_tasks(sharing_file_time, org_id, current_week)
end
@spec do_hourly_tasks(non_neg_integer(), boolean(), non_neg_integer(), non_neg_integer()) ::
any()
defp do_hourly_tasks(1, sharing_file_time, org_id, current_week),
do: if(sharing_file_time, do: upload_participant_responses(org_id, current_week))
defp do_hourly_tasks(2, sharing_file_time, org_id, current_week),
do: if(sharing_file_time, do: load_participant_file(org_id, current_week))
defp do_hourly_tasks(7, sharing_file_time, org_id, _current_week),
do: if(sharing_file_time, do: update_week_number(org_id))
defp do_hourly_tasks(_time, _sharing_file_time, _org_id, _current_week),
do: nil
defp get_current_week(organization_id) do
## For pilot phase, it will be the day number.
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{organization_id: organization_id, key: "current_week"})
organization_data.text
end
@spec get_current_week(non_neg_integer(), non_neg_integer()) :: String.t()
defp get_current_week(organization_id, contact_id) do
current_week = get_current_week(organization_id)
{:ok, contact} = Repo.fetch_by(Contact, %{id: contact_id, organization_id: organization_id})
# get the batch from contact fields and subtract from current week
batch_number = get_in(contact.fields, ["batch", "value"])
case batch_number do
nil ->
current_week
_ ->
(Glific.parse_maybe_integer!(current_week) - Glific.parse_maybe_integer!(batch_number) + 1)
|> to_string()
end
end
defp get_week_day_number do
# week starts from wednesday
rem(Timex.weekday(Timex.today()) + 4, 7) + 1
end
defp get_dynamic_week_key(current_week),
do: "dynamic_message_schedule_week_#{current_week}"
@spec get_message_id(non_neg_integer(), String.t(), non_neg_integer()) :: String.t()
defp get_message_id(organization_id, current_week, current_week_day) do
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: organization_id,
key: "static_message_schedule"
})
current_week_day = to_string(current_week_day)
static_message_schedule = organization_data.json
get_in(static_message_schedule, [current_week, current_week_day])
end
defp get_message_template_id(organization_id, message_id) do
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: organization_id,
key: "message_template_map"
})
message_id = to_string(message_id)
message_template_map = organization_data.json
get_in(message_template_map, [message_id])
end
defp get_question_template_id(organization_id, question_id) do
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: organization_id,
key: "question_template_map"
})
question_id = to_string(question_id)
question_template_map = organization_data.json
get_in(question_template_map, [question_id])
end
defp get_dynamic_message_id(organization_id, current_week, current_week_day, contact_id) do
key = get_dynamic_week_key(current_week)
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: organization_id,
key: key
})
current_week_day = to_string(current_week_day)
dynamic_message_schedule = organization_data.json
contact_id = to_string(contact_id)
get_in(dynamic_message_schedule, [contact_id, current_week_day, "m_id"])
end
defp get_dynamic_question_id(organization_id, current_week, current_week_day, contact_id) do
key = get_dynamic_week_key(current_week)
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: organization_id,
key: key
})
contact_id = to_string(contact_id)
current_week_day = to_string(current_week_day)
dynamic_message_schedule = organization_data.json
get_in(dynamic_message_schedule, [contact_id, current_week_day, "q_id"])
end
@spec update_week_number(non_neg_integer()) :: {integer, integer}
defp update_week_number(org_id) do
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: org_id,
key: "current_week"
})
current_week = Glific.parse_maybe_integer!(organization_data.text)
next_week = current_week + 1
{:ok, _} =
Partners.update_organization_data(organization_data, %{
key: "current_week",
text: to_string(next_week)
})
{current_week, next_week}
end
@doc """
load participant files from gcs
"""
@spec load_participant_file(non_neg_integer(), non_neg_integer()) :: any()
def load_participant_file(org_id, week_number) do
key = get_dynamic_week_key(week_number)
url = "#{@csv_url_key_map["dynamic_message_schedule_week"]}#{week_number}.csv"
add_weekly_dynamic_data(key, url, org_id)
end
@doc """
get template form EEx based on variables
"""
@spec template(integer(), String.t()) :: binary
def template(template_uuid, variables) do
%{
uuid: template_uuid,
name: "Template",
variables: variables,
expression: nil
}
|> Jason.encode!()
end
@doc """
get template form EEx without variables
"""
@spec template(integer()) :: binary
def template(template_uuid) do
%{
uuid: template_uuid,
name: "Template",
expression: nil
}
|> Jason.encode!()
end
@doc """
adds the weekly dynamic data loaded from the sheet based on current week
"""
@spec add_weekly_dynamic_data(String.t(), String.t(), non_neg_integer()) ::
{:ok, any()} | {:error, Ecto.Changeset.t()}
def add_weekly_dynamic_data(key, file_url, org_id) do
add_data_from_csv(
key,
file_url,
&cleanup_week_data/2,
org_id
)
end
@doc """
creates the static data map that needs to be sent to users
"""
@spec static_message_schedule_map(String.t(), non_neg_integer()) ::
{:ok, any()} | {:error, Ecto.Changeset.t()}
def static_message_schedule_map(file_url, org_id) do
add_data_from_csv(
"static_message_schedule",
file_url,
&cleanup_static_data/2,
org_id
)
end
@doc """
add data that needs to be sent to the database
"""
@spec add_data_from_csv(String.t(), String.t(), any(), non_neg_integer()) ::
{:ok, any()} | {:error, Ecto.Changeset.t()}
def add_data_from_csv(key, file_url, cleanup_func, org_id) do
# how to validate if the data is in correct format
ApiClient.get_csv_content(url: file_url)
|> Enum.reduce(%{}, fn {_, data}, acc ->
cleanup_func.(acc, data)
end)
|> then(fn data -> Partners.maybe_insert_organization_data(key, data, org_id) end)
end
@doc """
message mapping to HSM UUID
"""
@spec message_hsm_mapping(String.t(), non_neg_integer()) ::
{:ok, any()} | {:error, Ecto.Changeset.t()}
def message_hsm_mapping(file_url, org_id) do
add_data_from_csv(
"message_template_map",
file_url,
fn acc, data ->
Map.put(acc, data["Message ID"], data["Glific Template UUID"])
end,
org_id
)
end
@doc """
question mapping to HSM UUID
"""
@spec question_hsm_mapping(String.t(), non_neg_integer()) ::
{:ok, any()} | {:error, Ecto.Changeset.t()}
def question_hsm_mapping(file_url, org_id) do
add_data_from_csv(
"question_template_map",
file_url,
fn acc, data ->
Map.put(acc, data["Question ID"], data["Glific Template UUID"])
end,
org_id
)
end
@doc """
response to score mapping
"""
@spec response_score_mapping(String.t(), non_neg_integer()) ::
{:ok, any()} | {:error, Ecto.Changeset.t()}
def response_score_mapping(file_url, org_id) do
add_data_from_csv(
"response_score_map",
file_url,
&format_response_score_mapping/2,
org_id
)
end
@spec format_response_score_mapping(map(), map()) :: map()
defp format_response_score_mapping(acc, data) do
score_map =
Map.put(%{}, clean_string(data["1(Kannada)"]), 1)
|> Map.put(clean_string(data["2(Kannada)"]), 2)
|> Map.put(clean_string(data["3(Kannada)"]), 3)
Map.put(acc, data["ID"], score_map)
end
@doc """
Clean week data from the CSV file.
"""
@spec cleanup_week_data(map(), map()) :: map()
def cleanup_week_data(acc, data) do
attr = %{
@first_question_day => %{
"q_id" => data["Q1_ID"],
"m_id" => data["M1_ID"]
},
@second_question_day => %{
"q_id" => data["Q2_ID"],
"m_id" => data["M2_ID"]
}
}
Map.put(acc, data["PARTICIPANT_ID"], attr)
end
@doc """
Clean static weekly data from the CSV file.
"""
@spec cleanup_static_data(map(), map()) :: map()
def cleanup_static_data(acc, data) do
# check for 2nd day and update it to 4th
check_day = do_cleanup_static_data(data["Message No"], data)
week =
if Map.has_key?(acc, data["Week"]),
do: Map.put(acc[data["Week"]], check_day, data["Message ID"]),
else: %{check_day => data["Message ID"]}
Map.put(acc, data["Week"], week)
end
@spec do_cleanup_static_data(String.t(), map()) :: String.t()
defp do_cleanup_static_data("1", data),
do: if(data["Week"] !== "1", do: @first_question_day, else: data["Message No"])
defp do_cleanup_static_data("2", data),
do: if(data["Week"] !== "1", do: @second_question_day, else: data["Message No"])
defp do_cleanup_static_data(_message_no, data), do: data["Message No"]
@doc """
Conditionally execute the trigger based on: ID, Week, Day.
"""
@spec trigger_condition(Trigger.t()) :: boolean
def trigger_condition(trigger) do
if trigger.id > 0,
do: true,
else: false
end
@doc """
Get the messages based on flow label
"""
@spec get_messages_by_flow_label(String.t()) :: any()
def get_messages_by_flow_label(label) do
# get only last weeks data because we have same labels for pilot
week_before = DateTime.utc_now() |> Timex.shift(days: -8)
Message
|> join(:inner, [m], mc in Message, on: m.id == mc.context_message_id)
|> where([m], like(m.flow_label, ^"#{label}%") and m.inserted_at > ^week_before)
|> select([m, mc], %{body: mc.body, contact_id: mc.contact_id, flow_label: m.flow_label})
|> Repo.all()
end
@doc """
Get response message based on day and week
"""
@spec get_responses_by_week_and_day(non_neg_integer(), non_neg_integer(), String.t()) :: any()
def get_responses_by_week_and_day(org_id, week, day) do
response_label_format = "Q#{week}_#{day}_"
get_messages_by_flow_label(response_label_format)
|> Enum.map(fn m ->
response_label =
String.split(m.flow_label, ",")
|> Enum.find(fn s -> String.starts_with?(s, response_label_format) end)
q_id = get_question_id(response_label)
%{
"ID" => m.contact_id,
"Q_ID" => q_id,
"Q_response" => get_response_score(m.body, q_id, org_id)
}
end)
end
@doc """
Create a file in GCS bucket for candidate response
"""
@spec upload_participant_responses(non_neg_integer(), non_neg_integer()) :: any()
def upload_participant_responses(org_id, week) do
## the response files would be for the previous week
## since the week is updated on Tuesday and we send file on Wednesday
previous_week = week - 1
key = get_dynamic_week_key(previous_week)
# Question 1 responses for previous week
q1_responses = get_responses_by_week_and_day(org_id, previous_week, @first_question_day)
# Question 2 responses for previous week
q2_responses = get_responses_by_week_and_day(org_id, previous_week, @second_question_day)
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: org_id,
key: key
})
dynamic_message_schedule = organization_data.json
current_week_responses =
Enum.map(dynamic_message_schedule, fn {id, values} ->
%{
"ID" => id,
"Q1_ID" => values[@first_question_day]["q_id"],
"Q1_response" => get_response(q1_responses, id),
"Q2_ID" => values[@second_question_day]["q_id"],
"Q2_response" => get_response(q2_responses, id)
}
end)
# Creating a CSV file
temp_path =
System.tmp_dir!()
|> Path.join("participant_response.csv")
file = temp_path |> File.open!([:write, :utf8])
current_week_responses
|> CSV.encode(headers: @response_sheet_headers)
|> Enum.each(&IO.write(file, &1))
# Upload the file to GCS
GcsWorker.upload_media(temp_path, "participant_responses_week_#{previous_week}.csv", org_id)
|> case do
{:ok, gcs_url} -> %{url: gcs_url, error: nil}
{:error, error} -> %{url: nil, error: error}
end
end
@doc """
Return the response of the question for a contact
"""
@spec get_response(list(), String.t()) :: String.t() | nil
def get_response(list, contact_id) do
contact =
list
|> Enum.find(nil, fn contact ->
Integer.to_string(contact["ID"]) === contact_id
end)
# send 0 as response code when the contact did not answer instead of blank
get_in(contact, ["Q_response"]) || "0"
end
@doc """
Return the question id based on the label
"""
@spec get_question_id(String.t()) :: any()
def get_question_id(label) do
String.split(label, "_", trim: true)
|> List.last()
end
@doc """
Return the response score based on the body
"""
@spec get_response_score(String.t(), String.t(), non_neg_integer()) :: any()
def get_response_score(response, q_id, org_id) do
{:ok, organization_data} =
Repo.fetch_by(OrganizationData, %{
organization_id: org_id,
key: "response_score_map"
})
response = clean_string(response)
response_score = organization_data.json
get_in(response_score, [q_id, response]) || 0
end
@spec clean_string(String.t()) :: String.t()
defp clean_string(str) do
String.replace(str, " ", "")
end
end