Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request Compression #2870

Merged
merged 3 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def default_plugins
'Aws::Plugins::TransferEncoding' => "#{core_plugins}/transfer_encoding.rb",
'Aws::Plugins::HttpChecksum' => "#{core_plugins}/http_checksum.rb",
'Aws::Plugins::ChecksumAlgorithm' => "#{core_plugins}/checksum_algorithm.rb",
'Aws::Plugins::RequestCompression' => "#{core_plugins}/request_compression.rb",
'Aws::Plugins::DefaultsMode' => "#{core_plugins}/defaults_mode.rb",
'Aws::Plugins::RecursionDetection' => "#{core_plugins}/recursion_detection.rb"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,18 @@ def operations
if operation.key?('httpChecksum')
operation['httpChecksum']['requestAlgorithmMember'] = underscore(operation['httpChecksum']['requestAlgorithmMember']) if operation['httpChecksum']['requestAlgorithmMember']
operation['httpChecksum']['requestValidationModeMember'] = underscore(operation['httpChecksum']['requestValidationModeMember']) if operation['httpChecksum']['requestValidationModeMember']

o.http_checksum = operation['httpChecksum'].inject([]) do |a, (k, v)|
a << { key: k.inspect, value: v.inspect }
a
end
end

if operation.key?('requestCompression')
o.request_compression = operation['requestCompression'].each_with_object([]) do | (k,v) , arr|
arr << { key: k.inspect, value: v.inspect }
end
end

%w(input output).each do |key|
if operation[key]
o.shape_references << "o.#{key} = #{operation_ref(operation[key])}"
Expand Down Expand Up @@ -551,6 +557,9 @@ def initialize
# @return [Hash]
attr_accessor :http_checksum

# @return [Hash]
attr_accessor :request_compression

