lib/glific/third_party/bigquery/bigquery.ex

defmodule Glific.BigQuery do
  @moduledoc """
  Glific BigQuery Dataset and table creation
  """

  require Logger
  use Publicist

  alias Glific.{
    BigQuery.BigQueryJob,
    BigQuery.Schema,
    Contacts.Contact,
    Contacts.ContactHistory,
    Flows,
    Flows.FlowCount,
    Flows.FlowResult,
    Flows.FlowRevision,
    Flows.MessageBroadcast,
    Flows.MessageBroadcastContact,
    Jobs,
    Messages.Message,
    Messages.MessageConversation,
    Messages.MessageMedia,
    Partners,
    Partners.Saas,
    Profiles.Profile,
    Repo,
    Stats.Stat
  }

  alias GoogleApi.BigQuery.V2.{
    Api.Datasets,
    Api.Routines,
    Api.Tabledata,
    Api.Tables,
    Connection
  }

  @bigquery_tables %{
    "messages" => :message_schema,
    "contacts" => :contact_schema,
    "flows" => :flow_schema,
    "flow_results" => :flow_result_schema,
    "stats" => :stats_schema,
    "flow_counts" => :flow_count_schema,
    "messages_media" => :messages_media_schema,
    "flow_contexts" => :flow_context_schema,
    "profiles" => :profile_schema,
    "contact_histories" => :contact_history_schema,
    "message_conversations" => :message_conversation_schema,
    "message_broadcasts" => :message_broadcasts_schema,
    "message_broadcast_contacts" => :message_broadcast_contacts_schema
  }

  @spec bigquery_tables(any) :: %{optional(<<_::40, _::_*8>>) => atom}
  defp bigquery_tables(organization_id) do
    if organization_id == Saas.organization_id(),
      do: Map.put(@bigquery_tables, "stats_all", :stats_all_schema),
      else: @bigquery_tables
  end

  @doc """
  Ignore the tables for updates operations
  """
  @spec ignore_updates_for_table() :: list()
  def ignore_updates_for_table do
    [
      "message_conversations",
      "contact_histories",
      "flows",
      "stats",
      "stats_all"
    ]
  end

  @doc """
    Returns the status if the bigquery is enabled for
    organization.
  """
  @spec is_active?(non_neg_integer()) :: boolean()
  def is_active?(org_id) do
    organization = Partners.organization(org_id)
    not is_nil(organization.services["bigquery"])
  end

  @doc """
  Creating a dataset with messages and contacts as tables
  """
  @spec sync_schema_with_bigquery(non_neg_integer) :: {:ok, any} | {:error, any}
  def sync_schema_with_bigquery(organization_id) do
    with {:ok, %{conn: conn, project_id: project_id, dataset_id: dataset_id}} <-
           fetch_bigquery_credentials(organization_id) do
      case create_dataset(conn, project_id, dataset_id) do
        {:ok, _} ->
          do_refresh_the_schema(organization_id, %{
            conn: conn,
            dataset_id: dataset_id,
            project_id: project_id
          })

          {:ok, "Refreshing Bigquery Schema"}

        {:error, response} ->
          handle_sync_errors(response, organization_id, %{
            conn: conn,
            dataset_id: dataset_id,
            project_id: project_id
          })
      end
    end
  end

  @doc false
  @spec fetch_bigquery_credentials(non_neg_integer) :: nil | {:ok, any} | {:error, any}
  def fetch_bigquery_credentials(organization_id) do
    organization = Partners.organization(organization_id)
    org_contact = organization.contact

    organization.services["bigquery"]
    |> case do
      nil ->
        {:ok, "BigQuery is not active"}

      credentials ->
        decode_bigquery_credential(credentials, org_contact, organization_id)
    end
  end

  @doc """
  Decoding the credential for bigquery
  """
  @spec decode_bigquery_credential(map(), map(), non_neg_integer) :: {:ok, any} | {:error, any}
  def decode_bigquery_credential(
        credentials,
        org_contact,
        organization_id
      ) do
    case Jason.decode(credentials.secrets["service_account"]) do
      {:ok, service_account} ->
        project_id = service_account["project_id"]
        token = Partners.get_goth_token(organization_id, "bigquery")

        if is_nil(token) do
          {:error, "Error fetching token with Service Account JSON"}
        else
          conn = Connection.new(token.token)
          {:ok, %{conn: conn, project_id: project_id, dataset_id: org_contact.phone}}
        end

      {:error, _error} ->
        {:error, "Invalid Service Account JSON"}
    end
  end

  @table_lookup %{
    "messages" => Message,
    "contacts" => Contact,
    "flow_results" => FlowResult,
    "flows" => FlowRevision,
    "stats" => Stat,
    "stats_all" => Stat,
    "flow_counts" => FlowCount,
    "messages_media" => MessageMedia,
    "flow_contexts" => Flows.FlowContext,
    "profiles" => Profile,
    "contact_histories" => ContactHistory,
    "message_conversations" => MessageConversation,
    "message_broadcasts" => MessageBroadcast,
    "message_broadcast_contacts" => MessageBroadcastContact
  }

  @doc false
  @spec get_table_struct(String.t()) :: atom()
  def get_table_struct(table_name),
    do: Map.fetch!(@table_lookup, table_name)

  @doc """
  Refresh the bigquery schema and update all the older versions.
  """
  @spec do_refresh_the_schema(non_neg_integer, map()) ::
          {:error, Tesla.Env.t()} | {:ok, Tesla.Env.t()}
  def do_refresh_the_schema(
        organization_id,
        %{conn: conn, dataset_id: dataset_id, project_id: project_id} = _cred
      ) do
    Logger.info("refresh BigQuery schema for org_id: #{organization_id}")
    insert_bigquery_jobs(organization_id)
    create_tables(conn, organization_id, dataset_id, project_id)
    alter_tables(conn, organization_id, dataset_id, project_id)
    contacts_messages_view(conn, dataset_id, project_id)
    alter_contacts_messages_view(conn, dataset_id, project_id)
    flat_fields_procedure(conn, dataset_id, project_id)
  end

  @doc false
  @spec insert_bigquery_jobs(non_neg_integer) :: :ok
  def insert_bigquery_jobs(organization_id) do
    organization_id
    |> bigquery_tables()
    |> Map.keys()
    |> Enum.each(&create_bigquery_job(&1, organization_id))

    :ok
  end

  @doc false
  @spec create_bigquery_job(String.t(), non_neg_integer) :: :ok
  defp create_bigquery_job(table_name, organization_id) do
    Repo.fetch_by(BigQueryJob, %{table: table_name, organization_id: organization_id})
    |> case do
      {:ok, bigquery_job} ->
        bigquery_job

      _ ->
        %BigQueryJob{
          table: table_name,
          table_id: 0,
          organization_id: organization_id,
          last_updated_at: DateTime.utc_now()
        }
        |> Repo.insert!()
    end

    :ok
  end

  @spec handle_sync_errors(map(), non_neg_integer, map()) :: {:ok, any()}
  defp handle_sync_errors(response, organization_id, attrs) do
    Jason.decode(response.body)
    |> case do
      {:ok, data} ->
        error = data["error"]

        case error["status"] do
          "ALREADY_EXISTS" ->
            do_refresh_the_schema(organization_id, attrs)
            {:ok, "Refreshing Bigquery Schema"}

          "PERMISSION_DENIED" ->
            {:error,
             "Account does not have sufficient permissions to create dataset to BigQuery."}

          _ ->
            {:error,
             "Account deactivated with error code #{error["code"]} status #{error["status"]}"}
        end

      _ ->
        raise("Error while sync data with bigquery. #{inspect(response)}")
    end
  end

  ## Creating a view with un nested fields from contacts
  @spec flat_fields_procedure(Tesla.Client.t(), String.t(), String.t()) ::
          {:ok, GoogleApi.BigQuery.V2.Model.Table.t()} | {:ok, Tesla.Env.t()} | {:error, any()}
  defp flat_fields_procedure(conn, dataset_id, project_id) do
    routine_id = "flat_fields"
    definition = Schema.flat_fields_procedure(project_id, dataset_id)

    {:ok, _res} =
      create_or_update_procedure(
        %{conn: conn, dataset_id: dataset_id, project_id: project_id},
        routine_id,
        definition
      )
  end

  @spec create_or_update_procedure(map(), String.t(), String.t()) ::
          {:ok, GoogleApi.BigQuery.V2.Model.Table.t()} | {:ok, Tesla.Env.t()} | {:error, any()}
  defp create_or_update_procedure(
         %{conn: conn, dataset_id: dataset_id, project_id: project_id} = _cred,
         routine_id,
         definition
       ) do
    body = [
      body: %{
        routineReference: %{routineId: routine_id, datasetId: dataset_id, projectId: project_id},
        routineType: "PROCEDURE",
        definitionBody: definition
      }
    ]

    with {:error, _response} <-
           Routines.bigquery_routines_insert(conn, project_id, dataset_id, body),
         do: Routines.bigquery_routines_update(conn, project_id, dataset_id, routine_id, body)
  end

  @spec create_tables(Tesla.Client.t(), non_neg_integer, binary, binary) :: :ok
  defp create_tables(conn, organization_id, dataset_id, project_id) do
    organization_id
    |> bigquery_tables()
    |> Enum.each(fn {table_id, schema_fn} ->
      apply(Schema, schema_fn, [])
      |> create_table(%{
        conn: conn,
        dataset_id: dataset_id,
        project_id: project_id,
        table_id: table_id
      })
    end)
  end

  @doc """
  Alter bigquery table schema,
  if required this function should be called from iex
  """
  @spec alter_tables(Tesla.Client.t(), non_neg_integer, String.t(), String.t()) :: :ok
  def alter_tables(conn, organization_id, dataset_id, project_id) do
    case Datasets.bigquery_datasets_get(conn, project_id, dataset_id) do
      {:ok, _} ->
        organization_id
        |> bigquery_tables()
        |> Enum.each(fn {table_id, schema_fn} ->
          apply(Schema, schema_fn, [])
          |> alter_table(%{
            conn: conn,
            dataset_id: dataset_id,
            project_id: project_id,
            table_id: table_id
          })
        end)

      {:error, _} ->
        nil
    end

    :ok
  end

  @doc """
  Format dates for the bigquery.
  """
  @spec format_date(DateTime.t() | nil, non_neg_integer()) :: String.t()
  def format_date(nil, _),
    do: nil

  def format_date(date, organization_id) when is_binary(date) do
    timezone = Partners.organization(organization_id).timezone

    Timex.parse(date, "{RFC3339z}")
    |> elem(1)
    |> Timex.Timezone.convert(timezone)
    |> Timex.format!("{YYYY}-{0M}-{0D} {h24}:{m}:{s}")
  end

  def format_date(date, organization_id) do
    timezone = Partners.organization(organization_id).timezone

    date
    |> Timex.Timezone.convert(timezone)
    |> Timex.format!("{YYYY}-{0M}-{0D} {h24}:{m}:{s}")
  end

  @doc """
  Format all the json values
  """
  @spec format_json(map() | nil) :: iodata
  def format_json(nil), do: nil

  def format_json(definition) do
    Jason.encode(definition)
    |> case do
      {:ok, data} -> data
      _ -> nil
    end
  end

  @spec create_dataset(Tesla.Client.t(), String.t(), String.t()) ::
          {:ok, GoogleApi.BigQuery.V2.Model.Dataset.t()} | {:ok, Tesla.Env.t()} | {:error, any()}
  defp create_dataset(conn, project_id, dataset_id) do
    Datasets.bigquery_datasets_insert(
      conn,
      project_id,
      [
        body: %{
          datasetReference: %{
            datasetId: dataset_id,
            projectId: project_id
          }
        }
      ],
      []
    )
  end

  @spec create_table(list(), map()) ::
          {:ok, GoogleApi.BigQuery.V2.Model.Table.t()} | {:ok, Tesla.Env.t()} | {:error, any()}
  defp create_table(
         schema,
         %{conn: conn, dataset_id: dataset_id, project_id: project_id, table_id: table_id} = _cred
       ) do
    Tables.bigquery_tables_insert(
      conn,
      project_id,
      dataset_id,
      [
        body: %{
          tableReference: %{
            datasetId: dataset_id,
            projectId: project_id,
            tableId: table_id
          },
          schema: %{
            fields: schema
          }
        }
      ],
      []
    )
  end

  @spec alter_table(list(), map()) ::
          {:ok, GoogleApi.BigQuery.V2.Model.Table.t()} | {:ok, Tesla.Env.t()} | {:error, any()}
  defp alter_table(
         schema,
         %{conn: conn, dataset_id: dataset_id, project_id: project_id, table_id: table_id} = _cred
       ) do
    Tables.bigquery_tables_update(
      conn,
      project_id,
      dataset_id,
      table_id,
      [
        body: %{
          tableReference: %{
            datasetId: dataset_id,
            projectId: project_id,
            tableId: table_id
          },
          schema: %{
            fields: schema
          }
        }
      ],
      []
    )
  end

  @spec contacts_messages_view(Tesla.Client.t(), String.t(), String.t()) ::
          GoogleApi.BigQuery.V2.Model.Table.t() | Tesla.Env.t() | String.t()
  defp contacts_messages_view(conn, dataset_id, project_id) do
    Tables.bigquery_tables_insert(
      conn,
      project_id,
      dataset_id,
      [
        body: %{
          tableReference: %{
            datasetId: dataset_id,
            projectId: project_id,
            tableId: "contacts_messages"
          },
          view: %{
            query: """
            SELECT messages.id, contact_phone, phone, name, optin_time. language,
              flow_label, messages.tags_label, messages.inserted_at, media_url
            FROM `#{project_id}.#{dataset_id}.messages` AS messages
            JOIN `#{project_id}.#{dataset_id}.contacts` AS contacts
              ON messages.contact_phone = contacts.phone
            """,
            useLegacySql: false
          }
        }
      ],
      []
    )
    |> case do
      {:ok, response} -> response
      {:error, _} -> "Error creating a view"
    end
  end

  @spec alter_contacts_messages_view(Tesla.Client.t(), String.t(), String.t()) ::
          GoogleApi.BigQuery.V2.Model.Table.t() | Tesla.Env.t() | String.t()
  defp alter_contacts_messages_view(conn, dataset_id, project_id) do
    Tables.bigquery_tables_update(
      conn,
      project_id,
      dataset_id,
      "contacts_messages",
      [
        body: %{
          tableReference: %{
            datasetId: dataset_id,
            projectId: project_id,
            tableId: "contacts_messages"
          },
          view: %{
            query:
              "SELECT messages.id, uuid, contact_phone, phone, name, optin_time, language, flow_label, messages.tags_label, messages.inserted_at, media_url
              FROM `#{project_id}.#{dataset_id}.messages` as messages
              JOIN `#{project_id}.#{dataset_id}.contacts` as contacts
              ON messages.contact_phone = contacts.phone",
            useLegacySql: false
          }
        }
      ],
      []
    )
    |> case do
      {:ok, response} -> response
      {:error, _} -> "Error creating a view"
    end
  end

  @doc """
    Insert rows in the bigquery
  """
  @spec make_insert_query(map() | list, String.t(), non_neg_integer, Keyword.t()) :: :ok
  def make_insert_query(%{json: data}, _table, _organization_id, _max_id)
      when data in [[], nil, %{}],
      do: :ok

  def make_insert_query(data, table, organization_id, attrs) do
    max_id = Keyword.get(attrs, :max_id)
    last_updated_at = Keyword.get(attrs, :last_updated_at)

    Logger.info(
      "Insert data to bigquery for org_id: #{organization_id}, table: #{table}, rows_count: #{Enum.count(data)}"
    )

    fetch_bigquery_credentials(organization_id)
    |> do_make_insert_query(organization_id, data,
      table: table,
      max_id: max_id,
      last_updated_at: last_updated_at
    )
    |> handle_insert_query_response(organization_id,
      table: table,
      max_id: max_id,
      last_updated_at: last_updated_at
    )

    :ok
  end

  @spec do_make_insert_query(tuple(), non_neg_integer, list(), Keyword.t()) ::
          {:ok, any()} | {:error, any()}
  defp do_make_insert_query(
         {:ok, %{conn: conn, project_id: project_id, dataset_id: dataset_id}},
         organization_id,
         data,
         opts
       ) do
    table = Keyword.get(opts, :table)

    Logger.info(
      "Inserting data to bigquery for org_id: #{organization_id}, table: #{table}, rows_count: #{Enum.count(data)}"
    )

    Tabledata.bigquery_tabledata_insert_all(
      conn,
      project_id,
      dataset_id,
      table,
      [body: %{rows: data}],
      []
    )
  end

  @spec handle_insert_query_response(tuple(), non_neg_integer, Keyword.t()) :: :ok
  defp handle_insert_query_response({:ok, res}, organization_id, opts) do
    table = Keyword.get(opts, :table)
    max_id = Keyword.get(opts, :max_id)
    last_updated_at = Keyword.get(opts, :last_updated_at)

    cond do
      res.insertErrors != nil ->
        raise("BigQuery Insert Error for table #{table} with res: #{inspect(res)}")

      ## Max id will be nil or 0 in case of update statement.
      max_id not in [nil, 0] ->
        Jobs.update_bigquery_job(organization_id, table, %{table_id: max_id})

        Logger.info(
          "New Data has been inserted to bigquery successfully org_id: #{organization_id}, table: #{table}, max_id: #{max_id}, res: #{inspect(res)}"
        )

      last_updated_at not in [nil, 0] ->
        Jobs.update_bigquery_job(organization_id, table, %{last_updated_at: last_updated_at})

        Logger.info(
          "Updated Data has been inserted to bigquery successfully org_id: #{organization_id}, last_updated_at: #{last_updated_at} table: #{table}, res: #{inspect(res)}"
        )

      true ->
        Logger.info("Count not found the operation for bigquery insert and update")
    end

    :ok
  end

  defp handle_insert_query_response({:error, response}, organization_id, opts) do
    table = Keyword.get(opts, :table)

    Logger.info(
      "Error while inserting the data to bigquery. org_id: #{organization_id}, table: #{table}, response: #{inspect(response)}"
    )

    bigquery_error_status(response)
    |> case do
      "NOT_FOUND" ->
        sync_schema_with_bigquery(organization_id)

      "PERMISSION_DENIED" ->
        Partners.disable_credential(
          organization_id,
          "bigquery",
          "Account does not have sufficient permissions to insert data to BigQuery."
        )

      "TIMEOUT" ->
        Logger.info("Timeout while inserting the data. #{inspect(response)}")

      _ ->
        raise("BigQuery Insert Error for table #{table} #{inspect(response)}")
    end
  end

  @spec bigquery_error_status(any()) :: String.t() | atom()
  defp bigquery_error_status(response) do
    with true <- is_map(response),
         true <- Map.has_key?(response, :body),
         {:ok, error} <- Jason.decode(response.body) do
      error["error"]["status"]
    else
      _ ->
        if is_atom(response) do
          "TIMEOUT"
        else
          Logger.info("Bigquery status error #{inspect(response)}")
          :unknown
        end
    end
  end

  @doc """
    Merge delta and main tables.
  """
  @spec make_job_to_remove_duplicate(String.t(), non_neg_integer) :: :ok
  def make_job_to_remove_duplicate(table, organization_id) do
    fetch_bigquery_credentials(organization_id)
    |> case do
      {:ok, %{conn: conn, project_id: project_id, dataset_id: _dataset_id} = credentials} ->
        Logger.info("Remove duplicates on bigquery for org_id: #{organization_id} table:#{table}")

        sql = generate_duplicate_removal_query(table, credentials, organization_id)

        ## timeout takes some time to delete the old records. So increasing the timeout limit.
        GoogleApi.BigQuery.V2.Api.Jobs.bigquery_jobs_query(conn, project_id,
          body: %{query: sql, useLegacySql: false, timeoutMs: 120_000}
        )
        |> handle_duplicate_removal_job_error(table, credentials, organization_id)

      _ ->
        :ok
    end
  end

  @spec generate_duplicate_removal_query(String.t(), map(), non_neg_integer) :: String.t()
  defp generate_duplicate_removal_query(table, credentials, organization_id) do
    timezone = Partners.organization(organization_id).timezone

    """
    DELETE FROM `#{credentials.dataset_id}.#{table}`
    WHERE struct(id, updated_at, bq_uuid) IN (
      SELECT STRUCT(id, updated_at, bq_uuid)  FROM (
        SELECT id, updated_at, bq_uuid, ROW_NUMBER() OVER (
          PARTITION BY delta.id ORDER BY delta.updated_at DESC
        ) AS rn
        FROM `#{credentials.dataset_id}.#{table}` delta
        WHERE updated_at < DATETIME(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 3 HOUR),
          '#{timezone}')) a WHERE a.rn <> 1 ORDER BY id);
    """
  end

  @spec handle_duplicate_removal_job_error(tuple() | nil, String.t(), map(), non_neg_integer) ::
          :ok
  defp handle_duplicate_removal_job_error({:ok, _response}, table, _credentials, organization_id),
    do:
      Logger.info(
        "Duplicate entries have been removed for org_id: #{organization_id} from #{table} on bigquery "
      )

  ## Since we don't care about the delete query results, let's skip notifying this to AppSignal.
  defp handle_duplicate_removal_job_error({:error, error}, table, _, _) do
    Logger.error(
      "Error while removing duplicate entries from the table #{table} on bigquery. #{inspect(error)}"
    )
  end
end