defmodule Glific.Seeds.SeedsMigration do
@moduledoc """
One shot migration of data to add simulators and saas admin.
We use the functions in this file to add simulators for new organizations as
they are created
"""
import Ecto.Query
alias Glific.{
AccessControl.Role,
BigQuery,
Contacts,
Contacts.Contact,
Flows,
Groups.Group,
Partners,
Partners.Organization,
Partners.Saas,
Repo,
Searches.SavedSearch,
Seeds.SeedsDev,
Seeds.SeedsFlows,
Seeds.SeedsStats,
Settings,
Settings.Language,
Templates,
Users,
Users.User
}
@doc """
Public interface to do a seed migration across all organizations.
One function to rule them all. This function is invoked manually by a glific developer
to add data from the DB. This seems the cleanest way to do such things. We use phases to
separate different migrations
"""
@spec migrate_data(atom(), Organization.t() | nil) :: :ok
def migrate_data(phase, organization \\ nil) do
organizations = get_organizations(organization)
do_migrate_data(phase, organizations)
end
@doc false
@spec get_organizations(nil | Organization.t()) :: [Organization.t()]
defp get_organizations(nil), do: Partners.list_organizations()
defp get_organizations(organization), do: [organization]
@doc false
@spec do_migrate_data(atom(), [Organization.t()]) :: any()
defp do_migrate_data(:collection, organizations), do: seed_collections(organizations)
defp do_migrate_data(:fix_message_number, organizations), do: fix_message_number(organizations)
defp do_migrate_data(:optin, organizations), do: optin_data(organizations)
defp do_migrate_data(:opt_in_out, organizations), do: SeedsFlows.opt_in_out_flows(organizations)
defp do_migrate_data(:simulator, organizations), do: add_simulators(organizations)
defp do_migrate_data(:stats, organizations) do
org_id_list = Enum.map(organizations, fn o -> o.id end)
SeedsStats.seed_stats(org_id_list)
end
defp do_migrate_data(:sync_bigquery, _organizations) do
bigquery_enabled_org_ids()
|> sync_schema_with_bigquery()
end
defp do_migrate_data(:sync_hsm_templates, organizations),
do:
Enum.map(organizations, fn o -> o.id end)
|> sync_hsm_templates()
defp do_migrate_data(:localized_language, _organizations), do: update_localized_language()
defp do_migrate_data(:user_default_language, _organizations), do: update_user_default_language()
defp do_migrate_data(:submit_common_otp_template, organizations),
do: Enum.map(organizations, fn org -> submit_otp_template_for_org(org.id) end)
defp do_migrate_data(:set_newcontact_flow_id, organizations),
do: Enum.map(organizations, fn org -> set_newcontact_flow_id(org.id) end)
defp do_migrate_data(:set_default_organization_roles, organizations),
do: Enum.map(organizations, fn org -> set_default_organization_roles(org.id) end)
@doc false
@spec add_simulators(list()) :: :ok
def add_simulators(organizations) do
[en | _] = Settings.list_languages(%{filter: %{label: "english"}})
organizations
|> seed_simulators(en)
|> seed_users(en)
:ok
end
@doc """
Create default organization roles for an organization
"""
@spec set_default_organization_roles(non_neg_integer()) :: Role.t()
def set_default_organization_roles(org_id) do
org_id
|> Partners.get_organization!()
|> SeedsDev.seed_roles()
end
@doc false
@spec submit_otp_template_for_org(any) ::
{:error, Ecto.Changeset.t()} | {:ok, Templates.SessionTemplate.t()}
def submit_otp_template_for_org(org_id) do
%{
is_hsm: true,
shortcode: "common_otp",
label: "common_otp",
body: "Your OTP for {{1}} is {{2}}. This is valid for {{3}}.",
type: :text,
category: "OTP",
example: "Your OTP for [adding Anil as a payee] is [1234]. This is valid for [15 minutes].",
is_active: true,
is_source: false,
language_id: 1,
organization_id: org_id
}
|> Templates.create_session_template()
end
@doc false
@spec set_newcontact_flow_id(non_neg_integer()) ::
{:error, Ecto.Changeset.t()} | {:ok, Organization.t()}
def set_newcontact_flow_id(org_id) do
flow_id =
org_id
|> Flows.flow_keywords_map()
|> Map.get("published")
|> Map.get("newcontact", nil)
org_id
|> Partners.get_organization!()
|> Partners.update_organization(%{newcontact_flow_id: flow_id})
end
@spec has_contact?(Organization.t(), String.t()) :: boolean
defp has_contact?(organization, name) do
case Repo.fetch_by(
Contact,
%{name: name, organization_id: organization.id},
skip_organization_id: true
) do
{:ok, _contact} -> true
_ -> false
end
end
@spec get_common_attrs(Organization.t(), Language.t(), DateTime.t()) :: map()
defp get_common_attrs(organization, language, time) do
%{
organization_id: organization.id,
language_id: language.id,
bsp_status: :session_and_hsm,
inserted_at: time,
updated_at: time,
last_message_at: DateTime.truncate(time, :second),
last_communication_at: DateTime.truncate(time, :second),
optin_time: DateTime.truncate(time, :second)
}
end
@doc false
@spec seed_collections([Organization.t()]) :: [Organization.t()]
defp seed_collections(organizations) do
for org <- organizations,
do: create_collections(org)
organizations
end
defp create_collections(organization) do
Repo.insert!(%Group{
label: "Optin contacts",
is_restricted: false,
organization_id: organization.id
})
Repo.insert!(%Group{
label: "Optout contacts",
is_restricted: false,
organization_id: organization.id
})
Repo.insert!(%Group{
label: "STARTED_AB",
is_restricted: false,
organization_id: organization.id
})
end
@doc false
@spec seed_simulators([Organization.t()], Language.t()) :: [Organization.t()]
def seed_simulators(organizations \\ [], language) do
# for the insert's, lets pre-compute some values
for org <- organizations do
create_simulators(org, language)
end
organizations
end
@spec delete_old_simulators(Organization.t(), String.t()) :: :ok
defp delete_old_simulators(organization, prefix) do
Contact
|> where([c], c.organization_id == ^organization.id)
|> where([c], ilike(c.phone, ^"#{prefix}%"))
|> Repo.delete_all()
:ok
end
defp create_simulators(organization, language) do
if !has_contact?(organization, "Glific Simulator Five") do
simulators = [
{"One", "_1"},
{"Two", "_2"},
{"Three", "_3"},
{"Four", "_4"},
{"Five", "_5"}
]
utc_now = DateTime.utc_now()
simulator_phone_prefix = Contacts.simulator_phone_prefix()
# lets delete any old simulators for this organization
delete_old_simulators(organization, simulator_phone_prefix)
attrs = get_common_attrs(organization, language, utc_now)
simulators =
for {name, phone} <- simulators do
Map.merge(
attrs,
%{
name: "Glific Simulator " <> name,
phone: simulator_phone_prefix <> phone
}
)
end
Repo.insert_all(Contact, simulators)
end
:ok
end
@doc false
@spec seed_users([Organization.t()], Language.t()) :: [Organization.t()]
def seed_users(organizations, language) do
for org <- organizations do
add_saas_user(org, language)
end
organizations
end
@doc """
Add a saas user for the organization. We need to check if it already exists
since this code is used during data migration and can be repeated for the same
organization
"""
@spec add_saas_user(Organization.t(), Language.t()) :: :ok
def add_saas_user(organization, language) do
name = "SaaS Admin"
if !has_contact?(organization, name) do
# lets pre compute common values
utc_now = DateTime.utc_now()
organization
|> get_common_attrs(language, utc_now)
|> create_saas_contact(name)
|> create_saas_user()
end
:ok
end
@spec create_saas_contact(map(), String.t()) :: Contact.t()
defp create_saas_contact(attrs, name) do
attrs =
Map.merge(
attrs,
%{
phone: Saas.phone(),
name: name
}
)
Contact
|> struct(attrs)
|> Repo.insert!()
end
@spec create_saas_user(Contact.t()) :: User.t()
defp create_saas_user(contact) do
password = Ecto.UUID.generate()
{:ok, user} =
Users.create_user(%{
name: contact.name,
phone: contact.phone,
password: password,
confirm_password: password,
roles: ["admin"],
contact_id: contact.id,
organization_id: contact.organization_id
})
user
end
@spec optin_data(list()) :: :ok
defp optin_data(organizations) do
add_optin_search(organizations)
migrate_optin_data()
end
@spec add_optin_search(list()) :: :ok
defp add_optin_search(organizations) do
shortcode = "Optin"
organizations
|> Enum.each(fn org ->
Repo.insert!(%SavedSearch{
label: "Conversations where the contact has opted in",
shortcode: shortcode,
args: %{
filter: %{status: shortcode, term: ""},
contactOpts: %{limit: 25, offset: 0},
messageOpts: %{limit: 20, offset: 0}
},
is_reserved: true,
organization_id: org.id
})
end)
end
@spec migrate_optin_data :: :ok
defp migrate_optin_data do
# Set false status for contacts not opted in
Contact
|> where([c], is_nil(c.optin_time))
|> update([c], set: [optin_status: false])
|> Repo.update_all([], skip_organization_id: true)
# Set true status where we have an optin_date,
# also set method as BSP since they opted in via Gupshup
Contact
|> where([c], not is_nil(c.optin_time))
|> update([c], set: [optin_status: true, optin_method: "BSP"])
|> Repo.update_all([], skip_organization_id: true)
:ok
end
@doc """
sync all the hsm from BSP to Glific DB
"""
@spec sync_hsm_templates(list) :: :ok
def sync_hsm_templates(org_id_list) do
org_id_list
|> Enum.each(fn org_id ->
Task.Supervisor.async_nolink(Glific.TaskSupervisor, fn ->
Repo.put_process_state(org_id)
Glific.Templates.sync_hsms_from_bsp(org_id)
end)
end)
:ok
end
@doc """
Sync bigquery schema with local db changes.
"""
@spec sync_schema_with_bigquery(list) :: :ok
def sync_schema_with_bigquery(org_id_list) do
org_id_list
|> Enum.each(fn org_id ->
Task.Supervisor.async_nolink(Glific.TaskSupervisor, fn ->
Repo.put_process_state(org_id)
BigQuery.sync_schema_with_bigquery(org_id)
end)
end)
end
@doc """
Reset message number for a list of organizations or for a org_id
"""
@spec fix_message_number(list | integer()) :: :ok
def fix_message_number(org_id) when is_integer(org_id) do
# set a large query timeout for this
[
fix_message_number_query_for_contacts(org_id),
set_last_message_number_for_contacts(org_id),
fix_message_number_query_for_groups(org_id),
set_last_message_number_for_collection(org_id)
]
|> Enum.each(&Repo.query!(&1, [], timeout: 900_000))
:ok
end
def fix_message_number(organizations) when is_list(organizations),
do: organizations |> Enum.each(fn org -> fix_message_number(org.id) end)
@spec fix_message_number_query_for_contacts(integer()) :: String.t()
defp fix_message_number_query_for_contacts(org_id) do
"""
UPDATE
messages m
SET
message_number = m2.row_num
FROM (
SELECT
id,
contact_id,
ROW_NUMBER() OVER (PARTITION BY contact_id ORDER BY inserted_at ASC) AS row_num
FROM
messages m2
WHERE
m2.organization_id = #{org_id} and m2.sender_id != m2.receiver_id ) m2
WHERE
m.organization_id = #{org_id} and m.sender_id != m.receiver_id and m.id = m2.id;
"""
end
@spec fix_message_number_query_for_groups(integer()) :: String.t()
defp fix_message_number_query_for_groups(org_id) do
"""
UPDATE
messages m
SET
message_number = m2.row_num
FROM (
SELECT
id,
group_id,
ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY inserted_at ASC) AS row_num
FROM
messages m2
WHERE
m2.organization_id = #{org_id} and m2.sender_id = m2.receiver_id ) m2
WHERE
m.organization_id = #{org_id} and m.sender_id = m.receiver_id and m.id = m2.id;
"""
end
@spec set_last_message_number_for_contacts(integer()) :: String.t()
defp set_last_message_number_for_contacts(org_id) do
"""
UPDATE
contacts c
SET
last_message_number = (
SELECT
max(message_number) as message_number
FROM
messages
WHERE
contact_id = c.id)
WHERE
organization_id = #{org_id};
"""
end
@spec set_last_message_number_for_collection(integer()) :: String.t()
defp set_last_message_number_for_collection(org_id) do
"""
UPDATE
groups g
SET
last_message_number = (
SELECT
max(message_number) as message_number
FROM
messages
WHERE
group_id = g.id and messages.receiver_id = messages.sender_id)
WHERE
organization_id = #{org_id};
"""
end
@doc """
Reset message number for a list of organizations or for a contact id
"""
@spec fix_message_number_for_contact(integer()) :: :ok
def fix_message_number_for_contact(contact_id) do
# set a large query timeout for this
[
fix_message_number_query_for_contact_id(contact_id),
set_last_message_number_for_contact_id(contact_id)
]
|> Enum.each(&Repo.query!(&1, [], timeout: 20_000, skip_organization_id: true))
:ok
end
@spec fix_message_number_query_for_contact_id(integer()) :: String.t()
defp fix_message_number_query_for_contact_id(contact_id) do
"""
UPDATE
messages m
SET
message_number = m2.row_num
FROM (
SELECT
id,
contact_id,
ROW_NUMBER() OVER (PARTITION BY contact_id ORDER BY inserted_at ASC) AS row_num
FROM
messages m2
WHERE
m2.contact_id = #{contact_id} and m2.sender_id != m2.receiver_id ) m2
WHERE
m.contact_id = #{contact_id} and m.sender_id != m.receiver_id and m.id = m2.id;
"""
end
@spec set_last_message_number_for_contact_id(integer()) :: String.t()
defp set_last_message_number_for_contact_id(contact_id) do
"""
UPDATE
contacts c
SET
last_message_number = (
SELECT
max(message_number) as message_number
FROM
messages
WHERE
contact_id = c.id
)
WHERE
id = #{contact_id};
"""
end
@spec bigquery_enabled_org_ids() :: list()
defp bigquery_enabled_org_ids do
Partners.Credential
|> join(:left, [c], p in Partners.Provider, as: :p, on: c.provider_id == p.id)
|> where([_c, p], p.shortcode == ^"bigquery")
|> where([c, _p], c.is_active)
|> select([c, _p], c.organization_id)
|> Repo.all(skip_organization_id: true)
end
@spec update_localized_language() :: :ok
defp update_localized_language do
Settings.Language
|> where([l], l.locale in ["en", "hi"])
|> update([l], set: [localized: true])
|> Repo.update_all([])
end
@spec update_user_default_language() :: :ok
defp update_user_default_language do
{:ok, en} = Repo.fetch_by(Language, %{label_locale: "English"})
Glific.Users.User
|> update([u], set: [language_id: ^en.id])
|> Repo.update_all([], skip_organization_id: true)
end
end