Skip to content

Commit

Permalink
Added a multipart-copy utility.
Browse files Browse the repository at this point in the history
For example:

    obj = Aws::S3::Object.new('source-bucket', 'source-key')
    source.copy_to(bucket:'target-bucket', key:'target-key', multipart_copy: true)
  • Loading branch information
trevorrowe committed Aug 12, 2015
1 parent b684ad3 commit 8eb8083
Show file tree
Hide file tree
Showing 13 changed files with 571 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def shared_example(json_ex, method_name, operation)
end

def examples_from_disk(method_name, operation)
dir = "examples/#{@service_name.downcase}/client/#{method_name}/*.rb"
dir = "doc-src/examples/#{@service_name.downcase}/client/#{method_name}/*.rb"
Dir.glob(dir).map do |path|
title = File.basename(path).split(/\./).first
title = title.sub(/^\d+_/, '').gsub(/_/, ' ')
Expand Down
12 changes: 12 additions & 0 deletions aws-sdk-resources/features/s3/multipart_copy.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# language: en
@s3 @resources @multipart-copy
Feature: Managed multipart copies

Background:
Given I create a bucket

@slow
Scenario: Copy-to across buckets
Given a "source_bucket" is set in cfg["s3"]["large_object"]["bucket"]
And a "source_key" is set in cfg["s3"]["large_object"]["key"]
Then I should be able to multipart copy the object to a different bucket
7 changes: 7 additions & 0 deletions aws-sdk-resources/features/s3/step_definitions.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,10 @@
kms_key_id: @kms_key_id,
)
end

Then(/^I should be able to multipart copy the object to a different bucket$/) do
target_bucket = @s3.bucket(@bucket_name)
target_object = target_bucket.object("#{@source_key}-copy")
target_object.copy_from("#{@source_bucket}/#{@source_key}", multipart_copy: true)
expect(ApiCallTracker.called_operations).to include(:create_multipart_upload)
end
41 changes: 41 additions & 0 deletions aws-sdk-resources/lib/aws-sdk-resources/documenter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class << self

def apply_customizations
document_s3_object_upload_file_additional_options
document_s3_object_copy_from_options
end

private
Expand Down Expand Up @@ -47,6 +48,46 @@ def document_s3_object_upload_file_additional_options
tags.each { |tag| m.add_tag(tag) }
end

def document_s3_object_copy_from_options
copy_from = YARD::Registry['Aws::S3::Object#copy_from']
copy_to = YARD::Registry['Aws::S3::Object#copy_to']
existing_tags = copy_from.tags
copy_from.docstring = 'Copies another object to this object. Use `multipart_copy: true` for large objects. This is required for objects that exceed 5GB.'
existing_tags.each do |tag|
if tag.tag_name == 'option' && tag.pair.name != ":copy_source"
copy_from.add_tag(tag)
copy_to.add_tag(tag)
end
end
copy_from.add_tag(tag(<<-EXAMPLE))
@example Basic object copy
bucket = Aws::S3::Bucket.new('target-bucket')
object = bucket.object('target-key')
# source as String
object.copy_from('source-bucket/source-key')
# source as Hash
object.copy_from(bucket:'source-bucket', key:'source-key')
# source as Aws::S3::Object
object.copy_from(bucket.object('source-key'))
EXAMPLE

copy_from.add_tag(tag(<<-EXAMPLE))
@example Managed copy of large objects
# uses multipart upload APIs to copy object
object.copy_from('src-bucket/src-key', multipart_copy: true)
EXAMPLE

end

def tag(string)
YARD::DocstringParser.new.parse(string).to_docstring.tags.first
end

