Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void beforeClass()
exchangeManagerRegistry = new ExchangeManagerRegistry(new ExchangeHandleResolver());
exchangeManagerRegistry.addExchangeManagerFactory(new FileSystemExchangeManagerFactory());
exchangeManagerRegistry.loadExchangeManager("filesystem", ImmutableMap.of(
"exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager"));
"exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager"));
}

@AfterClass(alwaysRun = true)
Expand Down
6 changes: 3 additions & 3 deletions docs/src/main/sphinx/admin/fault-tolerant-execution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ for your storage solution.
* - Property name
- Description
- Default value
* - ``exchange.base-directory``
* - ``exchange.base-directories``
- The base directory URI location that the exchange manager uses to store
spooling data. Only supports S3 and local filesystems.
-
Expand Down Expand Up @@ -346,7 +346,7 @@ does not have to be in AWS, but can be any S3-compatible storage system.
.. code-block:: properties

exchange-manager.name=filesystem
exchange.base-directory=s3n://trino-exchange-manager
exchange.base-directories=s3n://trino-exchange-manager
exchange.encryption-enabled=true
exchange.s3.region=us-west-1
exchange.s3.aws-access-key=example-access-key
Expand All @@ -366,4 +366,4 @@ destination.
.. code-block:: properties

