lib/aliyun_oss/object/multipart_upload.ex

defmodule Aliyun.Oss.Object.MultipartUpload do
  @moduledoc """
  Object operations - Multipart Upload.
  """

  alias Aliyun.Oss.Config
  alias Aliyun.Oss.Bucket
  alias Aliyun.Oss.Service
  alias Aliyun.Oss.Client.{Response, Error}
  alias Aliyun.Oss.TaskSupervisor

  @type error() ::
          %Error{body: String.t(), status_code: integer(), parsed_details: map()} | atom()

  @doc """
  A shortcut for uploading streaming data.

  ## Examples

      iex> part_bytes = 102400 # The minimum allowed size is 100KB.
      iex> parts = File.stream!("/path/to/file", [], part_bytes)
      iex> Aliyun.Oss.Object.MultipartUpload.upload(config, "some-bucket", "some-object", parts)
      {:ok, %Aliyun.Oss.Client.Response{
          data: %{
            "CompleteMultipartUploadResult" => %{
            "Bucket" => "some-bucket",
            "ETag" => "\"21000000000000000000000000000000-1\"",
            "Key" => "some-object",
            "Location" => "https://some-bucket.oss-cn-shenzhen.aliyuncs.com/some-object"
            }
          },
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @spec upload(Config.t(), String.t(), String.t(), Enum.t()) ::
          {:error, error()} | {:ok, Response.t()}
  def upload(config, bucket, object, parts) do
    case init_upload(config, bucket, object) do
      {:ok, upload_id} ->
        try do
          with {:ok, uploaded_parts} <-
                 async_upload_parts(config, bucket, object, upload_id, parts),
               sorted_parts <- Enum.sort_by(uploaded_parts, &elem(&1, 0)),
               {:ok, resp} <- complete_upload(config, bucket, object, upload_id, sorted_parts) do
            {:ok, resp}
          else
            {:error, error} ->
              Task.Supervisor.start_child(TaskSupervisor, fn ->
                abort_upload(config, bucket, object, upload_id)
              end)

              {:error, error}
          end
        catch
          _, error ->
            Task.Supervisor.start_child(TaskSupervisor, fn ->
              abort_upload(config, bucket, object, upload_id)
            end)

            raise(error)
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp async_upload_parts(config, bucket, object, upload_id, parts) do
    Task.Supervisor.async_stream(
      TaskSupervisor,
      Stream.with_index(parts, 1),
      fn {binary, num} ->
        try do
          {num, upload_part(config, bucket, object, upload_id, num, binary)}
        catch
          _, _ -> {:error, {num, :failed}}
        end
      end,
      ordered: false,
      on_timeout: :kill_task
    )
    |> Enum.reduce_while({:ok, []}, fn
      {:ok, {part_number, {:ok, %{headers: headers}}}}, {:ok, uploaded_parts} ->
        {:cont, {:ok, [{part_number, get_etag_header(headers)} | uploaded_parts]}}

      _, _ ->
        {:halt, {:error, :failed_uploading_parts}}
    end)
  end

  defp get_etag_header(headers) do
    {_, etag} = Enum.find(headers, fn {k, _} -> k == "ETag" end)
    etag
  end

  @doc """
  InitiateMultipartUpload - notifies OSS to initiate a multipart upload task before you perform
  multipart upload to upload data.

  ## Options

  - `:encoding_type` - default is blank, accept value: `:url`

  ## Examples

      iex> Aliyun.Oss.Object.MultipartUpload.init_upload(config, "some-bucket", "some-object")
      {:ok, "UPLOAD_ID"}

  """
  @spec init_upload(Config.t(), String.t(), String.t(), map(), encoding_type: :url) ::
          {:error, error()} | {:ok, String.t()}
  def init_upload(config, bucket, object, headers \\ %{}, opts \\ []) do
    query_params =
      case Keyword.get(opts, :encoding_type) do
        :url -> %{"encoding-type" => "url"}
        _ -> %{}
      end

    case Service.post(config, bucket, object, "",
           query_params: query_params,
           headers: headers,
           sub_resources: %{"uploads" => nil}
         ) do
      {:ok, %{data: %{"InitiateMultipartUploadResult" => %{"UploadId" => upload_id}}}} ->
        {:ok, upload_id}

      {:error, error} ->
        {:error, error}
    end
  end

  @doc """
  UploadPart - uploads data by part based on the specified object name and upload ID after you
  initiate a multipart upload operation.

  ## Examples

      iex> Aliyun.Oss.Object.MultipartUpload.upload_part(config, "some-bucket", "some-object", "UPLOAD_ID", 1, "CONTENT")
      {:ok, %Aliyun.Oss.Client.Response{
          data: "",
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @spec upload_part(Config.t(), String.t(), String.t(), String.t(), integer(), String.t()) ::
          {:error, error()} | {:ok, Response.t()}
  def upload_part(config, bucket, object, upload_id, part_number, body) do
    sub_resources = %{"uploadId" => upload_id, "partNumber" => part_number}
    Service.put(config, bucket, object, body, sub_resources: sub_resources)
  end

  @doc """
  UploadPartCopy - copies data from an existing object to upload a part by adding a `x-oss-copy-request`
  header to UploadPart.

  ## Examples

      iex> Aliyun.Oss.Object.MultipartUpload.upload_part_copy(config, "some-bucket", "some-object", "UPLOAD_ID", 1, "/SourceBucketName/SourceObjectName")
      {:ok, %Aliyun.Oss.Client.Response{
          data: %{
            "CopyPartResult" => %{
              "ETag" => "\"09800000000000000000000000000000\"",
              "LastModified" => "2017-05-14T07:44:26.000Z"
            }
          },
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @spec upload_part_copy(
          Config.t(),
          String.t(),
          String.t(),
          String.t(),
          integer(),
          String.t(),
          String.t()
        ) ::
          {:error, error()} | {:ok, Response.t()}
  def upload_part_copy(
        config,
        bucket,
        object,
        upload_id,
        part_number,
        copy_source,
        copy_source_range \\ "",
        headers \\ %{}
      ) do
    sub_resources = %{"uploadId" => upload_id, "partNumber" => part_number}

    headers =
      Map.merge(headers, %{
        "x-oss-copy-source" => copy_source,
        "x-oss-copy-source-range" => copy_source_range
      })

    Service.put(config, bucket, object, "", sub_resources: sub_resources, headers: headers)
  end

  @doc """
  CompleteMultiUpload - completes the multipart upload task of an object after all parts of the
  object are uploaded.

  ## Examples

      iex> uploaded_parts = [{1, "ETAG_FOR_PART1}, {2, "ETAG_FOR_PART2}]
      iex> Aliyun.Oss.Object.MultipartUpload.complete_upload(config, "some-bucket", "some-object", "UPLOAD_ID", uploaded_parts)
      {:ok, %Aliyun.Oss.Client.Response{
          data: %{
            "CompleteMultipartUploadResult" => %{
              "Bucket" => "some-bucket",
              "ETag" => "\"21000000000000000000000000000000-1\"",
              "Key" => "some-object",
              "Location" => "https://some-bucket.oss-cn-shenzhen.aliyuncs.com/some-object"
            }
          },
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @body_tmpl """
  <?xml version="1.0" encoding="UTF-8"?>
  <CompleteMultipartUpload>
    <%= for {part_number, etag} <- parts do %>
    <Part>
      <PartNumber><%= part_number %></PartNumber>
      <ETag><%= etag %></ETag>
    </Part>
    <% end %>
  </CompleteMultipartUpload>
  """
  @spec complete_upload(
          Config.t(),
          String.t(),
          String.t(),
          String.t(),
          list({integer(), String.t()}),
          map()
        ) ::
          {:error, error()} | {:ok, Response.t()}
  def complete_upload(config, bucket, object, upload_id, parts, headers \\ %{}) do
    body = EEx.eval_string(@body_tmpl, parts: parts)

    Service.post(config, bucket, object, body,
      headers: headers,
      sub_resources: %{"uploadId" => upload_id}
    )
  end

  @doc """
  AbortMultipartUpload - cancels a multipart upload task and deletes the parts uploaded in the task.

  ## Examples

      iex> Aliyun.Oss.Object.MultipartUpload.abort_upload(config, "some-bucket", "some-object", "UPLOAD_ID")
      {:ok, %Aliyun.Oss.Client.Response{
          data: "",
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @spec abort_upload(Config.t(), String.t(), String.t(), String.t()) ::
          {:error, error()} | {:ok, Response.t()}
  def abort_upload(config, bucket, object, upload_id) do
    Service.delete(config, bucket, object, sub_resources: %{"uploadId" => upload_id})
  end

  @doc """
  ListMultipartUploads - lists all multipart upload tasks in progress.

  The result includes the tasks are not completed or canceled.

  ## Examples

      iex> Aliyun.Oss.Object.MultipartUpload.list_uploads(config, "some-bucket")
      {:ok, %Aliyun.Oss.Client.Response{
          data: %{
            "ListMultipartUploadsResult" => %{
              "Bucket" => "some-bucket",
              "Delimiter" => nil,
              "IsTruncated" => false,
              "KeyMarker" => nil,
              "MaxUploads" => "1000",
              "NextKeyMarker" => nil,
              "NextUploadIdMarker" => nil,
              "Prefix" => nil,
              "Upload" => [
                %{
                  "Initiated" => "2018-05-14T07:59:10.000Z",
                  "Key" => "some-object",
                  "StorageClass" => "Standard",
                  "UploadId" => "UPLOAD_ID"
                },
                %{
                  "Initiated" => "2018-05-14T07:59:10.000Z",
                  "Key" => "some-object",
                  "StorageClass" => "Standard",
                  "UploadId" => "UPLOAD_ID"
                }
              ],
              "UploadIdMarker" => nil
            }
          },
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @spec list_uploads(Config.t(), String.t(), map()) :: {:error, error()} | {:ok, Response.t()}
  def list_uploads(config, bucket, query_params \\ %{}) do
    Bucket.get_bucket(config, bucket, query_params, %{"uploads" => nil})
  end

  @doc """
  ListParts - Lists all parts that are uploaded by using a specified upload ID.

  ## Examples

      iex> Aliyun.Oss.Object.MultipartUpload.list_parts(config, "some-bucket", "some-object", "UPLOAD_ID")
      {:ok, %Aliyun.Oss.Client.Response{
          data:  %{
            "ListPartsResult" => %{
              "Bucket" => "some-bucket",
              "IsTruncated" => false,
              "Key" => "some-object",
              "MaxParts" => "1000",
              "NextPartNumberMarker" => "2",
              "Part" => [
                %{
                  "ETag" => "\"09000000000000000000000000000000\"",
                  "HashCrc64ecma" => "15248619871383844432",
                  "LastModified" => "2018-05-14T08:03:26.000Z",
                  "PartNumber" => "1",
                  "Size" => "1538"
                },
                %{
                  "ETag" => "\"2A000000000000000000000000000000\"",
                  "HashCrc64ecma" => "13658734736388254586",
                  "LastModified" => "2018-05-14T08:03:37.000Z",
                  "PartNumber" => "2",
                  "Size" => "18"
                }
              ],
              "PartNumberMarker" => "0",
              "StorageClass" => "Standard",
              "UploadId" => "UPLOAD_ID"
            }
          },
          headers: [
            {"Server", "AliyunOSS"},
            {"Date", "Wed, 05 Dec 2018 02:34:57 GMT"},
            ...
          ]
        }
      }

  """
  @spec list_parts(Config.t(), String.t(), String.t(), String.t(), map()) ::
          {:error, error()} | {:ok, Response.t()}
  def list_parts(config, bucket, object, upload_id, query_params \\ %{}) do
    Service.get(config, bucket, object,
      query_params: query_params,
      sub_resources: %{"uploadId" => upload_id}
    )
  end
end