diff --git a/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java index e1c405bcdc47f..86104ca472b63 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/ScanFilterAndProjectOperator.java @@ -29,6 +29,7 @@ import com.facebook.presto.spi.UpdatablePageSource; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.LazyBlock; +import com.facebook.presto.spi.block.LazyBlockLoader; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.type.Type; import com.facebook.presto.split.EmptySplit; @@ -247,13 +248,7 @@ private Page processColumnSource() CursorProcessorOutput output = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, cursor, pageBuilder); pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage()); - long bytesProcessed = cursor.getCompletedBytes() - completedBytes; - long elapsedNanos = cursor.getReadTimeNanos() - readTimeNanos; - operatorContext.recordRawInputWithTiming(bytesProcessed, output.getProcessedRows(), elapsedNanos); - // TODO: derive better values for cursors - operatorContext.recordProcessedInput(bytesProcessed, output.getProcessedRows()); - completedBytes = cursor.getCompletedBytes(); - readTimeNanos = cursor.getReadTimeNanos(); + recordCursorInputStats(output.getProcessedRows()); if (output.isNoMoreRows()) { finishing = true; mergingOutput.finish(); @@ -280,16 +275,8 @@ private Page processPageSource() pageSourceMemoryContext.setBytes(pageSource.getSystemMemoryUsage()); if (page != null) { - page = recordProcessedInput(page); - // update operator stats - long endCompletedBytes = pageSource.getCompletedBytes(); - long endCompletedPositions = pageSource.getCompletedPositions(); - long endReadTimeNanos = pageSource.getReadTimeNanos(); - operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos); - completedBytes = endCompletedBytes; - completedPositions = endCompletedPositions; - readTimeNanos = endReadTimeNanos; + page = recordProcessedInput(page); Iterator> output = pageProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, pageProcessorMemoryContext, page); mergingOutput.addInput(output); @@ -305,26 +292,72 @@ private Page processPageSource() return result; } + private final class RecordingLazyBlockLoader + implements LazyBlockLoader + { + private LazyBlock delegateLazyBlock; + + private RecordingLazyBlockLoader(LazyBlock delegateLazyBlock) + { + this.delegateLazyBlock = requireNonNull(delegateLazyBlock, "delegateLazyBlock is null"); + } + + @Override + public void load(LazyBlock block) + { + checkState(delegateLazyBlock != null, "delegateLazyBlock already loaded"); + Block loadedBlock = delegateLazyBlock.getLoadedBlock(); + delegateLazyBlock = null; + // Position count already recorded for lazy blocks, input bytes are not + operatorContext.recordProcessedInput(loadedBlock.getSizeInBytes(), 0); + recordPageSourceRawInputStats(); + block.setBlock(loadedBlock); + } + } + + private void recordCursorInputStats(long positionCount) + { + checkState(cursor != null, "cursor is null"); + long endCompletedBytes = cursor.getCompletedBytes(); + long endReadTimeNanos = cursor.getReadTimeNanos(); + long inputBytes = endCompletedBytes - completedBytes; + operatorContext.recordProcessedInput(inputBytes, positionCount); + operatorContext.recordRawInputWithTiming(inputBytes, positionCount, endReadTimeNanos - readTimeNanos); + completedBytes = endCompletedBytes; + readTimeNanos = endReadTimeNanos; + } + + private void recordPageSourceRawInputStats() + { + checkState(pageSource != null, "pageSource is null"); + long endCompletedBytes = pageSource.getCompletedBytes(); + long endCompletedPositions = pageSource.getCompletedPositions(); + long endReadTimeNanos = pageSource.getReadTimeNanos(); + operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos); + completedBytes = endCompletedBytes; + completedPositions = endCompletedPositions; + readTimeNanos = endReadTimeNanos; + } + private Page recordProcessedInput(Page page) { - operatorContext.recordProcessedInput(0, page.getPositionCount()); - // account processed bytes from lazy blocks only when they are loaded + long blockSizeSum = 0L; Block[] blocks = new Block[page.getChannelCount()]; - for (int i = 0; i < page.getChannelCount(); ++i) { + for (int i = 0; i < blocks.length; ++i) { Block block = page.getBlock(i); + // account processed bytes from lazy blocks only when they are loaded if (block instanceof LazyBlock) { - LazyBlock delegateLazyBlock = (LazyBlock) block; - blocks[i] = new LazyBlock(page.getPositionCount(), lazyBlock -> { - Block loadedBlock = delegateLazyBlock.getLoadedBlock(); - operatorContext.recordProcessedInput(loadedBlock.getSizeInBytes(), 0L); - lazyBlock.setBlock(loadedBlock); - }); + blocks[i] = new LazyBlock(page.getPositionCount(), new RecordingLazyBlockLoader((LazyBlock) block)); } else { - operatorContext.recordProcessedInput(block.getSizeInBytes(), 0L); + blockSizeSum += block.getSizeInBytes(); blocks[i] = block; } } + // stats update + operatorContext.recordProcessedInput(blockSizeSum, page.getPositionCount()); + recordPageSourceRawInputStats(); + return new Page(page.getPositionCount(), blocks); } diff --git a/presto-main/src/main/java/com/facebook/presto/operator/TableScanOperator.java b/presto-main/src/main/java/com/facebook/presto/operator/TableScanOperator.java index ba8775dc6994f..6ecebfd2fcf15 100644 --- a/presto-main/src/main/java/com/facebook/presto/operator/TableScanOperator.java +++ b/presto-main/src/main/java/com/facebook/presto/operator/TableScanOperator.java @@ -254,14 +254,8 @@ public Page getOutput() page = page.getLoadedPage(); // update operator stats - long endCompletedBytes = source.getCompletedBytes(); - long endCompletedPositions = source.getCompletedPositions(); - long endReadTimeNanos = source.getReadTimeNanos(); - operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos); operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount()); - completedBytes = endCompletedBytes; - completedPositions = endCompletedPositions; - readTimeNanos = endReadTimeNanos; + recordSourceRawInputStats(); } // updating system memory usage should happen after page is loaded. @@ -269,4 +263,17 @@ public Page getOutput() return page; } + + private void recordSourceRawInputStats() + { + checkState(source != null, "source must not be null"); + // update operator stats + long endCompletedBytes = source.getCompletedBytes(); + long endCompletedPositions = source.getCompletedPositions(); + long endReadTimeNanos = source.getReadTimeNanos(); + operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos); + completedBytes = endCompletedBytes; + completedPositions = endCompletedPositions; + readTimeNanos = endReadTimeNanos; + } } diff --git a/presto-main/src/test/java/com/facebook/presto/operator/TestScanFilterAndProjectOperator.java b/presto-main/src/test/java/com/facebook/presto/operator/TestScanFilterAndProjectOperator.java index adcde12bebd34..d76f466b37b42 100644 --- a/presto-main/src/test/java/com/facebook/presto/operator/TestScanFilterAndProjectOperator.java +++ b/presto-main/src/test/java/com/facebook/presto/operator/TestScanFilterAndProjectOperator.java @@ -35,6 +35,7 @@ import com.facebook.presto.spi.TableHandle; import com.facebook.presto.spi.block.Block; import com.facebook.presto.spi.block.LazyBlock; +import com.facebook.presto.spi.block.LazyBlockLoader; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.plan.PlanNodeId; import com.facebook.presto.spi.relation.RowExpression; @@ -44,9 +45,11 @@ import com.facebook.presto.testing.TestingSplit; import com.facebook.presto.testing.TestingTransactionHandle; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterators; import io.airlift.units.DataSize; import org.testng.annotations.Test; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -71,8 +74,10 @@ import static com.facebook.presto.sql.relational.Expressions.field; import static com.facebook.presto.testing.TestingTaskContext.createTaskContext; import static com.facebook.presto.testing.assertions.Assert.assertEquals; +import static com.google.common.base.Preconditions.checkState; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.KILOBYTE; +import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newCachedThreadPool; import static java.util.concurrent.Executors.newScheduledThreadPool; import static java.util.concurrent.TimeUnit.SECONDS; @@ -222,6 +227,54 @@ public void testPageSourceLazyLoad() assertEquals(actual, expected); } + @Test + public void testPageSourceLazyBlock() + { + // Tests that a page containing a LazyBlock is loaded and its bytes are counted by the operator. + DriverContext driverContext = newDriverContext(); + List projections = ImmutableList.of(field(0, BIGINT)); + Supplier cursorProcessor = expressionCompiler.compileCursorProcessor(driverContext.getSession().getSqlFunctionProperties(), Optional.empty(), projections, "key"); + Supplier pageProcessor = expressionCompiler.compilePageProcessor(driverContext.getSession().getSqlFunctionProperties(), Optional.empty(), projections); + + // This Block will be wrapped in a LazyBlock inside the operator on call to getNextPage(). + Block inputBlock = BlockAssertions.createLongSequenceBlock(0, 10); + + CountingLazyPageSource pageSource = new CountingLazyPageSource(ImmutableList.of(new Page(inputBlock))); + + ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory factory = new ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory( + 0, + new PlanNodeId("test"), + new PlanNodeId("0"), + (session, split, table, columns) -> pageSource, + cursorProcessor, + pageProcessor, + TESTING_TABLE_HANDLE, + ImmutableList.of(), + ImmutableList.of(BIGINT), + new DataSize(0, BYTE), + 0); + + SourceOperator operator = factory.createOperator(driverContext); + operator.addSplit(new Split(new ConnectorId("test"), TestingTransactionHandle.create(), TestingSplit.createLocalSplit())); + operator.noMoreSplits(); + + MaterializedResult expected = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), ImmutableList.of(new Page(inputBlock))); + Page expectedPage = expected.toPage(); + MaterializedResult actual = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), toPages(operator)); + Page actualPage = actual.toPage(); + + // Assert expected page and actual page are equal. + assertPageEquals(actual.getTypes(), actualPage, expectedPage); + + // PageSource counting isn't flawed, assert on the test implementation + assertEquals(pageSource.getCompletedBytes(), expectedPage.getSizeInBytes()); + assertEquals(pageSource.getCompletedPositions(), expectedPage.getPositionCount()); + + // Assert operator stats match the expected values + assertEquals(operator.getOperatorContext().getOperatorStats().getRawInputDataSize().toBytes(), expectedPage.getSizeInBytes()); + assertEquals(operator.getOperatorContext().getOperatorStats().getInputPositions(), expected.getRowCount()); + } + @Test public void testRecordCursorSource() { @@ -467,4 +520,97 @@ public Page getNextPage() return page; } } + + private static class CountingLazyPageSource + implements ConnectorPageSource + { + private Iterator pages; + private long completedBytes; + private long completedPositions; + + public CountingLazyPageSource(List pages) + { + this.pages = requireNonNull(pages, "pages is null").iterator(); + } + + @Override + public void close() + { + pages = Iterators.forArray(); + } + + @Override + public long getReadTimeNanos() + { + return 0; + } + + @Override + public long getSystemMemoryUsage() + { + return 0; + } + + @Override + public boolean isFinished() + { + return !pages.hasNext(); + } + + @Override + public Page getNextPage() + { + if (isFinished()) { + return null; + } + + Page page = pages.next(); + int channelCount = page.getChannelCount(); + Block[] blocks = new Block[channelCount]; + + for (int i = 0; i < channelCount; ++i) { + Block block = page.getBlock(i); + // Wrap current Block in a LazyBlock. + blocks[i] = new LazyBlock(block.getPositionCount(), new CountingLazyBlockLoader(block)); + } + completedPositions += page.getPositionCount(); + + return new Page(page.getPositionCount(), blocks); + } + + @Override + public long getCompletedBytes() + { + return completedBytes; + } + + @Override + public long getCompletedPositions() + { + return completedPositions; + } + + private final class CountingLazyBlockLoader + implements LazyBlockLoader + { + private Block loaderBlock; + + public CountingLazyBlockLoader(Block block) + { + loaderBlock = block; + } + + @Override + public final void load(LazyBlock lazyBlock) + { + checkState(loaderBlock != null, "loaderBlock already loaded"); + + Block loadedBlock = loaderBlock.getLoadedBlock(); + // Increment completed/read bytes for the page source. + completedBytes += loadedBlock.getSizeInBytes(); + loaderBlock = null; + lazyBlock.setBlock(loadedBlock); + } + } + } }