lib/glific/third_party/bigquery/bigquery_worker.ex

defmodule Glific.BigQuery.BigQueryWorker do
  @moduledoc """
  Process the message table for each organization. Chunk number of messages
  in groups of 128 and create a bigquery Worker Job to deliver the message to
  the bigquery servers

  We centralize both the cron job and the worker job in one module
  """

  @doc """
  we are using this module to sync the data from the postgres database to bigquery. Before that you need to create a database schema for the query for more info
  go to bigquery_schema.ex file and create a schema for the table.
  """

  import Ecto.Query

  require Logger
  use Publicist

  use Oban.Worker,
    queue: :bigquery,
    max_attempts: 1,
    priority: 1

  alias Glific.{
    BigQuery,
    Contacts,
    Contacts.Contact,
    Contacts.ContactHistory,
    Flows,
    Flows.Flow,
    Flows.FlowCount,
    Flows.FlowResult,
    Flows.FlowRevision,
    Flows.MessageBroadcast,
    Flows.MessageBroadcastContact,
    Groups.Group,
    Jobs,
    Messages.Message,
    Messages.MessageConversation,
    Messages.MessageMedia,
    Partners,
    Profiles.Profile,
    Repo,
    Stats.Stat,
    Users.User
  }

  @per_min_limit 500

  @doc """
  This is called from the cron job on a regular schedule. we sweep the messages table
  and queue them up for delivery to bigquery
  """
  @spec perform_periodic(non_neg_integer) :: :ok
  def perform_periodic(org_id) do
    if BigQuery.is_active?(org_id) do
      Logger.info("Found bigquery credentials for org_id: #{org_id}")

      Jobs.get_bigquery_jobs(org_id)
      |> Enum.each(&init_insert_job(&1, org_id))
    end

    :ok
  end

  @doc """
  This is called from the cron job on a regular schedule. We updates existing tables
  """
  @spec periodic_updates(non_neg_integer) :: :ok
  def periodic_updates(organization_id) do
    organization = Partners.organization(organization_id)
    credential = organization.services["bigquery"]

    if credential do
      [
        "contacts",
        "messages",
        "flow_results",
        "flow_counts",
        "messages_media",
        "flow_contexts",
        "profiles",
        "message_broadcasts",
        "message_broadcast_contacts"
      ]
      |> Enum.each(&init_removal_job(&1, organization_id))
    end

    :ok
  end

  @spec init_insert_job(BigQuery.BigQueryJob.t() | nil, non_neg_integer) :: any()
  defp init_insert_job(bq_job, org_id) do
    [:insert, :update]
    |> Enum.each(fn action ->
      __MODULE__.new(%{
        table: bq_job.table,
        organization_id: org_id,
        action: action
      })
      |> Oban.insert()
    end)
  end

  defp init_removal_job(table, org_id) do
    __MODULE__.new(%{
      table: table,
      organization_id: org_id,
      remove_duplicates: true
    })
    |> Oban.insert()
  end

  @impl Oban.Worker
  @doc """
  Standard perform method to use Oban worker
  """
  @spec perform(Oban.Job.t()) :: :ok | {:error, :string}
  def perform(
        %Oban.Job{
          args: %{
            "table" => table,
            "organization_id" => organization_id,
            "remove_duplicates" => true
          }
        } = _job
      ) do
    Repo.put_process_state(organization_id)
    Logger.info("removing duplicates for org_id: #{organization_id} table: #{table}")
    BigQuery.make_job_to_remove_duplicate(table, organization_id)
    :ok
  end

  def perform(
        %Oban.Job{
          args: %{"table" => table, "organization_id" => organization_id, "action" => action}
        } = _job
      ) do
    Repo.put_process_state(organization_id)

    Jobs.get_bigquery_job(organization_id, table)
    |> insert_for_table(organization_id, action)
  end

  @spec format_date_with_millisecond(DateTime.t(), non_neg_integer()) :: String.t()
  defp format_date_with_millisecond(date, organization_id) do
    timezone = Partners.organization(organization_id).timezone

    date
    |> Timex.Timezone.convert(timezone)
    |> Timex.format!("{YYYY}-{0M}-{0D} {h24}:{m}:{s}{ss}")
  end

  @spec insert_max_id(String.t(), non_neg_integer, non_neg_integer) :: non_neg_integer
  defp insert_max_id(table_name, table_id, organization_id) do
    Logger.info("Checking for bigquery job for org_id: #{organization_id} table: #{table_name}")

    max_id =
      BigQuery.get_table_struct(table_name)
      |> where([m], m.id > ^table_id)
      |> add_organization_id(table_name, organization_id)
      |> order_by([m], asc: m.id)
      |> limit(@per_min_limit)
      |> Repo.aggregate(:max, :id)

    if is_nil(max_id),
      do: table_id,
      else: max_id
  end

  @spec insert_last_updated(String.t(), DateTime.t() | nil, non_neg_integer) :: DateTime.t()
  defp insert_last_updated(table_name, table_last_updated_at, organization_id) do
    Logger.info(
      "Checking for bigquery job for org_id: #{organization_id} table: #{table_name} since: #{table_last_updated_at}"
    )

    max_last_update =
      BigQuery.get_table_struct(table_name)
      |> where([m], m.updated_at > ^table_last_updated_at)
      |> add_organization_id(table_name, organization_id)
      |> order_by([m], asc: m.id)
      |> limit(@per_min_limit)
      |> Repo.aggregate(:max, :updated_at, timeout: 40_000)

    if is_nil(max_last_update),
      do: table_last_updated_at,
      else: max_last_update
  end

  @spec insert_for_table(BigQuery.BigQueryJob.t() | nil, non_neg_integer, String.t()) :: :ok | nil
  defp insert_for_table(nil, _, _), do: nil

  defp insert_for_table(
         %{table: table, table_id: table_id, last_updated_at: table_last_updated_at} = _job,
         organization_id,
         action
       ) do
    if action == "update" do
      insert_updated_records(table, table_last_updated_at, organization_id)
    else
      insert_new_records(table, table_id, organization_id)
    end

    :ok
  end

  @spec insert_new_records(binary, non_neg_integer, non_neg_integer) :: :ok
  defp insert_new_records(table, table_id, organization_id) do
    max_id = insert_max_id(table, table_id, organization_id)

    if max_id > table_id,
      do:
        queue_table_data(table, organization_id, %{
          min_id: table_id,
          max_id: max_id,
          action: :insert
        })

    :ok
  end

  @spec insert_updated_records(binary, DateTime.t() | nil, non_neg_integer) :: :ok
  defp insert_updated_records(table, table_last_updated_at, organization_id) do
    if table in BigQuery.ignore_updates_for_table() do
      :ok
    else
      table_last_updated_at = table_last_updated_at || DateTime.utc_now()
      last_updated_at = insert_last_updated(table, table_last_updated_at, organization_id)

      queue_table_data(table, organization_id, %{
        action: :update,
        max_id: nil,
        last_updated_at: last_updated_at,
        table_last_updated_at: table_last_updated_at
      })
    end
  end

  @spec add_organization_id(Ecto.Query.t(), String.t(), non_neg_integer) :: Ecto.Query.t()
  defp add_organization_id(query, "stats_all", _organization_id),
    do: query

  defp add_organization_id(query, _table, organization_id),
    do: query |> where([m], m.organization_id == ^organization_id)

  ## ignore the tables for updates.
  @spec queue_table_data(String.t(), non_neg_integer(), map()) :: :ok
  defp queue_table_data(table, _organization_id, %{action: :update, max_id: nil} = _attrs)
       when table in ["flows", "stats", "stats_all"],
       do: :ok

  defp queue_table_data("messages", organization_id, attrs) do
    Logger.info(
      "fetching messages data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("messages", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce([], fn row, acc ->
      [
        row
        |> get_message_row(organization_id)
        |> Map.merge(bq_fields(organization_id))
        |> then(&%{json: &1})
        | acc
      ]
    end)
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :messages, organization_id, attrs))

    :ok
  end

  defp queue_table_data("message_broadcast_contacts", organization_id, attrs) do
    Logger.info(
      "fetching message_broadcast_contacts data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("message_broadcast_contacts", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            message_broadcast_id: row.id,
            phone: row.phone,
            status: row.status,
            processed_at: BigQuery.format_date(row.processed_at, organization_id),
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :message_broadcast_contacts, organization_id, attrs))

    :ok
  end

  defp queue_table_data("message_broadcasts", organization_id, attrs) do
    Logger.info(
      "fetching message_broadcasts data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("message_broadcasts", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            flow_id: row.id,
            flow_name: row.flow_name,
            user_id: row.user_id,
            user_phone: row.user_phone,
            group_id: row.group_id,
            group_name: row.group_name,
            broadcast_type: row.broadcast_type,
            message_params: BigQuery.format_json(row.message_params),
            started_at: BigQuery.format_date(row.started_at, organization_id),
            completed_at: BigQuery.format_date(row.completed_at, organization_id),
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :message_broadcasts, organization_id, attrs))

    :ok
  end

  defp queue_table_data("contacts", organization_id, attrs) do
    Logger.info(
      "fetching contacts data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("contacts", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        if Contacts.is_simulator_contact?(row.phone),
          do: acc,
          else: [
            # We are sending nil, as setting is a record type and need to structure the data first(like field)
            %{
              id: row.id,
              name: row.name,
              phone: row.phone,
              provider_status: row.bsp_status,
              status: row.status,
              language: row.language.label,
              optin_time: BigQuery.format_date(row.optin_time, organization_id),
              optout_time: BigQuery.format_date(row.optout_time, organization_id),
              contact_optin_method: row.optin_method,
              last_message_at: BigQuery.format_date(row.last_message_at, organization_id),
              inserted_at: format_date_with_millisecond(row.inserted_at, organization_id),
              updated_at: format_date_with_millisecond(row.updated_at, organization_id),
              fields:
                Enum.map(row.fields, fn {_key, field} ->
                  %{
                    label: field["label"],
                    inserted_at: BigQuery.format_date(field["inserted_at"], organization_id),
                    type: field["type"],
                    value: field["value"]
                  }
                end),
              settings: nil,
              user_name: if(!is_nil(row.user), do: row.user.name),
              user_role: if(!is_nil(row.user), do: BigQuery.format_json(row.user.roles)),
              groups:
                Enum.map(row.groups, fn group ->
                  %{label: group.label, description: group.description}
                end),
              tags: Enum.map(row.tags, fn tag -> %{label: tag.label} end),
              raw_fields: BigQuery.format_json(row.fields),
              group_labels: Enum.map_join(row.groups, ",", &Map.get(&1, :label))
            }
            |> Map.merge(bq_fields(organization_id))
            |> then(&%{json: &1})
            | acc
          ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :contacts, organization_id, attrs))

    :ok
  end

  defp queue_table_data("profiles", organization_id, attrs) do
    # This function will fetch all the profiles from the database and will insert it in bigquery in chunks of 100.
    Logger.info(
      "fetching profiles data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("profiles", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            name: row.name,
            type: row.type,
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id),
            phone: row.contact.phone,
            language: row.language.label,
            fields:
              Enum.map(row.fields, fn {_key, field} ->
                %{
                  label: field["label"],
                  inserted_at: BigQuery.format_date(field["inserted_at"], organization_id),
                  type: field["type"],
                  value: field["value"]
                }
              end)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :profiles, organization_id, attrs))

    :ok
  end

  defp queue_table_data("contact_histories", organization_id, attrs) do
    Logger.info(
      "fetching contact_histories data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("contact_histories", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            event_type: row.event_type,
            event_label: row.event_label,
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id),
            event_datetime: BigQuery.format_date(row.event_datetime, organization_id),
            phone: row.contact.phone,
            profile_id: row.profile_id
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :contact_histories, organization_id, attrs))

    :ok
  end

  defp queue_table_data("message_conversations", organization_id, attrs) do
    Logger.info(
      "fetching message_conversations data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("message_conversations", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            conversation_uuid: row.conversation_id,
            deduction_type: row.deduction_type,
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id),
            is_billable: row.is_billable,
            message_id: row.message_id,
            payload: BigQuery.format_json(row.payload),
            phone: row.phone
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :message_conversations, organization_id, attrs))

    :ok
  end

  defp queue_table_data("flows", organization_id, attrs) do
    Logger.info(
      "fetching flows data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("flows", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            name: row.flow.name,
            uuid: row.flow.uuid,
            inserted_at: format_date_with_millisecond(row.inserted_at, organization_id),
            updated_at: format_date_with_millisecond(row.updated_at, organization_id),
            keywords: BigQuery.format_json(row.flow.keywords),
            status: row.status,
            revision: BigQuery.format_json(row.definition)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :flows, organization_id, attrs))

    :ok
  end

  defp queue_table_data("flow_results", organization_id, attrs) do
    Logger.info(
      "fetching flow_results data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("flow_results", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        if Contacts.is_simulator_contact?(row.contact.phone),
          do: acc,
          else: [
            %{
              id: row.id,
              name: row.flow.name,
              uuid: row.flow.uuid,
              inserted_at: format_date_with_millisecond(row.inserted_at, organization_id),
              updated_at: format_date_with_millisecond(row.updated_at, organization_id),
              results: BigQuery.format_json(row.results),
              contact_phone: row.contact.phone,
              contact_name: row.contact.name,
              flow_version: row.flow_version,
              flow_context_id: row.flow_context_id,
              profile_id: row.profile_id
            }
            |> Map.merge(bq_fields(organization_id))
            |> then(&%{json: &1})
            | acc
          ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :flow_results, organization_id, attrs))

    :ok
  end

  defp queue_table_data("flow_counts", organization_id, attrs) do
    Logger.info(
      "fetching flow_counts data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("flow_counts", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          %{
            id: row.id,
            source_uuid: row.uuid,
            destination_uuid: row.destination_uuid,
            flow_name: row.flow.name,
            flow_uuid: row.flow.uuid,
            type: row.type,
            count: row.count,
            recent_messages: BigQuery.format_json(row.recent_messages),
            inserted_at: format_date_with_millisecond(row.inserted_at, organization_id),
            updated_at: format_date_with_millisecond(row.updated_at, organization_id)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :flow_counts, organization_id, attrs))

    :ok
  end

  defp queue_table_data("messages_media", organization_id, attrs) do
    Logger.info(
      "fetching messages_media data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("messages_media", organization_id, attrs)
    |> Repo.all()
    |> queue_message_media_data(organization_id, attrs)

    :ok
  end

  defp queue_table_data("flow_contexts", organization_id, attrs) do
    Logger.info(
      "fetching flow_contexts data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    get_query("flow_contexts", organization_id, attrs)
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          # We are sending nil, as setting is a record type and need to structure the data first(like field)
          %{
            id: row.id,
            node_uuid: row.node_uuid,
            flow_uuid: row.flow.uuid,
            flow_id: row.flow.id,
            contact_id: row.contact.id,
            contact_phone: row.contact.phone,
            results: BigQuery.format_json(row.results),
            recent_inbound: BigQuery.format_json(row.recent_inbound),
            recent_outbound: BigQuery.format_json(row.recent_outbound),
            status: row.status,
            parent_id: row.parent_id,
            message_broadcast_id: row.message_broadcast_id,
            is_background_flow: row.is_background_flow,
            is_await_result: row.is_await_result,
            is_killed: row.is_killed,
            profile_id: row.profile_id,
            wakeup_at: BigQuery.format_date(row.wakeup_at, organization_id),
            completed_at: BigQuery.format_date(row.completed_at, organization_id),
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :flow_contexts, organization_id, attrs))

    :ok
  end

  defp queue_table_data(stat, organization_id, attrs) when stat in ["stats", "stats_all"] do
    Logger.info(
      "fetching #{stat} data for org_id: #{organization_id} to send on bigquery with attrs: #{inspect(attrs)}"
    )

    stat_atom =
      if stat == "stats",
        do: :stats,
        else: :stats_all

    get_query(stat, organization_id, attrs)
    # for stats_all we specifically want to skip organization_id
    |> Repo.all(skip_organization_id: true)
    |> Enum.reduce(
      [],
      fn row, acc ->
        additional =
          if stat == "stats_all",
            do: %{
              organization_id: row.organization_id,
              organization_name: row.organization.name,
              organization_status: row.organization.status
            },
            else: %{}

        [
          %{
            id: row.id,
            contacts: row.contacts,
            active: row.active,
            optin: row.optin,
            optout: row.optout,
            messages: row.messages,
            inbound: row.inbound,
            outbound: row.outbound,
            hsm: row.hsm,
            flows_started: row.flows_started,
            flows_completed: row.flows_completed,
            users: row.users,
            period: row.period,
            date: Date.to_string(row.date),
            hour: row.hour,
            inserted_at: BigQuery.format_date(row.inserted_at, organization_id),
            updated_at: BigQuery.format_date(row.updated_at, organization_id)
          }
          |> Map.merge(additional)
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, stat_atom, organization_id, attrs))

    :ok
  end

  defp queue_table_data(_, _, _), do: :ok

  @spec bq_fields(non_neg_integer) :: map()
  defp bq_fields(org_id) do
    %{
      bq_uuid: Ecto.UUID.generate(),
      bq_inserted_at: format_date_with_millisecond(DateTime.utc_now(), org_id)
    }
  end

  @doc """
  Moving this logic to a different function so that we can reuse it
  for gcs worker also.
  """
  @spec queue_message_media_data(list(), non_neg_integer(), map()) :: :ok
  def queue_message_media_data(media_list, organization_id, attrs) do
    media_list
    |> Enum.reduce(
      [],
      fn row, acc ->
        [
          # We are sending nil, as setting is a record type and need to structure the data first(like field)
          %{
            id: row.id,
            caption: row.caption,
            url: row.url,
            source_url: row.source_url,
            gcs_url: row.gcs_url,
            inserted_at: format_date_with_millisecond(row.inserted_at, organization_id),
            updated_at: format_date_with_millisecond(row.updated_at, organization_id)
          }
          |> Map.merge(bq_fields(organization_id))
          |> then(&%{json: &1})
          | acc
        ]
      end
    )
    |> Enum.chunk_every(100)
    |> Enum.each(&make_job(&1, :messages_media, organization_id, attrs))
  end

  @spec get_message_row(atom | map(), non_neg_integer) :: map()
  defp get_message_row(row, organization_id),
    do:
      %{
        id: row.id,
        body: row.body,
        type: row.type,
        flow: row.flow,
        inserted_at: format_date_with_millisecond(row.inserted_at, organization_id),
        updated_at: format_date_with_millisecond(row.updated_at, organization_id),
        sent_at: BigQuery.format_date(row.sent_at, organization_id),
        uuid: row.uuid,
        status: row.status,
        sender_phone: row.sender.phone,
        receiver_phone: row.receiver.phone,
        contact_phone: row.contact.phone,
        contact_name: row.contact.name,
        profile_id: row.profile_id,
        user_phone: if(!is_nil(row.user), do: row.user.phone),
        user_name: if(!is_nil(row.user), do: row.user.name),
        tags_label: Enum.map_join(row.tags, ", ", fn tag -> tag.label end),
        flow_label: row.flow_label,
        flow_uuid: if(!is_nil(row.flow_object), do: row.flow_object.uuid),
        flow_name: if(!is_nil(row.flow_object), do: row.flow_object.name),
        longitude: if(!is_nil(row.location), do: row.location.longitude),
        latitude: if(!is_nil(row.location), do: row.location.latitude),
        errors: BigQuery.format_json(row.errors),
        message_broadcast_id: row.message_broadcast_id,
        bsp_status: row.bsp_status,
        group_id: row.group_id,
        group_name: if(!is_nil(row.group), do: row.group.label)
      }
      |> Map.merge(message_media_info(row.media))
      |> Map.merge(message_template_info(row))

  @spec message_media_info(any()) :: map()
  defp message_media_info(nil),
    do: %{
      media_id: nil,
      media_url: nil,
      gcs_url: nil
    }

  defp message_media_info(media),
    do: %{
      media_id: media.id,
      media_url: media.url,
      gcs_url: media.gcs_url
    }

  ## have to right this function since the above one is too long and credo is giving a warning

  @spec message_template_info(atom | map()) :: map()
  defp message_template_info(row),
    do: %{
      is_hsm: row.is_hsm,
      template_uuid: if(!is_nil(row.template), do: row.template.uuid),
      interactive_template_id: row.interactive_template_id,
      context_message_id: row.context_message_id
    }

  @spec make_job(list(), atom(), non_neg_integer, map()) :: :ok
  defp make_job(data, table, organization_id, %{action: :insert} = attrs)
       when data in [%{}, nil, []] do
    table = Atom.to_string(table)

    if is_integer(attrs[:max_id]) == true,
      do: Jobs.update_bigquery_job(organization_id, table, %{table_id: attrs[:max_id]})

    :ok
  end

  defp make_job(data, table, organization_id, %{action: :update} = attrs)
       when data in [%{}, nil, []] do
    table = Atom.to_string(table)

    if is_nil(attrs[:last_updated_at]) == false,
      do:
        Jobs.update_bigquery_job(organization_id, table, %{
          last_updated_at: attrs[:last_updated_at]
        })
  end

  defp make_job(data, table, organization_id, attrs) do
    Logger.info(
      "making a new job for org_id: #{organization_id} table: #{table} to send on bigquery with max id: #{inspect(attrs)}"
    )

    table = Atom.to_string(table)
    max_id = attrs[:max_id]
    last_updated_at = attrs[:last_updated_at]

    BigQuery.make_insert_query(data, table, organization_id,
      max_id: max_id,
      last_updated_at: if(!is_nil(last_updated_at), do: last_updated_at),
      else: nil
    )

    :ok
  end

  @spec apply_action_clause(Ecto.Queryable.t(), map()) :: Ecto.Queryable.t()
  defp apply_action_clause(query, %{action: :insert, max_id: max_id, min_id: min_id} = _attrs),
    do: query |> where([m], m.id >= ^min_id and m.id <= ^max_id)

  defp apply_action_clause(
         query,
         %{
           action: :update,
           last_updated_at: last_updated_at,
           table_last_updated_at: table_last_updated_at
         } = _attrs
       ),
       do:
         query
         |> where(
           [tb],
           tb.updated_at > ^table_last_updated_at and tb.updated_at <= ^last_updated_at
         )
         |> where(
           [tb],
           ## Adding clause so that we don't pick the newly inserted rows.
           fragment("DATE_PART('seconds', age(?, ?))::integer", tb.updated_at, tb.inserted_at) > 0
         )

  defp apply_action_clause(query, _attrs), do: query

  @spec get_query(String.t(), non_neg_integer, map()) :: Ecto.Queryable.t()
  defp get_query("messages", organization_id, attrs),
    do:
      Message
      |> where([m], m.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([m], [m.inserted_at, m.id])
      |> preload([
        :tags,
        :receiver,
        :sender,
        :contact,
        :user,
        :media,
        :flow_object,
        :location,
        :template,
        :group
      ])

  defp get_query("message_conversations", organization_id, attrs),
    do:
      MessageConversation
      |> join(:left, [mc], m in Message, as: :m, on: mc.message_id == m.id)
      |> join(:left, [m: m], c in Contact, as: :c, on: m.contact_id == c.id)
      |> select([mc, m, c], %{
        conversation_id: mc.conversation_id,
        deduction_type: mc.deduction_type,
        inserted_at: mc.inserted_at,
        updated_at: mc.updated_at,
        is_billable: mc.is_billable,
        payload: mc.payload,
        phone: c.phone,
        message_id: m.id,
        id: mc.id
      })
      |> where([mc], mc.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([mc], [mc.inserted_at, mc.id])

  defp get_query("message_broadcast_contacts", organization_id, attrs),
    do:
      MessageBroadcastContact
      |> join(:left, [mbc], c in Contact, as: :c, on: mbc.contact_id == c.id)
      |> select([mbc, c], %{
        id: mbc.id,
        message_broadcast_id: mbc.id,
        phone: c.phone,
        status: mbc.status,
        processed_at: mbc.processed_at,
        inserted_at: mbc.inserted_at,
        updated_at: mbc.updated_at
      })
      |> where([mbc], mbc.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([mbc], [mbc.inserted_at, mbc.id])

  defp get_query("message_broadcasts", organization_id, attrs),
    do:
      MessageBroadcast
      |> join(:left, [mb], f in Flow, as: :m, on: mb.flow_id == f.id)
      |> join(:left, [mb], u in User, as: :u, on: mb.user_id == u.id)
      |> join(:left, [mb], g in Group, as: :g, on: mb.group_id == g.id)
      |> select([mb, f, u, g], %{
        id: mb.id,
        flow_id: f.id,
        flow_name: f.name,
        user_id: u.id,
        user_phone: u.phone,
        group_id: g.id,
        group_name: g.label,
        broadcast_type: mb.type,
        message_params: mb.message_params,
        started_at: mb.started_at,
        completed_at: mb.completed_at,
        inserted_at: mb.inserted_at,
        updated_at: mb.updated_at
      })
      |> where([mb], mb.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([mb], [mb.inserted_at, mb.id])

  defp get_query("contacts", organization_id, attrs),
    do:
      Contact
      |> where([m], m.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([m], [m.inserted_at, m.id])
      |> preload([:language, :tags, :groups, :user])

  defp get_query("contact_histories", organization_id, attrs),
    do:
      ContactHistory
      |> where([c], c.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([c], [c.inserted_at, c.id])
      |> preload([:contact])

  defp get_query("profiles", organization_id, attrs),
    # We are creating a query here with the fields which are required instead of loading all the data.
    do:
      Profile
      |> where([p], p.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([p], [p.inserted_at, p.id])
      |> preload([:language, :contact])

  defp get_query("flows", organization_id, attrs),
    do:
      FlowRevision
      |> where([f], f.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> where([f], f.status in ["published", "archived"])
      |> order_by([f], [f.inserted_at, f.id])
      |> preload([:flow])

  defp get_query("flow_results", organization_id, attrs),
    do:
      FlowResult
      |> where([f], f.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([f], [f.inserted_at, f.id])
      |> preload([:flow, :contact])

  defp get_query("flow_counts", organization_id, attrs),
    do:
      FlowCount
      |> where([f], f.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([f], [f.inserted_at, f.id])
      |> preload([:flow])

  defp get_query("messages_media", organization_id, attrs),
    do:
      MessageMedia
      |> where([f], f.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([f], [f.inserted_at, f.id])
      |> preload([:organization])

  defp get_query("flow_contexts", organization_id, attrs),
    do:
      Flows.FlowContext
      |> where([f], f.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([f], [f.inserted_at, f.id])
      |> preload([:flow, :contact])

  defp get_query("stats", organization_id, attrs),
    do:
      Stat
      |> where([f], f.organization_id == ^organization_id)
      |> apply_action_clause(attrs)
      |> order_by([f], [f.inserted_at, f.id])

  defp get_query("stats_all", _organization_id, attrs),
    do:
      Stat
      |> apply_action_clause(attrs)
      |> order_by([f], [f.inserted_at, f.id])
      |> preload([:organization])
end