From e4271faf2ea6bf9522d5e30ad3bb780418504169 Mon Sep 17 00:00:00 2001 From: ozkoca Date: Wed, 6 Aug 2025 09:45:58 +0100 Subject: [PATCH 1/3] Added name for thread pool --- .../S3SeekableInputStreamFactory.java | 5 ++- .../util/NamedThreadFactory.java | 44 +++++++++++++++++++ .../S3SeekableInputStreamFactoryTest.java | 26 +++++++++++ .../util/NamedThreadFactoryTest.java | 44 +++++++++++++++++++ 4 files changed, 118 insertions(+), 1 deletion(-) create mode 100644 input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactory.java create mode 100644 input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactoryTest.java diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index d498085b..474a9c99 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -36,6 +36,7 @@ import software.amazon.s3.analyticsaccelerator.io.physical.impl.PhysicalIOImpl; import software.amazon.s3.analyticsaccelerator.request.ObjectClient; import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata; +import software.amazon.s3.analyticsaccelerator.util.NamedThreadFactory; import software.amazon.s3.analyticsaccelerator.util.ObjectFormatSelector; import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation; import software.amazon.s3.analyticsaccelerator.util.S3URI; @@ -52,6 +53,7 @@ */ @Getter(AccessLevel.PACKAGE) public class S3SeekableInputStreamFactory implements AutoCloseable { + private static final String THREAD_FACTORY_NAME = "s3-analytics-accelerator-"; private final S3SeekableInputStreamConfiguration configuration; private final ParquetColumnPrefetchStore parquetColumnPrefetchStore; private final MetadataStore objectMetadataStore; @@ -87,7 +89,8 @@ public S3SeekableInputStreamFactory( // TODO: calling applications should be able to pass in a thread pool if they so wish this.threadPool = Executors.newFixedThreadPool( - configuration.getPhysicalIOConfiguration().getThreadPoolSize()); + configuration.getPhysicalIOConfiguration().getThreadPoolSize(), + new NamedThreadFactory(THREAD_FACTORY_NAME, true)); this.objectBlobStore = new BlobStore( objectClient, diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactory.java new file mode 100644 index 00000000..b632cf8f --- /dev/null +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactory.java @@ -0,0 +1,44 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.util; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** ThreadFactory that creates named threads with configurable daemon status. */ +public class NamedThreadFactory implements ThreadFactory { + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + private final boolean daemon; + + /** + * Constructor + * + * @param namePrefix name + * @param daemon is daemon + */ + public NamedThreadFactory(String namePrefix, boolean daemon) { + this.namePrefix = namePrefix; + this.daemon = daemon; + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement()); + t.setDaemon(daemon); + return t; + } +} diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java index 9ee7d4cc..e99a1136 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java @@ -31,6 +31,7 @@ import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -407,4 +408,29 @@ void testCloseWithMultipleStreams() throws IOException { private static Exception[] exceptions() { return ExceptionHandler.getSampleExceptions(); } + + @Test + void testActualRead() throws IOException { + String bucket = "ozkoca-aqbe-baseline-partitioned128mb-emrs3a-us-east-1"; + String key = + "dataset/catalog_page/part-00000-fc7f39ec-1fdb-4421-8204-31fe00a1a43d-c000.snappy.parquet"; + S3AsyncClient crtClient = S3AsyncClient.crtBuilder().region(Region.US_EAST_1).build(); + S3SdkObjectClient client = new S3SdkObjectClient(crtClient); + S3SeekableInputStreamFactory s3SeekableInputStreamFactory = + new S3SeekableInputStreamFactory(client, S3SeekableInputStreamConfiguration.DEFAULT); + + S3SeekableInputStream inputStream = + s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key)); + + int bytesRead = 0; + int i = 0; + do { + bytesRead = inputStream.read(new byte[8 * 1024]); + System.out.println(i++ + ": " + bytesRead); + } while (bytesRead != -1); + + System.out.println("Completed"); + s3SeekableInputStreamFactory.close(); + System.out.println("Closed factory"); + } } diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactoryTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactoryTest.java new file mode 100644 index 00000000..7c2dfe04 --- /dev/null +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/util/NamedThreadFactoryTest.java @@ -0,0 +1,44 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.s3.analyticsaccelerator.util; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class NamedThreadFactoryTest { + + @Test + void testThreadNaming() { + NamedThreadFactory factory = new NamedThreadFactory("test-", true); + + Thread thread1 = factory.newThread(() -> {}); + Thread thread2 = factory.newThread(() -> {}); + + assertEquals("test-1", thread1.getName()); + assertEquals("test-2", thread2.getName()); + } + + @Test + void testDaemonThread() { + NamedThreadFactory factory = new NamedThreadFactory("daemon-", true); + + Thread thread = factory.newThread(() -> {}); + + assertTrue(thread.isDaemon()); + } +} From ec5b75c573d02b25a5521433616e35e0e37d5a0e Mon Sep 17 00:00:00 2001 From: ozkoca Date: Wed, 6 Aug 2025 10:31:36 +0100 Subject: [PATCH 2/3] Removed test --- .../S3SeekableInputStreamFactoryTest.java | 26 ------------------- 1 file changed, 26 deletions(-) diff --git a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java index e99a1136..9ee7d4cc 100644 --- a/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java +++ b/input-stream/src/test/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactoryTest.java @@ -31,7 +31,6 @@ import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; @@ -408,29 +407,4 @@ void testCloseWithMultipleStreams() throws IOException { private static Exception[] exceptions() { return ExceptionHandler.getSampleExceptions(); } - - @Test - void testActualRead() throws IOException { - String bucket = "ozkoca-aqbe-baseline-partitioned128mb-emrs3a-us-east-1"; - String key = - "dataset/catalog_page/part-00000-fc7f39ec-1fdb-4421-8204-31fe00a1a43d-c000.snappy.parquet"; - S3AsyncClient crtClient = S3AsyncClient.crtBuilder().region(Region.US_EAST_1).build(); - S3SdkObjectClient client = new S3SdkObjectClient(crtClient); - S3SeekableInputStreamFactory s3SeekableInputStreamFactory = - new S3SeekableInputStreamFactory(client, S3SeekableInputStreamConfiguration.DEFAULT); - - S3SeekableInputStream inputStream = - s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key)); - - int bytesRead = 0; - int i = 0; - do { - bytesRead = inputStream.read(new byte[8 * 1024]); - System.out.println(i++ + ": " + bytesRead); - } while (bytesRead != -1); - - System.out.println("Completed"); - s3SeekableInputStreamFactory.close(); - System.out.println("Closed factory"); - } } From 0a4701dc1ad8f6a2cfcb117f9bce22c407f43c48 Mon Sep 17 00:00:00 2001 From: ozkoca Date: Wed, 6 Aug 2025 15:44:06 +0100 Subject: [PATCH 3/3] Refactored the line of static variable --- .../s3/analyticsaccelerator/S3SeekableInputStreamFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java index 474a9c99..ceedc064 100644 --- a/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java +++ b/input-stream/src/main/java/software/amazon/s3/analyticsaccelerator/S3SeekableInputStreamFactory.java @@ -53,7 +53,6 @@ */ @Getter(AccessLevel.PACKAGE) public class S3SeekableInputStreamFactory implements AutoCloseable { - private static final String THREAD_FACTORY_NAME = "s3-analytics-accelerator-"; private final S3SeekableInputStreamConfiguration configuration; private final ParquetColumnPrefetchStore parquetColumnPrefetchStore; private final MetadataStore objectMetadataStore; @@ -64,6 +63,7 @@ public class S3SeekableInputStreamFactory implements AutoCloseable { private final ExecutorService threadPool; private static final Logger LOG = LoggerFactory.getLogger(S3SeekableInputStreamFactory.class); + private static final String THREAD_FACTORY_NAME = "s3-analytics-accelerator-"; /** * Creates a new instance of {@link S3SeekableInputStreamFactory}. This factory should be used to