Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.trino.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator;
import io.trino.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory;
import io.trino.spi.Page;
import io.trino.spi.metrics.Metrics;
import io.trino.sql.planner.plan.PlanNodeId;

import java.util.Optional;
Expand Down Expand Up @@ -145,4 +146,10 @@ public void close()
{
operator.close();
}

@Override
public Metrics getMetrics()
{
return operator.getMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory;
import io.trino.operator.project.PageProcessor;
import io.trino.operator.project.PageProcessorMetrics;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.metrics.Metrics;
import io.trino.spi.type.Type;
import io.trino.sql.planner.plan.PlanNodeId;

Expand All @@ -39,6 +41,7 @@ public class FilterAndProjectOperator
implements WorkProcessorOperator
{
private final WorkProcessor<Page> pages;
private final PageProcessorMetrics metrics = new PageProcessorMetrics();

private FilterAndProjectOperator(
Session session,
Expand All @@ -60,6 +63,7 @@ private FilterAndProjectOperator(
connectorSession,
yieldSignal,
outputMemoryContext,
metrics,
page,
avoidPageMaterialization))
.transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext))
Expand All @@ -72,6 +76,12 @@ public WorkProcessor<Page> getOutputPages()
return pages;
}

@Override
public Metrics getMetrics()
{
return metrics.getMetrics();
}

public static OperatorFactory createOperatorFactory(
int operatorId,
PlanNodeId planNodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.trino.operator.project.CursorProcessor;
import io.trino.operator.project.CursorProcessorOutput;
import io.trino.operator.project.PageProcessor;
import io.trino.operator.project.PageProcessorMetrics;
import io.trino.spi.Page;
import io.trino.spi.PageBuilder;
import io.trino.spi.connector.ColumnHandle;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class ScanFilterAndProjectOperator
implements WorkProcessorSourceOperator
{
private final WorkProcessor<Page> pages;
private final PageProcessorMetrics pageProcessorMetrics = new PageProcessorMetrics();

@Nullable
private RecordCursor cursor;
Expand Down Expand Up @@ -170,6 +172,15 @@ public Metrics getConnectorMetrics()
return metrics;
}

@Override
public Metrics getMetrics()
{
if (cursor != null) {
return Metrics.EMPTY;
}
return pageProcessorMetrics.getMetrics();
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand Down Expand Up @@ -294,6 +305,7 @@ WorkProcessor<Page> processPageSource()
connectorSession,
yieldSignal,
outputMemoryContext,
pageProcessorMetrics,
page,
avoidPageMaterialization))
.transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,26 @@ public PageProcessor(Optional<PageFilter> filter, List<? extends PageProjection>
this(filter, projections, OptionalInt.of(1));
}

@VisibleForTesting
public Iterator<Optional<Page>> process(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page)
{
return process(session, yieldSignal, memoryContext, page, false);
}

public Iterator<Optional<Page>> process(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page, boolean avoidPageMaterialization)
@VisibleForTesting
Iterator<Optional<Page>> process(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page, boolean avoidPageMaterialization)
{
WorkProcessor<Page> processor = createWorkProcessor(session, yieldSignal, memoryContext, page, avoidPageMaterialization);
WorkProcessor<Page> processor = createWorkProcessor(session, yieldSignal, memoryContext, new PageProcessorMetrics(), page, avoidPageMaterialization);
return processor.yieldingIterator();
}

public WorkProcessor<Page> createWorkProcessor(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page, boolean avoidPageMaterialization)
public WorkProcessor<Page> createWorkProcessor(
ConnectorSession session,
DriverYieldSignal yieldSignal,
LocalMemoryContext memoryContext,
PageProcessorMetrics metrics,
Page page,
boolean avoidPageMaterialization)
{
// limit the scope of the dictionary ids to just one page
dictionarySourceIdFunction.reset();
Expand All @@ -116,7 +124,10 @@ public WorkProcessor<Page> createWorkProcessor(ConnectorSession session, DriverY
}

if (filter.isPresent()) {
SelectedPositions selectedPositions = filter.get().filter(session, filter.get().getInputChannels().getInputChannels(page));
Page inputPage = filter.get().getInputChannels().getInputChannels(page);
long start = System.nanoTime();
SelectedPositions selectedPositions = filter.get().filter(session, inputPage);
metrics.recordFilterTimeSince(start);
if (selectedPositions.isEmpty()) {
return WorkProcessor.of();
}
Expand All @@ -127,15 +138,15 @@ public WorkProcessor<Page> createWorkProcessor(ConnectorSession session, DriverY
}

if (selectedPositions.size() != page.getPositionCount()) {
return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, page, selectedPositions, avoidPageMaterialization));
return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, metrics, page, selectedPositions, avoidPageMaterialization));
}
}
else if (projections.isEmpty()) {
// retained memory for empty page is negligible
return WorkProcessor.of(new Page(page.getPositionCount()));
}

