Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
51e392d
Move Iceberg reader early exit checks to start of method
dain Nov 6, 2024
70b6801
Replace process bytes lazy load monitor with state monitor in SFP
dain Nov 4, 2024
99c022c
Add SourcePage interface for ConnectorPageSource
dain Nov 1, 2024
2f3784f
Convert non-object store connectors to SourcePage
dain Nov 2, 2024
7c1691b
Convert ORC and Parquet to SourcePage
dain Nov 9, 2024
fb9f6fb
Simplify Hive and Iceberg projectSufficientColumns
dain Nov 8, 2024
0727507
Add TransformConnectorPageSource to Hive
dain Nov 8, 2024
943ff01
Convert Iceberg to SourePage and TransformConnectorPageSource
dain Nov 9, 2024
bb8a8a4
Move constants and static classes out of HivePageSource
dain Nov 8, 2024
21f063d
Change Delta CheckpointEntryIterator ADD to use existing base column
dain Nov 10, 2024
68409c1
Handle column dereference directly in Hive ParquetPageSourceFactory
dain Nov 8, 2024
ceedfe4
Cleanup Hive OrcPageSourceFactory
dain Nov 9, 2024
4e1311e
Handle column dereference directly in Hive OrcPageSourceFactory
dain Nov 8, 2024
6215763
Convert Hive AvroPageSourceFactory to projectColumnDereferences
dain Nov 9, 2024
5265644
Convert Hive LinePageSourceFactory to projectColumnDereferences
dain Nov 9, 2024
c59a4a9
Convert Hive RcFilePageSourceFactory to projectColumnDereferences
dain Nov 8, 2024
d08a6b8
Remove unused Hive ReaderColumns and ReaderPageSource
dain Nov 8, 2024
cc61b26
Convert Hive, Hudi, and Delta to SourcePage and TransformCPS
dain Nov 2, 2024
93a364f
Mark ConnectorPageSource.getNextPage() deprecated for removal
dain Nov 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.RelationType;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.security.AccessDeniedException;
import io.trino.spi.security.GrantInfo;
import io.trino.spi.security.RoleGrant;
Expand Down Expand Up @@ -168,7 +169,7 @@ public boolean isFinished()
}

@Override
public Page getNextPage()
public SourcePage getNextSourcePage()
{
if (isFinished()) {
return null;
Expand All @@ -187,7 +188,7 @@ public Page getNextPage()
memoryUsageBytes -= page.getRetainedSizeInBytes();
Page outputPage = projection.apply(page);
completedBytes += outputPage.getSizeInBytes();
return outputPage;
return SourcePage.create(outputPage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.trino.operator.project.PageProcessorMetrics;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -62,7 +63,7 @@ private FilterAndProjectOperator(
yieldSignal,
outputMemoryContext,
metrics,
page))
SourcePage.create(page)))
.transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext))
.blocking(() -> memoryTrackingContext.localUserMemoryContext().setBytes(localAggregatedMemoryContext.getBytes()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.SourcePage;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -90,11 +91,13 @@ public void addInput(Page page)
@Override
public Page getOutput()
{
Page page = pageSource.getNextPage();
if (page == null) {
SourcePage sourcePage = pageSource.getNextSourcePage();
if (sourcePage == null) {
return null;
}

Page page = sourcePage.getPage();

// update operator stats
long endCompletedBytes = pageSource.getCompletedBytes();
long endReadTimeNanos = pageSource.getReadTimeNanos();
Expand All @@ -103,9 +106,6 @@ public Page getOutput()
completedBytes = endCompletedBytes;
readTimeNanos = endReadTimeNanos;

// assure the page is in memory before handing to another operator
page = page.getLoadedPage();

return page;
}

Expand Down
43 changes: 0 additions & 43 deletions core/trino-main/src/main/java/io/trino/operator/PageUtils.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RecordPageSource;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import io.trino.split.EmptySplit;
Expand All @@ -51,14 +52,15 @@
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.concurrent.MoreFutures.toListenableFuture;
import static io.trino.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext;
import static io.trino.operator.PageUtils.recordMaterializedBytes;
import static io.trino.operator.WorkProcessor.TransformationState.finished;
import static io.trino.operator.WorkProcessor.TransformationState.ofResult;
import static io.trino.operator.project.MergePages.mergePages;
Expand Down Expand Up @@ -283,12 +285,16 @@ WorkProcessor<Page> processPageSource()
return WorkProcessor
.create(new ConnectorPageSourceToPages(pageSourceMemoryContext))
.yielding(yieldSignal::isSet)
.flatMap(page -> pageProcessor.createWorkProcessor(
connectorSession,
yieldSignal,
outputMemoryContext,
pageProcessorMetrics,
page))
.flatMap(page -> {
WorkProcessor<Page> workProcessor = pageProcessor.createWorkProcessor(
connectorSession,
yieldSignal,
outputMemoryContext,
pageProcessorMetrics,
page);
// Note this is monitoring the original source page not the result page
return workProcessor.withProcessStateMonitor(new ProcessedBytesMonitor(page, bytes -> processedBytes += bytes));
})
.transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext))
.blocking(() -> memoryContext.setBytes(localAggregatedMemoryContext.getBytes()));
}
Expand Down Expand Up @@ -356,8 +362,37 @@ public ProcessState<Page> process()
}
}

