5353import software .amazon .awssdk .services .s3 .model .NoSuchKeyException ;
5454import software .amazon .awssdk .services .s3 .model .ObjectAttributes ;
5555import software .amazon .awssdk .services .s3 .model .PutObjectRequest ;
56- import software .amazon .awssdk .services .s3 .model .ServerSideEncryption ;
5756import software .amazon .awssdk .services .s3 .model .UploadPartRequest ;
5857import software .amazon .awssdk .services .s3 .model .UploadPartResponse ;
5958import software .amazon .awssdk .services .s3 .paginators .ListObjectsV2Iterable ;
110109import static org .opensearch .repositories .s3 .S3Repository .MAX_FILE_SIZE ;
111110import static org .opensearch .repositories .s3 .S3Repository .MAX_FILE_SIZE_USING_MULTIPART ;
112111import static org .opensearch .repositories .s3 .S3Repository .MIN_PART_SIZE_USING_MULTIPART ;
112+ import static org .opensearch .repositories .s3 .utils .SseKmsUtil .configureEncryptionSettings ;
113113
114114class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer {
115115
@@ -129,7 +129,13 @@ public boolean blobExists(String blobName) {
129129 try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
130130 SocketAccess .doPrivileged (
131131 () -> clientReference .get ()
132- .headObject (HeadObjectRequest .builder ().bucket (blobStore .bucket ()).key (buildKey (blobName )).build ())
132+ .headObject (
133+ HeadObjectRequest .builder ()
134+ .bucket (blobStore .bucket ())
135+ .key (buildKey (blobName ))
136+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
137+ .build ()
138+ )
133139 );
134140 return true ;
135141 } catch (NoSuchKeyException e ) {
@@ -214,7 +220,12 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
214220 writeContext .doRemoteDataIntegrityCheck (),
215221 writeContext .getExpectedChecksum (),
216222 blobStore .isUploadRetryEnabled (),
217- writeContext .getMetadata ()
223+ writeContext .getMetadata (),
224+ blobStore .serverSideEncryptionType (),
225+ blobStore .serverSideEncryptionKmsKey (),
226+ blobStore .serverSideEncryptionBucketKey (),
227+ blobStore .serverSideEncryptionEncryptionContext (),
228+ blobStore .expectedBucketOwner ()
218229 );
219230 try {
220231 // If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
@@ -498,6 +509,7 @@ private ListObjectsV2Request listObjectsRequest(String keyPath) {
498509 .prefix (keyPath )
499510 .delimiter ("/" )
500511 .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().listObjectsMetricPublisher ))
512+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
501513 .build ();
502514 }
503515
@@ -534,14 +546,13 @@ void executeSingleUpload(
534546 .contentLength (blobSize )
535547 .storageClass (blobStore .getStorageClass ())
536548 .acl (blobStore .getCannedACL ())
537- .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().putObjectMetricPublisher ));
549+ .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().putObjectMetricPublisher ))
550+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
538551
539552 if (CollectionUtils .isNotEmpty (metadata )) {
540553 putObjectRequestBuilder = putObjectRequestBuilder .metadata (metadata );
541554 }
542- if (blobStore .serverSideEncryption ()) {
543- putObjectRequestBuilder .serverSideEncryption (ServerSideEncryption .AES256 );
544- }
555+ configureEncryptionSettings (putObjectRequestBuilder , blobStore );
545556
546557 PutObjectRequest putObjectRequest = putObjectRequestBuilder .build ();
547558 try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
@@ -591,15 +602,14 @@ void executeMultipartUpload(
591602 .key (blobName )
592603 .storageClass (blobStore .getStorageClass ())
593604 .acl (blobStore .getCannedACL ())
594- .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ));
605+ .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ))
606+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
595607
596608 if (CollectionUtils .isNotEmpty (metadata )) {
597609 createMultipartUploadRequestBuilder .metadata (metadata );
598610 }
599611
600- if (blobStore .serverSideEncryption ()) {
601- createMultipartUploadRequestBuilder .serverSideEncryption (ServerSideEncryption .AES256 );
602- }
612+ configureEncryptionSettings (createMultipartUploadRequestBuilder , blobStore );
603613
604614 final InputStream requestInputStream ;
605615 if (blobStore .isUploadRetryEnabled ()) {
@@ -628,6 +638,7 @@ void executeMultipartUpload(
628638 .partNumber (i )
629639 .contentLength ((i < nbParts ) ? partSize : lastPartSize )
630640 .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ))
641+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
631642 .build ();
632643
633644 bytesCount += uploadPartRequest .contentLength ();
@@ -650,6 +661,7 @@ void executeMultipartUpload(
650661 .uploadId (uploadId .get ())
651662 .multipartUpload (CompletedMultipartUpload .builder ().parts (parts ).build ())
652663 .overrideConfiguration (o -> o .addMetricPublisher (blobStore .getStatsMetricPublisher ().multipartUploadMetricCollector ))
664+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
653665 .build ();
654666
655667 SocketAccess .doPrivilegedVoid (() -> clientReference .get ().completeMultipartUpload (completeMultipartUploadRequest ));
@@ -663,6 +675,7 @@ void executeMultipartUpload(
663675 .bucket (bucketName )
664676 .key (blobName )
665677 .uploadId (uploadId .get ())
678+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
666679 .build ();
667680 try (AmazonS3Reference clientReference = blobStore .clientReference ()) {
668681 SocketAccess .doPrivilegedVoid (() -> clientReference .get ().abortMultipartUpload (abortRequest ));
@@ -729,12 +742,14 @@ CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
729742 @ Nullable Integer partNumber
730743 ) {
731744 final boolean isMultipartObject = partNumber != null ;
732- final GetObjectRequest .Builder getObjectRequestBuilder = GetObjectRequest .builder ().bucket (bucketName ).key (blobKey );
745+ final GetObjectRequest .Builder getObjectRequestBuilder = GetObjectRequest .builder ()
746+ .bucket (bucketName )
747+ .key (blobKey )
748+ .expectedBucketOwner (blobStore .expectedBucketOwner ());
733749
734750 if (isMultipartObject ) {
735751 getObjectRequestBuilder .partNumber (partNumber );
736752 }
737-
738753 return SocketAccess .doPrivileged (
739754 () -> s3AsyncClient .getObject (getObjectRequestBuilder .build (), AsyncResponseTransformer .toBlockingInputStream ())
740755 .thenApply (response -> transformResponseToInputStreamContainer (response , isMultipartObject ))
@@ -775,6 +790,7 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A
775790 .bucket (bucketName )
776791 .key (blobName )
777792 .objectAttributes (ObjectAttributes .CHECKSUM , ObjectAttributes .OBJECT_SIZE , ObjectAttributes .OBJECT_PARTS )
793+ .expectedBucketOwner (blobStore .expectedBucketOwner ())
778794 .build ();
779795
780796 return SocketAccess .doPrivileged (() -> s3AsyncClient .getObjectAttributes (getObjectAttributesRequest ));
0 commit comments