Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions include/aws/s3/private/s3_checksum_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "aws/s3/s3_client.h"
#include <aws/common/byte_buf.h>
#include <aws/common/mutex.h>
#include <aws/common/ref_count.h>

struct aws_s3_meta_request_checksum_config_storage;
Expand All @@ -28,9 +29,14 @@ struct aws_s3_upload_request_checksum_context {
enum aws_s3_checksum_algorithm algorithm;
enum aws_s3_checksum_location location;

struct aws_byte_buf base64_checksum;
/* The checksum already be calculated or not. */
bool checksum_calculated;
struct {
/* Note: don't directly access the synced_data. */
/* Lock to make sure the checksum context is safe to be access from different threads. */
struct aws_mutex lock;
struct aws_byte_buf base64_checksum;
/* The checksum already be calculated or not. */
bool checksum_calculated;
} synced_data;

/* Validation */
size_t encoded_checksum_size;
Expand Down Expand Up @@ -96,8 +102,7 @@ struct aws_s3_upload_request_checksum_context *aws_s3_upload_request_checksum_co
* @return true if checksum calculation is needed, false otherwise
*/
AWS_S3_API
bool aws_s3_upload_request_checksum_context_should_calculate(
const struct aws_s3_upload_request_checksum_context *context);
bool aws_s3_upload_request_checksum_context_should_calculate(struct aws_s3_upload_request_checksum_context *context);

/**
* Check if checksum should be added to HTTP headers.
Expand All @@ -122,15 +127,18 @@ bool aws_s3_upload_request_checksum_context_should_add_trailer(
const struct aws_s3_upload_request_checksum_context *context);

/**
* Get the checksum buffer to use for output.
* Returns the internal buffer for storing the calculated checksum.
* Encode the checksum to base64 and store it in the context.
* This function is thread-safe and can be called from multiple threads.
* Returns AWS_OP_SUCCESS on success, AWS_OP_ERR otherwise
*
* @param context The checksum context
* @return Pointer to the checksum buffer, or NULL if context is invalid
* @param raw_checksum_cursor the byte cursor to the raw checksum value.
* @return AWS_OP_SUCCESS on success, AWS_OP_ERR otherwise
*/
AWS_S3_API
struct aws_byte_buf *aws_s3_upload_request_checksum_context_get_output_buffer(
struct aws_s3_upload_request_checksum_context *context);
int aws_s3_upload_request_checksum_context_finalize_checksum(
struct aws_s3_upload_request_checksum_context *context,
struct aws_byte_cursor raw_checksum_cursor);

/**
* Get a cursor to the current base64 encoded checksum value (for use in headers/trailers).
Expand All @@ -141,7 +149,7 @@ struct aws_byte_buf *aws_s3_upload_request_checksum_context_get_output_buffer(
*/
AWS_S3_API
struct aws_byte_cursor aws_s3_upload_request_checksum_context_get_checksum_cursor(
const struct aws_s3_upload_request_checksum_context *context);
struct aws_s3_upload_request_checksum_context *context);

AWS_EXTERN_C_END

Expand Down
24 changes: 18 additions & 6 deletions include/aws/s3/private/s3_checksums.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ struct aws_s3_meta_request_checksum_config_storage {
};

/**
* a stream that takes in a stream, computes a running checksum as it is read, and outputs the checksum when the stream
* is destroyed.
* a stream that takes in a stream
* Note: seek this stream will immediately fail, as it would prevent an accurate calculation of the
* checksum.
*
Expand All @@ -72,15 +71,28 @@ struct aws_s3_meta_request_checksum_config_storage {
* outputs the checksum of existing stream to checksum_output upon destruction. Will be kept
* alive by the checksum stream
* @param algorithm Checksum algorithm to use.
* @param checksum_output Checksum of the `existing_stream`, owned by caller, which will be calculated when this stream
* is destroyed.
*/
AWS_S3_API
struct aws_input_stream *aws_checksum_stream_new(
struct aws_allocator *allocator,
struct aws_input_stream *existing_stream,
enum aws_s3_checksum_algorithm algorithm,
struct aws_byte_buf *checksum_output);
enum aws_s3_checksum_algorithm algorithm);

/**
* Finalize the checksum has read so far to the output checksum buf with base64 encoding.
* Not thread safe.
*/
AWS_S3_API
int aws_checksum_stream_finalize_checksum(struct aws_input_stream *checksum_stream, struct aws_byte_buf *checksum_buf);

/**
* Finalize the checksum has read so far to the checksum context.
* Not thread safe.
*/
AWS_S3_API
int aws_checksum_stream_finalize_checksum_context(
struct aws_input_stream *checksum_stream,
struct aws_s3_upload_request_checksum_context *checksum_context);

