Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.facebook.presto.spi.UpdatablePageSource;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.split.EmptySplit;
Expand Down Expand Up @@ -247,13 +248,7 @@ private Page processColumnSource()
CursorProcessorOutput output = cursorProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, cursor, pageBuilder);
pageSourceMemoryContext.setBytes(cursor.getSystemMemoryUsage());

long bytesProcessed = cursor.getCompletedBytes() - completedBytes;
long elapsedNanos = cursor.getReadTimeNanos() - readTimeNanos;
operatorContext.recordRawInputWithTiming(bytesProcessed, output.getProcessedRows(), elapsedNanos);
// TODO: derive better values for cursors
operatorContext.recordProcessedInput(bytesProcessed, output.getProcessedRows());
completedBytes = cursor.getCompletedBytes();
readTimeNanos = cursor.getReadTimeNanos();
recordCursorInputStats(output.getProcessedRows());
if (output.isNoMoreRows()) {
finishing = true;
mergingOutput.finish();
Expand All @@ -280,16 +275,8 @@ private Page processPageSource()
pageSourceMemoryContext.setBytes(pageSource.getSystemMemoryUsage());

if (page != null) {
page = recordProcessedInput(page);

// update operator stats
long endCompletedBytes = pageSource.getCompletedBytes();
long endCompletedPositions = pageSource.getCompletedPositions();
long endReadTimeNanos = pageSource.getReadTimeNanos();
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
page = recordProcessedInput(page);

Iterator<Optional<Page>> output = pageProcessor.process(operatorContext.getSession().toConnectorSession(), yieldSignal, pageProcessorMemoryContext, page);
mergingOutput.addInput(output);
Expand All @@ -305,26 +292,72 @@ private Page processPageSource()
return result;
}

private final class RecordingLazyBlockLoader
implements LazyBlockLoader<LazyBlock>
{
private LazyBlock delegateLazyBlock;

private RecordingLazyBlockLoader(LazyBlock delegateLazyBlock)
{
this.delegateLazyBlock = requireNonNull(delegateLazyBlock, "delegateLazyBlock is null");
}

@Override
public void load(LazyBlock block)
{
checkState(delegateLazyBlock != null, "delegateLazyBlock already loaded");
Block loadedBlock = delegateLazyBlock.getLoadedBlock();
delegateLazyBlock = null;
// Position count already recorded for lazy blocks, input bytes are not
operatorContext.recordProcessedInput(loadedBlock.getSizeInBytes(), 0);
recordPageSourceRawInputStats();
block.setBlock(loadedBlock);
}
}

private void recordCursorInputStats(long positionCount)
{
checkState(cursor != null, "cursor is null");
long endCompletedBytes = cursor.getCompletedBytes();
long endReadTimeNanos = cursor.getReadTimeNanos();
long inputBytes = endCompletedBytes - completedBytes;
operatorContext.recordProcessedInput(inputBytes, positionCount);
operatorContext.recordRawInputWithTiming(inputBytes, positionCount, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
readTimeNanos = endReadTimeNanos;
}

private void recordPageSourceRawInputStats()
{
checkState(pageSource != null, "pageSource is null");
long endCompletedBytes = pageSource.getCompletedBytes();
long endCompletedPositions = pageSource.getCompletedPositions();
long endReadTimeNanos = pageSource.getReadTimeNanos();
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
}

private Page recordProcessedInput(Page page)
{
operatorContext.recordProcessedInput(0, page.getPositionCount());
// account processed bytes from lazy blocks only when they are loaded
long blockSizeSum = 0L;
Block[] blocks = new Block[page.getChannelCount()];
for (int i = 0; i < page.getChannelCount(); ++i) {
for (int i = 0; i < blocks.length; ++i) {
Block block = page.getBlock(i);
// account processed bytes from lazy blocks only when they are loaded
if (block instanceof LazyBlock) {
LazyBlock delegateLazyBlock = (LazyBlock) block;
blocks[i] = new LazyBlock(page.getPositionCount(), lazyBlock -> {
Block loadedBlock = delegateLazyBlock.getLoadedBlock();
operatorContext.recordProcessedInput(loadedBlock.getSizeInBytes(), 0L);
lazyBlock.setBlock(loadedBlock);
});
blocks[i] = new LazyBlock(page.getPositionCount(), new RecordingLazyBlockLoader((LazyBlock) block));
}
else {
operatorContext.recordProcessedInput(block.getSizeInBytes(), 0L);
blockSizeSum += block.getSizeInBytes();
blocks[i] = block;
}
}
// stats update
operatorContext.recordProcessedInput(blockSizeSum, page.getPositionCount());
recordPageSourceRawInputStats();

return new Page(page.getPositionCount(), blocks);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,19 +254,26 @@ public Page getOutput()
page = page.getLoadedPage();

// update operator stats
long endCompletedBytes = source.getCompletedBytes();
long endCompletedPositions = source.getCompletedPositions();
long endReadTimeNanos = source.getReadTimeNanos();
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos);
operatorContext.recordProcessedInput(page.getSizeInBytes(), page.getPositionCount());
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
recordSourceRawInputStats();
}

// updating system memory usage should happen after page is loaded.
systemMemoryContext.setBytes(source.getSystemMemoryUsage());

return page;
}

private void recordSourceRawInputStats()
{
checkState(source != null, "source must not be null");
// update operator stats
long endCompletedBytes = source.getCompletedBytes();
long endCompletedPositions = source.getCompletedPositions();
long endReadTimeNanos = source.getReadTimeNanos();
operatorContext.recordRawInputWithTiming(endCompletedBytes - completedBytes, endCompletedPositions - completedPositions, endReadTimeNanos - readTimeNanos);
completedBytes = endCompletedBytes;
completedPositions = endCompletedPositions;
readTimeNanos = endReadTimeNanos;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.block.Block;
import com.facebook.presto.spi.block.LazyBlock;
import com.facebook.presto.spi.block.LazyBlockLoader;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.plan.PlanNodeId;
import com.facebook.presto.spi.relation.RowExpression;
Expand All @@ -44,9 +45,11 @@
import com.facebook.presto.testing.TestingSplit;
import com.facebook.presto.testing.TestingTransactionHandle;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import io.airlift.units.DataSize;
import org.testng.annotations.Test;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -71,8 +74,10 @@
import static com.facebook.presto.sql.relational.Expressions.field;
import static com.facebook.presto.testing.TestingTaskContext.createTaskContext;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.google.common.base.Preconditions.checkState;
import static io.airlift.units.DataSize.Unit.BYTE;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -222,6 +227,54 @@ public void testPageSourceLazyLoad()
assertEquals(actual, expected);
}

@Test
public void testPageSourceLazyBlock()
{
// Tests that a page containing a LazyBlock is loaded and its bytes are counted by the operator.
DriverContext driverContext = newDriverContext();
List<RowExpression> projections = ImmutableList.of(field(0, BIGINT));
Supplier<CursorProcessor> cursorProcessor = expressionCompiler.compileCursorProcessor(driverContext.getSession().getSqlFunctionProperties(), Optional.empty(), projections, "key");
Supplier<PageProcessor> pageProcessor = expressionCompiler.compilePageProcessor(driverContext.getSession().getSqlFunctionProperties(), Optional.empty(), projections);

// This Block will be wrapped in a LazyBlock inside the operator on call to getNextPage().
Block inputBlock = BlockAssertions.createLongSequenceBlock(0, 10);

CountingLazyPageSource pageSource = new CountingLazyPageSource(ImmutableList.of(new Page(inputBlock)));

ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory factory = new ScanFilterAndProjectOperator.ScanFilterAndProjectOperatorFactory(
0,
new PlanNodeId("test"),
new PlanNodeId("0"),
(session, split, table, columns) -> pageSource,
cursorProcessor,
pageProcessor,
TESTING_TABLE_HANDLE,
ImmutableList.of(),
ImmutableList.of(BIGINT),
new DataSize(0, BYTE),
0);

SourceOperator operator = factory.createOperator(driverContext);
operator.addSplit(new Split(new ConnectorId("test"), TestingTransactionHandle.create(), TestingSplit.createLocalSplit()));
operator.noMoreSplits();

MaterializedResult expected = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), ImmutableList.of(new Page(inputBlock)));
Page expectedPage = expected.toPage();
MaterializedResult actual = toMaterializedResult(driverContext.getSession(), ImmutableList.of(BIGINT), toPages(operator));
Page actualPage = actual.toPage();