end
end
end
Expand Down
2 changes: 2 additions & 0 deletions aws-sdk-resources/lib/aws-sdk-resources/services/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ module S3
autoload :FileUploader, 'aws-sdk-resources/services/s3/file_uploader'
autoload :MultipartFileUploader, 'aws-sdk-resources/services/s3/multipart_file_uploader'
autoload :MultipartUploadError, 'aws-sdk-resources/services/s3/multipart_upload_error'
autoload :ObjectCopier, 'aws-sdk-resources/services/s3/object_copier'
autoload :ObjectMultipartCopier, 'aws-sdk-resources/services/s3/object_multipart_copier'
autoload :PresignedPost, 'aws-sdk-resources/services/s3/presigned_post'

end
Expand Down
57 changes: 57 additions & 0 deletions aws-sdk-resources/lib/aws-sdk-resources/services/s3/object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,63 @@ def presigned_post(options = {})
options.merge(key: key))
end

# @param [S3::Object, String, Hash] source Where to copy object
# data from. `source` must be one of the following:
#
# * {Aws::S3::Object}
# * Hash - with `:bucket` and `:key`
# * String - formatted like `"source-bucket-name/source-key"`
#
# @option options [Boolean] :multipart_copy (false) When `true`,
# the object will be copied using the multipart APIs. This is
# necessary for objects larger than 5GB and can provide
# performance improvements on large objects. Amazon S3 does
# not accept multipart copies for objects smaller than 5MB.
#
# @see #copy_to
#
def copy_from(source, options = {})
if Hash === source && source[:copy_source]
# for backwards compatibility
@client.copy_object(source.merge(bucket: bucket_name, key: key))
else
ObjectCopier.new(self, options).copy_from(source, options)
end
end

# Copies this object to another object. Use `multipart_copy: true`
# for large objects. This is required for objects that exceed 5GB.
#
# @param [S3::Object, String, Hash] target Where to copy the object
# data to. `target` must be one of the following:
#
# * {Aws::S3::Object}
# * Hash - with `:bucket` and `:key`
# * String - formatted like `"target-bucket-name/target-key"`
#
# @example Basic object copy
#
# bucket = Aws::S3::Bucket.new('source-bucket')
# object = bucket.object('source-key')
#
# # target as String
# object.copy_to('target-bucket/target-key')
#
# # target as Hash
# object.copy_to(bucket: 'target-bucket', key: 'target-key')
#
# # target as Aws::S3::Object
# object.copy_to(bucket.object('target-key'))
#
# @example Managed copy of large objects
#
# # uses multipart upload APIs to copy object
# object.copy_to('src-bucket/src-key', multipart_copy: true)
#
def copy_to(target, options = {})
ObjectCopier.new(self, options).copy_to(target, options)
end

end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
require 'thread'

module Aws
module S3
# @api private
class ObjectCopier

# @param [S3::Objecst] object
def initialize(object, options = {})
@object = object
@options = options.merge(client: @object.client)
end

def copy_from(source, options = {})
copy_object(source, @object, options)
end


def copy_to(target, options = {})
copy_object(@object, target, options)
end

private

def copy_object(source, target, options)
target_bucket, target_key = copy_target(target)
options[:bucket] = target_bucket
options[:key] = target_key
options[:copy_source] = copy_source(source)
if options.delete(:multipart_copy)
ObjectMultipartCopier.new(@options).copy(options)
else
@object.client.copy_object(options)
end
end

def copy_source(source)
case source
when String then source
when Hash then "#{source[:bucket]}/#{source[:key]}"
when S3::Object then "#{source.bucket_name}/#{source.key}"
else
msg = "expected source to be an Aws::S3::Object, Hash, or String"
raise ArgumentError, msg
end
end

def copy_target(target)
case target
when String then target.match(/([^\/]+?)\/(.+)/)[1,2]
when Hash then target.values_at(:bucket, :key)
when S3::Object then [target.bucket_name, target.key]
else
msg = "expected target to be an Aws::S3::Object, Hash, or String"
raise ArgumentError, msg
end
end

end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
require 'thread'

module Aws
module S3
# @api private
class ObjectMultipartCopier

