Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pinot-integration-test-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<pinot.root>${basedir}/..</pinot.root>
<localstack-utils.version>0.2.11</localstack-utils.version>
<awaitility.version>3.0.0</awaitility.version>
<aws.sdk.version>2.14.28</aws.sdk.version>
<aws.sdk.version>2.20.83</aws.sdk.version>
</properties>

<build>
Expand Down
2 changes: 1 addition & 1 deletion pinot-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<pinot.root>${basedir}/..</pinot.root>
<localstack-utils.version>0.2.19</localstack-utils.version>
<awaitility.version>3.0.0</awaitility.version>
<aws.sdk.version>2.14.28</aws.sdk.version>
<aws.sdk.version>2.20.83</aws.sdk.version>
<testcontainers.version>1.17.3</testcontainers.version>
</properties>

Expand Down
24 changes: 3 additions & 21 deletions pinot-plugins/pinot-file-system/pinot-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
<url>https://pinot.apache.org</url>
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<aws.sdk.version>2.14.28</aws.sdk.version>
<aws.sdk.version>2.20.83</aws.sdk.version>
<http.client.version>4.5.13</http.client.version>
<http.core.version>4.4.13</http.core.version>
<s3mock.version>2.1.19</s3mock.version>
<s3mock.version>2.12.2</s3mock.version>
<javax.version>3.1.0</javax.version>
<phase.prop>package</phase.prop>
</properties>
Expand Down Expand Up @@ -183,27 +183,9 @@
</dependency>
<dependency>
<groupId>com.adobe.testing</groupId>
<artifactId>s3mock-testng</artifactId>
<artifactId>s3mock-testcontainers</artifactId>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3mock-testcontainers is recommended as it avoids bring in deps from s3mock transitively, which brings in the spring framework and also requires some specific keystore.

The new version of s3mock is more accurate on handling the leading and trailing slash in s3 object names, so the changes on the unit tests as made in this PR.

<version>${s3mock.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
*/
package org.apache.pinot.plugin.filesystem;

import com.adobe.testing.s3mock.testng.S3Mock;
import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -28,37 +30,45 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.filesystem.FileMetadata;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Object;


@Test
@Listeners(com.adobe.testing.s3mock.testng.S3MockListener.class)
public class S3PinotFSTest {
private static final String S3MOCK_VERSION = System.getProperty("s3mock.version", "2.12.2");
private static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "S3PinotFSTest");
private static final String DELIMITER = "/";
private static final String BUCKET = "test-bucket";
private static final String SCHEME = "s3";
private static final String FILE_FORMAT = "%s://%s/%s";
private static final String DIR_FORMAT = "%s://%s";

private S3MockContainer _s3MockContainer;
private S3PinotFS _s3PinotFS;
private S3Client _s3Client;

@BeforeClass
public void setUp() {
S3Mock s3Mock = S3Mock.getInstance();
_s3Client = s3Mock.createS3ClientV2();
_s3MockContainer = new S3MockContainer(S3MOCK_VERSION);
_s3MockContainer.start();
String endpoint = _s3MockContainer.getHttpEndpoint();
_s3Client = createS3ClientV2(endpoint);
_s3PinotFS = new S3PinotFS();
_s3PinotFS.init(_s3Client);
_s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
Expand All @@ -69,12 +79,14 @@ public void tearDown()
throws IOException {
_s3PinotFS.close();
_s3Client.close();
_s3MockContainer.stop();
FileUtils.deleteQuietly(TEMP_FILE);
}

private void createEmptyFile(String folderName, String fileName) {
String fileNameWithFolder = folderName + DELIMITER + fileName;
_s3Client
.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder), RequestBody.fromBytes(new byte[0]));
String fileNameWithFolder = folderName.length() == 0 ? fileName : folderName + DELIMITER + fileName;
_s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder),
RequestBody.fromBytes(new byte[0]));
}

@Test
Expand Down Expand Up @@ -249,7 +261,7 @@ public void testDeleteFile()
ListObjectsV2Response listObjectsV2Response =
_s3Client.listObjectsV2(S3TestUtils.getListObjectRequest(BUCKET, "", true));
String[] actualResponse =
listObjectsV2Response.contents().stream().map(x -> x.key().substring(1)).filter(x -> x.contains("delete"))
listObjectsV2Response.contents().stream().map(S3Object::key).filter(x -> x.contains("delete"))
.toArray(String[]::new);

