Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(s3 dataplane): Fix transfer of empty objects #417

Merged
merged 15 commits into from
Aug 21, 2024
14 changes: 7 additions & 7 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maven/mavencentral/com.apicatalog/carbon-did/0.3.0, Apache-2.0, approved, clearlydefined

Check failure on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Dependencies outdated

The DEPENDENCIES file was outdated and must be regenerated. Check the output of this job for more information

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.apicatalog/copper-multibase/0.5.0, Apache-2.0, approved, #14501
maven/mavencentral/com.apicatalog/copper-multicodec/0.1.1, Apache-2.0, approved, #14500
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.14.0, Apache-2.0, approved, clearlydefined
Expand Down Expand Up @@ -205,16 +205,16 @@
maven/mavencentral/org.jetbrains/annotations/24.1.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/org.junit-pioneer/junit-pioneer/2.2.0, EPL-2.0, approved, #11857
maven/mavencentral/org.junit.jupiter/junit-jupiter-api/5.10.2, EPL-2.0, approved, #9714
maven/mavencentral/org.junit.jupiter/junit-jupiter-api/5.10.3, EPL-2.0, approved, #9714
maven/mavencentral/org.junit.jupiter/junit-jupiter-api/5.11.0, EPL-2.0, approved, #15935
maven/mavencentral/org.junit.jupiter/junit-jupiter-engine/5.10.2, EPL-2.0, approved, #9711
maven/mavencentral/org.junit.jupiter/junit-jupiter-engine/5.10.3, EPL-2.0, approved, #9711
maven/mavencentral/org.junit.jupiter/junit-jupiter-params/5.10.3, EPL-2.0, approved, #15250
maven/mavencentral/org.junit.jupiter/junit-jupiter-engine/5.11.0, EPL-2.0, approved, #15939
maven/mavencentral/org.junit.jupiter/junit-jupiter-params/5.11.0, EPL-2.0, approved, #15940
maven/mavencentral/org.junit.platform/junit-platform-commons/1.10.2, EPL-2.0, approved, #9715
maven/mavencentral/org.junit.platform/junit-platform-commons/1.10.3, EPL-2.0, approved, #9715
maven/mavencentral/org.junit.platform/junit-platform-engine/1.10.3, EPL-2.0, approved, #9709
maven/mavencentral/org.junit.platform/junit-platform-launcher/1.10.3, EPL-2.0, approved, #15216
maven/mavencentral/org.junit.platform/junit-platform-commons/1.11.0, EPL-2.0, approved, #15936
maven/mavencentral/org.junit.platform/junit-platform-engine/1.11.0, EPL-2.0, approved, #15932
maven/mavencentral/org.junit.platform/junit-platform-launcher/1.11.0, EPL-2.0, approved, #15934
maven/mavencentral/org.junit/junit-bom/5.10.2, EPL-2.0, approved, #9844
maven/mavencentral/org.junit/junit-bom/5.10.3, EPL-2.0, approved, #9844
maven/mavencentral/org.junit/junit-bom/5.11.0, , restricted, clearlydefined
maven/mavencentral/org.junit/junit-bom/5.9.2, EPL-2.0, approved, #4711
maven/mavencentral/org.jvnet.mimepull/mimepull/1.9.15, CDDL-1.1 OR GPL-2.0-only WITH Classpath-exception-2.0, approved, CQ21484
maven/mavencentral/org.mockito/mockito-core/5.12.0, MIT AND (Apache-2.0 AND MIT) AND Apache-2.0, approved, #14678
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {

var partNumber = 1;
byte[] bytesChunk = input.readNBytes(chunkSize);
while (bytesChunk.length > 0) {
do {
completedParts.add(CompletedPart.builder().partNumber(partNumber)
.eTag(client.uploadPart(UploadPartRequest.builder()
.bucket(bucketName)
Expand All @@ -70,7 +70,7 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
.build(), RequestBody.fromByteBuffer(ByteBuffer.wrap(bytesChunk))).eTag()).build());
bytesChunk = input.readNBytes(chunkSize);
partNumber++;
}
} while (bytesChunk.length > 0);

client.completeMultipartUpload(CompleteMultipartUploadRequest.builder()
.bucket(bucketName)
Expand All @@ -92,12 +92,16 @@ protected StreamResult<Object> transferParts(List<DataSource.Part> parts) {
}

private String getDestinationObjectName(String partName, int partsSize) {
var name = (partsSize == 1 && !StringUtils.isNullOrEmpty(objectName)) ? objectName : partName;
var name = useObjectName(partsSize) ? objectName : partName;
if (!StringUtils.isNullOrEmpty(folderName)) {
return folderName.endsWith("/") ? folderName + name : folderName + "/" + name;
} else {
return name;
}

return name;
}

private boolean useObjectName(int partsSize) {
return partsSize == 1 && !StringUtils.isNullOrEmpty(objectName);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.eclipse.edc.connector.dataplane.spi.pipeline.StreamFailure.Reason.GENERAL_ERROR;
Expand Down Expand Up @@ -107,7 +109,9 @@ private List<S3Object> fetchPrefixedS3Objects() {

var response = client.listObjectsV2(listObjectsRequest);

s3Objects.addAll(response.contents());
Predicate<S3Object> isFile = object -> !object.key().endsWith("/");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a class field or constant, so it won't be recreated each time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed in a6645ac


s3Objects.addAll(response.contents().stream().filter(isFile).collect(Collectors.toList()));

continuationToken = response.nextContinuationToken();

Expand All @@ -117,7 +121,7 @@ private List<S3Object> fetchPrefixedS3Objects() {
}

@Override
public void close() throws Exception {
public void close() {
client.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -105,6 +107,12 @@ void should_copy_using_destination_object_name_case_single_transfer(List<String>
var objectNameInDestination = "object-name-in-destination";
var objectContent = UUID.randomUUID().toString();

//Put folder 0 byte size file marker. AWS does this when a folder is created via the console.
if (!isSingleObject) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know the test was already structured like this, but it's not a good practice to have conditionals into a test, better to have 2 distinct tests. It could also be solved in a different PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree these test class deserve some attention. The conditionals are there because the argument provider is the same for both test cases. I maintained it the same way to avoid refactoring the thing since it seems to be better done in another PR. I can create the issue.

sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, "");
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", "");
}

for (var objectName : objectNames) {
sourceClient.putStringOnBucket(sourceBucketName, objectName, objectContent);
}
Expand Down Expand Up @@ -152,6 +160,11 @@ void should_copy_using_destination_object_name_case_single_transfer(List<String>
.extracting(Long::intValue)
.isEqualTo(objectContent.length());
}

assertThat(destinationClient.getObject(destinationBucketName,
OBJECT_PREFIX)).failsWithin(5, SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(NoSuchKeyException.class);
}
}

Expand All @@ -164,6 +177,12 @@ void should_copy_to_folder_case_property_is_present(List<String> objectNames) {
var folderNameInDestination = "folder-name-in-destination/";
var objectBody = UUID.randomUUID().toString();

//Put folder 0 byte size file marker. AWS does this when a folder is created via the console.
if (!isSingleObject) {
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX, "");
sourceClient.putStringOnBucket(sourceBucketName, OBJECT_PREFIX + "testFolder/", "");
}

for (var objectToTransfer : objectNames) {
sourceClient.putStringOnBucket(sourceBucketName, objectToTransfer, objectBody);
}
Expand Down Expand Up @@ -212,7 +231,13 @@ void should_copy_to_folder_case_property_is_present(List<String> objectNames) {
.extracting(Long::intValue)
.isEqualTo(objectBody.length());
}
assertThat(destinationClient.getObject(destinationBucketName, folderNameInDestination +
OBJECT_PREFIX)).failsWithin(5, SECONDS)
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(NoSuchKeyException.class);
}


}

private DataAddress createDataAddress(List<String> assetNames, boolean isSingleObject) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.platform.commons.util.StringUtils;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
Expand Down Expand Up @@ -63,11 +62,6 @@ public class S3DataSinkTest {

private ArgumentCaptor<CompleteMultipartUploadRequest> completeMultipartUploadRequestCaptor;

private static InputStreamDataSource createDataSource(String text) {
var content = StringUtils.isBlank(text) ? "test stream" : text;
return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(content.getBytes(UTF_8)));
}

@BeforeEach
void setup() {
s3ClientMock = mock(S3Client.class);
Expand All @@ -90,37 +84,23 @@ void setup() {
}

@ParameterizedTest
@ArgumentsSource(SinglePartsInputs.class)
void transferParts_singlePart_succeeds(List<DataSource.Part> inputStream) {
@ArgumentsSource(PartsInputs.class)
void transferParts_succeeds(List<DataSource.Part> inputStream, int expectedPartsPerObject) {
var isSingleObject = inputStream.size() == 1;

var result = dataSink.transferParts(inputStream);
assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, times(inputStream.size())).completeMultipartUpload(completeMultipartUploadRequestCaptor
.capture());

var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue();
assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME);
assertThat(completeMultipartUploadRequest.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(1);
}

@ParameterizedTest
@ArgumentsSource(MultiPartsInputs.class)
void transferParts_multiPart_succeeds(List<DataSource.Part> inputStream) {
var isSingleObject = inputStream.size() == 1;
var completeMultipartUploadRequests = completeMultipartUploadRequestCaptor.getAllValues();

var result = dataSink.transferParts(inputStream);

assertThat(result.succeeded()).isTrue();
verify(s3ClientMock, times(inputStream.size()))
.completeMultipartUpload(completeMultipartUploadRequestCaptor.capture());
var completeMultipartUploadRequest = completeMultipartUploadRequestCaptor.getValue();
assertThat(completeMultipartUploadRequest.bucket()).isEqualTo(BUCKET_NAME);
assertThat(completeMultipartUploadRequest.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(completeMultipartUploadRequest.multipartUpload().parts()).hasSize(2);
for (var request : completeMultipartUploadRequests) {
assertThat(request.bucket()).isEqualTo(BUCKET_NAME);
assertThat(request.key())
.isEqualTo(isSingleObject ? DESTINATION_OBJECT_NAME : SOURCE_OBJECT_NAME);
assertThat(request.multipartUpload().parts()).hasSize(expectedPartsPerObject);
}
}

@Test
Expand All @@ -139,7 +119,7 @@ void transferParts_failed_to_download() {
}

@ParameterizedTest
@ArgumentsSource(MultiPartsInputs.class)
@ArgumentsSource(PartsInputs.class)
void transferParts_fails_to_upload(List<DataSource.Part> inputStream) {
var isSingleObject = inputStream.size() == 1;

Expand Down Expand Up @@ -167,24 +147,30 @@ void transferParts_fails_to_upload(List<DataSource.Part> inputStream) {
assertThat(result.getFailureDetail()).isEqualTo(expectedMessage);
}

private static class SinglePartsInputs implements ArgumentsProvider {
private static class PartsInputs implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
var content = "content smaller than a chunk size";
return Stream.of(Arguments.of(List.of(createDataSource(content))),
Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content)))
);
var emptyContent = "";
var smallContent = "content smaller than a chunk size";
var bigContent = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload";
return Stream.of(
Arguments.of(
List.of(createDataSource(emptyContent)), 1),
Arguments.of(
List.of(createDataSource(smallContent)), 1),
Arguments.of(
List.of(createDataSource(bigContent)), 2),
Arguments.of(
List.of(createDataSource(emptyContent), createDataSource(smallContent)), 1),
Arguments.of(
List.of(createDataSource(bigContent), createDataSource(bigContent)), 2));
}
}

private static class MultiPartsInputs implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
var content = "content bigger than 50 bytes chunk size so that it gets chunked and uploaded as a multipart upload";
return Stream.of(Arguments.of(List.of(createDataSource(content))),
Arguments.of(List.of(createDataSource(content)), List.of(createDataSource(content)))
);
private static InputStreamDataSource createDataSource(String text) {
if (text.length() > 0) {
return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(text.getBytes(UTF_8)));
}

return new InputStreamDataSource(SOURCE_OBJECT_NAME, new ByteArrayInputStream(new byte[0]));
}
}
Loading