diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java index 044793c..62c9add 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSink.java @@ -60,7 +60,7 @@ protected StreamResult transferParts(List parts) { var partNumber = 1; byte[] bytesChunk = input.readNBytes(chunkSize); - while (bytesChunk.length > 0) { + do { completedParts.add(CompletedPart.builder().partNumber(partNumber) .eTag(client.uploadPart(UploadPartRequest.builder() .bucket(bucketName) @@ -70,7 +70,7 @@ protected StreamResult transferParts(List parts) { .build(), RequestBody.fromByteBuffer(ByteBuffer.wrap(bytesChunk))).eTag()).build()); bytesChunk = input.readNBytes(chunkSize); partNumber++; - } + } while (bytesChunk.length > 0); client.completeMultipartUpload(CompleteMultipartUploadRequest.builder() .bucket(bucketName) @@ -92,12 +92,16 @@ protected StreamResult transferParts(List parts) { } private String getDestinationObjectName(String partName, int partsSize) { - var name = (partsSize == 1 && !StringUtils.isNullOrEmpty(objectName)) ? objectName : partName; + var name = useObjectName(partsSize) ? objectName : partName; if (!StringUtils.isNullOrEmpty(folderName)) { return folderName.endsWith("/") ? folderName + name : folderName + "/" + name; - } else { - return name; } + + return name; + } + + private boolean useObjectName(int partsSize) { + return partsSize == 1 && !StringUtils.isNullOrEmpty(objectName); } @NotNull diff --git a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java index a8b4d7c..b319ad6 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java +++ b/extensions/data-plane/data-plane-aws-s3/src/main/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSource.java @@ -28,6 +28,8 @@ import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR; @@ -47,6 +49,8 @@ class S3DataSource implements DataSource { private S3Client client; private Monitor monitor; + private final Predicate isFile = object -> !object.key().endsWith("/"); + private S3DataSource() { } @@ -107,7 +111,7 @@ private List fetchPrefixedS3Objects() { var response = client.listObjectsV2(listObjectsRequest); - s3Objects.addAll(response.contents()); + s3Objects.addAll(response.contents().stream().filter(isFile).collect(Collectors.toList())); continuationToken = response.nextContinuationToken(); @@ -117,7 +121,7 @@ private List fetchPrefixedS3Objects() { } @Override - public void close() throws Exception { + public void close() { client.close(); } diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataPlaneIntegrationTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataPlaneIntegrationTest.java index 31dc80a..67fab69 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataPlaneIntegrationTest.java +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataPlaneIntegrationTest.java @@ -35,9 +35,11 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import java.util.List; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.stream.Stream; @@ -105,6 +107,12 @@ void should_copy_using_destination_object_name_case_single_transfer(List var objectNameInDestination = "object-name-in-destination"; var objectContent = UUID.randomUUID().toString(); + //Put folder 0 byte size file marker. AWS does this when a folder is created via the console. + if (!isSingleObject) { + sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, ""); + sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", ""); + } + for (var objectName : objectNames) { sourceClient.putStringOnBucket(sourceBucketName, objectName, objectContent); } @@ -152,6 +160,11 @@ void should_copy_using_destination_object_name_case_single_transfer(List .extracting(Long::intValue) .isEqualTo(objectContent.length()); } + + assertThat(destinationClient.getObject(destinationBucketName, + OBJECT_PREFIX)).failsWithin(5, SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(NoSuchKeyException.class); } } @@ -164,6 +177,12 @@ void should_copy_to_folder_case_property_is_present(List objectNames) { var folderNameInDestination = "folder-name-in-destination/"; var objectBody = UUID.randomUUID().toString(); + //Put folder 0 byte size file marker. AWS does this when a folder is created via the console. + if (!isSingleObject) { + sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, ""); + sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", ""); + } + for (var objectToTransfer : objectNames) { sourceClient.putStringOnBucket(sourceBucketName, objectToTransfer, objectBody); } @@ -212,7 +231,13 @@ void should_copy_to_folder_case_property_is_present(List objectNames) { .extracting(Long::intValue) .isEqualTo(objectBody.length()); } + assertThat(destinationClient.getObject(destinationBucketName, folderNameInDestination + + OBJECT_PREFIX)).failsWithin(5, SECONDS) + .withThrowableOfType(ExecutionException.class) + .withCauseInstanceOf(NoSuchKeyException.class); } + + } private DataAddress createDataAddress(List assetNames, boolean isSingleObject) { diff --git a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkTest.java b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkTest.java index fd818d2..021437d 100644 --- a/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkTest.java +++ b/extensions/data-plane/data-plane-aws-s3/src/test/java/org/eclipse/edc/connector/dataplane/aws/s3/S3DataSinkTest.java @@ -26,7 +26,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ArgumentsProvider; import org.junit.jupiter.params.provider.ArgumentsSource; -import org.junit.platform.commons.util.StringUtils; import org.mockito.ArgumentCaptor; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3Client; @@ -85,8 +84,8 @@ void setup() { } @ParameterizedTest - @ArgumentsSource(SinglePartsInputs.class) - void transferParts_singlePart_succeeds(List inputStream) { + @ArgumentsSource(PartsInputs.class) + void transferParts_succeeds(List inputStream, int expectedPartsPerObject) { var isSingleObject = inputStream.size() == 1; var result = dataSink.transferParts(inputStream); @@ -94,28 +93,14 @@ void transferParts_singlePart_succeeds(List inputStream) { verify(s3ClientMock, times(inputStream.size())).completeMultipartUpload(completeMultipartUploadRequestCaptor .capture()); - var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue(); - assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME); - assertThat(completeMultipartUploadRequest.key()) - .isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME); - assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(1); - } - - @ParameterizedTest - @ArgumentsSource(MultiPartsInputs.class) - void transferParts_multiPart_succeeds(List inputStream) { - var isSingleObject = inputStream.size() == 1; - - var result = dataSink.transferParts(inputStream); + var completeMultipartUploadRequests = completeMultipartUploadRequestCaptor.getAllValues(); - assertThat(result.succeeded()).isTrue(); - verify(s3ClientMock, times(inputStream.size())) - .completeMultipartUpload(completeMultipartUploadRequestCaptor.capture()); - var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue(); - assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME); - assertThat(completeMultipartUploadRequest.key()) - .isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME); - assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(2); + for (var request : completeMultipartUploadRequests) { + assertThat(request.bucket()).isEqualTo(BUCKET_NAME); + assertThat(request.key()) + .isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME); + assertThat(request.multipartUpload().parts()).hasSize(expectedPartsPerObject); + } } @Test @@ -134,7 +119,7 @@ void transferParts_failed_to_download() { } @ParameterizedTest - @ArgumentsSource(MultiPartsInputs.class) + @ArgumentsSource(PartsInputs.class) void transferParts_fails_to_upload(List inputStream) { var isSingleObject = inputStream.size() == 1; @@ -162,25 +147,31 @@ void transferParts_fails_to_upload(List inputStream) { assertThat(result.getFailureDetail()).isEqualTo(expectedMessage); } - private static class SinglePartsInputs implements ArgumentsProvider { + private static class PartsInputs implements ArgumentsProvider { @Override public Stream provideArguments(ExtensionContext context) { - var content = "content smaller than a chunk size"; - return Stream.of(Arguments.of(List.of(createDataSource(content))), - Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content))) - ); + var emptyContent = ""; + var smallContent = "content smaller than a chunk size"; + var bigContent = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload"; + return Stream.of( + Arguments.of( + List.of(createDataSource(emptyContent)), 1), + Arguments.of( + List.of(createDataSource(smallContent)), 1), + Arguments.of( + List.of(createDataSource(bigContent)), 2), + Arguments.of( + List.of(createDataSource(emptyContent), createDataSource(smallContent)), 1), + Arguments.of( + List.of(createDataSource(bigContent), createDataSource(bigContent)), 2)); } } - private static class MultiPartsInputs implements ArgumentsProvider { - @Override - public Stream provideArguments(ExtensionContext context) { - var content = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload"; - return Stream.of(Arguments.of(List.of(createDataSource(content))), - Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content))) - ); + private static InputStreamDataSource createDataSource(String text) { + if (text.length() > 0) { + return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(text.getBytes(UTF_8))); } - + return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(new byte[0])); } private static InputStreamDataSource createDataSource(String text) {