Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix duplicated bytes in streaming retries #2693

Merged
merged 4 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gems/aws-sdk-s3/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Issue - Rewind the underlying file on a streaming retry that is not a truncated body (#2692).

1.113.0 (2022-02-24)
------------------

Expand Down
26 changes: 24 additions & 2 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/plugins/streaming_retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,17 @@ def truncate(_integer); end
def rewind; end
end

class NonRetryableStreamingError < StandardError

def initialize(error)
super('Unable to retry request - retry could result in processing duplicated chunks.')
set_backtrace(error.backtrace)
@original_error = error
end

attr_reader :original_error
end

# This handler works with the ResponseTarget plugin to provide smart
# retries of S3 streaming operations that support the range parameter
# (currently only: get_object). When a 200 OK with a TruncatedBodyError
Expand Down Expand Up @@ -84,8 +95,19 @@ def add_event_listeners(context, target)
end

context.http_response.on_error do |error|
if retryable_body?(context) && truncated_body?(error)
context.http_request.headers[:range] = "bytes=#{context.http_response.body.size}-"
puts context.http_response.body

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to stdout? Wasn't sure if this was intentional or left over from debugging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch, will remove!

if retryable_body?(context)
if truncated_body?(error)
context.http_request.headers[:range] = "bytes=#{context.http_response.body.size}-"
else
case context.http_response.body
when RetryableManagedFile
# call rewind on the underlying file
context.http_response.body.instance_variable_get(:@file).rewind
else
raise NonRetryableStreamingError, error
end
end
end
end
end
Expand Down
28 changes: 28 additions & 0 deletions gems/aws-sdk-s3/spec/plugins/streaming_retry_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
module Aws
module S3
module Plugins

describe StreamingRetry do
let(:creds) { Aws::Credentials.new('akid', 'secret') }
let(:client) { S3::Client.new(region: 'us-east-1', credentials: creds, retry_base_delay: 0.001) }
Expand Down Expand Up @@ -146,6 +147,33 @@ def stream_to_output
output.rewind
expect(File.read(tempfile.path)).to eq(parts[0])
end

it 'rewinds the underlying file on non-truncated errors' do
parts = %w[data_part_1 part2 longer_data_at_end_part3]
full_test_data = parts.join

first_call = true
allow_any_instance_of(Seahorse::Client::NetHttp::Handler).to receive(:transmit) do |s, config, req, resp|
if first_call

resp.signal_headers(200, {})

underlying_file = resp.body.instance_variable_get(:@file)
expect(underlying_file).to receive(:rewind).and_call_original

resp.signal_data(parts[0])
resp.signal_error(Seahorse::Client::NetworkingError.new(SocketError.new))
first_call = false
else
resp.signal_headers(200, {})
resp.signal_data(full_test_data)
resp.signal_done
end
end

client.get_object(request)
expect(File.read(tempfile.path)).to eq(full_test_data)
end
end
end
end
Expand Down