分段上传对象

分段上传可以将大文件分割成片段后上传,除了最后一个片段,每个片段的数据大小为5MB~5GB。本文介绍如何分段上传对象。

分段上传(Multipart Upload)分为以下三个步骤:

  1. 分段上传初始化。

    调用S3_initiate_multipart方法获得分段上传全局唯一的upload_id。

    接口定义:

    void S3_initiate_multipart(S3BucketContext *bucketContext, const char *key,
                               S3PutProperties *putProperties,
                               S3MultipartInitialHandler *handler,
                               S3RequestContext *requestContext,
                               int timeoutMs,
                               void *callbackData);
    

    参数:

    参数名 类型 说明
    bucketContext S3BucketContext * 包含bucket及相关的请求参数
    key const char * 将要上传的对象的文件名
    putProperties S3PutProperties * 请求消息头,可选地提供了附加的属性以应用于要上传的对象
    handler S3MultipartInitialHandler * 回调函数
    requestContext S3RequestContext * 请求参数,如果为NULL,则立即同步执行请求
    timeoutMs int 如果非0,则是以毫秒为单位的请求超时时间
    callbackData void * 回调数据
  2. 上传分段。

    调用S3_upload_part方法上传分段数据。

    接口定义:

    void S3_upload_part(S3BucketContext *bucketContext, const char *key,
                        S3PutProperties * putProperties,
                        S3PutObjectHandler *handler,
                        int seq, const char *upload_id, int partContentLength,
                        S3RequestContext *requestContext,
                        int timeoutMs,
                        void *callbackData);
    

    参数:

    参数名 类型 说明
    bucketContext S3BucketContext * 包含bucket及相关的请求参数
    key const char * 将要上传的对象的文件名
    putProperties S3PutProperties * 请求消息头,可选地提供了附加的属性以应用于要上传的对象
    handler S3PutObjectHandler* 回调函数
    seq int 分段编号,它唯一地标识了一个分段
    upload_id const char * S3_initiate_multipart中得到的upload_id
    partContentLength int 以字节数为单位的本次分段大小
    requestContext S3RequestContext * 请求参数,如果为NULL,则立即同步执行请求
    timeoutMs int 如果非0,则是以毫秒为单位的请求超时时间
    callbackData void * 回调数据
  3. 完成分段上传。

    所有分段上传完成后,调用S3_complete_multipart_upload方法将所有分段合并成完整的文件。

    接口定义:

    void S3_complete_multipart_upload(S3BucketContext *bucketContext,
                                      const char *key,
                                      S3MultipartCommitHandler *handler,
                                      const char *upload_id,
                                      int contentLength,
                                      S3RequestContext *requestContext,
                                      int timeoutMs,
                                      void *callbackData);
    

    参数:

    参数名 类型 说明
    bucketContext S3BucketContext * 包含bucket及相关的请求参数
    key const char * 将要上传的对象的文件名
    handler S3MultipartCommitHandler* 回调函数
    upload_id const char * S3_initiate_multipart中得到的upload_id
    contentLength int 以字节数为单位的提交信息的大小
    requestContext S3RequestContext * 请求参数,如果为NULL,则立即同步执行请求
    timeoutMs int 如果非0,则是以毫秒为单位的请求超时时间
    callbackData void * 回调数据

代码示例:

#include <sys/stat.h>

#define MULTIPART_CHUNK_SIZE (5 << 20) // multipart is 5M

typedef struct growbuffer
{
    // The total number of bytes, and the start byte
    int size;
    // The start byte
    int start;
    // The blocks
    char data[64 * 1024];
    struct growbuffer *prev, *next;
} growbuffer;

typedef struct UploadManager
{
    //used for initial multipart
    char *upload_id;

    //used for upload part object
    char **etags;
    int next_etags_pos;

    //used for commit Upload
    growbuffer *gb;
    int remaining;
} UploadManager;

typedef struct put_object_callback_data
{
    FILE *infile;
    growbuffer *gb;
    uint64_t contentLength, originalContentLength;
    uint64_t totalContentLength, totalOriginalContentLength;
    int noStatus;
} put_object_callback_data;

