Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class FileSystemExchangeConfig
private int exchangeSinkBuffersPerPartition = 2;
private DataSize exchangeSinkMaxFileSize = DataSize.of(1, GIGABYTE);
private int exchangeSourceConcurrentReaders = 4;
private int exchangeSourceMaxFilesPerReader = 25;
private int maxOutputPartitionCount = 50;
private int exchangeFileListingParallelism = 50;
private DataSize exchangeSourceHandleTargetDataSize = DataSize.of(256, MEGABYTE);
Expand Down Expand Up @@ -151,6 +152,19 @@ public FileSystemExchangeConfig setExchangeSourceConcurrentReaders(int exchangeS
return this;
}

@Min(1)
public int getExchangeSourceMaxFilesPerReader()
{
return exchangeSourceMaxFilesPerReader;
}

@Config("exchange.source-max-files-per-reader")
public FileSystemExchangeConfig setExchangeSourceMaxFilesPerReader(int exchangeSourceMaxFilesPerReader)
{
this.exchangeSourceMaxFilesPerReader = exchangeSourceMaxFilesPerReader;
return this;
}

@Min(1)
public int getMaxOutputPartitionCount()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class FileSystemExchangeManager
private final int exchangeSinkBuffersPerPartition;
private final long exchangeSinkMaxFileSizeInBytes;
private final int exchangeSourceConcurrentReaders;
private final int exchangeSourceMaxFilesPerReader;
private final int maxOutputPartitionCount;
private final int exchangeFileListingParallelism;
private final long exchangeSourceHandleTargetDataSizeInBytes;
Expand All @@ -77,6 +78,7 @@ public FileSystemExchangeManager(
this.exchangeSinkBuffersPerPartition = fileSystemExchangeConfig.getExchangeSinkBuffersPerPartition();
this.exchangeSinkMaxFileSizeInBytes = fileSystemExchangeConfig.getExchangeSinkMaxFileSize().toBytes();
this.exchangeSourceConcurrentReaders = fileSystemExchangeConfig.getExchangeSourceConcurrentReaders();
this.exchangeSourceMaxFilesPerReader = fileSystemExchangeConfig.getExchangeSourceMaxFilesPerReader();
this.maxOutputPartitionCount = fileSystemExchangeConfig.getMaxOutputPartitionCount();
this.exchangeFileListingParallelism = fileSystemExchangeConfig.getExchangeFileListingParallelism();
this.exchangeSourceHandleTargetDataSizeInBytes = fileSystemExchangeConfig.getExchangeSourceHandleTargetDataSize().toBytes();
Expand Down Expand Up @@ -140,6 +142,7 @@ public ExchangeSource createSource()
exchangeStorage,
stats,
maxPageStorageSizeInBytes,
exchangeSourceConcurrentReaders);
exchangeSourceConcurrentReaders,
exchangeSourceMaxFilesPerReader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class FileSystemExchangeSource
private final FileSystemExchangeStats stats;
private final int maxPageStorageSize;
private final int exchangeSourceConcurrentReaders;
private final int maxFilesPerReader;

private final Queue<ExchangeSourceFile> files = new ConcurrentLinkedQueue<>();
@GuardedBy("this")
Expand All @@ -74,12 +75,14 @@ public FileSystemExchangeSource(
FileSystemExchangeStorage exchangeStorage,
FileSystemExchangeStats stats,
int maxPageStorageSize,
int exchangeSourceConcurrentReaders)
int exchangeSourceConcurrentReaders,
int maxFilesPerReader)
{
this.exchangeStorage = requireNonNull(exchangeStorage, "exchangeStorage is null");
this.stats = requireNonNull(stats, "stats is null");
this.maxPageStorageSize = maxPageStorageSize;
this.exchangeSourceConcurrentReaders = exchangeSourceConcurrentReaders;
this.maxFilesPerReader = maxFilesPerReader;
}

@Override
Expand Down Expand Up @@ -272,6 +275,7 @@ private void closeAndCreateReadersIfNecessary()
try {
while (activeReaders.size() < exchangeSourceConcurrentReaders && !files.isEmpty()) {
ImmutableList.Builder<ExchangeSourceFile> readerFiles = ImmutableList.builder();
int readerFileCount = 0;
long readerFileSize = 0;
while (!files.isEmpty()) {
ExchangeSourceFile file = files.peek();
Expand All @@ -280,9 +284,10 @@ private void closeAndCreateReadersIfNecessary()
file.getExchangeId(),
file.getSourceTaskPartitionId(),
file.getSourceTaskAttemptId());
if (readerFileSize == 0 || readerFileSize + file.getFileSize() <= maxPageStorageSize + exchangeStorage.getWriteBufferSize()) {
if (readerFileCount == 0 || ((readerFileSize + file.getFileSize() <= maxPageStorageSize + exchangeStorage.getWriteBufferSize()) && readerFileCount < maxFilesPerReader)) {
readerFiles.add(file);
readerFileSize += file.getFileSize();
readerFileCount++;
files.poll();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void testDefaults()
.setExchangeSinkBuffersPerPartition(2)
.setExchangeSinkMaxFileSize(DataSize.of(1, GIGABYTE))
.setExchangeSourceConcurrentReaders(4)
.setExchangeSourceMaxFilesPerReader(25)
.setMaxOutputPartitionCount(50)
.setExchangeFileListingParallelism(50)
.setExchangeSourceHandleTargetDataSize(DataSize.of(256, MEGABYTE)));
Expand All @@ -54,6 +55,7 @@ public void testExplicitPropertyMappings()
.put("exchange.sink-buffers-per-partition", "3")
.put("exchange.sink-max-file-size", "2GB")
.put("exchange.source-concurrent-readers", "10")
.put("exchange.source-max-files-per-reader", "111")
.put("exchange.max-output-partition-count", "53")
.put("exchange.file-listing-parallelism", "20")
.put("exchange.source-handle-target-data-size", "1GB")
Expand All @@ -67,6 +69,7 @@ public void testExplicitPropertyMappings()
.setExchangeSinkBuffersPerPartition(3)
.setExchangeSinkMaxFileSize(DataSize.of(2, GIGABYTE))
.setExchangeSourceConcurrentReaders(10)
.setExchangeSourceMaxFilesPerReader(111)
.setMaxOutputPartitionCount(53)
.setExchangeFileListingParallelism(20)
.setExchangeSourceHandleTargetDataSize(DataSize.of(1, GIGABYTE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public void testIsBlockedNonCancellable()
new LocalFileSystemExchangeStorage(),
new FileSystemExchangeStats(),
1024,
2)) {
2,
1)) {
CompletableFuture<Void> first = source.isBlocked();
CompletableFuture<Void> second = source.isBlocked();
assertThat(first)
Expand Down