// Assert expected page and actual page are equal.
assertPageEquals(actual.getTypes(), actualPage, expectedPage);

// PageSource counting isn't flawed, assert on the test implementation
assertEquals(pageSource.getCompletedBytes(), expectedPage.getSizeInBytes());
assertEquals(pageSource.getCompletedPositions(), expectedPage.getPositionCount());

// Assert operator stats match the expected values
assertEquals(operator.getOperatorContext().getOperatorStats().getRawInputDataSize().toBytes(), expectedPage.getSizeInBytes());
assertEquals(operator.getOperatorContext().getOperatorStats().getInputPositions(), expected.getRowCount());
}

@Test
public void testRecordCursorSource()
{
Expand Down Expand Up @@ -467,4 +520,97 @@ public Page getNextPage()
return page;
}
}

private static class CountingLazyPageSource
implements ConnectorPageSource
{
private Iterator<Page> pages;
private long completedBytes;
private long completedPositions;

public CountingLazyPageSource(List<Page> pages)
{
this.pages = requireNonNull(pages, "pages is null").iterator();
}

@Override
public void close()
{
pages = Iterators.forArray();
}

@Override
public long getReadTimeNanos()
{
return 0;
}

@Override
public long getSystemMemoryUsage()
{
return 0;
}

@Override
public boolean isFinished()
{
return !pages.hasNext();
}

@Override
public Page getNextPage()
{
if (isFinished()) {
return null;
}

Page page = pages.next();
int channelCount = page.getChannelCount();
Block[] blocks = new Block[channelCount];

for (int i = 0; i < channelCount; ++i) {
Block block = page.getBlock(i);
// Wrap current Block in a LazyBlock.
blocks[i] = new LazyBlock(block.getPositionCount(), new CountingLazyBlockLoader(block));
}
completedPositions += page.getPositionCount();

return new Page(page.getPositionCount(), blocks);
}

@Override
public long getCompletedBytes()
{
return completedBytes;
}

@Override
public long getCompletedPositions()
{
return completedPositions;
}

private final class CountingLazyBlockLoader
implements LazyBlockLoader<LazyBlock>
{
private Block loaderBlock;

public CountingLazyBlockLoader(Block block)
{
loaderBlock = block;
}

@Override
public final void load(LazyBlock lazyBlock)
{
checkState(loaderBlock != null, "loaderBlock already loaded");

Block loadedBlock = loaderBlock.getLoadedBlock();
// Increment completed/read bytes for the page source.
completedBytes += loadedBlock.getSizeInBytes();
loaderBlock = null;
lazyBlock.setBlock(loadedBlock);
}
}
}
}