int putObjectDataCallback(int bufferSize, char *buffer,
                                 void *callbackData)
{
    put_object_callback_data *data = (put_object_callback_data *)callbackData;

    int ret = 0;

    if (data->contentLength)
    {
        int toRead = ((data->contentLength > (unsigned)bufferSize) ? (unsigned)bufferSize : data->contentLength);
        ret = fread(buffer, 1, toRead, data->infile);
    }
    data->contentLength -= ret;
    return ret;
}

S3Status initial_multipart_callback(const char *upload_id,
                                    void *callbackData)
{
    UploadManager *manager = (UploadManager *)callbackData;
    manager->upload_id = strdup(upload_id);
    return S3StatusOK;
}

S3Status MultipartResponseProperiesCallback(const S3ResponseProperties *properties, void *callbackData)
{
    responsePropertiesCallback(properties, callbackData);
    MultipartPartData *data = (MultipartPartData *)callbackData;
    int seq = data->seq;
    const char *etag = properties->eTag;
    data->manager->etags[seq - 1] = strdup(etag);
    data->manager->next_etags_pos = seq;
    return S3StatusOK;
}

static int multipartPutXmlCallback(int bufferSize, char *buffer,
                                   void *callbackData)
{
    UploadManager *manager = (UploadManager *)callbackData;
    int ret = 0;
    if (manager->remaining)
    {
        int toRead = ((manager->remaining > bufferSize) ? bufferSize : manager->remaining);
        growbuffer_read(&(manager->gb), toRead, &ret, buffer);
    }
    manager->remaining -= ret;
    return ret;
}

static int growbuffer_append(growbuffer **gb, const char *data, int dataLen)
{
    int origDataLen = dataLen;
    while (dataLen)
    {
        growbuffer *buf = *gb ? (*gb)->prev : 0;
        if (!buf || (buf->size == sizeof(buf->data)))
        {
            buf = (growbuffer *)malloc(sizeof(growbuffer));
            if (!buf)
            {
                return 0;
            }
            buf->size = 0;
            buf->start = 0;
            if (*gb && (*gb)->prev)
            {
                buf->prev = (*gb)->prev;
                buf->next = *gb;
                (*gb)->prev->next = buf;
                (*gb)->prev = buf;
            }
            else
            {
                buf->prev = buf->next = buf;
                *gb = buf;
            }
        }

        int toCopy = (sizeof(buf->data) - buf->size);
        if (toCopy > dataLen)
        {
            toCopy = dataLen;
        }

        memcpy(&(buf->data[buf->size]), data, toCopy);

        buf->size += toCopy, data += toCopy, dataLen -= toCopy;
    }

    return origDataLen;
}

static void growbuffer_read(growbuffer **gb, int amt, int *amtReturn,
                            char *buffer)
{
    *amtReturn = 0;

    growbuffer *buf = *gb;

    if (!buf)
    {
        return;
    }

    *amtReturn = (buf->size > amt) ? amt : buf->size;

    memcpy(buffer, &(buf->data[buf->start]), *amtReturn);

    buf->start += *amtReturn, buf->size -= *amtReturn;

    if (buf->size == 0)
    {
        if (buf->next == buf)
        {
            *gb = 0;
        }
        else
        {
            *gb = buf->next;
            buf->prev->next = buf->next;
            buf->next->prev = buf->prev;
        }
        free(buf);
        buf = NULL;
    }
}

static void growbuffer_destroy(growbuffer *gb)
{
    growbuffer *start = gb;

    while (gb)
    {
        growbuffer *next = gb->next;
        free(gb);
        gb = (next == start) ? 0 : next;
    }
}