Assert.assertEquals(actualResponse.length, 2);
Expand Down Expand Up @@ -290,8 +302,8 @@ public void testIsDirectory()

boolean isBucketDir = _s3PinotFS.isDirectory(URI.create(String.format(DIR_FORMAT, SCHEME, BUCKET)));
boolean isDir = _s3PinotFS.isDirectory(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)));
boolean isDirChild = _s3PinotFS
.isDirectory(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder)));
boolean isDirChild = _s3PinotFS.isDirectory(
URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder)));
boolean notIsDir = _s3PinotFS.isDirectory(URI.create(
String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder + DELIMITER + "a-delete.txt")));

Expand Down Expand Up @@ -333,47 +345,46 @@ public void testExists()
public void testCopyFromAndToLocal()
throws Exception {
String fileName = "copyFile.txt";

File fileToCopy = new File(getClass().getClassLoader().getResource(fileName).getFile());

_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));

HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));

Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());

File fileToDownload = new File("copyFile_download.txt").getAbsoluteFile();
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());

fileToDownload.deleteOnExit();
File fileToCopy = new File(TEMP_FILE, fileName);
File fileToDownload = new File(TEMP_FILE, "copyFile_download.txt").getAbsoluteFile();
try {
createDummyFile(fileToCopy, 1024);
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
} finally {
FileUtils.deleteQuietly(fileToCopy);
FileUtils.deleteQuietly(fileToDownload);
}
}

@Test
public void testMultiPartUpload()
throws Exception {
String fileName = "copyFile.txt";

File fileToCopy = new File(getClass().getClassLoader().getResource(fileName).getFile());

// input file size is 20
_s3PinotFS.setMultiPartUploadConfigs(1, 3);
String fileName = "copyFile_for_multipart.txt";
File fileToCopy = new File(TEMP_FILE, fileName);
File fileToDownload = new File(TEMP_FILE, "copyFile_download_multipart.txt").getAbsoluteFile();
try {
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
// Make a file of 11MB to upload in parts, whose min required size is 5MB.
createDummyFile(fileToCopy, 11 * 1024 * 1024);
System.out.println("fileToCopy.length:" + fileToCopy.length());
_s3PinotFS.setMultiPartUploadConfigs(1, 5 * 1024 * 1024);
try {
_s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
} finally {
// disable multipart upload again for the other UT cases.
_s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
}
HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
} finally {
// disable multipart upload again for the other UT cases.
_s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
FileUtils.deleteQuietly(fileToCopy);
FileUtils.deleteQuietly(fileToDownload);
}

HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));

Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());

File fileToDownload = new File("copyFile_download_multipart.txt").getAbsoluteFile();
_s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
Assert.assertEquals(fileToCopy.length(), fileToDownload.length());

fileToDownload.deleteOnExit();
}

@Test
Expand All @@ -396,7 +407,26 @@ public void testMkdir()

_s3PinotFS.mkdir(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName)));

HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, folderName));
HeadObjectResponse headObjectResponse =
_s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, folderName + "/"));
Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful());
}

private static void createDummyFile(File file, int size)
throws IOException {
FileUtils.deleteQuietly(file);
FileUtils.touch(file);
try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
for (int i = 0; i < size; i++) {
out.write((byte) i);
}
}
}

private static S3Client createS3ClientV2(String endpoint) {
return S3Client.builder().region(Region.of("us-east-1"))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("foo", "bar")))
.serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
.endpointOverride(URI.create(endpoint)).build();
}
}

This file was deleted.

2 changes: 1 addition & 1 deletion pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<properties>
<pinot.root>${basedir}/../../..</pinot.root>
<phase.prop>package</phase.prop>
<aws.version>2.14.28</aws.version>
<aws.version>2.20.83</aws.version>
<easymock.version>4.2</easymock.version>
<reactive.version>1.0.2</reactive.version>
<localstack-utils.version>0.2.19</localstack-utils.version>
Expand Down
2 changes: 1 addition & 1 deletion pinot-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<url>https://pinot.apache.org/</url>
<properties>
<pinot.root>${basedir}/..</pinot.root>
<aws.version>2.14.28</aws.version>
<aws.version>2.20.83</aws.version>
<scala.version>2.12</scala.version>
<spark.version>3.2.1</spark.version>
<airlift.version>0.16</airlift.version>
Expand Down