diff --git a/presto-common/src/main/java/com/facebook/presto/common/Page.java b/presto-common/src/main/java/com/facebook/presto/common/Page.java index 36410ac686c38..83d3c90e3fe81 100644 --- a/presto-common/src/main/java/com/facebook/presto/common/Page.java +++ b/presto-common/src/main/java/com/facebook/presto/common/Page.java @@ -177,10 +177,10 @@ public Page appendColumn(Block block) return wrapBlocksWithoutCopy(positionCount, newBlocks); } - public void compact() + public Page compact() { if (getRetainedSizeInBytes() <= getSizeInBytes()) { - return; + return this; } for (int i = 0; i < blocks.length; i++) { @@ -202,6 +202,7 @@ public void compact() } updateRetainedSize(); + return this; } private Map getRelatedDictionaryBlocks() diff --git a/presto-main/src/main/java/com/facebook/presto/spiller/FileSingleStreamSpiller.java b/presto-main/src/main/java/com/facebook/presto/spiller/FileSingleStreamSpiller.java index ae57aa589d6a4..217f30277d695 100644 --- a/presto-main/src/main/java/com/facebook/presto/spiller/FileSingleStreamSpiller.java +++ b/presto-main/src/main/java/com/facebook/presto/spiller/FileSingleStreamSpiller.java @@ -50,6 +50,7 @@ import static com.facebook.presto.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_PREFIX; import static com.facebook.presto.spiller.FileSingleStreamSpillerFactory.SPILL_FILE_SUFFIX; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterators.transform; import static java.nio.file.StandardOpenOption.APPEND; import static java.util.Objects.requireNonNull; @@ -168,8 +169,9 @@ private Iterator readPages() try { InputStream input = closer.register(targetFile.newInputStream()); - Iterator pages = PagesSerdeUtil.readPages(serde, new InputStreamSliceInput(input, BUFFER_SIZE)); - return closeWhenExhausted(pages, input); + Iterator deserializedPages = PagesSerdeUtil.readPages(serde, new InputStreamSliceInput(input, BUFFER_SIZE)); + Iterator compactPages = transform(deserializedPages, Page::compact); + return closeWhenExhausted(compactPages, input); } catch (IOException e) { throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", e); diff --git a/presto-main/src/main/java/com/facebook/presto/spiller/TempStorageSingleStreamSpiller.java b/presto-main/src/main/java/com/facebook/presto/spiller/TempStorageSingleStreamSpiller.java index e78769ef6c619..cc1bb0d6b9294 100644 --- a/presto-main/src/main/java/com/facebook/presto/spiller/TempStorageSingleStreamSpiller.java +++ b/presto-main/src/main/java/com/facebook/presto/spiller/TempStorageSingleStreamSpiller.java @@ -51,6 +51,7 @@ import static com.facebook.presto.execution.buffer.PageSplitterUtil.splitPage; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.Iterators.transform; import static java.lang.Math.toIntExact; import static java.util.Objects.requireNonNull; @@ -181,8 +182,9 @@ private Iterator readPages() tempStorageHandle = dataSink.commit(); InputStream input = closer.register(tempStorage.open(tempDataOperationContext, tempStorageHandle)); - Iterator pages = PagesSerdeUtil.readPages(serde, new InputStreamSliceInput(input)); - return closeWhenExhausted(pages, input); + Iterator deserializedPages = PagesSerdeUtil.readPages(serde, new InputStreamSliceInput(input)); + Iterator compactPages = transform(deserializedPages, Page::compact); + return closeWhenExhausted(compactPages, input); } catch (IOException e) { throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", e);