void put_object_multipart(const char *filename)
{
    put_object_callback_data data;
    struct stat statbuf;
    if (stat(filename, &statbuf) == -1)
    {
        fprintf(stderr, "\nERROR: Failed to stat file %s: ", filename);
        perror(0);
        exit(-1);
    }

    int contentLength = statbuf.st_size;
    data.contentLength = contentLength;

    if (!(data.infile = fopen(filename, "r")))
    {
        fprintf(stderr, "\nERROR: Failed to open input file %s: ", filename);
        perror(0);
        exit(-1);
    }

    uint64_t totalContentLength = contentLength;
    uint64_t todoContentLength = contentLength;
    UploadManager manager;
    manager.upload_id = 0;
    manager.gb = 0;

    S3PutProperties putProperties = {};

    //div round up
    int seq;
    int totalSeq = ((contentLength + MULTIPART_CHUNK_SIZE - 1) /
                    MULTIPART_CHUNK_SIZE);

    MultipartPartData partData;
    memset(&partData, 0, sizeof(MultipartPartData));
    int partContentLength = 0;

    S3MultipartInitialHandler handler = {
        {&responsePropertiesCallback,
         &responseCompleteCallback},
        &initial_multipart_callback};

    S3PutObjectHandler putObjectHandler = {
        {&MultipartResponseProperiesCallback, &responseCompleteCallback},
        &putObjectDataCallback};

    S3MultipartCommitHandler commit_handler = {
        {&responsePropertiesCallback, &responseCompleteCallback},
        &multipartPutXmlCallback,
        0};

    manager.etags = (char **)malloc(sizeof(char *) * totalSeq);
    manager.next_etags_pos = 0;

    do
    {
        S3_initiate_multipart(&bucketContext, test_key, 0, &handler, 0, 0, &manager);
    } while (S3_status_is_retryable(statusG) && should_retry());

    if (manager.upload_id == 0 || statusG != S3StatusOK)
    {
        printError();
        goto clean;
    }
    printf("uploadId:%s\n", manager.upload_id);

upload:
    todoContentLength -= MULTIPART_CHUNK_SIZE * manager.next_etags_pos;
    for (seq = manager.next_etags_pos + 1; seq <= totalSeq; seq++)
    {
        partData.manager = &manager;
        partData.seq = seq;
        if (partData.put_object_data.gb == NULL)
        {
            partData.put_object_data = data;
        }
        partContentLength = ((contentLength > MULTIPART_CHUNK_SIZE) ? MULTIPART_CHUNK_SIZE : contentLength);
        printf("%s Part Seq %d, length=%d\n", "Sending", seq, partContentLength);
        partData.put_object_data.contentLength = partContentLength;
        partData.put_object_data.originalContentLength = partContentLength;
        partData.put_object_data.totalContentLength = todoContentLength;
        partData.put_object_data.totalOriginalContentLength = totalContentLength;
        putProperties.md5 = 0;
        do
        {
            S3_upload_part(&bucketContext, test_key, &putProperties,
                           &putObjectHandler, seq, manager.upload_id,
                           partContentLength,
                           0, 0,
                           &partData);
        } while (S3_status_is_retryable(statusG) && should_retry());
        if (statusG != S3StatusOK)
        {
            printError();
            goto clean;
        }
        contentLength -= MULTIPART_CHUNK_SIZE;
        todoContentLength -= MULTIPART_CHUNK_SIZE;
    }

    int i;
    int size = 0;
    size += growbuffer_append(&(manager.gb), "<CompleteMultipartUpload>",
                              strlen("<CompleteMultipartUpload>"));
    char buf[256];
    int n;
    for (i = 0; i < totalSeq; i++)
    {
        n = snprintf(buf, sizeof(buf), "<Part><PartNumber>%d</PartNumber>"
                                       "<ETag>%s</ETag></Part>",
                     i + 1, manager.etags[i]);
        size += growbuffer_append(&(manager.gb), buf, n);
    }
    size += growbuffer_append(&(manager.gb), "</CompleteMultipartUpload>",
                              strlen("</CompleteMultipartUpload>"));
    manager.remaining = size;

    do
    {
        S3_complete_multipart_upload(&bucketContext, test_key, &commit_handler,
                                     manager.upload_id, manager.remaining,
                                     0, 0, &manager);
    } while (S3_status_is_retryable(statusG) && should_retry());
    if (statusG != S3StatusOK)
    {
        printError();
        goto clean;
    }

clean:
    if (manager.upload_id)
    {
        free(manager.upload_id);
    }
    for (i = 0; i < manager.next_etags_pos; i++)
    {
        free(manager.etags[i]);
    }
    growbuffer_destroy(manager.gb);
    free(manager.etags);
}