Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ private ListenableFuture<Void> spillIndex()
spiller = Optional.of(singleStreamSpillerFactory.create(
index.getTypes(),
operatorContext.getSpillContext().newLocalSpillContext(),
operatorContext.newLocalUserMemoryContext(HashBuilderOperator.class.getSimpleName())));
operatorContext.newLocalUserMemoryContext(HashBuilderOperator.class.getSimpleName()),
true));
long spillStartNanos = System.nanoTime();
ListenableFuture<DataSize> spillFuture = getSpiller().spill(index.getPages());
addSuccessCallback(spillFuture, dataSize -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,13 @@
*/
package io.trino.spiller;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.OutputStreamSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.units.DataSize;
import io.trino.annotation.NotThreadSafe;
import io.trino.execution.buffer.PageDeserializer;
Expand All @@ -42,28 +39,29 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterators.transform;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_PREFIX;
import static io.trino.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_SUFFIX;
import static java.nio.file.StandardOpenOption.APPEND;
import static java.util.Objects.requireNonNull;

@NotThreadSafe
public class FileSingleStreamSpiller
implements SingleStreamSpiller
{
@VisibleForTesting
static final int BUFFER_SIZE = 4 * 1024;
private final List<SpillFile> spillFiles;
private volatile int currentFileIndex;

private final FileHolder targetFile;
private final Closer closer = Closer.create();
private final PagesSerdeFactory serdeFactory;
private volatile Optional<SecretKey> encryptionKey;
Expand All @@ -84,12 +82,15 @@ public FileSingleStreamSpiller(
PagesSerdeFactory serdeFactory,
Optional<SecretKey> encryptionKey,
ListeningExecutorService executor,
Path spillPath,
List<Path> spillPaths,
SpillerStats spillerStats,
SpillContext spillContext,
LocalMemoryContext memoryContext,
Runnable fileSystemErrorHandler)
{
requireNonNull(spillPaths, "spillPaths is null");
checkArgument(!spillPaths.isEmpty(), "spillPaths is empty");

this.serdeFactory = requireNonNull(serdeFactory, "serdeFactory is null");
this.encryptionKey = requireNonNull(encryptionKey, "encryptionKey is null");
this.encrypted = encryptionKey.isPresent();
Expand All @@ -107,10 +108,14 @@ public FileSingleStreamSpiller(
// This means we start accounting for the memory before the spiller thread allocates it, and we release the memory reservation
// before/after the spiller thread allocates that memory -- -- whether before or after depends on whether writePages() is in the
// middle of execution when close() is called (note that this applies to both readPages() and writePages() methods).
this.memoryContext.setBytes(BUFFER_SIZE);
this.memoryContext.setBytes((long) SpillFile.BUFFER_SIZE * spillPaths.size());
this.fileSystemErrorHandler = requireNonNull(fileSystemErrorHandler, "filesystemErrorHandler is null");
try {
this.targetFile = closer.register(new FileHolder(Files.createTempFile(spillPath, SPILL_FILE_PREFIX, SPILL_FILE_SUFFIX)));
ImmutableList.Builder<SpillFile> builder = ImmutableList.builderWithExpectedSize(spillPaths.size());
for (Path path : spillPaths) {
builder.add(closer.register(new SpillFile(Files.createTempFile(path, SPILL_FILE_PREFIX, SPILL_FILE_SUFFIX))));
}
this.spillFiles = builder.build();
}
catch (IOException e) {
this.fileSystemErrorHandler.run();
Expand All @@ -137,61 +142,136 @@ public long getSpilledPagesInMemorySize()
public Iterator<Page> getSpilledPages()
{
checkNoSpillInProgress();
return readPages();
checkState(writable.getAndSet(false), "Repeated reads are disallowed to prevent potential resource leaks");

try {
Optional<SecretKey> encryptionKey = this.encryptionKey;
checkState(encrypted == encryptionKey.isPresent(), "encryptionKey has been discarded");

PageDeserializer deserializer = serdeFactory.createDeserializer(encryptionKey);
this.encryptionKey = Optional.empty();

int fileCount = spillFiles.size();
List<Iterator<Page>> iterators = new ArrayList<>(fileCount);
for (SpillFile file : spillFiles) {
iterators.add(readFilePages(deserializer, file, closer));
}

return new AbstractIterator<>()
{
int fileIndex;

@Override
protected Page computeNext()
{
Iterator<Page> iterator = iterators.get(fileIndex);
if (!iterator.hasNext()) {
checkAllIteratorsExhausted(iterators);
return endOfData();
}

Page page = iterator.next();
fileIndex = (fileIndex + 1) % fileCount;
return page;
}
};
}
catch (IOException e) {
fileSystemErrorHandler.run();
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", e);
}
}

@Override
public ListenableFuture<List<Page>> getAllSpilledPages()
{
return executor.submit(() -> ImmutableList.copyOf(getSpilledPages()));
checkNoSpillInProgress();
checkState(writable.getAndSet(false), "Repeated reads are disallowed to prevent potential resource leaks");

Optional<SecretKey> encryptionKey = this.encryptionKey;
checkState(encrypted == encryptionKey.isPresent(), "encryptionKey has been discarded");

this.encryptionKey = Optional.empty();

List<ListenableFuture<List<Page>>> futures = new ArrayList<>();
for (SpillFile file : spillFiles) {
futures.add(executor.submit(() -> {
PageDeserializer deserializer = serdeFactory.createDeserializer(encryptionKey);
ImmutableList.Builder<Page> pages = ImmutableList.builder();
try (Closer closer = Closer.create()) {
readFilePages(deserializer, file, closer).forEachRemaining(pages::add);
}
return pages.build();
}));
}

// Combine pages from all spill files according to the round-robin order.
return Futures.transform(Futures.allAsList(futures), pagesPerFile -> {
ImmutableList.Builder<Page> builder = ImmutableList.builderWithExpectedSize(pagesPerFile.stream().mapToInt(List::size).sum());
int fileCount = spillFiles.size();

List<Iterator<Page>> iterators = new ArrayList<>(fileCount);
for (List<Page> pages : pagesPerFile) {
iterators.add(pages.iterator());
}

int fileIndex = 0;
while (iterators.get(fileIndex).hasNext()) {
builder.add(iterators.get(fileIndex).next());
fileIndex = (fileIndex + 1) % fileCount;
}
checkAllIteratorsExhausted(iterators);
return builder.build();
}, executor);
}

private static void checkAllIteratorsExhausted(List<Iterator<Page>> iterators)
{
iterators.forEach(iterator -> checkState(!iterator.hasNext(), "spill file iterator not fully consumed"));
}

private DataSize writePages(Iterator<Page> pageIterator)
private DataSize writePages(Iterator<Page> pages)
{
checkState(writable.get(), "Spilling no longer allowed. The spiller has been made non-writable on first read for subsequent reads to be consistent");

Optional<SecretKey> encryptionKey = this.encryptionKey;
checkState(encrypted == encryptionKey.isPresent(), "encryptionKey has been discarded");
PageSerializer serializer = serdeFactory.createSerializer(encryptionKey);

long spilledPagesBytes = 0;
try (SliceOutput output = new OutputStreamSliceOutput(targetFile.newOutputStream(APPEND), BUFFER_SIZE)) {
while (pageIterator.hasNext()) {
Page page = pageIterator.next();
int fileIndex = currentFileIndex;
int fileCount = spillFiles.size();

try {
while (pages.hasNext()) {
Page page = pages.next();
long pageSizeInBytes = page.getSizeInBytes();
Slice serialized = serializer.serialize(page);
long serializedPageSize = serialized.length();

spillFiles.get(fileIndex).writeBytes(serialized);

spilledPagesBytes += pageSizeInBytes;

spilledPagesInMemorySize.addAndGet(pageSizeInBytes);
Slice serializedPage = serializer.serialize(page);
long pageSize = serializedPage.length();
localSpillContext.updateBytes(pageSize);
spillerStats.addToTotalSpilledBytes(pageSize);
output.writeBytes(serializedPage);
localSpillContext.updateBytes(serializedPageSize);
spillerStats.addToTotalSpilledBytes(serializedPageSize);

fileIndex = (fileIndex + 1) % fileCount;
}

currentFileIndex = fileIndex;

for (SpillFile file : spillFiles) {
file.closeOutput();
}
}
catch (UncheckedIOException | IOException e) {
fileSystemErrorHandler.run();
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to spill pages", e);
}
return DataSize.ofBytes(spilledPagesBytes);
}

private Iterator<Page> readPages()
{
checkState(writable.getAndSet(false), "Repeated reads are disallowed to prevent potential resource leaks");

try {
Optional<SecretKey> encryptionKey = this.encryptionKey;
checkState(encrypted == encryptionKey.isPresent(), "encryptionKey has been discarded");
PageDeserializer deserializer = serdeFactory.createDeserializer(encryptionKey);
// encryption key is safe to discard since it now belongs to the PageDeserializer and repeated reads are disallowed
this.encryptionKey = Optional.empty();
InputStream input = closer.register(targetFile.newInputStream());
Iterator<Page> pages = PagesSerdeUtil.readPages(deserializer, input);
return closeWhenExhausted(pages, input);
}
catch (IOException e) {
fileSystemErrorHandler.run();
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", e);
}
return DataSize.ofBytes(spilledPagesBytes);
}

@Override
Expand All @@ -215,6 +295,17 @@ private void checkNoSpillInProgress()
checkState(spillInProgress.isDone(), "spill in progress");
}

/**
* Returns an iterator that exposes all pages stored in the given file.
* Pages are lazily deserialized as the iterator is consumed.
*/
private Iterator<Page> readFilePages(PageDeserializer deserializer, SpillFile file, Closer closer)
throws IOException
{
InputStream input = closer.register(file.newInputStream());
return transform(closeWhenExhausted(PagesSerdeUtil.readSerializedPages(input), input), deserializer::deserialize);
}

private static <T> Iterator<T> closeWhenExhausted(Iterator<T> iterator, Closeable resource)
{
requireNonNull(iterator, "iterator is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airlift.log.Logger;
import io.trino.FeaturesConfig;
import io.trino.cache.NonKeyEvictableLoadingCache;
import io.trino.execution.TaskManagerConfig;
import io.trino.execution.buffer.CompressionCodec;
import io.trino.execution.buffer.PagesSerdeFactory;
import io.trino.memory.context.LocalMemoryContext;
Expand Down Expand Up @@ -80,11 +81,12 @@ public class FileSingleStreamSpillerFactory
private final SpillerStats spillerStats;
private final double maxUsedSpaceThreshold;
private final boolean spillEncryptionEnabled;
private final int spillFileCount;
private int roundRobinIndex;
private final NonKeyEvictableLoadingCache<Path, Boolean> spillPathHealthCache;

@Inject
public FileSingleStreamSpillerFactory(BlockEncodingSerde blockEncodingSerde, SpillerStats spillerStats, FeaturesConfig featuresConfig, NodeSpillConfig nodeSpillConfig)
public FileSingleStreamSpillerFactory(BlockEncodingSerde blockEncodingSerde, SpillerStats spillerStats, FeaturesConfig featuresConfig, NodeSpillConfig nodeSpillConfig, TaskManagerConfig taskManagerConfig)
{
this(
listeningDecorator(newFixedThreadPool(
Expand All @@ -93,6 +95,7 @@ public FileSingleStreamSpillerFactory(BlockEncodingSerde blockEncodingSerde, Spi
requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"),
spillerStats,
featuresConfig.getSpillerSpillPaths(),
Math.min(featuresConfig.getSpillerThreads(), taskManagerConfig.getTaskConcurrency()),
featuresConfig.getSpillMaxUsedSpaceThreshold(),
nodeSpillConfig.getSpillCompressionCodec(),
nodeSpillConfig.isSpillEncryptionEnabled());
Expand All @@ -104,6 +107,7 @@ public FileSingleStreamSpillerFactory(
BlockEncodingSerde blockEncodingSerde,
SpillerStats spillerStats,
List<Path> spillPaths,
int spillFileCount,
double maxUsedSpaceThreshold,
CompressionCodec compressionCodec,
boolean spillEncryptionEnabled)
Expand All @@ -124,6 +128,7 @@ public FileSingleStreamSpillerFactory(
throw new IllegalArgumentException(format("spill path %s is not accessible, it must be +rwx; adjust %s config property or filesystem permissions", path, SPILLER_SPILL_PATH));
}
});
this.spillFileCount = spillFileCount;
this.maxUsedSpaceThreshold = maxUsedSpaceThreshold;
this.spillEncryptionEnabled = spillEncryptionEnabled;
this.roundRobinIndex = 0;
Expand Down Expand Up @@ -165,14 +170,19 @@ private static void cleanupOldSpillFiles(Path path)
}

@Override
public SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext)
public SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext, boolean parallelSpill)
{
Optional<SecretKey> encryptionKey = spillEncryptionEnabled ? Optional.of(createRandomAesEncryptionKey()) : Optional.empty();
int spillFileCount = parallelSpill ? this.spillFileCount : 1;
ImmutableList.Builder<Path> paths = ImmutableList.builderWithExpectedSize(spillFileCount);
for (int i = 0; i < spillFileCount; i++) {
paths.add(getNextSpillPath());
}
return new FileSingleStreamSpiller(
serdeFactory,
encryptionKey,
executor,
getNextSpillPath(),
paths.build(),
spillerStats,
spillContext,
memoryContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@

public interface SingleStreamSpillerFactory
{
SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext);
default SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext)
{
return create(types, spillContext, memoryContext, false);
}

SingleStreamSpiller create(List<Type> types, SpillContext spillContext, LocalMemoryContext memoryContext, boolean parallelSpill);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SingleStreamSpiller maintains data order of data being spilled and read back (FIFO)

is it still the case when it's created as parallelSpill?

if yes -- we should deprecate the old method and use the new one
if no - SingleStreamSpiller interface needs update


static SingleStreamSpillerFactory unsupportedSingleStreamSpillerFactory()
{
return (types, spillContext, memoryContext) -> {
return (types, spillContext, memoryContext, parallelSpill) -> {
throw new UnsupportedOperationException();
};
}
Expand Down
Loading