lib/glific/erase.ex

defmodule Glific.Erase do
  @moduledoc """
  A simple module to periodically delete old data to clean up db
  """
  import Ecto.Query

  alias Glific.{
    Repo,
    Seeds.SeedsMigration
  }

  require Logger

  @doc """
  Do the weekly DB cleaner tasks, typically in the middle of the night on sunday morning
  """
  @spec perform_weekly() :: any
  def perform_weekly do
    refresh_tables()
    clean_old_records()
  end

  @doc """
  Do the daily DB cleaner tasks
  """
  @spec perform_daily() :: any
  def perform_daily do
    [
      "REINDEX TABLE global.oban_jobs"
    ]
    |> Enum.each(
      # need such a large timeout specifically to vacuum the messages
      &Repo.query!(&1, [], timeout: 300_000, skip_organization_id: true)
    )
  end

  @doc """
  Clean old records for table like notification and logs
  """
  @spec clean_old_records() :: any
  def clean_old_records do
    remove_old_records()
    clean_flow_revision()
  end

  @spec refresh_tables() :: any
  defp refresh_tables do
    [
      "REINDEX TABLE global.oban_jobs",
      "VACUUM (FULL, ANALYZE) webhook_logs",
      "VACUUM (FULL, ANALYZE) organizations",
      "VACUUM (FULL, ANALYZE) messages_tags",
      "VACUUM (FULL, ANALYZE) notifications",
      "VACUUM (FULL, ANALYZE) flow_counts",
      "VACUUM (FULL, ANALYZE) bigquery_jobs",
      "VACUUM (FULL, ANALYZE) global.oban_producers",
      "VACUUM (FULL, ANALYZE) contacts_groups",
      "VACUUM (FULL, ANALYZE) flow_results",
      "VACUUM (FULL, ANALYZE) contacts",
      "VACUUM (FULL, ANALYZE) contact_histories",
      "VACUUM (ANALYZE) messages",
      "VACUUM (ANALYZE) messages_media"
    ]
    |> Enum.each(
      # need such a large timeout specifically to vacuum the messages
      &Repo.query!(&1, [], timeout: 300_000, skip_organization_id: true)
    )
  end

  # Deleting rows older than a month from tables periodically
  @spec remove_old_records() :: any
  defp remove_old_records do
    [
      {"message_broadcasts", "week"},
      {"notifications", "month"},
      {"webhook_logs", "month"},
      {"flow_contexts", "month"},
      {"messages_conversations", "month"}
    ]
    |> Enum.each(fn {table, duration} ->
      Repo.delete_all(
        from(fc in table,
          where: fc.inserted_at < fragment("CURRENT_DATE - ('1' || ?)::interval", ^duration)
        ),
        skip_organization_id: true,
        timeout: 400_000
      )
    end)
  end

  @doc """
  Keep latest 25 contact_history for a contact
  """
  @spec clean_contact_histories() :: any
  def clean_contact_histories do
    """
    WITH top_25_contact_histories_per_contact AS (
    SELECT t.*, ROW_NUMBER() OVER (PARTITION BY contact_id ORDER BY updated_at DESC) rn
    FROM contact_histories t
    )
    DELETE FROM contact_histories WHERE id NOT IN (
      SELECT id
      FROM top_25_contact_histories_per_contact
      WHERE rn <= 25
    )
    """
    |> Repo.query!([], timeout: 60_000, skip_organization_id: true)
  end

  # Deleting flow_revision older than a month
  @spec clean_flow_revision() :: any
  defp clean_flow_revision do
    clean_drafted_flow_revisions()
    clean_archived_flow_revisions()
  end

  defp clean_drafted_flow_revisions do
    """
    DELETE FROM flow_revisions fr1
    WHERE fr1.status = 'draft' AND id
    NOT IN( SELECT fr2.id FROM flow_revisions fr2 WHERE fr2.flow_id = fr1.flow_id and fr2.status = 'draft' ORDER BY fr2.id DESC LIMIT 10);
    """
    |> Repo.query!([], timeout: 60_000, skip_organization_id: true)
  end

  defp clean_archived_flow_revisions do
    """
    DELETE FROM flow_revisions fr1
    WHERE fr1.status = 'archived' AND id
    NOT IN( SELECT fr2.id FROM flow_revisions fr2 WHERE fr2.flow_id = fr1.flow_id  and fr2.status = 'archived' ORDER BY fr2.id DESC LIMIT 10);
    """
    |> Repo.query!([], timeout: 60_000, skip_organization_id: true)
  end

  @limit 500

  @doc """
  Keep latest limited messages for a contact
  """
  @spec clean_messages(non_neg_integer(), boolean()) :: list
  def clean_messages(org_id, skip_delete \\ false) do
    Repo.put_process_state(org_id)

    contact_query =
      "select id from contacts where organization_id = #{org_id} and last_message_number > #{@limit + 2} order by last_message_number"

    Repo.query!(contact_query).rows
    |> Enum.map(fn [contact_id] ->
      clean_message_for_contact(contact_id, org_id, skip_delete)
    end)
  end

  @doc """
  Keep latest limited messages for a contact
  """
  @spec clean_message_for_contact(non_neg_integer(), non_neg_integer(), boolean()) :: :ok
  def clean_message_for_contact(contact_id, org_id, skip_delete \\ false) do
    SeedsMigration.fix_message_number_for_contact(contact_id)

    [[last_message_number]] =
      Glific.Repo.query!("select last_message_number from contacts where id = #{contact_id}").rows

    message_to_delete = last_message_number - @limit

    delete_message_query =
      "delete from messages where organization_id = #{org_id} and contact_id = #{contact_id} and message_number < #{message_to_delete}"

    Logger.info(
      "message cleanup started for #{contact_id} where message number #{message_to_delete}"
    )

    if skip_delete == false && message_to_delete > 0 do
      Repo.query!(delete_message_query, [], timeout: 400_000, skip_organization_id: true)
      SeedsMigration.fix_message_number_for_contact(contact_id)
    end

    :ok
  end
end