diff --git a/.changes/next-release/feature-AWSSDKforJavav2-7fea588.json b/.changes/next-release/feature-AWSSDKforJavav2-7fea588.json new file mode 100644 index 000000000000..14686623c5ee --- /dev/null +++ b/.changes/next-release/feature-AWSSDKforJavav2-7fea588.json @@ -0,0 +1,6 @@ +{ + "type": "feature", + "category": "AWS SDK for Java v2", + "contributor": "mpdn", + "description": "Added option of using an explicit `ExecutorService` in `FileAsyncResponseTransformer`" +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileTransformerConfiguration.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileTransformerConfiguration.java index 6aea8880fc60..902815f96c49 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileTransformerConfiguration.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/FileTransformerConfiguration.java @@ -15,8 +15,12 @@ package software.amazon.awssdk.core; +import java.nio.channels.AsynchronousChannelGroup; import java.nio.file.FileAlreadyExistsException; import java.nio.file.Path; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ExecutorService; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.utils.Validate; @@ -36,10 +40,12 @@ public final class FileTransformerConfiguration implements ToCopyableBuilder { private final FileWriteOption fileWriteOption; private final FailureBehavior failureBehavior; + private final ExecutorService executorService; private FileTransformerConfiguration(DefaultBuilder builder) { this.fileWriteOption = Validate.paramNotNull(builder.fileWriteOption, "fileWriteOption"); this.failureBehavior = Validate.paramNotNull(builder.failureBehavior, "failureBehavior"); + this.executorService = builder.executorService; } /** @@ -56,6 +62,16 @@ public FailureBehavior failureBehavior() { return failureBehavior; } + /** + * The configured {@link ExecutorService} the writes should be executed on. + *

+ * If not set, the default thread pool defined by the underlying {@link java.nio.file.spi.FileSystemProvider} will be used. + * This will typically be the thread pool defined by the {@link AsynchronousChannelGroup}. + */ + public Optional executorService() { + return Optional.ofNullable(executorService); + } + /** * Create a {@link Builder}, used to create a {@link FileTransformerConfiguration}. */ @@ -118,13 +134,17 @@ public boolean equals(Object o) { if (fileWriteOption != that.fileWriteOption) { return false; } - return failureBehavior == that.failureBehavior; + if (failureBehavior != that.failureBehavior) { + return false; + } + return Objects.equals(executorService, that.executorService); } @Override public int hashCode() { int result = fileWriteOption != null ? fileWriteOption.hashCode() : 0; result = 31 * result + (failureBehavior != null ? failureBehavior.hashCode() : 0); + result = 31 * result + (executorService != null ? executorService.hashCode() : 0); return result; } @@ -181,11 +201,20 @@ public interface Builder extends CopyableBuilder options = new HashSet<>(); switch (configuration.fileWriteOption()) { case CREATE_OR_APPEND_TO_EXISTING: - return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE); + Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE); + break; case CREATE_OR_REPLACE_EXISTING: - return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE, - StandardOpenOption.TRUNCATE_EXISTING); + Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE, + StandardOpenOption.TRUNCATE_EXISTING); + break; case CREATE_NEW: - return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); + Collections.addAll(options, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW); + break; default: throw new IllegalArgumentException("Unsupported file write option: " + configuration.fileWriteOption()); } + + ExecutorService executorService = configuration.executorService().orElse(null); + return AsynchronousFileChannel.open(path, options, executorService); } @Override diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java index c5e65f09dc0c..bcab30e49675 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerTest.java @@ -31,9 +31,16 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; +import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -213,6 +220,32 @@ private static List configurations() { .failureBehavior(LEAVE).build()); } + @Test + void explicitExecutor_shouldUseExecutor() throws Exception { + Path testPath = testFs.getPath("test_file.txt"); + assertThat(testPath).doesNotExist(); + String newContent = RandomStringUtils.randomAlphanumeric(2000); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + SpyingExecutorService spyingExecutorService = new SpyingExecutorService(executor); + FileTransformerConfiguration configuration = FileTransformerConfiguration + .builder() + .fileWriteOption(FileWriteOption.CREATE_NEW) + .failureBehavior(DELETE) + .executorService(spyingExecutorService) + .build(); + FileAsyncResponseTransformer transformer = new FileAsyncResponseTransformer<>(testPath, configuration); + + stubSuccessfulStreaming(newContent, transformer); + assertThat(testPath).hasContent(newContent); + assertThat(spyingExecutorService.hasReceivedTasks()).isTrue(); + } finally { + executor.shutdown(); + assertThat(executor.awaitTermination(1, TimeUnit.MINUTES)).isTrue(); + } + } + private static void stubSuccessfulStreaming(String newContent, FileAsyncResponseTransformer transformer) throws Exception { CompletableFuture future = transformer.prepare(); transformer.onResponse("foobar"); @@ -240,4 +273,90 @@ private static void stubException(String newContent, FileAsyncResponseTransforme private static SdkPublisher testPublisher(String content) { return SdkPublisher.adapt(Flowable.just(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8)))); } + + private static final class SpyingExecutorService implements ExecutorService { + private final ExecutorService executorService; + private boolean receivedTasks = false; + + private SpyingExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public boolean hasReceivedTasks() { + return receivedTasks; + } + + @Override + public void shutdown() { + executorService.shutdown(); + } + + @Override + public List shutdownNow() { + return executorService.shutdownNow(); + } + + @Override + public boolean isShutdown() { + return executorService.isShutdown(); + } + + @Override + public boolean isTerminated() { + return executorService.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return executorService.awaitTermination(timeout, unit); + } + + @Override + public Future submit(Callable task) { + receivedTasks = true; + return executorService.submit(task); + } + + @Override + public Future submit(Runnable task, T result) { + receivedTasks = true; + return executorService.submit(task, result); + } + + @Override + public Future submit(Runnable task) { + receivedTasks = true; + return executorService.submit(task); + } + + @Override + public List> invokeAll(Collection> tasks) throws InterruptedException { + receivedTasks = true; + return executorService.invokeAll(tasks); + } + + @Override + public List> invokeAll(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException { + receivedTasks = true; + return executorService.invokeAll(tasks, timeout, unit); + } + + @Override + public T invokeAny(Collection> tasks) throws InterruptedException, ExecutionException { + receivedTasks = true; + return executorService.invokeAny(tasks); + } + + @Override + public T invokeAny(Collection> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + receivedTasks = true; + return executorService.invokeAny(tasks, timeout, unit); + } + + @Override + public void execute(Runnable command) { + receivedTasks = true; + executorService.execute(command); + } + } } \ No newline at end of file