Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Use QueryCoordinatorContext for the rewrite in validate API. ([#18272](https://github.com/opensearch-project/OpenSearch/pull/18272))
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
- [repository-s3] Add support for SSE-KMS and S3 bucket owner verification ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))

### Changed
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
Expand All @@ -51,6 +52,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Deprecated

### Removed
- [repository-s3] Removed existing ineffective `server_side_encryption` setting ([#18312](https://github.com/opensearch-project/OpenSearch/pull/18312))

### Fixed
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
Expand Down Expand Up @@ -110,6 +109,7 @@
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.utils.SseKmsUtil.configureEncryptionSettings;

class S3BlobContainer extends AbstractBlobContainer implements AsyncMultiStreamBlobContainer {

Expand All @@ -129,7 +129,13 @@ public boolean blobExists(String blobName) {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivileged(
() -> clientReference.get()
.headObject(HeadObjectRequest.builder().bucket(blobStore.bucket()).key(buildKey(blobName)).build())
.headObject(
HeadObjectRequest.builder()
.bucket(blobStore.bucket())
.key(buildKey(blobName))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build()
)
);
return true;
} catch (NoSuchKeyException e) {
Expand Down Expand Up @@ -214,7 +220,12 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled(),
writeContext.getMetadata()
writeContext.getMetadata(),
blobStore.serverSideEncryptionType(),
blobStore.serverSideEncryptionKmsKey(),
blobStore.serverSideEncryptionBucketKey(),
blobStore.serverSideEncryptionEncryptionContext(),
blobStore.expectedBucketOwner()
);
try {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
Expand Down Expand Up @@ -498,6 +509,7 @@ private ListObjectsV2Request listObjectsRequest(String keyPath) {
.prefix(keyPath)
.delimiter("/")
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();
}

Expand Down Expand Up @@ -534,14 +546,13 @@ void executeSingleUpload(
.contentLength(blobSize)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher));
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().putObjectMetricPublisher))
.expectedBucketOwner(blobStore.expectedBucketOwner());

if (CollectionUtils.isNotEmpty(metadata)) {
putObjectRequestBuilder = putObjectRequestBuilder.metadata(metadata);
}
if (blobStore.serverSideEncryption()) {
putObjectRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
configureEncryptionSettings(putObjectRequestBuilder, blobStore);

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
Expand Down Expand Up @@ -591,15 +602,14 @@ void executeMultipartUpload(
.key(blobName)
.storageClass(blobStore.getStorageClass())
.acl(blobStore.getCannedACL())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector));
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.expectedBucketOwner(blobStore.expectedBucketOwner());

if (CollectionUtils.isNotEmpty(metadata)) {
createMultipartUploadRequestBuilder.metadata(metadata);
}

if (blobStore.serverSideEncryption()) {
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}
configureEncryptionSettings(createMultipartUploadRequestBuilder, blobStore);

final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
Expand Down Expand Up @@ -628,6 +638,7 @@ void executeMultipartUpload(
.partNumber(i)
.contentLength((i < nbParts) ? partSize : lastPartSize)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

bytesCount += uploadPartRequest.contentLength();
Expand All @@ -650,6 +661,7 @@ void executeMultipartUpload(
.uploadId(uploadId.get())
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().multipartUploadMetricCollector))
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

SocketAccess.doPrivilegedVoid(() -> clientReference.get().completeMultipartUpload(completeMultipartUploadRequest));
Expand All @@ -663,6 +675,7 @@ void executeMultipartUpload(
.bucket(bucketName)
.key(blobName)
.uploadId(uploadId.get())
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
SocketAccess.doPrivilegedVoid(() -> clientReference.get().abortMultipartUpload(abortRequest));
Expand Down Expand Up @@ -729,12 +742,14 @@ CompletableFuture<InputStreamContainer> getBlobPartInputStreamContainer(
@Nullable Integer partNumber
) {
final boolean isMultipartObject = partNumber != null;
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder().bucket(bucketName).key(blobKey);
final GetObjectRequest.Builder getObjectRequestBuilder = GetObjectRequest.builder()
.bucket(bucketName)
.key(blobKey)
.expectedBucketOwner(blobStore.expectedBucketOwner());

if (isMultipartObject) {
getObjectRequestBuilder.partNumber(partNumber);
}

return SocketAccess.doPrivileged(
() -> s3AsyncClient.getObject(getObjectRequestBuilder.build(), AsyncResponseTransformer.toBlockingInputStream())
.thenApply(response -> transformResponseToInputStreamContainer(response, isMultipartObject))
Expand Down Expand Up @@ -775,6 +790,7 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A
.bucket(bucketName)
.key(blobName)
.objectAttributes(ObjectAttributes.CHECKSUM, ObjectAttributes.OBJECT_SIZE, ObjectAttributes.OBJECT_PARTS)
.expectedBucketOwner(blobStore.expectedBucketOwner())
.build();

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
Expand All @@ -57,9 +59,13 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.EXPECTED_BUCKET_OWNER_SETTING;
import static org.opensearch.repositories.s3.S3Repository.PERMIT_BACKED_TRANSFER_ENABLED;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_BUCKET_KEY_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_ENCRYPTION_CONTEXT_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_KMS_KEY_SETTING;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_TYPE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

Expand All @@ -81,7 +87,11 @@

private volatile boolean permitBackedTransferEnabled;

private volatile boolean serverSideEncryption;
private volatile String serverSideEncryptionType;
private volatile String serverSideEncryptionKmsKey;
private volatile boolean serverSideEncryptionBucketKey;
private volatile String serverSideEncryptionEncryptionContext;
private volatile String expectedBucketOwner;

private volatile ObjectCannedACL cannedACL;

Expand All @@ -107,7 +117,6 @@
S3AsyncService s3AsyncService,
boolean multipartUploadEnabled,
String bucket,
boolean serverSideEncryption,
ByteSizeValue bufferSize,
String cannedACL,
String storageClass,
Expand All @@ -119,13 +128,17 @@
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ normalPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ,
GenericStatsMetricPublisher genericStatsMetricPublisher
GenericStatsMetricPublisher genericStatsMetricPublisher,
String serverSideEncryptionType,
String serverSideEncryptionKmsKey,
boolean serverSideEncryptionBucketKey,
String serverSideEncryptionEncryptionContext,
String expectedBucketOwner
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
this.multipartUploadEnabled = multipartUploadEnabled;
this.bucket = bucket;
this.serverSideEncryption = serverSideEncryption;
this.bufferSize = bufferSize;
this.cannedACL = initCannedACL(cannedACL);
this.storageClass = initStorageClass(storageClass);
Expand All @@ -142,20 +155,29 @@
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
this.genericStatsMetricPublisher = genericStatsMetricPublisher;
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
this.serverSideEncryptionType = serverSideEncryptionType;
this.serverSideEncryptionKmsKey = serverSideEncryptionKmsKey;
this.serverSideEncryptionBucketKey = serverSideEncryptionBucketKey;
this.serverSideEncryptionEncryptionContext = serverSideEncryptionEncryptionContext;
this.expectedBucketOwner = expectedBucketOwner;
}

@Override
public void reload(RepositoryMetadata repositoryMetadata) {
this.repositoryMetadata = repositoryMetadata;
this.bucket = BUCKET_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryption = SERVER_SIDE_ENCRYPTION_SETTING.get(repositoryMetadata.settings());
this.bufferSize = BUFFER_SIZE_SETTING.get(repositoryMetadata.settings());
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.permitBackedTransferEnabled = PERMIT_BACKED_TRANSFER_ENABLED.get(repositoryMetadata.settings());
this.serverSideEncryptionType = SERVER_SIDE_ENCRYPTION_TYPE_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryptionKmsKey = SERVER_SIDE_ENCRYPTION_KMS_KEY_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryptionBucketKey = SERVER_SIDE_ENCRYPTION_BUCKET_KEY_SETTING.get(repositoryMetadata.settings());
this.serverSideEncryptionEncryptionContext = SERVER_SIDE_ENCRYPTION_ENCRYPTION_CONTEXT_SETTING.get(repositoryMetadata.settings());
this.expectedBucketOwner = EXPECTED_BUCKET_OWNER_SETTING.get(repositoryMetadata.settings());

Check warning on line 180 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L176-L180

Added lines #L176 - L180 were not covered by tests
Comment on lines +176 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jed326 : For my understanding would these settings become update-able?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar yep, a user can change the encryption type / KMS key / encryption context at any time. On the download side these values are read from object metadata rather than user provided, so as long as the calling identity has proper permissions updating these settings won't break anything.

}

@Override
Expand Down Expand Up @@ -191,8 +213,33 @@
return bucket;
}

public boolean serverSideEncryption() {
return serverSideEncryption;
public String serverSideEncryptionType() {
return serverSideEncryptionType;
}

public String serverSideEncryptionKmsKey() {
return serverSideEncryptionKmsKey;
}

public boolean serverSideEncryptionBucketKey() {
return serverSideEncryptionBucketKey;
}

/**
* Returns the SSE encryption context base64 UTF8 encoded, as required by S3 SDK. If the encryption context is empty return
* null as the S3 client ignores null header values
*/
public String serverSideEncryptionEncryptionContext() {
return serverSideEncryptionEncryptionContext.isEmpty()
? null
: Base64.getEncoder().encodeToString(serverSideEncryptionEncryptionContext.getBytes(StandardCharsets.UTF_8));

Check warning on line 235 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L235

Added line #L235 was not covered by tests
}

/**
* Returns the expected bucket owner if set, else null as the S3 client ignores null header values
*/
public String expectedBucketOwner() {
return expectedBucketOwner.isEmpty() ? null : expectedBucketOwner;
}

public long bufferSizeInBytes() {
Expand Down
Loading
Loading