diff --git a/pinot-integration-test-base/pom.xml b/pinot-integration-test-base/pom.xml
index 936baeed4073..123821f8affb 100644
--- a/pinot-integration-test-base/pom.xml
+++ b/pinot-integration-test-base/pom.xml
@@ -37,7 +37,7 @@
${basedir}/..
0.2.11
3.0.0
- 2.14.28
+ 2.20.83
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index b6d2affbc795..8195ef204c3a 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -36,7 +36,7 @@
${basedir}/..
0.2.19
3.0.0
- 2.14.28
+ 2.20.83
1.17.3
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/pom.xml b/pinot-plugins/pinot-file-system/pinot-s3/pom.xml
index 9ea74649909b..50120b6031f3 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/pom.xml
+++ b/pinot-plugins/pinot-file-system/pinot-s3/pom.xml
@@ -35,10 +35,10 @@
https://pinot.apache.org
${basedir}/../../..
- 2.14.28
+ 2.20.83
4.5.13
4.4.13
- 2.1.19
+ 2.12.2
3.1.0
package
@@ -183,27 +183,9 @@
com.adobe.testing
- s3mock-testng
+ s3mock-testcontainers
${s3mock.version}
test
-
-
- ch.qos.logback
- logback-core
-
-
- org.apache.logging.log4j
- log4j-to-slf4j
-
-
- javax.servlet
- javax.servlet-api
-
-
- org.apache.httpcomponents
- httpclient
-
-
javax.servlet
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
index 7c6cc6dbf7f8..339155643ae3 100644
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
+++ b/pinot-plugins/pinot-file-system/pinot-s3/src/test/java/org/apache/pinot/plugin/filesystem/S3PinotFSTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pinot.plugin.filesystem;
-import com.adobe.testing.s3mock.testng.S3Mock;
+import com.adobe.testing.s3mock.testcontainers.S3MockContainer;
+import java.io.BufferedOutputStream;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
@@ -28,15 +30,19 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.pinot.spi.filesystem.FileMetadata;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Listeners;
import org.testng.annotations.Test;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
+import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
@@ -44,21 +50,25 @@
@Test
-@Listeners(com.adobe.testing.s3mock.testng.S3MockListener.class)
public class S3PinotFSTest {
+ private static final String S3MOCK_VERSION = System.getProperty("s3mock.version", "2.12.2");
+ private static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "S3PinotFSTest");
private static final String DELIMITER = "/";
private static final String BUCKET = "test-bucket";
private static final String SCHEME = "s3";
private static final String FILE_FORMAT = "%s://%s/%s";
private static final String DIR_FORMAT = "%s://%s";
+ private S3MockContainer _s3MockContainer;
private S3PinotFS _s3PinotFS;
private S3Client _s3Client;
@BeforeClass
public void setUp() {
- S3Mock s3Mock = S3Mock.getInstance();
- _s3Client = s3Mock.createS3ClientV2();
+ _s3MockContainer = new S3MockContainer(S3MOCK_VERSION);
+ _s3MockContainer.start();
+ String endpoint = _s3MockContainer.getHttpEndpoint();
+ _s3Client = createS3ClientV2(endpoint);
_s3PinotFS = new S3PinotFS();
_s3PinotFS.init(_s3Client);
_s3Client.createBucket(CreateBucketRequest.builder().bucket(BUCKET).build());
@@ -69,12 +79,14 @@ public void tearDown()
throws IOException {
_s3PinotFS.close();
_s3Client.close();
+ _s3MockContainer.stop();
+ FileUtils.deleteQuietly(TEMP_FILE);
}
private void createEmptyFile(String folderName, String fileName) {
- String fileNameWithFolder = folderName + DELIMITER + fileName;
- _s3Client
- .putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder), RequestBody.fromBytes(new byte[0]));
+ String fileNameWithFolder = folderName.length() == 0 ? fileName : folderName + DELIMITER + fileName;
+ _s3Client.putObject(S3TestUtils.getPutObjectRequest(BUCKET, fileNameWithFolder),
+ RequestBody.fromBytes(new byte[0]));
}
@Test
@@ -249,7 +261,7 @@ public void testDeleteFile()
ListObjectsV2Response listObjectsV2Response =
_s3Client.listObjectsV2(S3TestUtils.getListObjectRequest(BUCKET, "", true));
String[] actualResponse =
- listObjectsV2Response.contents().stream().map(x -> x.key().substring(1)).filter(x -> x.contains("delete"))
+ listObjectsV2Response.contents().stream().map(S3Object::key).filter(x -> x.contains("delete"))
.toArray(String[]::new);
Assert.assertEquals(actualResponse.length, 2);
@@ -290,8 +302,8 @@ public void testIsDirectory()
boolean isBucketDir = _s3PinotFS.isDirectory(URI.create(String.format(DIR_FORMAT, SCHEME, BUCKET)));
boolean isDir = _s3PinotFS.isDirectory(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder)));
- boolean isDirChild = _s3PinotFS
- .isDirectory(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder)));
+ boolean isDirChild = _s3PinotFS.isDirectory(
+ URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder)));
boolean notIsDir = _s3PinotFS.isDirectory(URI.create(
String.format(FILE_FORMAT, SCHEME, BUCKET, folder + DELIMITER + childFolder + DELIMITER + "a-delete.txt")));
@@ -333,47 +345,46 @@ public void testExists()
public void testCopyFromAndToLocal()
throws Exception {
String fileName = "copyFile.txt";
-
- File fileToCopy = new File(getClass().getClassLoader().getResource(fileName).getFile());
-
- _s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
-
- HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
-
- Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
-
- File fileToDownload = new File("copyFile_download.txt").getAbsoluteFile();
- _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
- Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
-
- fileToDownload.deleteOnExit();
+ File fileToCopy = new File(TEMP_FILE, fileName);
+ File fileToDownload = new File(TEMP_FILE, "copyFile_download.txt").getAbsoluteFile();
+ try {
+ createDummyFile(fileToCopy, 1024);
+ _s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
+ HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
+ Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
+ _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
+ Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
+ } finally {
+ FileUtils.deleteQuietly(fileToCopy);
+ FileUtils.deleteQuietly(fileToDownload);
+ }
}
@Test
public void testMultiPartUpload()
throws Exception {
- String fileName = "copyFile.txt";
-
- File fileToCopy = new File(getClass().getClassLoader().getResource(fileName).getFile());
-
- // input file size is 20
- _s3PinotFS.setMultiPartUploadConfigs(1, 3);
+ String fileName = "copyFile_for_multipart.txt";
+ File fileToCopy = new File(TEMP_FILE, fileName);
+ File fileToDownload = new File(TEMP_FILE, "copyFile_download_multipart.txt").getAbsoluteFile();
try {
- _s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
+ // Make a file of 11MB to upload in parts, whose min required size is 5MB.
+ createDummyFile(fileToCopy, 11 * 1024 * 1024);
+ System.out.println("fileToCopy.length:" + fileToCopy.length());
+ _s3PinotFS.setMultiPartUploadConfigs(1, 5 * 1024 * 1024);
+ try {
+ _s3PinotFS.copyFromLocalFile(fileToCopy, URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)));
+ } finally {
+ // disable multipart upload again for the other UT cases.
+ _s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
+ }
+ HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
+ Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
+ _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
+ Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
} finally {
- // disable multipart upload again for the other UT cases.
- _s3PinotFS.setMultiPartUploadConfigs(-1, 128 * 1024 * 1024);
+ FileUtils.deleteQuietly(fileToCopy);
+ FileUtils.deleteQuietly(fileToDownload);
}
-
- HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, fileName));
-
- Assert.assertEquals(headObjectResponse.contentLength(), (Long) fileToCopy.length());
-
- File fileToDownload = new File("copyFile_download_multipart.txt").getAbsoluteFile();
- _s3PinotFS.copyToLocalFile(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, fileName)), fileToDownload);
- Assert.assertEquals(fileToCopy.length(), fileToDownload.length());
-
- fileToDownload.deleteOnExit();
}
@Test
@@ -396,7 +407,26 @@ public void testMkdir()
_s3PinotFS.mkdir(URI.create(String.format(FILE_FORMAT, SCHEME, BUCKET, folderName)));
- HeadObjectResponse headObjectResponse = _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, folderName));
+ HeadObjectResponse headObjectResponse =
+ _s3Client.headObject(S3TestUtils.getHeadObjectRequest(BUCKET, folderName + "/"));
Assert.assertTrue(headObjectResponse.sdkHttpResponse().isSuccessful());
}
+
+ private static void createDummyFile(File file, int size)
+ throws IOException {
+ FileUtils.deleteQuietly(file);
+ FileUtils.touch(file);
+ try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(file))) {
+ for (int i = 0; i < size; i++) {
+ out.write((byte) i);
+ }
+ }
+ }
+
+ private static S3Client createS3ClientV2(String endpoint) {
+ return S3Client.builder().region(Region.of("us-east-1"))
+ .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("foo", "bar")))
+ .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build())
+ .endpointOverride(URI.create(endpoint)).build();
+ }
}
diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/test/resources/copyFile.txt b/pinot-plugins/pinot-file-system/pinot-s3/src/test/resources/copyFile.txt
deleted file mode 100644
index 0b669b66edab..000000000000
--- a/pinot-plugins/pinot-file-system/pinot-s3/src/test/resources/copyFile.txt
+++ /dev/null
@@ -1,10 +0,0 @@
-1
-2
-3
-4
-5
-6
-7
-8
-9
-10
\ No newline at end of file
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 969d2e26840e..83ab43216081 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -36,7 +36,7 @@
${basedir}/../../..
package
- 2.14.28
+ 2.20.83
4.2
1.0.2
0.2.19
diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml
index edf4392c3381..cbd3b26b2bc7 100644
--- a/pinot-tools/pom.xml
+++ b/pinot-tools/pom.xml
@@ -33,7 +33,7 @@
https://pinot.apache.org/
${basedir}/..
- 2.14.28
+ 2.20.83
2.12
3.2.1
0.16