lib/glific/third_party/gcs/gcs_worker.ex

defmodule Glific.GCS.GcsWorker do
  @moduledoc """
  Process the  media table for each organization. Chunk number of message medias
  in groups of 128 and create a Gcs Worker Job to deliver the message media url to
  the gcs servers

  We centralize both the cron job and the worker job in one module
  """

  import Ecto.Query
  require Logger

  use Oban.Worker,
    queue: :gcs,
    max_attempts: 2,
    priority: 2

  alias Waffle.Storage.Google.CloudStorage

  alias Glific.{
    BigQuery,
    BigQuery.BigQueryWorker,
    Jobs,
    Messages,
    Messages.Message,
    Messages.MessageMedia,
    Partners,
    Repo
  }

  @provider_shortcode "google_cloud_storage"

  @doc """
  This is called from the cron job on a regular schedule. we sweep the message media url  table
  and queue them up for delivery to gcs
  """
  @spec perform_periodic(non_neg_integer) :: :ok
  def perform_periodic(organization_id) do
    organization = Partners.organization(organization_id)
    credential = organization.services[@provider_shortcode]
    goth_token = Partners.get_goth_token(organization_id, @provider_shortcode)

    if is_nil(credential) || is_nil(goth_token) do
      :ok
    else
      jobs(organization_id)
      :ok
    end
  end

  @spec jobs(non_neg_integer) :: :ok
  defp jobs(organization_id) do
    gcs_job = Jobs.get_gcs_job(organization_id)

    message_media_id =
      if gcs_job == nil,
        do: 0,
        else: gcs_job.message_media_id

    message_media_id = message_media_id || 0

    limit = files_per_minute_count()

    data =
      MessageMedia
      |> select([m], m.id)
      |> join(:left, [m], msg in Message, as: :msg, on: m.id == msg.media_id)
      |> where([m], m.organization_id == ^organization_id and m.id > ^message_media_id)
      |> where([m, msg], msg.flow == :inbound)
      |> order_by([m], asc: m.id)
      |> limit(^limit)
      |> Repo.all()

    max_id = if is_list(data), do: List.last(data), else: message_media_id

    if !is_nil(max_id) and max_id > message_media_id do
      Logger.info(
        "GCSWORKER: Updating GCS jobs with max id:  #{max_id} , min id: #{message_media_id} for org_id: #{organization_id}"
      )

      queue_urls(organization_id, message_media_id, max_id)
      Jobs.update_gcs_job(%{message_media_id: max_id, organization_id: organization_id})
    end

    :ok
  end

  @spec files_per_minute_count() :: integer()
  defp files_per_minute_count do
    Application.fetch_env!(:glific, :gcs_file_count)
    |> Glific.parse_maybe_integer()
    |> case do
      {:ok, nil} -> 5
      {:ok, count} -> count
      _ -> 5
    end
  end

  @doc """
    Queue urls for gcs jobs.
  """
  @spec queue_urls(non_neg_integer, non_neg_integer, non_neg_integer) :: :ok
  def queue_urls(organization_id, min_id, max_id) do
    query =
      MessageMedia
      |> where([m], m.id > ^min_id and m.id <= ^max_id)
      |> join(:left, [m], msg in Message, as: :msg, on: m.id == msg.media_id)
      |> where([m, msg], msg.organization_id == ^organization_id)
      |> where([m, msg], msg.flow == :inbound)
      |> select([m, msg], [m.id, m.url, msg.type, msg.contact_id, msg.flow_id])
      |> order_by([m], [m.inserted_at, m.id])

    query
    |> Repo.all()
    |> Enum.reduce(
      [],
      fn row, _acc ->
        row
        |> make_media(organization_id)
        |> make_job()
      end
    )
  end

  @spec make_media(list(), non_neg_integer) :: map()
  defp make_media(row, organization_id) do
    [id, url, type, contact_id, flow_id] = row

    Logger.info("GCSWORKER: Making media for media id: #{id}")

    %{
      url: url,
      id: id,
      type: type,
      contact_id: contact_id,
      flow_id: if(is_nil(flow_id), do: 0, else: flow_id),
      organization_id: organization_id
    }
  end

  @spec make_job(map()) :: :ok
  defp make_job(media) do
    {:ok, _} =
      __MODULE__.new(%{media: media})
      |> Oban.insert()

    :ok
  end

  @spec pad(non_neg_integer) :: String.t()
  defp pad(i) when i < 10, do: <<?0, ?0 + i>>
  defp pad(i), do: to_string(i)

  # copied from mix task ecto.gen.migration
  @spec timestamp :: String.t()
  defp timestamp do
    {{y, m, d}, {hh, mm, ss}} = :calendar.universal_time()
    "#{y}#{pad(m)}#{pad(d)}#{pad(hh)}#{pad(mm)}#{pad(ss)}"
  end

  @doc """
  Standard perform method to use Oban worker
  """
  @impl Oban.Worker
  @spec perform(Oban.Job.t()) :: :ok | {:error, String.t()} | {:discard, String.t()}
  def perform(%Oban.Job{args: %{"media" => media}}) do
    Logger.info("GCSWORKER: Performing gcs media for media id: #{media["id"]}")

    Repo.put_process_state(media["organization_id"])

    # We will download the file from internet and then upload it to gsc and then remove it.
    extension = get_media_extension(media["type"])

    remote_name =
      "#{timestamp()}_C#{media["contact_id"]}_F#{media["flow_id"]}_M#{media["id"]}.#{extension}"

    local_name = "#{System.tmp_dir!()}/#{remote_name}"

    media =
      media
      |> Map.put("remote_name", remote_name)
      |> Map.put("local_name", local_name)

    Logger.info("GCSWORKER: Performing gcs media with details for media id: #{inspect(media)}")

    download_file_to_temp(media["url"], local_name, media["organization_id"])
    |> case do
      {:ok, _} ->
        uploading_to_gcs(local_name, media)
        :ok

      {:error, :timeout} ->
        error =
          "GCSWORKER: GCS Download timeout for org_id: #{media["organization_id"]}, media_id: #{media["id"]}"

        Logger.info(error)

        {:error, error}

      {:error, error} ->
        error =
          "GCSWORKER: GCS Upload failed for org_id: #{media["organization_id"]}, media_id: #{media["id"]}, error: #{inspect(error)}"

        Logger.info(error)

        {:discard, error}
    end
  end

  @spec uploading_to_gcs(String.t(), map()) :: :ok
  defp uploading_to_gcs(local_name, media) do
    upload_file_on_gcs(media)
    |> case do
      {:ok, response} ->
        get_public_link(response)
        |> update_gcs_url(media["id"])

        File.rm(local_name)

      {:error, error} ->
        handle_gcs_error(media["organization_id"], error)
    end

    :ok
  end

  @spec handle_gcs_error(non_neg_integer, map()) :: String.t()
  defp handle_gcs_error(org_id, error) do
    Jason.decode(error.body)
    |> case do
      {:ok, data} ->
        [error] = get_in(data, ["error", "errors"])

        # We will disabling GCS when billing account is disabled
        if error["reason"] == "accountDisabled" do
          Partners.disable_credential(
            org_id,
            "google_cloud_storage",
            "Billing account is disabled"
          )
        end

        error = "GCSWORKER: Error while uploading file to GCS #{inspect(error)}"
        Logger.info(error)

        error

      _ ->
        {_, stacktrace} = Process.info(self(), :current_stacktrace)

        error =
          "GCSWORKER: Error while uploading file to GCS #{inspect(error)} stacktrace: #{inspect(stacktrace)}"

        Logger.info(error)

        error
    end
  end

  @spec get_public_link(map()) :: String.t()
  defp get_public_link(response) do
    Enum.join(["https://storage.googleapis.com", response.id], "/")
    |> String.replace("/#{response.generation}", "")
  end

  @spec upload_file_on_gcs(map()) ::
          {:ok, GoogleApi.Storage.V1.Model.Object.t()} | {:error, Tesla.Env.t()}
  defp upload_file_on_gcs(%{"local_name" => local_name} = media) do
    remote_name = Glific.Clients.gcs_file_name(media)
    upload_file_on_gcs(local_name, remote_name, media["organization_id"])
  end

  @spec upload_file_on_gcs(String.t(), String.t(), non_neg_integer) ::
          {:ok, GoogleApi.Storage.V1.Model.Object.t()} | {:error, Tesla.Env.t()} | {:error, map()}
  defp upload_file_on_gcs(local, remote, organization_id) do
    Logger.info("GCSWORKER: Uploading to GCS, org_id: #{organization_id}, file_name: #{remote}")

    CloudStorage.put(
      Glific.Media,
      :original,
      {
        %Waffle.File{path: local, file_name: remote},
        "#{organization_id}"
      }
    )
    |> case do
      {:ok, response} ->
        {:ok, response}

      {:error, error} when is_map(error) == true ->
        {:error, error}

      response ->
        {:error, %{body: response}}
    end
  end

  @doc """
  Public interface to upload a file provided by the org at local name to gcs as remote name
  """
  @spec upload_media(String.t(), String.t(), non_neg_integer) ::
          {:ok, map()} | {:error, String.t()}
  def upload_media(local, remote, organization_id) do
    upload_file_on_gcs(local, remote, organization_id)
    |> case do
      {:ok, response} ->
        File.rm(local)
        {type, _media} = Messages.get_media_type_from_url(response.selfLink)
        {:ok, %{url: get_public_link(response), type: type}}

      {:error, error} ->
        error = handle_gcs_error(organization_id, error)
        {:error, error}
    end
  end

  @spec update_gcs_url(String.t(), integer()) ::
          {:ok, MessageMedia.t()} | {:error, Ecto.Changeset.t()}
  defp update_gcs_url(gcs_url, id) do
    {:ok, message_media} =
      Repo.get(MessageMedia, id)
      |> MessageMedia.changeset(%{gcs_url: gcs_url})
      |> Repo.update()

    organization_id = message_media.organization_id

    if BigQuery.is_active?(organization_id) do
      BigQueryWorker.queue_message_media_data([message_media], organization_id, %{
        action: :update,
        max_id: nil,
        last_updated_at: Timex.now()
      })
    end

    {:ok, message_media}
  end

  @spec get_media_extension(String.t()) :: String.t()
  defp get_media_extension(type) do
    %{
      image: "png",
      video: "mp4",
      audio: "mp3",
      document: "pdf"
    }
    |> Map.get(Glific.safe_string_to_atom(type), "png")
  end

  @doc """
  Download a file to the specific path. Should move this to a more generic
  helper file in glific
  """
  @spec download_file_to_temp(String.t(), String.t(), non_neg_integer) ::
          {:ok, String.t()} | {:error, any()}
  def download_file_to_temp(url, path, org_id) do
    Logger.info("GCSWORKER: Downloading file: org_id: #{org_id}, url: #{url}")

    Tesla.get(url, opts: [adapter: [recv_timeout: 10_000]])
    |> case do
      {:ok, %Tesla.Env{status: status, body: body} = _env} when status in 200..299 ->
        File.write!(path, body)
        {:ok, path}

      {:error, :timeout} ->
        {:error, :timeout}

      {:error, %Tesla.Error{reason: reason}} ->
        {:error, reason}

      error ->
        {:error, error}
    end
  end
end