# @return [Array<String>]
attr_accessor :shape_references

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ module {{module_name}}
{{/http_checksum}}
}
{{/http_checksum}}
{{#request_compression}}
o.request_compression = {
{{#request_compression}}
{{{key}}} => {{{value}}},
{{/request_compression}}
}
{{/request_compression}}
{{#deprecated}}
o.deprecated = true
{{/deprecated}}
Expand Down
2 changes: 2 additions & 0 deletions gems/aws-sdk-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
Unreleased Changes
------------------

* Feature - Add support for Request Compression.

3.175.0 (2023-06-15)
------------------

Expand Down
217 changes: 217 additions & 0 deletions gems/aws-sdk-core/lib/aws-sdk-core/plugins/request_compression.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# frozen_string_literal: true

module Aws
module Plugins
# @api private
class RequestCompression < Seahorse::Client::Plugin
DEFAULT_MIN_COMPRESSION_SIZE = 10_240
MIN_COMPRESSION_SIZE_LIMIT = 10_485_760
SUPPORTED_ENCODINGS = %w[gzip].freeze
CHUNK_SIZE = 1 * 1024 * 1024 # one MB

option(
:disable_request_compression,
default: false,
doc_type: 'Boolean',
docstring: <<-DOCS) do |cfg|
When set to 'true' the request body will not be compressed
for supported operations.
DOCS
resolve_disable_request_compression(cfg)
end

option(
:request_min_compression_size_bytes,
default: 10_240,
doc_type: 'Integer',
docstring: <<-DOCS) do |cfg|
The minimum size in bytes that triggers compression for request
bodies. The value must be non-negative integer value between 0
and 10485780 bytes inclusive.
DOCS
resolve_request_min_compression_size_bytes(cfg)
end

def after_initialize(client)
validate_disable_request_compression_input(client.config)
validate_request_min_compression_size_bytes_input(client.config)
end

def validate_disable_request_compression_input(cfg)
unless [true, false].include?(cfg.disable_request_compression)
raise ArgumentError,
'Must provide either `true` or `false` for the '\
'`disable_request_compression` configuration option.'
end
end

def validate_request_min_compression_size_bytes_input(cfg)
value = Integer(cfg.request_min_compression_size_bytes)
unless value.between?(0, MIN_COMPRESSION_SIZE_LIMIT)
raise ArgumentError,
'Must provide a non-negative integer value between '\
'`0` and `10485760` bytes inclusive for the '\
'`request_min_compression_size_bytes` configuration option.'
end
end

def add_handlers(handlers, _config)
# priority set to ensure compression happens BEFORE checksum
handlers.add(CompressionHandler, priority: 16, step: :build)
end

class << self
private

def resolve_disable_request_compression(cfg)
value = ENV['AWS_DISABLE_REQUEST_COMPRESSION'] ||
Aws.shared_config.disable_request_compression(profile: cfg.profile) ||
'false'
Aws::Util.str_2_bool(value)
end

def resolve_request_min_compression_size_bytes(cfg)
value = ENV['AWS_REQUEST_MIN_COMPRESSION_SIZE_BYTES'] ||
Aws.shared_config.request_min_compression_size_bytes(profile: cfg.profile) ||
DEFAULT_MIN_COMPRESSION_SIZE.to_s
Integer(value)
end
end

# @api private
class CompressionHandler < Seahorse::Client::Handler
def call(context)
if should_compress?(context)
selected_encoding = request_encoding_selection(context)
if selected_encoding
if streaming?(context.operation.input)
process_streaming_compression(selected_encoding, context)
elsif context.http_request.body.size >= context.config.request_min_compression_size_bytes
process_compression(selected_encoding, context)
end
end
end
@handler.call(context)
end

private

def request_encoding_selection(context)
encoding_list = context.operation.request_compression['encodings']
encoding_list.find { |encoding| RequestCompression::SUPPORTED_ENCODINGS.include?(encoding) }
end

def update_content_encoding(encoding, context)
headers = context.http_request.headers
if headers['Content-Encoding']
headers['Content-Encoding'] += ',' + encoding
else
headers['Content-Encoding'] = encoding
end
end

def should_compress?(context)
context.operation.request_compression &&
!context.config.disable_request_compression
end

def streaming?(input)
if payload = input[:payload_member] # checking ref and shape
payload['streaming'] || payload.shape['streaming']
else
false
end
end

def process_compression(encoding, context)
case encoding
when 'gzip'
gzip_compress(context)
else
raise StandardError, "We currently do not support #{encoding} encoding"
end
update_content_encoding(encoding, context)
end

def gzip_compress(context)
compressed = StringIO.new
compressed.binmode
gzip_writer = Zlib::GzipWriter.new(compressed)
if context.http_request.body.respond_to?(:read)
update_in_chunks(gzip_writer, context.http_request.body)
else
gzip_writer.write(context.http_request.body)
end
gzip_writer.close
new_body = StringIO.new(compressed.string)
context.http_request.body = new_body
end

def update_in_chunks(compressor, io)
loop do
chunk = io.read(CHUNK_SIZE)
break unless chunk

compressor.write(chunk)
end
end

def process_streaming_compression(encoding, context)
case encoding
when 'gzip'
context.http_request.body = GzipIO.new(context.http_request.body)
else
raise StandardError, "We currently do not support #{encoding} encoding"
end
update_content_encoding(encoding, context)
end

# @api private
class GzipIO
def initialize(body)
@body = body
@buffer = ChunkBuffer.new
@gzip_writer = Zlib::GzipWriter.new(@buffer)
end

def read(length, buff = nil)
if @gzip_writer.closed?
# an empty string to signify an end as
# there will be nothing remaining to be read
StringIO.new('').read(length, buff)
return
end

chunk = @body.read(length)
if !chunk || chunk.empty?
# closing the writer will write one last chunk
# with a trailer (to be read from the @buffer)
@gzip_writer.close
else
# flush happens first to ensure that header fields
# are being sent over since write will override
@gzip_writer.flush
@gzip_writer.write(chunk)
end

StringIO.new(@buffer.last_chunk).read(length, buff)
end
end

# @api private
class ChunkBuffer
def initialize
@last_chunk = nil
end

attr_reader :last_chunk

def write(data)
@last_chunk = data
end
end
end

end
end
end
4 changes: 3 additions & 1 deletion gems/aws-sdk-core/lib/aws-sdk-core/shared_config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ def self.config_reader(*attrs)
:s3_us_east_1_regional_endpoint,
:s3_disable_multiregion_access_points,
:defaults_mode,
:sdk_ua_app_id
:sdk_ua_app_id,
:disable_request_compression,
:request_min_compression_size_bytes
)

private
Expand Down
3 changes: 3 additions & 0 deletions gems/aws-sdk-core/lib/seahorse/model/operation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def initialize
# @return [Hash]
attr_accessor :http_checksum

# @return [Hash]
attr_accessor :request_compression

# @return [Boolean]
attr_accessor :deprecated

Expand Down
Loading
Loading