/**
* TODO: properly support chunked encoding.
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/private/s3_client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ struct aws_s3_client_vtable {
struct aws_http_connection *client_connection,
const struct aws_http_make_request_options *options);

void (*after_prepare_upload_part_finish)(struct aws_s3_request *request);
void (*after_prepare_upload_part_finish)(struct aws_s3_request *request, struct aws_http_message *message);
};

struct aws_s3_upload_part_timeout_stats {
Expand Down
2 changes: 1 addition & 1 deletion include/aws/s3/s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ struct aws_s3_meta_request_options {
* Optional.
* Callback for reviewing an upload before it completes.
* WARNING: experimental/unstable
* See `aws_s3_upload_review_fn`
* See `aws_s3_meta_request_upload_review_fn`
*/
aws_s3_meta_request_upload_review_fn *upload_review_callback;

Expand Down
34 changes: 18 additions & 16 deletions source/s3_auto_ranged_put.c
Original file line number Diff line number Diff line change
Expand Up @@ -1099,15 +1099,20 @@ static void s_s3_prepare_upload_part_on_read_done(void *user_data) {
goto on_done;
}
struct aws_s3_upload_request_checksum_context *context = previously_uploaded_info->checksum_context;
/* if previously uploaded part had a checksum, compare it to what we just skipped */
if (context != NULL && context->checksum_calculated == true &&
s_verify_part_matches_checksum(
meta_request->allocator,
aws_byte_cursor_from_buf(&request->request_body),
meta_request->checksum_config.checksum_algorithm,
aws_byte_cursor_from_buf(&context->base64_checksum))) {
error_code = aws_last_error_or_unknown();
goto on_done;
if (context) {
if (!aws_s3_upload_request_checksum_context_should_calculate(context)) {
struct aws_byte_cursor previous_calculated_checksum =
aws_s3_upload_request_checksum_context_get_checksum_cursor(context);
/* if previously uploaded part had a checksum, compare it to what we just skipped */
if (s_verify_part_matches_checksum(
meta_request->allocator,
aws_byte_cursor_from_buf(&request->request_body),
meta_request->checksum_config.checksum_algorithm,
previous_calculated_checksum) != AWS_OP_SUCCESS) {
error_code = aws_last_error_or_unknown();
goto on_done;
}
}
}
}

Expand Down Expand Up @@ -1157,9 +1162,6 @@ static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_jo
checksum_context = part->checksum_context;
/* If checksum already calculated, it means either the part being retried or the part resumed from list
* parts. Keep reusing the old checksum in case of the request body in memory mangled */
AWS_ASSERT(
!checksum_context->checksum_calculated || request->num_times_prepared > 0 ||
auto_ranged_put->resume_token != NULL);
aws_s3_meta_request_unlock_synced_data(meta_request);
}
/* END CRITICAL SECTION */
Expand All @@ -1184,15 +1186,15 @@ static void s_s3_prepare_upload_part_finish(struct aws_s3_prepare_upload_part_jo
aws_future_http_message_set_error(part_prep->on_complete, aws_last_error());
goto on_done;
}
if (client->vtable->after_prepare_upload_part_finish) {
/* TEST ONLY, allow test to stub here. */
client->vtable->after_prepare_upload_part_finish(request, message);
}

/* Success! */
aws_future_http_message_set_result_by_move(part_prep->on_complete, &message);

on_done:
if (client->vtable->after_prepare_upload_part_finish) {
/* TEST ONLY, allow test to stub here. */
client->vtable->after_prepare_upload_part_finish(request);
}
AWS_FATAL_ASSERT(aws_future_http_message_is_done(part_prep->on_complete));
aws_future_bool_release(part_prep->asyncstep_read_part);
aws_future_http_message_release(part_prep->on_complete);
Expand Down
72 changes: 58 additions & 14 deletions source/s3_checksum_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,18 @@
#include <aws/common/encoding.h>
#include <aws/common/logging.h>

static void s_lock_synced_data(struct aws_s3_upload_request_checksum_context *context) {
aws_mutex_lock(&context->synced_data.lock);
}

static void s_unlock_synced_data(struct aws_s3_upload_request_checksum_context *context) {
aws_mutex_unlock(&context->synced_data.lock);
}

static void s_aws_s3_upload_request_checksum_context_destroy(void *context) {
struct aws_s3_upload_request_checksum_context *checksum_context = context;
aws_byte_buf_clean_up(&checksum_context->base64_checksum);
aws_byte_buf_clean_up(&checksum_context->synced_data.base64_checksum);
aws_mutex_clean_up(&checksum_context->synced_data.lock);
aws_mem_release(checksum_context->allocator, checksum_context);
}