FIVE_MB = 5 * 1024 * 1024 # 5MB

FILE_TOO_SMALL = "unable to multipart copy files smaller than 5MB"

MAX_PARTS = 10_000

# @option options [Client] :client
# @option [Integer] :min_part_size (52428800) Size of copied parts.
# Defaults to 50MB.
# will be constructed from the given `options' hash.
# @option [Integer] :thread_count (10) Number of concurrent threads to
# use for copying parts.
def initialize(options = {})
@thread_count = options.delete(:thread_count) || 10
@min_part_size = options.delete(:min_part_size) || (FIVE_MB * 10)
@client = options[:client] || Client.new
end

# @return [Client]
attr_reader :client

# @option (see S3::Client#copy_object)
def copy(options = {})
size = source_size(options)
options[:upload_id] = initiate_upload(options)
begin
parts = copy_parts(size, default_part_size(size), options)
complete_upload(parts, options)
rescue => error
abort_upload(options)
raise error
end
end

private

def initiate_upload(options)
options = options_for(:create_multipart_upload, options)
@client.create_multipart_upload(options).upload_id
end

def copy_parts(size, default_part_size, options)
queue = PartQueue.new(compute_parts(size, default_part_size, options))
threads = []
@thread_count.times do
threads << copy_part_thread(queue)
end
threads.map(&:value).flatten.sort_by{ |part| part[:part_number] }
end

def copy_part_thread(queue)
Thread.new do
begin
completed = []
while part = queue.shift
completed << copy_part(part)
end
completed
rescue => error
queue.clear!
raise error
end
end
end

def copy_part(part)
{
etag: @client.upload_part_copy(part).copy_part_result.etag,
part_number: part[:part_number],
}
end

def complete_upload(parts, options)
options = options_for(:complete_multipart_upload, options)
options[:multipart_upload] = { parts: parts }
@client.complete_multipart_upload(options)
end

def abort_upload(options)
@client.abort_multipart_upload({
bucket: options[:bucket],
key: options[:key],
upload_id: options[:upload_id],
})
end

def compute_parts(size, default_part_size, options)
part_number = 1
offset = 0
parts = []
options = options_for(:upload_part_copy, options)
while offset < size
parts << options.merge({
part_number: part_number,
copy_source_range: byte_range(offset, default_part_size, size),
})
part_number += 1
offset += default_part_size
end
parts
end

def byte_range(offset, default_part_size, size)
if offset + default_part_size < size
"bytes=#{offset}-#{offset + default_part_size - 1}"
else
"bytes=#{offset}-#{size - 1}"
end
end

def source_size(options)
if options[:content_length]
options.delete(:content_length)
else
bucket, key = options[:copy_source].match(/([^\/]+?)\/(.+)/)[1,2]
@client.head_object(bucket:bucket, key:key).content_length
end
end

def default_part_size(source_size)
if source_size < FIVE_MB
raise ArgumentError, FILE_TOO_SMALL
else
[(source_size.to_f / MAX_PARTS).ceil, @min_part_size].max.to_i
end
end

def options_for(operation_name, options)
API_OPTIONS[operation_name].inject({}) do |hash, opt_name|
hash[opt_name] = options[opt_name] if options.key?(opt_name)
hash
end
end

# @api private
def self.options_for(shape_name)
Client.api.metadata['shapes'][shape_name].member_names
end

API_OPTIONS = {
create_multipart_upload: options_for('CreateMultipartUploadRequest'),
upload_part_copy: options_for('UploadPartCopyRequest'),
complete_multipart_upload: options_for('CompleteMultipartUploadRequest'),
}

class PartQueue

def initialize(parts = [])
@parts = parts
@mutex = Mutex.new
end

def shift
@mutex.synchronize { @parts.shift }
end

def clear!
@mutex.synchronize { @parts.clear }
end

end
end
end
end
Loading

0 comments on commit 8eb8083

Please sign in to comment.