lib/glific/contacts/import.ex

defmodule Glific.Contacts.Import do
  @moduledoc """
  The Contact Importer Module
  """
  import Ecto.Query, warn: false

  alias Glific.{
    Contacts,
    Contacts.Contact,
    Flows.ContactField,
    Groups,
    Groups.ContactGroup,
    Groups.GroupContacts,
    Partners,
    Repo,
    Settings,
    Users.User
  }

  @max_concurrency System.schedulers_online()

  @spec cleanup_contact_data(map(), map(), String.t()) :: map()
  defp cleanup_contact_data(
         data,
         %{user: user, organization_id: organization_id} = contact_attrs,
         date_format
       ) do
    results = %{
      name: data["name"],
      phone: data["phone"],
      organization_id: organization_id,
      language_id: Enum.at(Settings.get_language_by_label_or_locale(data["language"]), 0).id,
      collection: data["collection"],
      delete: data["delete"],
      contact_fields: Map.drop(data, ["phone", "group", "language", "delete", "opt_in"]),
      optin_time:
        if(data["opt_in"] in ["", nil],
          do: nil,
          else: elem(Timex.parse(data["opt_in"], date_format), 1)
        )
    }

    cond do
      user.roles == [:glific_admin] ->
        results
        |> Map.merge(%{
          delete: data["delete"],
          collection: contact_attrs.collection
        })

      user.upload_contacts ->
        results
        |> Map.merge(%{
          delete: data["delete"]
        })

      true ->
        results
    end
  end

  @spec add_contact_fields(Contact.t(), map()) :: {:ok, ContactGroup.t()}
  defp add_contact_fields(contact, fields) do
    Enum.reduce(fields, contact, fn {field, value}, contact ->
      field = Glific.string_snake_case(field)

      if value === "",
        do: contact,
        else:
          ContactField.do_add_contact_field(
            contact,
            field,
            field,
            value
          )
    end)
  end

  @spec fetch_contact_data_as_string(Keyword.t()) :: File.Stream.t() | IO.Stream.t()
  defp fetch_contact_data_as_string(opts) do
    file_path = Keyword.get(opts, :file_path, nil)
    url = Keyword.get(opts, :url, nil)
    data = Keyword.get(opts, :data, nil)

    cond do
      file_path != nil ->
        file_path |> Path.expand() |> File.stream!()

      url != nil ->
        {:ok, response} = Tesla.get(url)
        {:ok, stream} = StringIO.open(response.body)
        stream |> IO.binstream(:line)

      data != nil ->
        {:ok, stream} = StringIO.open(data)
        stream |> IO.binstream(:line)
    end
  end

  @doc """
  This method allows importing of contacts to a particular organization and group

  The method takes in a csv file path and adds the contacts to the particular organization
  and group.
  """
  @spec import_contacts(non_neg_integer(), map(), [{atom(), String.t()}]) :: tuple()
  def import_contacts(
        organization_id,
        %{user: user, collection: collection} = _contact_attrs,
        opts
      ) do
    if length(opts) > 1 do
      raise "Please specify only one of keyword arguments: file_path, url or data"
    end

    contact_data_as_stream = fetch_contact_data_as_string(opts)
    contact_attrs = %{organization_id: organization_id, user: user, collection: collection}

    handle_csv_for_admins(contact_attrs, contact_data_as_stream, opts)
  end

  def import_contacts(organization_id, contact_attrs, opts) do
    if length(opts) > 1 do
      raise "Please specify only one of keyword arguments: file_path, url or data"
    end

    contact_data_as_stream = fetch_contact_data_as_string(opts)
    contact_attrs = %{organization_id: organization_id, user: contact_attrs.user}
    handle_csv_for_admins(contact_attrs, contact_data_as_stream, opts)
  end

  @spec handle_csv_for_admins(map(), map(), [{atom(), String.t()}]) :: tuple()
  defp handle_csv_for_admins(contact_attrs, data, opts) do
    # this ensures the  org_id exists and is valid
    case Partners.organization(contact_attrs.organization_id) do
      %{} ->
        decode_csv_data(contact_attrs, data, opts)

      {:error, error} ->
        {:error,
         %{
           message: "All contacts could not be added",
           details:
             "Could not fetch the organization with id #{contact_attrs.organization_id}. Error -> #{inspect(error)}"
         }}
    end
  end

  @spec decode_csv_data(map(), map(), [{atom(), String.t()}]) :: tuple()
  defp decode_csv_data(params, data, opts) do
    %{organization_id: organization_id, user: user} = params
    {date_format, _opts} = Keyword.pop(opts, :date_format, "{YYYY}-{M}-{D} {h24}:{m}:{s}")

    result =
      data
      |> CSV.decode(headers: true, strip_fields: true)
      |> Stream.map(fn {_, data} -> cleanup_contact_data(data, params, date_format) end)
      |> Task.async_stream(
        fn contact ->
          process_data(user, contact, %{
            organization_id: Repo.put_process_state(organization_id)
          })
        end,
        max_concurrency: @max_concurrency
      )
      |> Enum.map(fn {:ok, result} -> result end)

    errors =
      result
      |> Enum.filter(fn contact -> Map.has_key?(contact, :error) end)
      |> Enum.map(fn %{error: error} -> error end)

    case errors do
      [] ->
        {:ok, %{message: "All contacts added"}}

      _ ->
        {:error, errors}
    end
  end

  @spec process_data(User.t(), map(), map()) :: Contact.t() | map()
  defp process_data(user, %{delete: "1"} = contact, _contact_attrs) do
    if user.roles == [:glific_admin] || user.upload_contacts == true do
      case Repo.get_by(Contact, %{phone: contact.phone}) do
        nil ->
          %{ok: "Contact does not exist"}

        contact ->
          {:ok, contact} = Contacts.delete_contact(contact)
          contact
      end
    else
      %{
        error: "This user doesn't have enough permission"
      }
    end
  end

  defp process_data(user, contact_attrs, _attrs) do
    cond do
      user.roles == [:glific_admin] ->
        {:ok, contact} = Contacts.maybe_create_contact(contact_attrs)

        create_group_and_contact_fields(contact_attrs, contact)
        optin_contact(user, contact, contact_attrs)

      user.upload_contacts ->
        {:ok, contact} = Contacts.maybe_create_contact(contact_attrs)
        may_update_contact(contact_attrs)
        optin_contact(user, contact, contact_attrs)

      true ->
        may_update_contact(contact_attrs)
    end
  end

  @spec may_update_contact(map()) :: {:ok, any} | {:error, any}
  defp may_update_contact(contact_attrs) do
    case Contacts.maybe_update_contact(contact_attrs) do
      {:ok, contact} -> create_group_and_contact_fields(contact_attrs, contact)
      {:error, error} -> %{error: error}
    end
  end

  @spec create_group_and_contact_fields(map(), Contact.t()) :: :ok | {:ok, ContactGroup.t()}
  defp create_group_and_contact_fields(contact_attrs, contact) do
    collection_label_check(contact, contact_attrs.collection)

    if contact_attrs[:contact_fields] not in [%{}] do
      add_contact_fields(contact, contact_attrs[:contact_fields])
    end
  end

  @spec collection_label_check(Contact.t(), String.t()) :: boolean() | :ok
  defp collection_label_check(_contact, nil), do: false

  defp collection_label_check(contact, collection) when is_binary(collection) do
    if String.length(collection) != 0 do
      collection = String.split(collection, ",")

      add_multiple_group(collection, contact.organization_id)
      add_contact_to_groups(collection, contact)
    end
  end

  @spec add_contact_to_groups(list(), Contact.t()) :: :ok
  defp add_contact_to_groups(collection, contact) do
    collection
    |> Groups.load_group_by_label()
    |> Enum.each(fn group ->
      Groups.create_contact_group(%{
        contact_id: contact.id,
        group_id: group.id,
        organization_id: contact.organization_id
      })
    end)
  end

  @spec add_multiple_group(list(), non_neg_integer()) :: :ok
  defp add_multiple_group(collection, organization_id) do
    collection
    |> Enum.each(fn label -> Groups.get_or_create_group_by_label(label, organization_id) end)
  end

  @spec optin_contact(User.t(), Contact.t(), map()) :: Contact.t()
  defp optin_contact(user, contact, contact_attrs) do
    if should_optin_contact?(user, contact, contact_attrs) do
      contact_attrs
      |> Map.put(:method, "Import")
      |> Contacts.optin_contact()
      |> case do
        {:ok, contact} ->
          contact

        {:error, error} ->
          %{phone: contact.phone, error: error}
      end
    else
      %{
        phone: contact.phone,
        error:
          "Not able to optin the contact. Either the contact is opted out, invalid or the opted-in time present in sheet is not in the correct format"
      }
    end
  end

  ## later we can have one more column to say that force optin
  @spec should_optin_contact?(User.t(), Contact.t(), map()) :: boolean()
  defp should_optin_contact?(user, contact, attrs) do
    cond do
      attrs.optin_time == nil ->
        false

      contact.optout_time != nil ->
        false

      user.roles == [:glific_admin] || user.upload_contacts ->
        true

      true ->
        false
    end
  end

  @doc """
    Move the existing contacts to a group.
  """
  @spec add_contacts_to_group(integer, String.t(), [{atom(), String.t()}]) :: tuple()
  def add_contacts_to_group(organization_id, group_label, opts \\ []) do
    contact_data_as_stream = fetch_contact_data_as_string(opts)
    {:ok, group} = Groups.get_or_create_group_by_label(group_label, organization_id)

    contact_id_list =
      contact_data_as_stream
      |> CSV.decode(headers: true, strip_fields: true)
      |> Enum.map(fn {_, data} -> clean_contact_for_group(data, organization_id) end)
      |> get_contact_id_list(organization_id)

    %{
      group_id: group.id,
      add_contact_ids: contact_id_list,
      delete_contact_ids: [],
      organization_id: organization_id
    }
    |> GroupContacts.update_group_contacts()

    {:ok, %{message: "#{length(contact_id_list)} contacts added to group #{group_label}"}}
  end

  @spec clean_contact_for_group(map(), non_neg_integer()) :: map()
  defp clean_contact_for_group(data, _organization_id),
    do: %{phone: data["Contact Number"]}

  @spec get_contact_id_list(list(), non_neg_integer()) :: list()
  defp get_contact_id_list(contacts, org_id) do
    contact_phone_list = Enum.map(contacts, fn contact -> contact.phone end)
    Repo.put_organization_id(org_id)

    Contact
    |> where([c], c.organization_id == ^org_id)
    |> where([c], c.phone in ^contact_phone_list)
    |> select([c], c.id)
    |> Repo.all()
  end
end