分段上传
普通上传方式只能上传小于5GB的对象,如果需要上传超过5GB的对象,您可以使用分段上传模式。
使用分段上传时,需要分为如下3个步骤:
- 初始化一个分段上传任务,使用 create_multipart_upload 接口
- 使用 upload_part 接口逐个或并行上传分段,或者使用 upload_part_copy 接口复制分段
- 完成分段上传,使用 complete_multipart_upload 接口
使用分段上传模式传输数据前,必须先初始化一个分段上传任务,分段上传任务可用于上传或复制对象。该操作会返回一个服务器创建的全局唯一的Upload ID,用于标识本次分段上传任务。用户可以根据这个ID来发起相关的操作,如中止分段上传任务、查询分段上传任务等。
初始化一个分段上传任务之后,可以根据指定的对象名和 Upload ID 来上传分段(Part)数据。每一个上传的 Part 都有一个标识它的号码——分段号(part number,范围是1~10000)。对于同一个 Upload ID,该分段号不但唯一标识这一块数据,也标识了这块数据在整个文件内的相对位置,您需要在 upload_part 中指定 Upload ID 与分段号。如果用同一个分段号码,上传了新的数据,那么已有的这个分段的数据将被覆盖。除了最后一块 part 以外,其他的 part 最小为5MB;最后一块part没有大小限制。分段不需要按顺序上传,甚至可以在不同进程、不同机器上上传,在完成分段上传后服务端会按照分段号排序组成大文件。
您可以初始化一个分段上传任务之后,通过 upload_part_copy 接口分段复制某一个对象,您需要在 upload_part_copy 中设置复制的源桶名、源对象名,目标桶名、目标对象名和Upload ID来分段复制对象。与上传分段类似的,每一个复制的分段都有一个分段号(part number),您也需要在 upload_part_copy 中设置分段号标识复制的分段。upload_part_copy 操作的对象必须大于5MB。
所有分段上传或复制完成后,需要调用 complete_multipart_upload 来完成整个分段上传任务。在执行该操作时,需要提供所有有效的分段列表(包括分段号和分段 ETAG );服务端收到提交的分段列表后,会逐一验证每个分段的有效性。当所有的数据Part验证通过后,服务端将把这些分段组合成一个完整的 Object。
分段上传示例代码如下:
class MyFile < File
PART_SIZE = 1024 * 1024 * 5
def each_part
yield read(PART_SIZE) until eof?
end
end
def multipart_upload(client, bucket_name, object_key, localfile)
input_opts = {
bucket: bucket_name,
key: object_key,
}
upload_id = client.create_multipart_upload(input_opts).upload_id
current_part = 1
parts = []
MyFile.open(localfile, 'rb') do |file|
total_parts = (file.size.to_f / MyFile::PART_SIZE).ceil
file.each_part do |part|
part_response = client.upload_part({
body: part,
bucket: bucket_name,
key: object_key,
part_number: current_part,
upload_id: upload_id,
})
percent_complete = (current_part.to_f / total_parts.to_f) * 100
percent_complete = 100 if percent_complete > 100
percent_complete = sprintf('%.2f', percent_complete.to_f)
puts "Upload progress: #{percent_complete}%"
parts << {
etag: part_response.etag,
part_number: current_part,
}
current_part = current_part + 1
end
end
input_opts = input_opts.merge(
multipart_upload: { parts: parts },
upload_id: upload_id
)
client.complete_multipart_upload(input_opts)
puts 'Multipart upload completed.'
rescue StandardError => e
puts "Error in multipart upload: #{e.message}"
end
分段复制示例代码如下:
def multipart_copy(client, src_bucket, src_obj_key, dest_bucket, dest_obj_key)
input_opts = {
bucket: dest_bucket,
key: dest_obj_key,
}
part_size = 1024 * 1024 * 5
upload_id = client.create_multipart_upload(input_opts).upload_id
obj_size = client.head_object(
bucket: src_bucket,
key: src_obj_key
).content_length
if obj_size <= part_size
puts 'The size of the object must be larger than 5MB.'
return
end
total_parts = (obj_size.to_f / part_size).ceil
current_part = 1
start_byte = 0
end_byte = 0
parts = []
while current_part <= total_parts do
start_byte = (current_part - 1) * part_size + (current_part - 1)
end_byte = start_byte + part_size < obj_size ? start_byte + part_size : obj_size - 1
part_response = client.upload_part_copy(
bucket: dest_bucket,
copy_source: "/#{src_bucket}/#{src_obj_key}",
copy_source_range: "bytes=#{start_byte}-#{end_byte}",
key: dest_obj_key,
part_number: current_part,
upload_id: upload_id
)
percent_complete = (current_part.to_f / total_parts.to_f) * 100
percent_complete = 100 if percent_complete > 100
percent_complete = sprintf('%.2f', percent_complete.to_f)
puts "Copy progress: #{percent_complete}%"
parts << {
etag: part_response.copy_part_result.etag,
part_number: current_part,
}
current_part = current_part + 1
end
input_opts = input_opts.merge(
multipart_upload: { parts: parts },
upload_id: upload_id
)
client.complete_multipart_upload(input_opts)
puts 'Multipart copy completed.'
rescue StandardError => e
puts "Error in multipart copy: #{e.message}"
end