return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, page, positionsRange(0, page.getPositionCount()), avoidPageMaterialization));
return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, metrics, page, positionsRange(0, page.getPositionCount()), avoidPageMaterialization));
}

private class ProjectSelectedPositions
Expand All @@ -144,6 +155,7 @@ private class ProjectSelectedPositions
private final ConnectorSession session;
private final DriverYieldSignal yieldSignal;
private final LocalMemoryContext memoryContext;
private final PageProcessorMetrics metrics;
private final boolean avoidPageMaterialization;

private Page page;
Expand All @@ -163,6 +175,7 @@ private ProjectSelectedPositions(
ConnectorSession session,
DriverYieldSignal yieldSignal,
LocalMemoryContext memoryContext,
PageProcessorMetrics metrics,
Page page,
SelectedPositions selectedPositions,
boolean avoidPageMaterialization)
Expand All @@ -171,6 +184,7 @@ private ProjectSelectedPositions(

this.session = session;
this.yieldSignal = yieldSignal;
this.metrics = metrics;
this.page = page;
this.memoryContext = memoryContext;
this.avoidPageMaterialization = avoidPageMaterialization;
Expand Down Expand Up @@ -328,9 +342,11 @@ private ProcessBatchResult processBatch(int batchSize)
}
else {
if (pageProjectWork == null) {
Page inputPage = projection.getInputChannels().getInputChannels(page);
expressionProfiler.start();
pageProjectWork = projection.project(session, yieldSignal, projection.getInputChannels().getInputChannels(page), positionsBatch);
expressionProfiler.stop(positionsBatch.size());
pageProjectWork = projection.project(session, yieldSignal, inputPage, positionsBatch);
long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size());
metrics.recordProjectionTime(projectionTimeNanos);
Comment thread
raunaqmorarka marked this conversation as resolved.
}
if (!pageProjectWork.process()) {
return ProcessBatchResult.processBatchYield();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.operator.project;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import io.trino.plugin.base.metrics.DurationTiming;
import io.trino.spi.metrics.Metric;
import io.trino.spi.metrics.Metrics;

import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class PageProcessorMetrics
{
private static final String FILTER_TIME = "Filter CPU time";
private static final String PROJECTION_TIME = "Projection CPU time";

private long filterTimeNanos;
private boolean hasFilter;
private long projectionTimeNanos;
private boolean hasProjection;

public void recordFilterTimeSince(long startNanos)
{
filterTimeNanos += System.nanoTime() - startNanos;
hasFilter = true;
}

public void recordProjectionTime(long projectionTimeNanos)
{
this.projectionTimeNanos += projectionTimeNanos;
hasProjection = true;
}

public Metrics getMetrics()
{
ImmutableMap.Builder<String, Metric<?>> builder = ImmutableMap.builder();
if (hasFilter) {
builder.put(FILTER_TIME, new DurationTiming(new Duration(filterTimeNanos, NANOSECONDS)));
}
if (hasProjection) {
builder.put(PROJECTION_TIME, new DurationTiming(new Duration(projectionTimeNanos, NANOSECONDS)));
}
return new Metrics(builder.buildOrThrow());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void start()
previousTimestamp = ticker.read();
}

public void stop(int batchSize)
public long stop(int batchSize)
{
verify(previousTimestamp != NOT_INITALIZED, "start() is not called");
verify(batchSize > 0, "batchSize must be positive");
Expand All @@ -67,6 +67,7 @@ public void stop(int batchSize)
isExpressionExpensive = false;
}
previousTimestamp = NOT_INITALIZED;
return delta;
}

public boolean isExpressionExpensive()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8245,6 +8245,24 @@ public void testExplainAnalyzePhysicalReadWallTime()
"'Physical input read time' = \\{duration=.*}");
}

@Test
public void testExplainAnalyzeScanFilterProjectWallTime()
{
assertExplainAnalyze(
"EXPLAIN ANALYZE VERBOSE SELECT nationkey * 2 FROM nation WHERE nationkey > 0",
"'Filter CPU time' = \\{duration=.*}",
"'Projection CPU time' = \\{duration=.*}");
}

@Test
public void testExplainAnalyzeFilterProjectWallTime()
{
assertExplainAnalyze(
"EXPLAIN ANALYZE VERBOSE SELECT * FROM (SELECT nationkey, count(*) cnt FROM nation GROUP BY 1) where cnt > 0",
"'Filter CPU time' = \\{duration=.*}",
"'Projection CPU time' = \\{duration=.*}");
}

private static final Set<HiveStorageFormat> NAMED_COLUMN_ONLY_FORMATS = ImmutableSet.of(HiveStorageFormat.AVRO, HiveStorageFormat.JSON);

@DataProvider
Expand Down