lib/glific/third_party/sheets/sheets.ex

defmodule Glific.Sheets do
  @moduledoc """
  The Sheets context
  """

  import Ecto.Query, warn: false
  require Logger

  alias Glific.{
    Flows.Action,
    Flows.FlowContext,
    Messages,
    Repo,
    Sheets.ApiClient,
    Sheets.Sheet,
    Sheets.SheetData
  }

  @doc """
  Creates a sheet

  ## Examples

      iex> create_sheet(%{field: value})
      {:ok, %Sheet{}}

      iex> create_sheet(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_sheet(map()) :: {:ok, Sheet.t()} | {:error, any()}
  def create_sheet(attrs) do
    with {:ok, true} <- validate_sheet(attrs.url),
         {:ok, sheet} <-
           %Sheet{}
           |> Sheet.changeset(attrs)
           |> Repo.insert() do
      sync_sheet_data(sheet)
    end
  end

  @spec validate_sheet(String.t()) :: {:ok, true} | {:error, String.t()}
  defp validate_sheet(url) do
    Tesla.get(url)
    |> case do
      {:ok, %Tesla.Env{status: status}} when status in 200..299 ->
        {:ok, true}

      _ ->
        {:error,
         "Please double-check the URL and make sure the sharing access for the sheet is at least set to 'Anyone with the link' can view."}
    end
  end

  @doc """
  Updates a sheet.

  ## Examples

      iex> update_sheet(sheet, %{field: new_value})
      {:ok, %Sheet{}}

      iex> update_sheet(sheet, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_sheet(Sheet.t(), map()) :: {:ok, Sheet.t()} | {:error, Ecto.Changeset.t()}
  def update_sheet(%Sheet{} = sheet, attrs) do
    with {:ok, sheet} <-
           sheet
           |> Sheet.changeset(attrs)
           |> Repo.update() do
      # incase we update the url. Let's resync the sheet data.
      if Map.has_key?(attrs, :url), do: sync_sheet_data(sheet), else: {:ok, sheet}
    end
  end

  @doc """
  Deletes a sheet.

  ## Examples

      iex> delete_sheet(sheet)
      {:ok, %Sheet{}}

      iex> delete_sheet(sheet)
      {:error, %Ecto.Changeset{}}

  """
  @spec delete_sheet(Sheet.t()) :: {:ok, Sheet.t()} | {:error, Ecto.Changeset.t()}
  def delete_sheet(%Sheet{} = sheet), do: Repo.delete(sheet)

  @doc """
  Gets a single sheet.

  Raises `Ecto.NoResultsError` if the Sheet does not exist.

  ## Examples

      iex> get_sheet!(123)
      %Sheet{}

      iex> get_sheet!(456)
      ** (Ecto.NoResultsError)

  """
  @spec get_sheet!(integer) :: Sheet.t()
  def get_sheet!(id), do: Repo.get!(Sheet, id)

  @doc """
  Returns the list of sheets.

  ## Examples

      iex> list_sheets()
      [%Sheet{}, ...]

  """
  @spec list_sheets(map()) :: [Sheet.t()]
  def list_sheets(args),
    do: Repo.list_filter(args, Sheet, &Repo.opts_with_label/2, &filter_with/2)

  @doc """
  Return the count of sheets, using the same filter as list_sheets
  """
  @spec count_sheets(map()) :: integer
  def count_sheets(args),
    do: Repo.count_filter(args, Sheet, &filter_with/2)

  @spec filter_with(Ecto.Queryable.t(), %{optional(atom()) => any}) :: Ecto.Queryable.t()
  defp filter_with(query, filter) do
    query = Repo.filter_with(query, filter)

    Enum.reduce(filter, query, fn
      {:is_active, is_active}, query ->
        from(q in query, where: q.is_active == ^is_active)

      _, query ->
        query
    end)
  end

  @doc """
  Sync a sheet
  """
  @spec sync_sheet_data(Sheet.t()) :: {:ok, Sheet.t()} | {:error, Ecto.Changeset.t()}
  def sync_sheet_data(sheet) do
    [sheet_url, gid] = String.split(sheet.url, ["edit", "view", "comment"])

    last_synced_at = DateTime.utc_now()
    export_url = sheet_url <> "export?format=csv&&" <> String.replace(gid, "#", "")

    SheetData
    |> where([sd], sd.sheet_id == ^sheet.id)
    |> Repo.delete_all()

    media_warnings =
      ApiClient.get_csv_content(url: export_url)
      |> Enum.reduce(%{}, fn {_, row}, acc ->
        parsed_rows = parse_row_values(row)

        %{
          ## we can also think in case we need first column.
          key: row["Key"],
          row_data: parsed_rows.values,
          sheet_id: sheet.id,
          organization_id: sheet.organization_id,
          last_synced_at: last_synced_at
        }
        |> create_sheet_data()

        Map.merge(acc, parsed_rows.errors)
      end)

    remove_stale_sheet_data(sheet, last_synced_at)

    sheet_data_count =
      SheetData
      |> where([sd], sd.sheet_id == ^sheet.id)
      |> Repo.aggregate(:count)

    ## we can move this to top of the function also. We can change that later.
    update_sheet(sheet, %{last_synced_at: last_synced_at, sheet_data_count: sheet_data_count})
    |> append_warnings(media_warnings)
  end

  defp append_warnings({:error, _error} = sheet, _media_warnings), do: sheet

  defp append_warnings({:ok, updated_sheet} = _sheet, media_warnings) do
    updated_sheet
    |> Map.put(:warnings, media_warnings)
    |> then(&{:ok, &1})
  end

  @spec parse_row_values(map()) :: map()
  defp parse_row_values(row) do
    clean_row_values =
      Enum.reduce(row, %{}, fn {key, value}, acc ->
        key = key |> String.downcase() |> String.replace(" ", "_")
        Map.put(acc, key, value)
      end)

    errors =
      clean_row_values
      |> Enum.reduce(%{}, fn {_key, value}, acc ->
        {media_type, _media} = Messages.get_media_type_from_url(value, log_error: false)

        with true <- media_type != :text,
             %{is_valid: is_valid, message: message} <-
               Glific.Messages.validate_media(value, Atom.to_string(media_type)),
             false <- is_valid do
          Map.put(acc, value, message)
        else
          _ -> acc
        end
      end)

    %{values: clean_row_values, errors: errors}
  end

  ## We are removing all the rows which are not refreshed in the last sync.
  ## We are assuming that these rows have been deleted from the sheet also.
  @spec remove_stale_sheet_data(Sheet.t(), DateTime.t()) :: {integer(), nil | [term()]}
  defp remove_stale_sheet_data(sheet, last_synced_at) do
    Repo.delete_all(
      from(sd in SheetData,
        where:
          sd.organization_id == ^sheet.organization_id and sd.sheet_id == ^sheet.id and
            sd.last_synced_at != ^last_synced_at
      )
    )
  end

  @doc """
  Creates a sheet

  ## Examples

      iex> create_sheet_data(%{field: value})
      {:ok, %Sheet{}}

      iex> create_sheet_data(%{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec create_sheet_data(map()) :: {:ok, Sheet.t()} | {:error, Ecto.Changeset.t()}
  def create_sheet_data(attrs) do
    %SheetData{}
    |> SheetData.changeset(attrs)
    |> Repo.insert()
  end

  @doc """
  Updates a sheet data

  ## Examples

      iex> update_sheet_data(sheet_data, %{field: new_value})
      {:ok, %SheetData{}}

      iex> update_sheet_data(sheet_data, %{field: bad_value})
      {:error, %Ecto.Changeset{}}

  """
  @spec update_sheet_data(SheetData.t(), map()) ::
          {:ok, SheetData.t()} | {:error, Ecto.Changeset.t()}
  def update_sheet_data(%SheetData{} = sheet_data, attrs) do
    sheet_data
    |> SheetData.changeset(attrs)
    |> Repo.update()
  end

  @doc """
  Execute a sheet action
  """
  @spec execute(Action.t() | any(), FlowContext.t()) :: {FlowContext.t(), Messages.Message.t()}
  def execute(action, context) do
    with {:ok, loaded_sheet} <-
           Repo.fetch_by(SheetData, %{
             sheet_id: action.sheet_id,
             key: FlowContext.parse_context_string(context, action.row),
             organization_id: context.organization_id
           }),
         context <-
           FlowContext.update_results(context, %{action.result_name => loaded_sheet.row_data}) do
      {context, Messages.create_temp_message(context.organization_id, "Success")}
    else
      _ ->
        {context, Messages.create_temp_message(context.organization_id, "Failure")}
    end
  end
end