分段上传对象
分段上传可以将大文件分割成片段后上传,除了最后一个片段,每个片段的数据大小为5MB~5GB。本文介绍如何分段上传对象。
分段上传(Multipart Upload)分为以下三个步骤:
-
分段上传初始化。
调用
S3_initiate_multipart
方法获得分段上传全局唯一的upload_id。接口定义:
void S3_initiate_multipart(S3BucketContext *bucketContext, const char *key, S3PutProperties *putProperties, S3MultipartInitialHandler *handler, S3RequestContext *requestContext, int timeoutMs, void *callbackData);
参数:
参数名 类型 说明 bucketContext S3BucketContext * 包含bucket及相关的请求参数 key const char * 将要上传的对象的文件名 putProperties S3PutProperties * 请求消息头,可选地提供了附加的属性以应用于要上传的对象 handler S3MultipartInitialHandler * 回调函数 requestContext S3RequestContext * 请求参数,如果为NULL,则立即同步执行请求 timeoutMs int 如果非0,则是以毫秒为单位的请求超时时间 callbackData void * 回调数据 -
上传分段。
调用
S3_upload_part
方法上传分段数据。接口定义:
void S3_upload_part(S3BucketContext *bucketContext, const char *key, S3PutProperties * putProperties, S3PutObjectHandler *handler, int seq, const char *upload_id, int partContentLength, S3RequestContext *requestContext, int timeoutMs, void *callbackData);
参数:
参数名 类型 说明 bucketContext S3BucketContext * 包含bucket及相关的请求参数 key const char * 将要上传的对象的文件名 putProperties S3PutProperties * 请求消息头,可选地提供了附加的属性以应用于要上传的对象 handler S3PutObjectHandler* 回调函数 seq int 分段编号,它唯一地标识了一个分段 upload_id const char * S3_initiate_multipart中得到的upload_id partContentLength int 以字节数为单位的本次分段大小 requestContext S3RequestContext * 请求参数,如果为NULL,则立即同步执行请求 timeoutMs int 如果非0,则是以毫秒为单位的请求超时时间 callbackData void * 回调数据 -
完成分段上传。
所有分段上传完成后,调用
S3_complete_multipart_upload
方法将所有分段合并成完整的文件。接口定义:
void S3_complete_multipart_upload(S3BucketContext *bucketContext, const char *key, S3MultipartCommitHandler *handler, const char *upload_id, int contentLength, S3RequestContext *requestContext, int timeoutMs, void *callbackData);
参数:
参数名 类型 说明 bucketContext S3BucketContext * 包含bucket及相关的请求参数 key const char * 将要上传的对象的文件名 handler S3MultipartCommitHandler* 回调函数 upload_id const char * S3_initiate_multipart中得到的upload_id contentLength int 以字节数为单位的提交信息的大小 requestContext S3RequestContext * 请求参数,如果为NULL,则立即同步执行请求 timeoutMs int 如果非0,则是以毫秒为单位的请求超时时间 callbackData void * 回调数据
代码示例:
#include <sys/stat.h>
#define MULTIPART_CHUNK_SIZE (5 << 20) // multipart is 5M
typedef struct growbuffer
{
// The total number of bytes, and the start byte
int size;
// The start byte
int start;
// The blocks
char data[64 * 1024];
struct growbuffer *prev, *next;
} growbuffer;
typedef struct UploadManager
{
//used for initial multipart
char *upload_id;
//used for upload part object
char **etags;
int next_etags_pos;
//used for commit Upload
growbuffer *gb;
int remaining;
} UploadManager;
typedef struct put_object_callback_data
{
FILE *infile;
growbuffer *gb;
uint64_t contentLength, originalContentLength;
uint64_t totalContentLength, totalOriginalContentLength;
int noStatus;
} put_object_callback_data;
int putObjectDataCallback(int bufferSize, char *buffer,
void *callbackData)
{
put_object_callback_data *data = (put_object_callback_data *)callbackData;
int ret = 0;
if (data->contentLength)
{
int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength);
ret = fread(buffer, 1, toRead, data->infile);
}
data->contentLength -= ret;
return ret;
}
S3Status initial_multipart_callback(const char *upload_id,
void *callbackData)
{
UploadManager *manager = (UploadManager *)callbackData;
manager->upload_id = strdup(upload_id);
return S3StatusOK;
}
S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData)
{
responsePropertiesCallback(properties, callbackData);
MultipartPartData *data = (MultipartPartData *)callbackData;
int seq = data->seq;
const char *etag = properties->eTag;
data->manager->etags[seq - 1] = strdup(etag);
data->manager->next_etags_pos = seq;
return S3StatusOK;
}
static int multipartPutXmlCallback(int bufferSize, char *buffer,
void *callbackData)
{
UploadManager *manager = (UploadManager *)callbackData;
int ret = 0;
if (manager->remaining)
{
int toRead = ((manager->remaining > bufferSize) ? bufferSize : manager->remaining);
growbuffer_read(&(manager->gb), toRead, &ret, buffer);
}
manager->remaining -= ret;
return ret;
}
static int growbuffer_append(growbuffer **gb, const char *data, int dataLen)
{
int origDataLen = dataLen;
while (dataLen)
{
growbuffer *buf = *gb ? (*gb)->prev : 0;
if (!buf || (buf->size == sizeof(buf->data)))
{
buf = (growbuffer *)malloc(sizeof(growbuffer));
if (!buf)
{
return 0;
}
buf->size = 0;
buf->start = 0;
if (*gb && (*gb)->prev)
{
buf->prev = (*gb)->prev;
buf->next = *gb;
(*gb)->prev->next = buf;
(*gb)->prev = buf;
}
else
{
buf->prev = buf->next = buf;
*gb = buf;
}
}
int toCopy = (sizeof(buf->data) - buf->size);
if (toCopy > dataLen)
{
toCopy = dataLen;
}
memcpy(&(buf->data[buf->size]), data, toCopy);
buf->size += toCopy, data += toCopy, dataLen -= toCopy;
}
return origDataLen;
}
static void growbuffer_read(growbuffer **gb, int amt, int *amtReturn,
char *buffer)
{
*amtReturn = 0;
growbuffer *buf = *gb;
if (!buf)
{
return;
}
*amtReturn = (buf->size > amt) ? amt : buf->size;
memcpy(buffer, &(buf->data[buf->start]), *amtReturn);
buf->start += *amtReturn, buf->size -= *amtReturn;
if (buf->size == 0)
{
if (buf->next == buf)
{
*gb = 0;
}
else
{
*gb = buf->next;
buf->prev->next = buf->next;
buf->next->prev = buf->prev;
}
free(buf);
buf = NULL;
}
}
static void growbuffer_destroy(growbuffer *gb)
{
growbuffer *start = gb;
while (gb)
{
growbuffer *next = gb->next;
free(gb);
gb = (next == start) ? 0 : next;
}
}
void put_object_multipart(const char *filename)
{
put_object_callback_data data;
struct stat statbuf;
if (stat(filename, &statbuf) == -1)
{
fprintf(stderr, "\nERROR: Failed to stat file %s: ", filename);
perror(0);
exit(-1);
}
int contentLength = statbuf.st_size;
data.contentLength = contentLength;
if (!(data.infile = fopen(filename, "r")))
{
fprintf(stderr, "\nERROR: Failed to open input file %s: ", filename);
perror(0);
exit(-1);
}
uint64_t totalContentLength = contentLength;
uint64_t todoContentLength = contentLength;
UploadManager manager;
manager.upload_id = 0;
manager.gb = 0;
S3PutProperties putProperties = {};
//div round up
int seq;
int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) /
MULTIPART_CHUNK_SIZE);
MultipartPartData partData;
memset(&partData, 0, sizeof(MultipartPartData));
int partContentLength = 0;
S3MultipartInitialHandler handler = {
{&responsePropertiesCallback,
&responseCompleteCallback},
&initial_multipart_callback};
S3PutObjectHandler putObjectHandler = {
{&MultipartResponseProperiesCallback, &responseCompleteCallback},
&putObjectDataCallback};
S3MultipartCommitHandler commit_handler = {
{&responsePropertiesCallback, &responseCompleteCallback},
&multipartPutXmlCallback,
0};
manager.etags = (char **)malloc(sizeof(char *) * totalSeq);
manager.next_etags_pos = 0;
do
{
S3_initiate_multipart(&bucketContext, test_key, 0, &handler, 0, 0, &manager);
} while (S3_status_is_retryable(statusG) && should_retry());
if (manager.upload_id == 0 || statusG != S3StatusOK)
{
printError();
goto clean;
}
printf("uploadId:%s\n", manager.upload_id);
upload:
todoContentLength -= MULTIPART_CHUNK_SIZE * manager.next_etags_pos;
for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++)
{
partData.manager = &manager;
partData.seq = seq;
if (partData.put_object_data.gb == NULL)
{
partData.put_object_data = data;
}
partContentLength = ((contentLength > MULTIPART_CHUNK_SIZE) ? MULTIPART_CHUNK_SIZE : contentLength);
printf("%s Part Seq %d, length=%d\n", "Sending", seq, partContentLength);
partData.put_object_data.contentLength = partContentLength;
partData.put_object_data.originalContentLength = partContentLength;
partData.put_object_data.totalContentLength = todoContentLength;
partData.put_object_data.totalOriginalContentLength = totalContentLength;
putProperties.md5 = 0;
do
{
S3_upload_part(&bucketContext, test_key, &putProperties,
&putObjectHandler, seq, manager.upload_id,
partContentLength,
0, 0,
&partData);
} while (S3_status_is_retryable(statusG) && should_retry());
if (statusG != S3StatusOK)
{
printError();
goto clean;
}
contentLength -= MULTIPART_CHUNK_SIZE;
todoContentLength -= MULTIPART_CHUNK_SIZE;
}
int i;
int size = 0;
size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>",
strlen("<CompleteMultipartUpload>"));
char buf[256];
int n;
for (i = 0; i < totalSeq; i++)
{
n = snprintf(buf, sizeof(buf), "<Part><PartNumber>%d</PartNumber>"
"<ETag>%s</ETag></Part>",
i + 1, manager.etags[i]);
size += growbuffer_append(&(manager.gb), buf, n);
}
size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>",
strlen("</CompleteMultipartUpload>"));
manager.remaining = size;
do
{
S3_complete_multipart_upload(&bucketContext, test_key, &commit_handler,
manager.upload_id, manager.remaining,
0, 0, &manager);
} while (S3_status_is_retryable(statusG) && should_retry());
if (statusG != S3StatusOK)
{
printError();
goto clean;
}
clean:
if (manager.upload_id)
{
free(manager.upload_id);
}
for (i = 0; i < manager.next_etags_pos; i++)
{
free(manager.etags[i]);
}
growbuffer_destroy(manager.gb);
free(manager.etags);
}