Expand All @@ -24,6 +33,10 @@ static struct aws_s3_upload_request_checksum_context *s_s3_upload_request_checks

aws_ref_count_init(&context->ref_count, context, s_aws_s3_upload_request_checksum_context_destroy);
context->allocator = allocator;
if (aws_mutex_init(&context->synced_data.lock)) {
aws_s3_upload_request_checksum_context_release(context);
return NULL;
}
/* Handle case where no checksum config is provided */
if (!checksum_config || checksum_config->checksum_algorithm == AWS_SCA_NONE) {
context->algorithm = AWS_SCA_NONE;
Expand Down Expand Up @@ -54,7 +67,7 @@ struct aws_s3_upload_request_checksum_context *aws_s3_upload_request_checksum_co
s_s3_upload_request_checksum_context_new_base(allocator, checksum_config);
if (context && context->encoded_checksum_size > 0) {
/* Initial the buffer for checksum */
aws_byte_buf_init(&context->base64_checksum, allocator, context->encoded_checksum_size);
aws_byte_buf_init(&context->synced_data.base64_checksum, allocator, context->encoded_checksum_size);
}
return context;
}
Expand All @@ -79,8 +92,8 @@ struct aws_s3_upload_request_checksum_context *aws_s3_upload_request_checksum_co
aws_s3_upload_request_checksum_context_release(context);
return NULL;
}
aws_byte_buf_init_copy_from_cursor(&context->base64_checksum, allocator, existing_base64_checksum);
context->checksum_calculated = true;
aws_byte_buf_init_copy_from_cursor(&context->synced_data.base64_checksum, allocator, existing_base64_checksum);
context->synced_data.checksum_calculated = true;
}
return context;
}
Expand All @@ -101,14 +114,18 @@ struct aws_s3_upload_request_checksum_context *aws_s3_upload_request_checksum_co
return NULL;
}

bool aws_s3_upload_request_checksum_context_should_calculate(
const struct aws_s3_upload_request_checksum_context *context) {
bool aws_s3_upload_request_checksum_context_should_calculate(struct aws_s3_upload_request_checksum_context *context) {
if (!context || context->algorithm == AWS_SCA_NONE) {
return false;
}

bool should_calculate = false;
s_lock_synced_data(context);
/* If not previous calculated */
return !context->checksum_calculated;
should_calculate = !context->synced_data.checksum_calculated;
s_unlock_synced_data(context);

return should_calculate;
}

bool aws_s3_upload_request_checksum_context_should_add_header(
Expand All @@ -129,19 +146,46 @@ bool aws_s3_upload_request_checksum_context_should_add_trailer(
return context->location == AWS_SCL_TRAILER && context->algorithm != AWS_SCA_NONE;
}

struct aws_byte_buf *aws_s3_upload_request_checksum_context_get_output_buffer(
struct aws_s3_upload_request_checksum_context *context) {
int aws_s3_upload_request_checksum_context_finalize_checksum(
struct aws_s3_upload_request_checksum_context *context,
struct aws_byte_cursor raw_checksum_cursor) {
if (!context) {
return NULL;
return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
}
return &context->base64_checksum;
s_lock_synced_data(context);
/* If not previous calculated */
if (!context->synced_data.checksum_calculated) {
AWS_ASSERT(context->synced_data.base64_checksum.len == 0);

if (aws_base64_encode(&raw_checksum_cursor, &context->synced_data.base64_checksum)) {
aws_byte_buf_reset(&context->synced_data.base64_checksum, false);
AWS_LOGF_ERROR(
AWS_LS_S3_GENERAL,
"Failed to base64 encode for the checksum. Raw checksum length: %zu. Output buffer capacity: %zu "
"length %zu",
raw_checksum_cursor.len,
context->synced_data.base64_checksum.capacity,
context->synced_data.base64_checksum.len);
s_unlock_synced_data(context);
return AWS_OP_ERR;
}
context->synced_data.checksum_calculated = true;
}
s_unlock_synced_data(context);
return AWS_OP_SUCCESS;
}

struct aws_byte_cursor aws_s3_upload_request_checksum_context_get_checksum_cursor(
const struct aws_s3_upload_request_checksum_context *context) {
struct aws_s3_upload_request_checksum_context *context) {
struct aws_byte_cursor checksum_cursor = {0};
if (!context || !context->checksum_calculated) {
if (!context) {
return checksum_cursor;
}
return aws_byte_cursor_from_buf(&context->base64_checksum);
s_lock_synced_data(context);
/* If not previous calculated */
if (context->synced_data.checksum_calculated) {
checksum_cursor = aws_byte_cursor_from_buf(&context->synced_data.base64_checksum);
}
s_unlock_synced_data(context);
return checksum_cursor;
}
Loading
Loading