exchange-manager.name=filesystem
exchange.base-directory=/tmp/trino-exchange-manager
exchange.base-directories=/tmp/trino-exchange-manager
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,17 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.security.Key;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand All @@ -59,13 +64,15 @@ public class FileSystemExchange
{
private static final Pattern PARTITION_FILE_NAME_PATTERN = Pattern.compile("(\\d+)_(\\d+)\\.data");

private final URI baseDirectory;
private final List<URI> baseDirectories;
private final FileSystemExchangeStorage exchangeStorage;
private final ExchangeContext exchangeContext;
private final int outputPartitionCount;
private final Optional<SecretKey> secretKey;
private final ExecutorService executor;

private final Map<Integer, String> randomizedPrefixes = new ConcurrentHashMap<>();

@GuardedBy("this")
private final Set<Integer> allSinks = new HashSet<>();
@GuardedBy("this")
Expand All @@ -78,31 +85,24 @@ public class FileSystemExchange
private final CompletableFuture<List<ExchangeSourceHandle>> exchangeSourceHandlesFuture = new CompletableFuture<>();

public FileSystemExchange(
URI baseDirectory,
List<URI> baseDirectories,
FileSystemExchangeStorage exchangeStorage,
ExchangeContext exchangeContext,
int outputPartitionCount,
Optional<SecretKey> secretKey,
ExecutorService executor)
{
this.baseDirectory = requireNonNull(baseDirectory, "baseDirectory is null");
List<URI> directories = new ArrayList<>(requireNonNull(baseDirectories, "baseDirectories is null"));
Collections.shuffle(directories);

this.baseDirectories = ImmutableList.copyOf(directories);
this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null");
this.exchangeContext = requireNonNull(exchangeContext, "exchangeContext is null");
this.outputPartitionCount = outputPartitionCount;
this.secretKey = requireNonNull(secretKey, "secretKey is null");
this.executor = requireNonNull(executor, "executor is null");
}

public void initialize()
{
try {
exchangeStorage.createDirectories(getExchangeDirectory());
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public synchronized ExchangeSinkHandle addSink(int taskPartition)
{
Expand All @@ -124,9 +124,8 @@ public void noMoreSinks()
public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId)
{
FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = (FileSystemExchangeSinkHandle) sinkHandle;
URI outputDirectory = getExchangeDirectory()
.resolve(fileSystemExchangeSinkHandle.getPartitionId() + PATH_SEPARATOR)
.resolve(taskAttemptId + PATH_SEPARATOR);
int taskPartitionId = fileSystemExchangeSinkHandle.getPartitionId();
URI outputDirectory = getTaskOutputDirectory(taskPartitionId).resolve(taskAttemptId + PATH_SEPARATOR);
try {
exchangeStorage.createDirectories(outputDirectory);
}
Expand Down Expand Up @@ -193,9 +192,9 @@ private List<ExchangeSourceHandle> createExchangeSourceHandles()
return result.build();
}

private URI getCommittedAttemptPath(Integer taskPartition)
private URI getCommittedAttemptPath(int taskPartitionId)
{
URI sinkOutputBasePath = getExchangeDirectory().resolve(taskPartition + PATH_SEPARATOR);
URI sinkOutputBasePath = getTaskOutputDirectory(taskPartitionId);
try {
List<URI> attemptPaths = exchangeStorage.listDirectories(sinkOutputBasePath);
checkState(!attemptPaths.isEmpty(), "No attempts found under sink output path %s", sinkOutputBasePath);
Expand Down Expand Up @@ -242,9 +241,14 @@ private Multimap<Integer, FileStatus> getCommittedPartitions(URI committedAttemp
}
}

private URI getExchangeDirectory()
private URI getTaskOutputDirectory(int taskPartitionId)
{
return baseDirectory.resolve(exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + PATH_SEPARATOR);
URI baseDirectory = baseDirectories.get(taskPartitionId % baseDirectories.size());
String randomizedPrefix = randomizedPrefixes.computeIfAbsent(taskPartitionId, ignored -> UUID.randomUUID().toString().split("-")[0]);

// Add a randomized prefix to evenly distribute data into different S3 shards
// Data output file path format: {randomizedPrefix}.{queryId}.{stageId}.{sinkPartitionId}/{attemptId}/{sourcePartitionId}_{splitId}.data
return baseDirectory.resolve(randomizedPrefix + "." + exchangeContext.getQueryId() + "." + exchangeContext.getExchangeId() + "." + taskPartitionId + PATH_SEPARATOR);
}

@Override
Expand Down Expand Up @@ -294,6 +298,8 @@ public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle
@Override
public void close()
{
exchangeStorage.deleteRecursively(getExchangeDirectory());
for (Integer taskPartitionId : allSinks) {
exchangeStorage.deleteRecursively(getTaskOutputDirectory(taskPartitionId));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,26 @@
*/
package io.trino.plugin.exchange;

import com.google.common.collect.ImmutableList;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.DataSize;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import java.net.URI;
import java.util.List;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.plugin.exchange.FileSystemExchangeManager.PATH_SEPARATOR;

public class FileSystemExchangeConfig
{
private String baseDirectory;
private List<URI> baseDirectories = ImmutableList.of();
private boolean exchangeEncryptionEnabled = true;
// For S3, we make read requests aligned with part boundaries. Incomplete slice at the end of the buffer is
// possible and will be copied to the beginning of the new buffer, and we need to make room for that.
Expand All @@ -37,15 +44,28 @@ public class FileSystemExchangeConfig
private int exchangeSourceConcurrentReaders = 4;

@NotNull
public String getBaseDirectory()
@NotEmpty(message = "At least one base directory needs to be configured")
public List<URI> getBaseDirectories()
{
return baseDirectory;
return baseDirectories;
}

@Config("exchange.base-directory")
public FileSystemExchangeConfig setBaseDirectory(String baseDirectory)
@Config("exchange.base-directories")
@LegacyConfig("exchange.base-directory")
@ConfigDescription("List of base directories separated by commas")
public FileSystemExchangeConfig setBaseDirectories(String baseDirectories)
{
this.baseDirectory = baseDirectory;
if (baseDirectories != null) {
ImmutableList.Builder<URI> builder = ImmutableList.builder();
for (String baseDirectory : baseDirectories.split(",")) {
if (!baseDirectory.endsWith(PATH_SEPARATOR)) {
// This is needed as URI's resolve method expects directories to end with '/'
baseDirectory += PATH_SEPARATOR;
}
builder.add(URI.create(baseDirectory));
}
this.baseDirectories = builder.build();
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.exchange;

import com.google.common.collect.ImmutableList;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
Expand Down Expand Up @@ -49,7 +50,7 @@ public class FileSystemExchangeManager
private static final int KEY_BITS = 256;

private final FileSystemExchangeStorage exchangeStorage;
private final URI baseDirectory;
private final List<URI> baseDirectories;
private final boolean exchangeEncryptionEnabled;
private final int maxPageStorageSizeInBytes;
private final int exchangeSinkBufferPoolMinSize;
Expand All @@ -64,12 +65,7 @@ public FileSystemExchangeManager(FileSystemExchangeStorage exchangeStorage, File
requireNonNull(fileSystemExchangeConfig, "fileSystemExchangeConfig is null");

this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null");
String baseDirectory = requireNonNull(fileSystemExchangeConfig.getBaseDirectory(), "baseDirectory is null");
if (!baseDirectory.endsWith(PATH_SEPARATOR)) {
// This is needed as URI's resolve method expects directories to end with '/'
baseDirectory += PATH_SEPARATOR;
}
this.baseDirectory = URI.create(baseDirectory);
this.baseDirectories = ImmutableList.copyOf(requireNonNull(fileSystemExchangeConfig.getBaseDirectories(), "baseDirectories is null"));
this.exchangeEncryptionEnabled = fileSystemExchangeConfig.isExchangeEncryptionEnabled();
this.maxPageStorageSizeInBytes = toIntExact(fileSystemExchangeConfig.getMaxPageStorageSize().toBytes());
this.exchangeSinkBufferPoolMinSize = fileSystemExchangeConfig.getExchangeSinkBufferPoolMinSize();
Expand All @@ -93,9 +89,7 @@ public Exchange createExchange(ExchangeContext context, int outputPartitionCount
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to generate new secret key: " + e.getMessage(), e);
}
}
FileSystemExchange exchange = new FileSystemExchange(baseDirectory, exchangeStorage, context, outputPartitionCount, secretKey, executor);
exchange.initialize();
return exchange;
return new FileSystemExchange(baseDirectories, exchangeStorage, context, outputPartitionCount, secretKey, executor);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@
import io.trino.spi.TrinoException;

import java.net.URI;
import java.util.List;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.spi.StandardErrorCode.CONFIGURATION_INVALID;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class FileSystemExchangeModule
extends AbstractConfigurationAwareModule
Expand All @@ -37,8 +38,11 @@ protected void setup(Binder binder)
{
binder.bind(FileSystemExchangeManager.class).in(Scopes.SINGLETON);

FileSystemExchangeConfig fileSystemExchangeConfig = buildConfigObject(FileSystemExchangeConfig.class);
String scheme = URI.create(requireNonNull(fileSystemExchangeConfig.getBaseDirectory(), "baseDirectory is null")).getScheme();
List<URI> baseDirectories = buildConfigObject(FileSystemExchangeConfig.class).getBaseDirectories();
if (baseDirectories.stream().map(URI::getScheme).distinct().count() != 1) {
throw new TrinoException(CONFIGURATION_INVALID, "Multiple schemes in exchange base directories");
}
String scheme = baseDirectories.get(0).getScheme();
if (scheme == null || scheme.equals("file")) {
binder.bind(FileSystemExchangeStorage.class).to(LocalFileSystemExchangeStorage.class).in(Scopes.SINGLETON);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class TestFileSystemExchangeConfig
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(FileSystemExchangeConfig.class)
.setBaseDirectory(null)
.setBaseDirectories(null)
.setExchangeEncryptionEnabled(true)
.setMaxPageStorageSize(DataSize.of(16, MEGABYTE))
.setExchangeSinkBufferPoolMinSize(10)
Expand All @@ -44,7 +44,7 @@ public void testDefaults()
public void testExplicitPropertyMappings()
{
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("exchange.base-directory", "s3n://exchange-spooling-test/")
.put("exchange.base-directories", "s3n://exchange-spooling-test/")
.put("exchange.encryption-enabled", "false")
.put("exchange.max-page-storage-size", "32MB")
.put("exchange.sink-buffer-pool-min-size", "20")
Expand All @@ -54,7 +54,7 @@ public void testExplicitPropertyMappings()
.buildOrThrow();

FileSystemExchangeConfig expected = new FileSystemExchangeConfig()
.setBaseDirectory("s3n://exchange-spooling-test/")
.setBaseDirectories("s3n://exchange-spooling-test/")
.setExchangeEncryptionEnabled(false)
.setMaxPageStorageSize(DataSize.of(32, MEGABYTE))
.setExchangeSinkBufferPoolMinSize(20)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void close()
public static Map<String, String> getExchangeManagerProperties(MinioStorage minioStorage)
{
return ImmutableMap.<String, String>builder()
.put("exchange.base-directory", "s3n://" + minioStorage.getBucketName())
.put("exchange.base-directories", "s3n://" + minioStorage.getBucketName())
// TODO: enable exchange encryption after https is supported for Trino MinIO
.put("exchange.encryption-enabled", "false")
// to trigger file split in some tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ public class TestLocalFileSystemExchangeManager
@Override
protected ExchangeManager createExchangeManager()
{
String baseDirectory1 = System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager-1";
String baseDirectory2 = System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager-2";
return new FileSystemExchangeManagerFactory().create(ImmutableMap.of(
"exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager",
"exchange.base-directories", baseDirectory1 + "," + baseDirectory2,
// to trigger file split in some tests
"exchange.sink-max-file-size", "16MB"));
}
Expand Down
2 changes: 1 addition & 1 deletion testing/trino-server-dev/etc/exchange-manager.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
exchange-manager.name=filesystem
exchange.base-directory=/tmp/trino-local-file-system-exchange-manager
exchange.base-directories=/tmp/trino-local-file-system-exchange-manager
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected QueryRunner createQueryRunner()
throws Exception
{
ImmutableMap<String, String> exchangeManagerProperties = ImmutableMap.<String, String>builder()
.put("exchange.base-directory", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")
.put("exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")
.buildOrThrow();

DistributedQueryRunner queryRunner = MemoryQueryRunner.builder()
Expand Down