Skip to content

Commit

Permalink
Add PutObject checksums (#1690)
Browse files Browse the repository at this point in the history
Single part, checksums must be done manually for now until we have trailing headers.

For multipart we add CRC32C if we don't already send a checksum and we read the content before sending.

Also serves as test for minio/minio#15433 - but should be backwards compatible.
  • Loading branch information
klauspost authored Sep 16, 2022
1 parent def1174 commit 1803f02
Show file tree
Hide file tree
Showing 9 changed files with 378 additions and 80 deletions.
12 changes: 12 additions & 0 deletions api-datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ type UploadInfo struct {
// not to be confused with `Expires` HTTP header.
Expiration time.Time
ExpirationRuleID string

// Verified checksum values, if any.
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
}

// RestoreInfo contains information of the restore operation of an archived object
Expand Down Expand Up @@ -148,6 +154,12 @@ type ObjectInfo struct {

Restore *RestoreInfo

// Checksum values
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string

// Error
Err error `json:"-"`
}
Expand Down
9 changes: 9 additions & 0 deletions api-get-options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ type GetObjectOptions struct {
ServerSideEncryption encrypt.ServerSide
VersionID string
PartNumber int

// Include any checksums, if object was uploaded with checksum.
// For multipart objects this is a checksum of part checksums.
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
Checksum bool

// To be not used by external applications
Internal AdvancedGetOptions
}
Expand All @@ -60,6 +66,9 @@ func (o GetObjectOptions) Header() http.Header {
if o.Internal.ReplicationProxyRequest != "" {
headers.Set(minIOBucketReplicationProxyRequest, o.Internal.ReplicationProxyRequest)
}
if o.Checksum {
headers.Set("x-amz-checksum-mode", "ENABLED")
}
return headers
}

Expand Down
72 changes: 53 additions & 19 deletions api-put-object-multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/hex"
"encoding/xml"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -79,11 +80,23 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
return UploadInfo{}, err
}

// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256)
if len(hashSums) == 0 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}

// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

defer func() {
if err != nil {
Expand All @@ -100,12 +113,12 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
// Create a buffer.
buf := make([]byte, partSize)

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
for partNumber <= totalPartsCount {
// Choose hash algorithms to be calculated by hashCopyN,
// avoid sha256 with non-v4 signature request or
// HTTPS connection.
hashAlgos, hashSums := c.hashMaterials(opts.SendContentMd5, !opts.DisableContentSha256)

length, rErr := readFull(reader, buf)
if rErr == io.EOF && partNumber > 1 {
break
Expand All @@ -131,18 +144,23 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
md5Base64 string
sha256Hex string
)

if hashSums["md5"] != nil {
md5Base64 = base64.StdEncoding.EncodeToString(hashSums["md5"])
}
if hashSums["sha256"] != nil {
sha256Hex = hex.EncodeToString(hashSums["sha256"])
}
if len(hashSums) == 0 {
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

// Proceed to upload the part.
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber,
md5Base64, sha256Hex, int64(length),
opts.ServerSideEncryption,
!opts.DisableContentSha256)
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down Expand Up @@ -171,15 +189,25 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
})
}

// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
opts = PutObjectOptions{}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}
Expand Down Expand Up @@ -242,9 +270,7 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, object
}

// uploadPart - Uploads a part in a multipart upload.
func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadID string, reader io.Reader,
partNumber int, md5Base64, sha256Hex string, size int64, sse encrypt.ServerSide, streamSha256 bool,
) (ObjectPart, error) {
func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName string, uploadID string, reader io.Reader, partNumber int, md5Base64 string, sha256Hex string, size int64, sse encrypt.ServerSide, streamSha256 bool, customHeader http.Header) (ObjectPart, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
return ObjectPart{}, err
Expand Down Expand Up @@ -273,7 +299,9 @@ func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadI
urlValues.Set("uploadId", uploadID)

// Set encryption headers, if any.
customHeader := make(http.Header)
if customHeader == nil {
customHeader = make(http.Header)
}
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
// Server-side encryption is supported by the S3 Multipart Upload actions.
// Unless you are using a customer-provided encryption key, you don't need
Expand Down Expand Up @@ -306,11 +334,17 @@ func (c *Client) uploadPart(ctx context.Context, bucketName, objectName, uploadI
}
}
// Once successfully uploaded, return completed part.
objPart := ObjectPart{}
h := resp.Header
objPart := ObjectPart{
ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
}
objPart.Size = size
objPart.PartNumber = partNumber
// Trim off the odd double quotes from ETag in the beginning and end.
objPart.ETag = trimEtag(resp.Header.Get("ETag"))
objPart.ETag = trimEtag(h.Get("ETag"))
return objPart, nil
}