static class ProcessedBytesMonitor
implements Consumer<ProcessState<Page>>
{
private final SourcePage page;
private final LongConsumer processedBytesConsumer;
private long localProcessedBytes;

public ProcessedBytesMonitor(SourcePage page, LongConsumer processedBytesConsumer)
{
this.page = requireNonNull(page, "page is null");
this.processedBytesConsumer = requireNonNull(processedBytesConsumer, "processedBytesConsumer is null");
localProcessedBytes = page.getSizeInBytes();
processedBytesConsumer.accept(localProcessedBytes);
}

@Override
public void accept(ProcessState<Page> state)
{
update();
}

void update()
{
long newProcessedBytes = page.getSizeInBytes();
processedBytesConsumer.accept(newProcessedBytes - localProcessedBytes);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when the new page is smaller than the one before the update the value of processedBytesConsumer will be decreased and even get to 0 when all rows were deleted. Does that make sense or we should use something like Math.abs(newProcessedBytes - localProcessedBytes)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shlomi-alfasi I don't follow. page.getSizeInBytes() is a value that should only increase in size. It represents the loaded size of the page, and you can't "unload" data from a page. processedBytesConsumer is an accumulator so we need deltas. If there a buggy page that reduces the value, I don't want to try to mask that over here because it could end up with an ever increasing value (think of the sequence 100, 0, 100, 0...)

localProcessedBytes = newProcessedBytes;
}
}

private class ConnectorPageSourceToPages
implements WorkProcessor.Process<Page>
implements WorkProcessor.Process<SourcePage>
{
final LocalMemoryContext pageSourceMemoryContext;

Expand All @@ -367,7 +402,7 @@ private class ConnectorPageSourceToPages
}

@Override
public ProcessState<Page> process()
public ProcessState<SourcePage> process()
{
if (pageSource.isFinished()) {
return ProcessState.finished();
Expand All @@ -378,7 +413,7 @@ public ProcessState<Page> process()
return ProcessState.blocked(asVoid(toListenableFuture(isBlocked)));
}

Page page = pageSource.getNextPage();
SourcePage page = pageSource.getNextSourcePage();
pageSourceMemoryContext.setBytes(pageSource.getMemoryUsage());

if (page == null) {
Expand All @@ -388,8 +423,6 @@ public ProcessState<Page> process()
return ProcessState.yielded();
}

recordMaterializedBytes(page, sizeInBytes -> processedBytes += sizeInBytes);

// update operator stats
processedPositions += page.getPositionCount();
physicalBytes = pageSource.getCompletedBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.SourcePage;
import io.trino.split.EmptySplit;
import io.trino.split.PageSourceProvider;
import io.trino.split.PageSourceProviderFactory;
Expand Down Expand Up @@ -265,8 +266,10 @@ public Page getOutput()
source = pageSourceProvider.createPageSource(operatorContext.getSession(), split, table, columns, dynamicFilter);
}

Page page = source.getNextPage();
if (page != null) {
SourcePage sourcePage = source.getNextSourcePage();
Page page = null;
if (sourcePage != null) {
page = sourcePage.getPage();
// assure the page is in memory before handing to another operator
page = page.getLoadedPage();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.type.BlockTypeOperators.BlockPositionEqual;

import java.util.List;
Expand Down Expand Up @@ -66,7 +67,7 @@ public InputChannels getInputChannels()
}

@Override
public SelectedPositions filter(ConnectorSession session, Page page)
public SelectedPositions filter(ConnectorSession session, SourcePage page)
{
if (selectedPositions.length < page.getPositionCount()) {
selectedPositions = new boolean[page.getPositionCount()];
Expand All @@ -79,7 +80,7 @@ public SelectedPositions filter(ConnectorSession session, Page page)
return PageFilter.positionsArrayToSelectedPositions(selectedPositions, page.getPositionCount());
}

private boolean matches(Page page, int position)
private boolean matches(SourcePage page, int position)
{
for (int channel = 0; channel < inputChannels.size(); channel++) {
BlockPositionEqual equalOperator = equalOperators.get(channel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
import io.trino.operator.CompletedWork;
import io.trino.operator.DriverYieldSignal;
import io.trino.operator.Work;
import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;
import io.trino.spi.type.Type;

import static io.trino.spi.type.TypeUtils.writeNativeValue;
Expand Down Expand Up @@ -58,7 +58,7 @@ public InputChannels getInputChannels()
}

@Override
public Work<Block> project(ConnectorSession session, DriverYieldSignal yieldSignal, Page page, SelectedPositions selectedPositions)
public Work<Block> project(ConnectorSession session, DriverYieldSignal yieldSignal, SourcePage page, SelectedPositions selectedPositions)
{
return new CompletedWork<>(RunLengthEncodedBlock.create(value, selectedPositions.size()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
*/
package io.trino.operator.project;

import io.trino.spi.Page;
import io.trino.spi.block.Block;
import io.trino.spi.block.DictionaryBlock;
import io.trino.spi.block.RunLengthEncodedBlock;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SourcePage;

import java.util.Optional;

Expand Down Expand Up @@ -54,7 +54,7 @@ public InputChannels getInputChannels()
}

@Override
public SelectedPositions filter(ConnectorSession session, Page page)
public SelectedPositions filter(ConnectorSession session, SourcePage page)
{
Block block = page.getBlock(0).getLoadedBlock();

Expand All @@ -79,7 +79,7 @@ public SelectedPositions filter(ConnectorSession session, Page page)
}
}

return filter.filter(session, new Page(block));
return filter.filter(session, SourcePage.create(block));
}

private Optional<boolean[]> processDictionary(ConnectorSession session, Block dictionary, int blockPositionsCount)
Expand All @@ -99,7 +99,7 @@ private Optional<boolean[]> processDictionary(ConnectorSession session, Block di

if (shouldProcessDictionary) {
try {
SelectedPositions selectedDictionaryPositions = filter.filter(session, new Page(dictionary));
SelectedPositions selectedDictionaryPositions = filter.filter(session, SourcePage.create(dictionary));
lastOutputDictionary = Optional.of(toPositionsMask(selectedDictionaryPositions, dictionary.getPositionCount()));
}
catch (Exception _) {
Expand Down
Loading