From a9a297fde4e670beebef6b2602b8c6316633db19 Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Fri, 15 Apr 2022 17:11:46 -0700 Subject: [PATCH 1/3] Support exchange spooling on multiple buckets --- .../plugin/exchange/FileSystemExchange.java | 39 ++++++++++++------- .../exchange/FileSystemExchangeConfig.java | 32 ++++++++++++--- .../exchange/FileSystemExchangeManager.java | 12 ++---- .../exchange/FileSystemExchangeModule.java | 10 +++-- .../TestFileSystemExchangeConfig.java | 6 +-- .../TestLocalFileSystemExchangeManager.java | 4 +- 6 files changed, 68 insertions(+), 35 deletions(-) diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java index 82ade7262677..64f16a4d4748 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java @@ -33,6 +33,8 @@ import java.io.UncheckedIOException; import java.net.URI; import java.security.Key; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -59,7 +61,7 @@ public class FileSystemExchange { private static final Pattern PARTITION_FILE_NAME_PATTERN = Pattern.compile("(\\d+)_(\\d+)\\.data"); - private final URI baseDirectory; + private final List baseDirectories; private final FileSystemExchangeStorage exchangeStorage; private final ExchangeContext exchangeContext; private final int outputPartitionCount; @@ -78,14 +80,17 @@ public class FileSystemExchange private final CompletableFuture> exchangeSourceHandlesFuture = new CompletableFuture<>(); public FileSystemExchange( - URI baseDirectory, + List baseDirectories, FileSystemExchangeStorage exchangeStorage, ExchangeContext exchangeContext, int outputPartitionCount, Optional secretKey, ExecutorService executor) { - this.baseDirectory = requireNonNull(baseDirectory, "baseDirectory is null"); + List directories = new ArrayList<>(requireNonNull(baseDirectories, "baseDirectories is null")); + Collections.shuffle(directories); + + this.baseDirectories = ImmutableList.copyOf(directories); this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); this.exchangeContext = requireNonNull(exchangeContext, "exchangeContext is null"); this.outputPartitionCount = outputPartitionCount; @@ -95,11 +100,13 @@ public FileSystemExchange( public void initialize() { - try { - exchangeStorage.createDirectories(getExchangeDirectory()); - } - catch (IOException e) { - throw new UncheckedIOException(e); + for (int i = 0; i < baseDirectories.size(); ++i) { + try { + exchangeStorage.createDirectories(getExchangeDirectory(i)); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } } } @@ -124,8 +131,9 @@ public void noMoreSinks() public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId) { FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = (FileSystemExchangeSinkHandle) sinkHandle; - URI outputDirectory = getExchangeDirectory() - .resolve(fileSystemExchangeSinkHandle.getPartitionId() + PATH_SEPARATOR) + int taskPartitionId = fileSystemExchangeSinkHandle.getPartitionId(); + URI outputDirectory = getExchangeDirectory(taskPartitionId) + .resolve(taskPartitionId + PATH_SEPARATOR) .resolve(taskAttemptId + PATH_SEPARATOR); try { exchangeStorage.createDirectories(outputDirectory); @@ -193,9 +201,9 @@ private List createExchangeSourceHandles() return result.build(); } - private URI getCommittedAttemptPath(Integer taskPartition) + private URI getCommittedAttemptPath(int taskPartitionId) { - URI sinkOutputBasePath = getExchangeDirectory().resolve(taskPartition + PATH_SEPARATOR); + URI sinkOutputBasePath = getExchangeDirectory(taskPartitionId).resolve(taskPartitionId + PATH_SEPARATOR); try { List attemptPaths = exchangeStorage.listDirectories(sinkOutputBasePath); checkState(!attemptPaths.isEmpty(), "No attempts found under sink output path %s", sinkOutputBasePath); @@ -242,8 +250,9 @@ private Multimap getCommittedPartitions(URI committedAttemp } } - private URI getExchangeDirectory() + private URI getExchangeDirectory(int taskPartitionId) { + URI baseDirectory = baseDirectories.get(taskPartitionId % baseDirectories.size()); return baseDirectory.resolve(exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + PATH_SEPARATOR); } @@ -294,6 +303,8 @@ public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle @Override public void close() { - exchangeStorage.deleteRecursively(getExchangeDirectory()); + for (int i = 0; i < baseDirectories.size(); ++i) { + exchangeStorage.deleteRecursively(getExchangeDirectory(i)); + } } } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java index a8220ede457d..aaabba1f8263 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeConfig.java @@ -13,19 +13,26 @@ */ package io.trino.plugin.exchange; +import com.google.common.collect.ImmutableList; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; +import io.airlift.configuration.LegacyConfig; import io.airlift.units.DataSize; import javax.validation.constraints.Min; +import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; +import java.net.URI; +import java.util.List; + import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static io.trino.plugin.exchange.FileSystemExchangeManager.PATH_SEPARATOR; public class FileSystemExchangeConfig { - private String baseDirectory; + private List baseDirectories = ImmutableList.of(); private boolean exchangeEncryptionEnabled = true; // For S3, we make read requests aligned with part boundaries. Incomplete slice at the end of the buffer is // possible and will be copied to the beginning of the new buffer, and we need to make room for that. @@ -37,15 +44,28 @@ public class FileSystemExchangeConfig private int exchangeSourceConcurrentReaders = 4; @NotNull - public String getBaseDirectory() + @NotEmpty(message = "At least one base directory needs to be configured") + public List getBaseDirectories() { - return baseDirectory; + return baseDirectories; } - @Config("exchange.base-directory") - public FileSystemExchangeConfig setBaseDirectory(String baseDirectory) + @Config("exchange.base-directories") + @LegacyConfig("exchange.base-directory") + @ConfigDescription("List of base directories separated by commas") + public FileSystemExchangeConfig setBaseDirectories(String baseDirectories) { - this.baseDirectory = baseDirectory; + if (baseDirectories != null) { + ImmutableList.Builder builder = ImmutableList.builder(); + for (String baseDirectory : baseDirectories.split(",")) { + if (!baseDirectory.endsWith(PATH_SEPARATOR)) { + // This is needed as URI's resolve method expects directories to end with '/' + baseDirectory += PATH_SEPARATOR; + } + builder.add(URI.create(baseDirectory)); + } + this.baseDirectories = builder.build(); + } return this; } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java index cef92b160ad6..175adfbdcd59 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.exchange; +import com.google.common.collect.ImmutableList; import io.trino.spi.TrinoException; import io.trino.spi.exchange.Exchange; import io.trino.spi.exchange.ExchangeContext; @@ -49,7 +50,7 @@ public class FileSystemExchangeManager private static final int KEY_BITS = 256; private final FileSystemExchangeStorage exchangeStorage; - private final URI baseDirectory; + private final List baseDirectories; private final boolean exchangeEncryptionEnabled; private final int maxPageStorageSizeInBytes; private final int exchangeSinkBufferPoolMinSize; @@ -64,12 +65,7 @@ public FileSystemExchangeManager(FileSystemExchangeStorage exchangeStorage, File requireNonNull(fileSystemExchangeConfig, "fileSystemExchangeConfig is null"); this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null"); - String baseDirectory = requireNonNull(fileSystemExchangeConfig.getBaseDirectory(), "baseDirectory is null"); - if (!baseDirectory.endsWith(PATH_SEPARATOR)) { - // This is needed as URI's resolve method expects directories to end with '/' - baseDirectory += PATH_SEPARATOR; - } - this.baseDirectory = URI.create(baseDirectory); + this.baseDirectories = ImmutableList.copyOf(requireNonNull(fileSystemExchangeConfig.getBaseDirectories(), "baseDirectories is null")); this.exchangeEncryptionEnabled = fileSystemExchangeConfig.isExchangeEncryptionEnabled(); this.maxPageStorageSizeInBytes = toIntExact(fileSystemExchangeConfig.getMaxPageStorageSize().toBytes()); this.exchangeSinkBufferPoolMinSize = fileSystemExchangeConfig.getExchangeSinkBufferPoolMinSize(); @@ -93,7 +89,7 @@ public Exchange createExchange(ExchangeContext context, int outputPartitionCount throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generate new secret key: " + e.getMessage(), e); } } - FileSystemExchange exchange = new FileSystemExchange(baseDirectory, exchangeStorage, context, outputPartitionCount, secretKey, executor); + FileSystemExchange exchange = new FileSystemExchange(baseDirectories, exchangeStorage, context, outputPartitionCount, secretKey, executor); exchange.initialize(); return exchange; } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java index e1db43be9c24..d2b411fb5d79 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeModule.java @@ -23,11 +23,12 @@ import io.trino.spi.TrinoException; import java.net.URI; +import java.util.List; import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static java.lang.String.format; -import static java.util.Objects.requireNonNull; public class FileSystemExchangeModule extends AbstractConfigurationAwareModule @@ -37,8 +38,11 @@ protected void setup(Binder binder) { binder.bind(FileSystemExchangeManager.class).in(Scopes.SINGLETON); - FileSystemExchangeConfig fileSystemExchangeConfig = buildConfigObject(FileSystemExchangeConfig.class); - String scheme = URI.create(requireNonNull(fileSystemExchangeConfig.getBaseDirectory(), "baseDirectory is null")).getScheme(); + List baseDirectories = buildConfigObject(FileSystemExchangeConfig.class).getBaseDirectories(); + if (baseDirectories.stream().map(URI::getScheme).distinct().count() != 1) { + throw new TrinoException(CONFIGURATION_INVALID, "Multiple schemes in exchange base directories"); + } + String scheme = baseDirectories.get(0).getScheme(); if (scheme == null || scheme.equals("file")) { binder.bind(FileSystemExchangeStorage.class).to(LocalFileSystemExchangeStorage.class).in(Scopes.SINGLETON); } diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java index 300aaafb5391..72a68bb01ae8 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/TestFileSystemExchangeConfig.java @@ -31,7 +31,7 @@ public class TestFileSystemExchangeConfig public void testDefaults() { assertRecordedDefaults(recordDefaults(FileSystemExchangeConfig.class) - .setBaseDirectory(null) + .setBaseDirectories(null) .setExchangeEncryptionEnabled(true) .setMaxPageStorageSize(DataSize.of(16, MEGABYTE)) .setExchangeSinkBufferPoolMinSize(10) @@ -44,7 +44,7 @@ public void testDefaults() public void testExplicitPropertyMappings() { Map properties = ImmutableMap.builder() - .put("exchange.base-directory", "s3n://exchange-spooling-test/") + .put("exchange.base-directories", "s3n://exchange-spooling-test/") .put("exchange.encryption-enabled", "false") .put("exchange.max-page-storage-size", "32MB") .put("exchange.sink-buffer-pool-min-size", "20") @@ -54,7 +54,7 @@ public void testExplicitPropertyMappings() .buildOrThrow(); FileSystemExchangeConfig expected = new FileSystemExchangeConfig() - .setBaseDirectory("s3n://exchange-spooling-test/") + .setBaseDirectories("s3n://exchange-spooling-test/") .setExchangeEncryptionEnabled(false) .setMaxPageStorageSize(DataSize.of(32, MEGABYTE)) .setExchangeSinkBufferPoolMinSize(20) diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java index dc962f9bcc1e..4ff8ef608299 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java @@ -24,8 +24,10 @@ public class TestLocalFileSystemExchangeManager @Override protected ExchangeManager createExchangeManager() { + String baseDirectory1 = System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager-1"; + String baseDirectory2 = System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager-2"; return new FileSystemExchangeManagerFactory().create(ImmutableMap.of( - "exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager", + "exchange.base-directory", baseDirectory1 + "," + baseDirectory2, // to trigger file split in some tests "exchange.sink-max-file-size", "16MB")); } From 8e7b5710d0e6a27630df1f0f98451ba44821047a Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Mon, 18 Apr 2022 10:56:50 -0700 Subject: [PATCH 2/3] Rename exchange.base-directory to exchange.base-directories --- .../operator/TestDeduplicatingDirectExchangeBuffer.java | 2 +- docs/src/main/sphinx/admin/fault-tolerant-execution.rst | 6 +++--- .../io/trino/plugin/exchange/containers/MinioStorage.java | 2 +- .../exchange/local/TestLocalFileSystemExchangeManager.java | 2 +- testing/trino-server-dev/etc/exchange-manager.properties | 2 +- .../TestDistributedFaultTolerantEngineOnlyQueries.java | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java index e0ace1cac40d..1d5e3c218a33 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDeduplicatingDirectExchangeBuffer.java @@ -67,7 +67,7 @@ public void beforeClass() exchangeManagerRegistry = new ExchangeManagerRegistry(new ExchangeHandleResolver()); exchangeManagerRegistry.addExchangeManagerFactory(new FileSystemExchangeManagerFactory()); exchangeManagerRegistry.loadExchangeManager("filesystem", ImmutableMap.of( - "exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); + "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); } @AfterClass(alwaysRun = true) diff --git a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst index 6f3b25100c3f..eb2d6d997266 100644 --- a/docs/src/main/sphinx/admin/fault-tolerant-execution.rst +++ b/docs/src/main/sphinx/admin/fault-tolerant-execution.rst @@ -293,7 +293,7 @@ for your storage solution. * - Property name - Description - Default value - * - ``exchange.base-directory`` + * - ``exchange.base-directories`` - The base directory URI location that the exchange manager uses to store spooling data. Only supports S3 and local filesystems. - @@ -346,7 +346,7 @@ does not have to be in AWS, but can be any S3-compatible storage system. .. code-block:: properties exchange-manager.name=filesystem - exchange.base-directory=s3n://trino-exchange-manager + exchange.base-directories=s3n://trino-exchange-manager exchange.encryption-enabled=true exchange.s3.region=us-west-1 exchange.s3.aws-access-key=example-access-key @@ -366,4 +366,4 @@ destination. .. code-block:: properties exchange-manager.name=filesystem - exchange.base-directory=/tmp/trino-exchange-manager + exchange.base-directories=/tmp/trino-exchange-manager diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java index 0b9c668ea3ad..ea1135ae797e 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/containers/MinioStorage.java @@ -86,7 +86,7 @@ public void close() public static Map getExchangeManagerProperties(MinioStorage minioStorage) { return ImmutableMap.builder() - .put("exchange.base-directory", "s3n://" + minioStorage.getBucketName()) + .put("exchange.base-directories", "s3n://" + minioStorage.getBucketName()) // TODO: enable exchange encryption after https is supported for Trino MinIO .put("exchange.encryption-enabled", "false") // to trigger file split in some tests diff --git a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java index 4ff8ef608299..eca4a8ed47b0 100644 --- a/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/test/java/io/trino/plugin/exchange/local/TestLocalFileSystemExchangeManager.java @@ -27,7 +27,7 @@ protected ExchangeManager createExchangeManager() String baseDirectory1 = System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager-1"; String baseDirectory2 = System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager-2"; return new FileSystemExchangeManagerFactory().create(ImmutableMap.of( - "exchange.base-directory", baseDirectory1 + "," + baseDirectory2, + "exchange.base-directories", baseDirectory1 + "," + baseDirectory2, // to trigger file split in some tests "exchange.sink-max-file-size", "16MB")); } diff --git a/testing/trino-server-dev/etc/exchange-manager.properties b/testing/trino-server-dev/etc/exchange-manager.properties index ba19c9754c6c..2332a453e90f 100644 --- a/testing/trino-server-dev/etc/exchange-manager.properties +++ b/testing/trino-server-dev/etc/exchange-manager.properties @@ -1,2 +1,2 @@ exchange-manager.name=filesystem -exchange.base-directory=/tmp/trino-local-file-system-exchange-manager +exchange.base-directories=/tmp/trino-local-file-system-exchange-manager diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java index 05377b7130ab..c4dc11cbe45a 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestDistributedFaultTolerantEngineOnlyQueries.java @@ -35,7 +35,7 @@ protected QueryRunner createQueryRunner() throws Exception { ImmutableMap exchangeManagerProperties = ImmutableMap.builder() - .put("exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager") + .put("exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager") .buildOrThrow(); DistributedQueryRunner queryRunner = MemoryQueryRunner.builder() From 8c6a4c1783d67f49684916a138c7e9dd6475e9d0 Mon Sep 17 00:00:00 2001 From: Zebing Lin Date: Mon, 18 Apr 2022 13:55:08 -0700 Subject: [PATCH 3/3] Add a randomized prefix to spooling output path Add a randomized prefix to evenly distribute data into different S3 shards. Data output file path format: {randomizedPrefix}.{queryId}.{stageId}.{sinkPartitionId}/{attemptId}/{sourcePartitionId}_{splitId}.data --- .../plugin/exchange/FileSystemExchange.java | 35 ++++++++----------- .../exchange/FileSystemExchangeManager.java | 4 +-- 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java index 64f16a4d4748..adf7d371fb43 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchange.java @@ -38,9 +38,12 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -68,6 +71,8 @@ public class FileSystemExchange private final Optional secretKey; private final ExecutorService executor; + private final Map randomizedPrefixes = new ConcurrentHashMap<>(); + @GuardedBy("this") private final Set allSinks = new HashSet<>(); @GuardedBy("this") @@ -98,18 +103,6 @@ public FileSystemExchange( this.executor = requireNonNull(executor, "executor is null"); } - public void initialize() - { - for (int i = 0; i < baseDirectories.size(); ++i) { - try { - exchangeStorage.createDirectories(getExchangeDirectory(i)); - } - catch (IOException e) { - throw new UncheckedIOException(e); - } - } - } - @Override public synchronized ExchangeSinkHandle addSink(int taskPartition) { @@ -132,9 +125,7 @@ public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, { FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = (FileSystemExchangeSinkHandle) sinkHandle; int taskPartitionId = fileSystemExchangeSinkHandle.getPartitionId(); - URI outputDirectory = getExchangeDirectory(taskPartitionId) - .resolve(taskPartitionId + PATH_SEPARATOR) - .resolve(taskAttemptId + PATH_SEPARATOR); + URI outputDirectory = getTaskOutputDirectory(taskPartitionId).resolve(taskAttemptId + PATH_SEPARATOR); try { exchangeStorage.createDirectories(outputDirectory); } @@ -203,7 +194,7 @@ private List createExchangeSourceHandles() private URI getCommittedAttemptPath(int taskPartitionId) { - URI sinkOutputBasePath = getExchangeDirectory(taskPartitionId).resolve(taskPartitionId + PATH_SEPARATOR); + URI sinkOutputBasePath = getTaskOutputDirectory(taskPartitionId); try { List attemptPaths = exchangeStorage.listDirectories(sinkOutputBasePath); checkState(!attemptPaths.isEmpty(), "No attempts found under sink output path %s", sinkOutputBasePath); @@ -250,10 +241,14 @@ private Multimap getCommittedPartitions(URI committedAttemp } } - private URI getExchangeDirectory(int taskPartitionId) + private URI getTaskOutputDirectory(int taskPartitionId) { URI baseDirectory = baseDirectories.get(taskPartitionId % baseDirectories.size()); - return baseDirectory.resolve(exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + PATH_SEPARATOR); + String randomizedPrefix = randomizedPrefixes.computeIfAbsent(taskPartitionId, ignored -> UUID.randomUUID().toString().split("-")[0]); + + // Add a randomized prefix to evenly distribute data into different S3 shards + // Data output file path format: {randomizedPrefix}.{queryId}.{stageId}.{sinkPartitionId}/{attemptId}/{sourcePartitionId}_{splitId}.data + return baseDirectory.resolve(randomizedPrefix + "." + exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + "." + taskPartitionId + PATH_SEPARATOR); } @Override @@ -303,8 +298,8 @@ public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle @Override public void close() { - for (int i = 0; i < baseDirectories.size(); ++i) { - exchangeStorage.deleteRecursively(getExchangeDirectory(i)); + for (Integer taskPartitionId : allSinks) { + exchangeStorage.deleteRecursively(getTaskOutputDirectory(taskPartitionId)); } } } diff --git a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java index 175adfbdcd59..28e603ac7f13 100644 --- a/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java +++ b/plugin/trino-exchange/src/main/java/io/trino/plugin/exchange/FileSystemExchangeManager.java @@ -89,9 +89,7 @@ public Exchange createExchange(ExchangeContext context, int outputPartitionCount throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generate new secret key: " + e.getMessage(), e); } } - FileSystemExchange exchange = new FileSystemExchange(baseDirectories, exchangeStorage, context, outputPartitionCount, secretKey, executor); - exchange.initialize(); - return exchange; + return new FileSystemExchange(baseDirectories, exchangeStorage, context, outputPartitionCount, secretKey, executor); } @Override