lib/glific/partners.ex

defmodule Glific.Partners do
  @moduledoc """
  The Partners context. This is the gateway for the application to access/update all the organization
  and Provider information.
  """
  use Publicist

  import Ecto.Query, warn: false
  import GlificWeb.Gettext
  require Logger

  alias __MODULE__

  alias Glific.{
    BigQuery,
    Caches,
    Contacts.Contact,
    Flags,
    Flows,
    Flows.Flow,
    GCS,
    Notifications,
    Partners.Credential,
    Partners.Organization,
    Partners.OrganizationData,
    Partners.Provider,
    Providers.Gupshup.GupshupWallet,
    Providers.Gupshup.PartnerAPI,
    Providers.GupshupContacts,
    Repo,
    Settings.Language,
    Users.User
  }

  # We cache organization info under this id since when we want to retrieve
  # by shortcode we do not have an organization id to retrieve it from.
  @global_organization_id 0

  @doc """
  Returns the list of providers.

  ## Examples

      iex> list_providers()
      [%Provider{}, ...]

  """
  @spec list_providers(map()) :: [Provider.t(), ...]
  def list_providers(args \\ %{}) do
    Repo.list_filter(args, Provider, &Repo.opts_with_name/2, &filter_provider_with/2)
    |> Enum.reject(fn provider ->
      Enum.member?(["goth"], provider.shortcode)
    end)
  end

  @doc """
  Return the count of providers, using the same filter as list_providers
  """
  @spec count_providers(map()) :: integer
  def count_providers(args \\ %{}),
    do: Repo.count_filter(args, Provider, &filter_provider_with/2)

  @spec filter_provider_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
  defp filter_provider_with(query, filter) do
    filter = Map.delete(filter, :organization_id)
    Repo.filter_with(query, filter)
  end

  @doc """
  Gets a single provider.

  Raises `Ecto.NoResultsError` if the Provider does not exist.

  ## Examples

      iex> get_provider!(123)
      %Provider{}

      iex> get_provider!(456)
      ** (Ecto.NoResultsError)

  """
  @spec get_provider!(id :: integer) :: Provider.t()
  def get_provider!(id), do: Repo.get!(Provider, id)

  @doc """
  Creates a provider.

  ## Examples

      iex> create_provider(%{field: value})
      {:ok, %Provider{}}

      iex> create_provider(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_provider(map()) :: {:ok, Provider.t()} | {:error, Ecto.Changeset.t()}
  def create_provider(attrs \\ %{}) do
    %Provider{}
    |> Provider.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a provider.

  ## Examples

      iex> update_provider(provider, %{field: new_value})
      {:ok, %Provider{}}

      iex> update_provider(provider, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_provider(Provider.t(), map()) :: {:ok, Provider.t()} | {:error, Ecto.Changeset.t()}
  def update_provider(%Provider{} = provider, attrs) do
    provider
    |> Provider.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Deletes a provider.

  ## Examples

      iex> delete_provider(provider)
      {:ok, %Provider{}}

      iex> delete_provider(provider)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_provider(Provider.t()) :: {:ok, Provider.t()} | {:error, Ecto.Changeset.t()}
  def delete_provider(%Provider{} = provider) do
    provider
    |> Ecto.Changeset.change()
    |> Ecto.Changeset.no_assoc_constraint(:organizations, name: "organizations_provider_id_fkey")
    |> Ecto.Changeset.no_assoc_constraint(:credential)
    |> Repo.delete()
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking provider changes.

  ## Examples

  iex> change_provider(provider)
  %Ecto.Changeset{data: %Provider{}}
  """
  @spec change_provider(Provider.t(), map()) :: Ecto.Changeset.t()
  def change_provider(%Provider{} = provider, attrs \\ %{}) do
    Provider.changeset(provider, attrs)
  end

  @doc """
  Returns the list of organizations.

  ## Examples

      iex> Glific.Partners.list_organizations()
      [%Glific.Partners.Organization{}, ...]

  """
  @spec list_organizations(map()) :: [Organization.t()]
  def list_organizations(args \\ %{}),
    do:
      Repo.list_filter(
        args,
        Organization,
        &Repo.opts_with_name/2,
        &filter_organization_with/2,
        skip_organization_id: true
      )

  @doc """
  List of organizations that are active within the system
  """
  @spec active_organizations(list(), boolean) :: map()
  def active_organizations(orgs, suspended \\ false) do
    Organization
    |> where([q], q.is_active == true)
    |> select([q], [q.id, q.name, q.last_communication_at])
    |> where([q], q.is_suspended == ^suspended)
    |> restrict_orgs(orgs)
    |> Repo.all(skip_organization_id: true)
    |> Enum.reduce(%{}, fn row, acc ->
      [id, value, time] = row
      Map.put(acc, id, %{name: value, last_communication_at: time})
    end)
  end

  @spec restrict_orgs(Ecto.Query.t(), list()) :: Ecto.Query.t()
  defp restrict_orgs(query, []), do: query

  defp restrict_orgs(query, org_list),
    do: query |> where([q], q.id in ^org_list)

  @doc """
  Return the count of organizations, using the same filter as list_organizations
  """
  @spec count_organizations(map()) :: integer
  def count_organizations(args \\ %{}),
    do:
      Repo.count_filter(
        args,
        Organization,
        &filter_organization_with/2,
        skip_organization_id: true
      )

  # codebeat:disable[ABC]
  @spec filter_organization_with(Ecto.Queryable.t(), %{optional(atom()) => any}) ::
          Ecto.Queryable.t()
  defp filter_organization_with(query, filter) do
    filter = Map.delete(filter, :organization_id)
    query = Repo.filter_with(query, filter)

    Enum.reduce(filter, query, fn
      {:email, email}, query ->
        from(q in query, where: ilike(q.email, ^"%#{email}%"))

      {:bsp, bsp}, query ->
        from(q in query,
          join: c in assoc(q, :bsp),
          where: ilike(c.name, ^"%#{bsp}%")
        )

      {:default_language, default_language}, query ->
        from(q in query,
          join: c in assoc(q, :default_language),
          where: ilike(c.label, ^"%#{default_language}%")
        )

      _, query ->
        query
    end)
  end

  # codebeat:enable[ABC]

  @doc """
  Gets a single organization.

  Raises `Ecto.NoResultsError` if the organization does not exist.

  ## Examples

  iex> Glific.Partners.get_organization!(1)
  %Glific.Partners.Organization{}

  iex> Glific.Partners.get_organization!(-1)
  ** (Ecto.NoResultsError)
  """
  @spec get_organization!(integer) :: Organization.t()
  def get_organization!(id), do: Repo.get!(Organization, id, skip_organization_id: true)

  @doc """
  Creates a organization.

  ## Examples

      iex> Glific.Partners.create_organization(%{name: value})
      {:ok, %Glific.Partners.Organization{}}

      iex> Glific.Partners.create_organization(%{bad_field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_organization(map()) :: {:ok, Organization.t()} | {:error, Ecto.Changeset.t()}
  def create_organization(attrs \\ %{}) do
    %Organization{}
    |> Organization.changeset(attrs)
    |> Repo.insert(skip_organization_id: true)
  end

  @doc """
  Updates an organization.

  ## Examples

      iex> Glific.Partners.update_organization(Organization, %{name: new_name})
      {:ok, %Glific.Partners.Organization{}}

      iex> Glific.Partners.update_organization(Organization, %{abc: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_organization(Organization.t(), map()) ::
          {:ok, Organization.t()} | {:error, Ecto.Changeset.t()}
  def update_organization(%Organization{} = organization, attrs) do
    # first delete the cached organization
    remove_organization_cache(organization.id, organization.shortcode)

    ## in case user updates the out of office flow it should update the flow keyword map as well.
    ## We need to think about a better approach to handle this one.
    Caches.remove(organization.id, ["flow_keywords_map"])

    with {:ok, updated_organization} <-
           organization
           |> Organization.changeset(attrs)
           |> Repo.update(skip_organization_id: true) do
      maybe_pin_newcontact_flow(
        updated_organization.newcontact_flow_id,
        organization.newcontact_flow_id,
        updated_organization
      )
    end
  end

  @spec maybe_pin_newcontact_flow(non_neg_integer(), non_neg_integer(), Organization.t()) ::
          {:ok, Organization.t()}
  defp maybe_pin_newcontact_flow(nil, old_newcontact_flow_id, organization) do
    unpin_old_newcontact_flow(old_newcontact_flow_id)
    {:ok, organization}
  end

  defp maybe_pin_newcontact_flow(newcontact_flow_id, nil, organization) do
    pin_new_newcontact_flow(newcontact_flow_id)
    {:ok, organization}
  end

  defp maybe_pin_newcontact_flow(newcontact_flow_id, old_newcontact_flow_id, organization)
       when newcontact_flow_id != old_newcontact_flow_id do
    unpin_old_newcontact_flow(old_newcontact_flow_id)
    pin_new_newcontact_flow(newcontact_flow_id)
    {:ok, organization}
  end

  defp maybe_pin_newcontact_flow(_newcontact_flow_id, _old_newcontact_flow_id, organization),
    do: {:ok, organization}

  @spec unpin_old_newcontact_flow(non_neg_integer()) ::
          {:ok, Flow.t()} | {:error, Ecto.Changeset.t()}
  defp unpin_old_newcontact_flow(newcontact_flow_id) do
    with false <- is_nil(newcontact_flow_id),
         {:ok, flow} <- Flows.fetch_flow(newcontact_flow_id) do
      Flows.update_flow(flow, %{is_pinned: false})
    end
  end

  @spec pin_new_newcontact_flow(non_neg_integer()) ::
          {:ok, Flow.t()} | {:error, Ecto.Changeset.t()}
  defp pin_new_newcontact_flow(newcontact_flow_id) do
    with {:ok, new_newcontact_flow} <- Flows.fetch_flow(newcontact_flow_id) do
      Flows.update_flow(new_newcontact_flow, %{
        is_pinned: true
      })
    end
  end

  @doc """
  Deletes an Organization.

  ## Examples

      iex> Glific.Partners.delete_organization(organization)
      {:ok, %Glific.Partners.Organization{}}

      iex> delete_organization(organization)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_organization(Organization.t()) ::
          {:ok, Organization.t()} | {:error, Ecto.Changeset.t()}
  def delete_organization(%Organization{} = organization) do
    # we are deleting an organization that is one of the SaaS users, not the current users org
    # setting timeout as the deleting organization is an expensive operation
    Repo.delete(organization, skip_organization_id: true, timeout: 900_000)
  end

  @doc """
  Returns an `%Ecto.Changeset{}` for tracking organization changes.

  ## Examples

      iex> Glific.Partners.change_organization(organization)
      %Ecto.Changeset{data: %Glific.Partners.Organization{}}

  """
  @spec change_organization(Organization.t(), map()) :: Ecto.Changeset.t()
  def change_organization(%Organization{} = organization, attrs \\ %{}) do
    Organization.changeset(organization, attrs)
  end

  @doc """
  Returns bsp balance for an organization
  """
  @spec get_bsp_balance(non_neg_integer) :: {:ok, any()} | {:error, String.t()}
  def get_bsp_balance(organization_id) do
    organization = organization(organization_id)

    if is_nil(organization.services["bsp"]) do
      {:error, dgettext("errors", "No active BSP available")}
    else
      credentials = organization.services["bsp"]
      api_key = credentials.secrets["api_key"]

      case organization.bsp.shortcode do
        "gupshup" -> GupshupWallet.balance(api_key)
        _ -> {:error, dgettext("errors", "Invalid BSP provider")}
      end
    end
  end

  @doc """
  Returns quality rating information for an organization provider
  """
  @spec get_quality_rating(non_neg_integer()) :: {:ok, any()} | {:error, String.t()}
  def get_quality_rating(organization_id) do
    organization = organization(organization_id)

    if is_nil(organization.services["bsp"]) do
      {:error, dgettext("errors", "No active BSP available")}
    else
      case organization.bsp.shortcode do
        "gupshup" -> PartnerAPI.get_quality_rating(organization_id)
        _ -> {:error, dgettext("errors", "Invalid BSP provider")}
      end
    end
  end

  @doc """
  Given a minimal organization object, fill it up and store in cache. Making this
  public so we can call from test harness and avoid SQL Sandbox issues
  """
  @spec fill_cache(Organization.t()) :: Organization.t()
  def fill_cache(organization) do
    # For this process, lets set the organization id
    Repo.put_organization_id(organization.id)

    organization =
      organization
      |> set_root_user()
      |> set_credentials()
      |> Repo.preload([:bsp, :contact])
      |> set_bsp_info()
      |> set_out_of_office_values()
      |> set_languages()
      |> set_flow_uuid_display()
      |> set_roles_and_permission()
      |> set_contact_profile_enabled()

    Caches.set(
      @global_organization_id,
      [{:organization, organization.id}, {:organization, organization.shortcode}],
      organization
    )

    # also update the flags table with updated values
    Flags.init(organization)
    organization
  end

  @doc """
  Follow the cachex protocol to load the cache from the DB
  """
  @spec load_cache(tuple()) :: {:ignore, Organization.t()}
  def load_cache(cachex_key) do
    # this is of the form {:global_org_key, {:organization, value}}
    # we want the value element
    cache_key = cachex_key |> elem(1) |> elem(1)
    Logger.info("Loading organization cache: #{cache_key}")

    organization =
      if is_integer(cache_key) do
        get_organization!(cache_key) |> fill_cache()
      else
        case Repo.fetch_by(Organization, %{shortcode: cache_key}, skip_organization_id: true) do
          {:ok, organization} ->
            organization |> fill_cache()

          _ ->
            raise(ArgumentError, message: "Could not find an organization with #{cache_key}")
        end
      end

    # we are already storing this in the cache (in the function fill_cache),
    # so we can ask cachex to ignore the value. We need to do this since we are
    # storing multiple keys for the same object
    {:ignore, organization}
  end

  @doc """
  Cache the entire organization structure.
  """
  @spec organization(non_neg_integer | String.t()) ::
          Organization.t() | nil | {:error, String.t()}
  def organization(cache_key) do
    case Caches.fetch(@global_organization_id, {:organization, cache_key}, &load_cache/1) do
      {:error, error} ->
        {:error, error}

      {_, organization} ->
        Repo.put_organization_id(organization.id)
        organization
    end
  end

  @doc """
  This contact id is special since it is the sender for all outbound messages
  and the receiver for all inbound messages
  """
  @spec organization_contact_id(non_neg_integer) :: integer()
  def organization_contact_id(organization_id),
    do: organization(organization_id).contact_id

  @doc """
  Get the default language id
  """
  @spec organization_language_id(non_neg_integer) :: integer()
  def organization_language_id(organization_id),
    do: organization(organization_id).default_language_id

  @doc """
  Get the timezone
  """
  @spec organization_timezone(non_neg_integer) :: String.t()
  def organization_timezone(organization_id),
    do: organization(organization_id).timezone

  @spec set_root_user(Organization.t()) :: Organization.t()
  defp set_root_user(organization) do
    {:ok, root_user} = Repo.fetch_by(User, %{contact_id: organization.contact_id})
    Map.put(organization, :root_user, root_user)
  end

  @spec set_out_of_office_values(Organization.t()) :: Organization.t()
  defp set_out_of_office_values(organization) do
    out_of_office = organization.out_of_office

    {hours, days} =
      if out_of_office.enabled do
        hours = [out_of_office.start_time, out_of_office.end_time]

        days =
          Enum.reduce(
            out_of_office.enabled_days,
            [],
            fn x, acc ->
              if x.enabled,
                do: [x.id | acc],
                else: acc
            end
          )
          |> Enum.reverse()

        {hours, days}
      else
        {[], []}
      end

    organization
    |> Map.put(:hours, hours)
    |> Map.put(:days, days)
  end

  @spec set_languages(map()) :: map()
  defp set_languages(organization) do
    languages =
      Language
      |> where([l], l.id in ^organization.active_language_ids)
      |> Repo.all()

    organization
    |> Map.put(:languages, languages)
  end

  @doc """
  Determine if we need to show uuid on the nodes.
  """
  @spec get_flow_uuid_display(map()) :: boolean
  def get_flow_uuid_display(organization) do
    id = organization.id

    cond do
      FunWithFlags.enabled?(:flow_uuid_display, for: %{organization_id: id}) ->
        true

      # the below 2 conditions are just for testing and prototyping purposes
      # we'll get rid of them when we start using this actively
      Application.get_env(:glific, :environment) == :prod && id == 2 ->
        true

      Application.get_env(:glific, :environment) != :prod && id == 1 ->
        true

      true ->
        false
    end
  end

  @doc """
  check fun_with_flag toggle for an organization and returns boolean value
  """
  @spec get_roles_and_permission(Organization.t()) :: boolean()
  def get_roles_and_permission(organization),
    do: FunWithFlags.enabled?(:roles_and_permission, for: %{organization_id: organization.id})

  @doc """
  Determine if we need to enable contact profile for an organization
  """
  @spec get_contact_profile_enabled(map()) :: boolean
  def get_contact_profile_enabled(organization),
    do:
      FunWithFlags.enabled?(:is_contact_profile_enabled, for: %{organization_id: organization.id})

  @spec set_flow_uuid_display(map()) :: map()
  defp set_flow_uuid_display(organization) do
    Map.put(
      organization,
      :is_flow_uuid_display,
      get_flow_uuid_display(organization)
    )
  end

  @spec set_roles_and_permission(map()) :: map()
  defp set_roles_and_permission(organization) do
    Map.put(
      organization,
      :is_roles_and_permission,
      get_roles_and_permission(organization)
    )
  end

  @spec set_contact_profile_enabled(map()) :: map()
  defp set_contact_profile_enabled(organization) do
    Map.put(
      organization,
      :is_contact_profile_enabled,
      get_contact_profile_enabled(organization)
    )
  end

  # Lets cache all bsp provider specific info in the organization entity since
  # we use it on all sending / receiving of messages
  @spec set_bsp_info(map()) :: map()
  defp set_bsp_info(organization) do
    bsp_credential = organization.services[organization.bsp.shortcode]

    updated_services_map =
      Map.merge(organization.services, %{
        "bsp" => bsp_credential
      })

    %{organization | services: updated_services_map}
  end

  # Lets cache keys and secrets of all the active services
  @spec set_credentials(map()) :: map()
  defp set_credentials(organization) do
    credentials =
      Credential
      |> where([c], c.organization_id == ^organization.id)
      |> where([c], c.is_active == true)
      |> preload(:provider)
      |> Repo.all()

    services_map =
      Enum.reduce(credentials, %{}, fn credential, acc ->
        Map.merge(acc, %{
          credential.provider.shortcode => %{keys: credential.keys, secrets: credential.secrets}
        })
      end)

    organization
    |> Map.put(:services, services_map)
  end

  @spec suspend_offset(Organization.t(), non_neg_integer()) :: DateTime.t()
  defp suspend_offset(org, 0), do: start_of_next_day(org)

  defp suspend_offset(_org, hours), do: Timex.shift(DateTime.utc_now(), hours: hours)

  # get the start of the next day in orgs timezone and then convert that to UTC since
  # we only store UTC time in our DB
  @spec start_of_next_day(Organization.t()) :: DateTime.t()
  defp start_of_next_day(org),
    do:
      org.timezone
      |> DateTime.now!()
      |> Timex.beginning_of_day()
      |> Timex.shift(days: 1)
      |> Timex.to_datetime("Etc/UTC")

  @doc """
  Suspend an organization till the start of the next day for the organization
  (we still need to figure out if this is the right WABA interpretation)
  """
  @spec suspend_organization(Organization.t(), non_neg_integer()) :: any()
  def suspend_organization(organization, hours \\ 0) do
    {:ok, _} =
      organization
      |> then(fn org ->
        Partners.update_organization(
          org,
          %{
            is_suspended: true,
            suspended_until: suspend_offset(org, hours)
          }
        )
      end)
  end

  @spec unsuspend_org_list(DateTime.t()) :: list()
  defp unsuspend_org_list(time \\ DateTime.utc_now()) do
    Organization
    |> where([q], q.is_active == true)
    |> select([q], q.id)
    |> where([q], q.is_suspended == true)
    |> where([q], q.suspended_until < ^time)
    |> Repo.all(skip_organization_id: true)
  end

  @spec unsuspend_organization(non_neg_integer()) :: any()
  defp unsuspend_organization(org_id) do
    {:ok, _} =
      update_organization(
        organization(org_id),
        %{
          is_suspended: false,
          suspended_until: nil
        }
      )
  end

  @doc """
  Resume all organization that are suspended if we are past the suspended time, we check this on an hourly basis for all organizations
  that are in a suspended state via a cron job
  """
  @spec unsuspend_organizations :: any()
  def unsuspend_organizations do
    unsuspend_org_list()
    |> Enum.each(&unsuspend_organization(&1))
  end

  @doc """
  Execute a function across all active organizations. This function is typically called
  by a micron job worker process

  The handler is expected to take the organization id as its first argument. The second argument
  is expected to be a map of arguments passed in by the cron job, and can be ignored if not used

  The list is a restricted list of organizations, so we don't repeatedly do work. The convention is as
  follows:

  list == nil - the action should not be performed for any organization
  list == [] (empty list) - the action should be performed for all organizations
  list == [ values ] - the actions should be performed only for organizations in the values list
  """
  @spec perform_all((... -> nil), map() | nil, list() | [] | nil, Keyword.t()) :: any
  def perform_all(handler, handler_args, list, opts \\ [])

  def perform_all(_handler, _handler_args, nil, _opts), do: nil

  def perform_all(handler, handler_args, list, opts) do
    only_recent = Keyword.get(opts, :only_recent, false)
    # We need to do this for all the active organizations
    list
    |> active_organizations()
    |> recent_organizations(only_recent)
    |> Enum.each(fn {id, %{name: name}} ->
      perform_handler(handler, handler_args, id, name)
    end)
  rescue
    # If we fail, we need to mark the organization as failed
    # and log the error
    err ->
      "Error occurred while executing cron handler for organizations. Error: #{inspect(err)}, handler: #{inspect(handler)}, handler_args: #{inspect(handler_args)}"
      |> Glific.log_error()
  end

  @active_minutes 120

  @doc """
  Get the organizations which had a message transaction in the last minutes
  as defined by @active_minutes
  """
  @spec recent_organizations(map(), boolean) :: map()
  def recent_organizations(map, false), do: map

  def recent_organizations(map, true) do
    Enum.filter(
      map,
      fn {_id, %{last_communication_at: last_communication_at}} ->
        Timex.diff(DateTime.utc_now(), last_communication_at, :minutes) < @active_minutes
      end
    )
  end

  @spec perform_handler((... -> nil), map() | nil, non_neg_integer(), String.t() | nil) :: any
  defp perform_handler(handler, handler_args, org_id, org_name) do
    Repo.put_process_state(org_id)

    Logger.info("Starting processes for org id: #{org_id}")

    if is_nil(handler_args) do
      handler.(org_id)
    else
      handler.(org_id, Map.put(handler_args, :organization_name, org_name))
    end
  end

  @doc """
  Get organization's credential by service shortcode
  """
  @spec get_credential(map()) ::
          {:ok, Credential.t()} | {:error, String.t() | [String.t()]}
  def get_credential(%{organization_id: organization_id, shortcode: shortcode}) do
    case Repo.fetch_by(Provider, %{shortcode: shortcode}) do
      {:ok, provider} ->
        Repo.fetch_by(Credential, %{
          organization_id: organization_id,
          provider_id: provider.id
        })

      _ ->
        {:error, ["shortcode", "Invalid provider shortcode."]}
    end
  end

  @doc """
  Creates an organization's credential
  """
  @spec create_credential(map()) :: {:ok, Credential.t()} | {:error, any()}
  def create_credential(attrs) do
    case Repo.fetch_by(Provider, %{shortcode: attrs[:shortcode]}) do
      {:ok, provider} ->
        # first delete the cached organization
        organization = get_organization!(attrs.organization_id)
        remove_organization_cache(organization.id, organization.shortcode)
        attrs = Map.merge(attrs, %{provider_id: provider.id})

        %Credential{}
        |> Credential.changeset(attrs)
        |> Repo.insert()

      _ ->
        {:error, ["shortcode", "Invalid provider shortcode: #{attrs[:shortcode]}."]}
    end
  end

  # check for non empty string or nil
  @spec non_nil_string(String.t() | nil) :: boolean()
  defp non_nil_string(str) do
    !is_nil(str) && str != ""
  end

  # Ensures we have all the keys required in the credential to call Gupshup
  @spec valid_bsp?(Credential.t()) :: boolean()
  defp valid_bsp?(credential) do
    bsp = credential.provider.shortcode

    credential.provider.group == "bsp" &&
      non_nil_string(credential.keys["api_end_point"]) &&
      validate_secrets?(credential.secrets, bsp)
  end

  @spec validate_secrets?(map(), String.t()) :: boolean()
  defp validate_secrets?(secrets, "gupshup"),
    do:
      non_nil_string(secrets["app_name"]) &&
        non_nil_string(secrets["api_key"])

  defp validate_secrets?(secrets, "gupshup_enterprise"),
    do:
      non_nil_string(secrets["hsm_user_id"]) &&
        non_nil_string(secrets["hsm_password"]) &&
        non_nil_string(secrets["two_way_user_id"]) &&
        non_nil_string(secrets["two_way_password"])

  defp validate_secrets?(_secrets, _bsp),
    do: false

  @doc """
  Updates an organization's credential
  """

  @spec update_credential(Credential.t(), map()) ::
          {:ok, Credential.t()} | {:error, any}
  def update_credential(%Credential{} = credential, attrs) do
    # delete the cached organization and associated credentials
    organization = organization(credential.organization_id)

    remove_organization_cache(organization.id, organization.shortcode)

    {:ok, credential} =
      credential
      |> Credential.changeset(attrs)
      |> Repo.update()

    credential = credential |> Repo.preload([:provider, :organization])

    credential.organization
    |> credential_update_callback(credential, credential.provider.shortcode)
  end

  @spec credential_update_callback(Organization.t(), Credential.t(), String.t()) ::
          {:ok, any} | {:error, any}
  defp credential_update_callback(organization, credential, "bigquery") do
    Caches.remove(organization.id, [{:provider_token, "bigquery"}])

    case BigQuery.sync_schema_with_bigquery(organization.id) do
      {:ok, _callback} ->
        {:ok, credential}

      {:error, error} ->
        Partners.disable_credential(
          organization.id,
          "bigquery",
          error
        )

        {:error, error}
    end
  end

  defp credential_update_callback(organization, credential, "google_cloud_storage") do
    case GCS.refresh_gcs_setup(organization.id) do
      {:ok, _callback} -> {:ok, credential}
      {:error, _error} -> {:error, "Invalid Credentials"}
    end
  end

  defp credential_update_callback(organization, credential, "dialogflow") do
    case Glific.Dialogflow.get_intent_list(organization.id) do
      {:ok, _callback} -> {:ok, credential}
      {:error, _error} -> {:error, "Invalid Credentials"}
    end
  end

  defp credential_update_callback(organization, credential, "gupshup") do
    if valid_bsp?(credential) do
      update_organization(organization, %{bsp_id: credential.provider.id})

      if credential.is_active do
        GupshupContacts.fetch_opted_in_contacts(credential)
        set_bsp_app_id(organization, "gupshup")
      end
    end

    {:ok, credential}
  end

  defp credential_update_callback(organization, credential, "gupshup_enterprise") do
    if valid_bsp?(credential) do
      update_organization(organization, %{bsp_id: credential.provider.id})
    end

    {:ok, credential}
  end

  defp credential_update_callback(_organization, credential, _provider), do: {:ok, credential}

  @doc """
  Removing organization and service cache
  """
  @spec remove_organization_cache(non_neg_integer, String.t()) :: any()
  def remove_organization_cache(organization_id, shortcode) do
    Caches.remove(@global_organization_id, ["organization_services"])

    Caches.remove(
      @global_organization_id,
      [{:organization, organization_id}, {:organization, shortcode}]
    )

    Caches.remove(
      @global_organization_id,
      ["organization_services"]
    )
  end

  @spec config(map()) :: map() | :error
  defp config(credentials) do
    case Jason.decode(credentials.secrets["service_account"]) do
      {:ok, config} -> config
      _ -> :error
    end
  end

  @doc """
  Common function to get the goth config
  """
  @spec get_goth_token(non_neg_integer, String.t()) :: nil | Goth.Token.t()
  def get_goth_token(organization_id, provider_shortcode) do
    key = {:provider_token, provider_shortcode}
    organization = organization(organization_id)

    if is_nil(organization.services[provider_shortcode]) do
      nil
    else
      Caches.fetch(organization_id, key, &load_goth_token/1)
      |> case do
        {_status, res} when is_map(res) ->
          res

        _ ->
          Logger.error(
            "Could not fetch token for service #{provider_shortcode} for org id: #{organization_id}"
          )

          nil
      end
    end
  end

  @spec load_goth_token(tuple()) :: tuple()
  defp load_goth_token(cache_key) do
    {organization_id, {:provider_token, provider_shortcode}} = cache_key

    organization = organization(organization_id)
    credentials = organization.services[provider_shortcode] |> config()

    if credentials == :error do
      {:ignore, nil}
    else
      Goth.Token.fetch(source: {:service_account, credentials})
      |> case do
        {:ok, token} ->
          opts = [ttl: :timer.seconds(token.expires - System.system_time(:second) - 60)]
          Caches.set(organization_id, {:provider_token, provider_shortcode}, token, opts)
          {:ignore, token}

        {:error, error} ->
          Logger.info(
            "Error fetching token for: #{provider_shortcode}, error: #{error}, org_id: #{organization_id}"
          )

          handle_token_error(organization_id, provider_shortcode, "#{inspect(error)}")
          {:ignore, nil}
      end
    end
  end

  @spec handle_token_error(non_neg_integer, String.t(), String.t() | any()) :: nil
  defp handle_token_error(organization_id, provider_shortcode, error) when is_binary(error) do
    if String.contains?(error, ["account not found", "invalid_grant"]),
      do:
        disable_credential(
          organization_id,
          provider_shortcode,
          "Invalid credentials, service account not found"
        )

    nil
  end

  defp handle_token_error(_organization_id, _provider_shortcode, error),
    do: raise("Error fetching goth token' #{inspect(error)}")

  @doc """
  Disable a specific credential for the organization
  """
  @spec disable_credential(non_neg_integer, String.t(), String.t()) :: :ok | {:error, list()}
  def disable_credential(organization_id, shortcode, error_message) do
    case Repo.fetch_by(Provider, %{shortcode: shortcode}) do
      {:ok, provider} ->
        # first delete the cached organization
        organization = get_organization!(organization_id)
        remove_organization_cache(organization.id, organization.shortcode)

        Credential
        |> where([c], c.provider_id == ^provider.id)
        |> where([c], c.organization_id == ^organization_id)
        |> Repo.update_all(set: [is_active: false])

        Logger.info("Disable #{shortcode} credential for org_id: #{organization_id}")

        Notifications.create_notification(%{
          category: "Partner",
          message: "Disabling #{shortcode}. #{error_message}",
          severity: Notifications.types().critical,
          organization_id: organization_id,
          entity: %{
            id: provider.id,
            shortcode: shortcode
          }
        })

        :ok

      _ ->
        {:error, ["shortcode", "Invalid provider shortcode to disable: #{shortcode}."]}
    end
  end

  @doc """
  Check if we can allow attachments for this organization. For now, this is a check to
  see if GCS is enabled for this organization
  """
  @spec attachments_enabled?(non_neg_integer) :: boolean()
  def attachments_enabled?(organization_id),
    do:
      organization_id
      |> organization()
      |> Map.get(:services)
      |> Map.has_key?("google_cloud_storage")

  @doc """
  Given an empty list, determine which organizations have been active in the recent
  past
  """
  @spec org_id_list(list(), boolean) :: list()
  def org_id_list([], recent) do
    active_organizations([])
    |> recent_organizations(recent)
    |> Enum.reduce([], fn {id, _map}, acc -> [id | acc] end)
  end

  def org_id_list(list, _recent) do
    Enum.map(
      list,
      fn l ->
        {:ok, int_l} = Glific.parse_maybe_integer(l)
        int_l
      end
    )
  end

  @doc """
  Wrapper query used by various statistics collection routines in Glific
  to return counts on contact with its variations
  """
  @spec contact_organization_query(list()) :: Ecto.Query.t()
  def contact_organization_query(org_id_list) do
    Contact
    # block messages sent to groups
    |> where([c], c.status != :blocked)
    |> where([c], c.organization_id in ^org_id_list)
    |> group_by([c], c.organization_id)
    |> select([c], [count(c.id), c.organization_id])
  end

  @doc """
  Convert global field to map for variable substitution
  """
  @spec get_global_field_map(integer) :: map()
  def get_global_field_map(organization_id), do: organization(organization_id).fields

  @doc """
  Returns a map of organizations services as key value pair
  """
  @spec get_organization_services :: map()
  def get_organization_services do
    case Caches.fetch(
           @global_organization_id,
           "organization_services",
           &load_organization_services/1
         ) do
      {:error, error} ->
        raise(ArgumentError,
          message: "Failed to retrieve organization services: #{error}"
        )

      {_, services} ->
        services
    end
  end

  # this is a global cache, so we kinda ignore the cache key
  @spec load_organization_services(tuple()) :: {:commit, map()}
  defp load_organization_services(_cache_key) do
    services =
      active_organizations([])
      |> Enum.reduce(
        %{},
        fn {org_id, _name}, acc ->
          Map.put(acc, org_id, get_org_services_by_id(org_id))
        end
      )
      |> combine_services()

    {:commit, services}
  end

  @doc """
    Get all the services and status for a given organization id.
  """
  @spec get_org_services_by_id(non_neg_integer) :: map()
  def get_org_services_by_id(organization_id) do
    organization = organization(organization_id)

    %{
      "fun_with_flags" =>
        FunWithFlags.enabled?(
          :enable_out_of_office,
          for: %{organization_id: organization_id}
        ),
      "bigquery" => organization.services["bigquery"] != nil,
      "google_cloud_storage" => organization.services["google_cloud_storage"] != nil,
      "dialogflow" => organization.services["dialogflow"] != nil,
      "flow_uuid_display" => get_flow_uuid_display(organization),
      "roles_and_permission" => get_roles_and_permission(organization),
      "contact_profile_enabled" => get_contact_profile_enabled(organization)
    }
  end

  @spec add_service(map(), String.t(), boolean(), non_neg_integer) :: map()
  defp add_service(acc, _name, false, _org_id), do: acc

  defp add_service(acc, name, true, org_id) do
    value = Map.get(acc, name, [])
    Map.put(acc, name, [org_id | value])
  end

  @spec combine_services(map()) :: map()
  defp combine_services(services) do
    combined =
      services
      |> Enum.reduce(
        %{},
        fn {org_id, service}, acc ->
          acc
          |> add_service("fun_with_flags", service["fun_with_flags"], org_id)
          |> add_service("bigquery", service["bigquery"], org_id)
          |> add_service("google_cloud_storage", service["google_cloud_storage"], org_id)
          |> add_service("dialogflow", service["dialogflow"], org_id)
        end
      )

    Map.merge(services, combined)
  end

  @doc """
  Set BSP APP id whenever we update the bsp credentials.
  """
  @spec set_bsp_app_id(Organization.t(), String.t()) :: any()
  def set_bsp_app_id(org, "gupshup" = shortcode) do
    # restricting this function  for BSP only
    {:ok, provider} = Repo.fetch_by(Provider, %{shortcode: shortcode, group: "bsp"})

    {:ok, bsp_cred} =
      Repo.fetch_by(Credential, %{provider_id: provider.id, organization_id: org.id})

    app_details = PartnerAPI.fetch_app_details(org.id)
    app_id = if is_map(app_details), do: app_details["id"], else: "NA"

    updated_secrets = Map.put(bsp_cred.secrets, "app_id", app_id)
    attrs = %{secrets: updated_secrets, organization_id: org.id}

    {:ok, _credential} =
      bsp_cred
      |> Credential.changeset(attrs)
      |> Repo.update()

    remove_organization_cache(org.id, org.shortcode)
  end

  def set_bsp_app_id(org, _shortcode), do: org

  @doc """
  Get a List for org data
  """
  @spec list_organization_data(map()) :: [Provider.t(), ...]
  def list_organization_data(args \\ %{}) do
    Repo.list_filter(
      args,
      OrganizationData,
      &Repo.opts_with_name/2,
      &filter_organization_data_with/2
    )
  end

  @spec filter_organization_data_with(Ecto.Queryable.t(), %{optional(atom()) => any}) ::
          Ecto.Queryable.t()
  defp filter_organization_data_with(query, filter) do
    query = Repo.filter_with(query, filter)
    # these filters are specific to webhook logs only.
    # We might want to move them in the repo in the future.
    Enum.reduce(filter, query, fn
      {:key, key}, query ->
        from(q in query, where: ilike(q.key, ^"%#{key}%"))

      _, query ->
        query
    end)
  end

  @doc """
  Create a Client Data struct
  """
  @spec create_organization_data(map()) ::
          {:ok, OrganizationData.t()} | {:error, Ecto.Changeset.t()}
  def create_organization_data(attrs \\ %{}) do
    %OrganizationData{}
    |> OrganizationData.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Update a Client Data struct
  """
  @spec update_organization_data(OrganizationData.t(), map()) ::
          {:ok, OrganizationData.t()} | {:error, Ecto.Changeset.t()}
  def update_organization_data(organization_data, attrs) do
    organization_data
    |> OrganizationData.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Delete Client Data struct
  """
  @spec delete_organization_data(OrganizationData.t()) ::
          {:ok, OrganizationData.t()} | {:error, Ecto.Changeset.t()}
  def delete_organization_data(%OrganizationData{} = organization_data) do
    Repo.delete(organization_data)
  end

  @doc """
  Insert or update data if key present for OrganizationData table.
  """
  @spec maybe_insert_organization_data(String.t(), map(), non_neg_integer()) ::
          {:ok, OrganizationData.t()} | {:error, Ecto.Changeset.t()}
  def maybe_insert_organization_data(key, data, org_id) do
    # check if the week key is already present in the database
    case Repo.get_by(OrganizationData, %{key: key, organization_id: org_id}) do
      nil ->
        attrs =
          %{}
          |> Map.put(:key, key)
          |> Map.put(:json, data)
          |> Map.put(:organization_id, org_id)

        %OrganizationData{}
        |> OrganizationData.changeset(attrs)
        |> Repo.insert()

      organization_data ->
        organization_data
        |> OrganizationData.changeset(%{json: data})
        |> Repo.update()
    end
  end
end