Expand Down
108 changes: 66 additions & 42 deletions api-put-object-streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"encoding/base64"
"fmt"
"hash/crc32"
"io"
"net/http"
"net/url"
Expand All @@ -38,9 +39,8 @@ import (
//
// Following code handles these types of readers.
//
// - *minio.Object
// - Any reader which has a method 'ReadAt()'
//
// - *minio.Object
// - Any reader which has a method 'ReadAt()'
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, opts PutObjectOptions,
) (info UploadInfo, err error) {
Expand Down Expand Up @@ -184,12 +184,7 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)

// Proceed to upload the part.
objPart, err := c.uploadPart(ctx, bucketName, objectName,
uploadID, sectionReader, uploadReq.PartNum,
"", "", partSize,
opts.ServerSideEncryption,
!opts.DisableContentSha256,
)
objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, nil)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
Expand Down Expand Up @@ -260,6 +255,13 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return UploadInfo{}, err
}

if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := OptimalPartInfo(size, opts.PartSize)
if err != nil {
Expand All @@ -270,6 +272,7 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

// Aborts the multipart upload if the function returns
// any error, since we do not resume we should purge
Expand All @@ -281,6 +284,14 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
}
}()

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
customHeader := make(http.Header)
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
md5Hash := c.md5Hasher()
defer md5Hash.Close()

// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64

Expand All @@ -292,7 +303,6 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b

// Avoid declaring variables in the for loop
var md5Base64 string
var hookReader io.Reader

// Part number always starts with '1'.
var partNumber int
Expand All @@ -303,37 +313,34 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
partSize = lastPartSize
}

if opts.SendContentMd5 {
length, rerr := readFull(reader, buf)
if rerr == io.EOF && partNumber > 1 {
break
}

if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
return UploadInfo{}, rerr
}
length, rerr := readFull(reader, buf)
if rerr == io.EOF && partNumber > 1 {
break
}

// Calculate md5sum.
hash := c.md5Hasher()
hash.Write(buf[:length])
md5Base64 = base64.StdEncoding.EncodeToString(hash.Sum(nil))
hash.Close()
if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
return UploadInfo{}, rerr
}

// Update progress reader appropriately to the latest offset
// as we read from the source.
hookReader = newHook(bytes.NewReader(buf[:length]), opts.Progress)
// Calculate md5sum.
if opts.SendContentMd5 {
md5Hash.Reset()
md5Hash.Write(buf[:length])
md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
} else {
// Update progress reader appropriately to the latest offset
// as we read from the source.
hookReader = newHook(reader, opts.Progress)
// Add CRC32C instead.
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID,
io.LimitReader(hookReader, partSize),
partNumber, md5Base64, "", partSize,
opts.ServerSideEncryption,
!opts.DisableContentSha256,
)
// Update progress reader appropriately to the latest offset
// as we read from the source.
hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)

objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, hooked, partNumber, md5Base64, "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down Expand Up @@ -363,15 +370,26 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
})
}

// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
opts = PutObjectOptions{}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}
Expand Down Expand Up @@ -490,14 +508,20 @@ func (c *Client) putObjectDo(ctx context.Context, bucketName, objectName string,

// extract lifecycle expiry date and rule ID
expTime, ruleID := amzExpirationToExpiryDateRuleID(resp.Header.Get(amzExpiration))

h := resp.Header
return UploadInfo{
Bucket: bucketName,
Key: objectName,
ETag: trimEtag(resp.Header.Get("ETag")),
VersionID: resp.Header.Get(amzVersionID),
ETag: trimEtag(h.Get("ETag")),
VersionID: h.Get(amzVersionID),
Size: size,
Expiration: expTime,
ExpirationRuleID: ruleID,

// Checksum values
ChecksumCRC32: h.Get("x-amz-checksum-crc32"),
ChecksumCRC32C: h.Get("x-amz-checksum-crc32c"),
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
}, nil
}
Loading

0 comments on commit 1803f02

Please sign in to comment.