diff --git a/core/trino-main/src/main/java/io/trino/operator/BasicWorkProcessorOperatorAdapter.java b/core/trino-main/src/main/java/io/trino/operator/BasicWorkProcessorOperatorAdapter.java index 0a4f532d7d6..5f5137665a7 100644 --- a/core/trino-main/src/main/java/io/trino/operator/BasicWorkProcessorOperatorAdapter.java +++ b/core/trino-main/src/main/java/io/trino/operator/BasicWorkProcessorOperatorAdapter.java @@ -16,6 +16,7 @@ import io.trino.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperator; import io.trino.operator.WorkProcessorOperatorAdapter.AdapterWorkProcessorOperatorFactory; import io.trino.spi.Page; +import io.trino.spi.metrics.Metrics; import io.trino.sql.planner.plan.PlanNodeId; import java.util.Optional; @@ -145,4 +146,10 @@ public void close() { operator.close(); } + + @Override + public Metrics getMetrics() + { + return operator.getMetrics(); + } } diff --git a/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java b/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java index e30cfb8c7fd..987d64300f9 100644 --- a/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/FilterAndProjectOperator.java @@ -21,8 +21,10 @@ import io.trino.memory.context.MemoryTrackingContext; import io.trino.operator.BasicWorkProcessorOperatorAdapter.BasicAdapterWorkProcessorOperatorFactory; import io.trino.operator.project.PageProcessor; +import io.trino.operator.project.PageProcessorMetrics; import io.trino.spi.Page; import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.metrics.Metrics; import io.trino.spi.type.Type; import io.trino.sql.planner.plan.PlanNodeId; @@ -39,6 +41,7 @@ public class FilterAndProjectOperator implements WorkProcessorOperator { private final WorkProcessor pages; + private final PageProcessorMetrics metrics = new PageProcessorMetrics(); private FilterAndProjectOperator( Session session, @@ -60,6 +63,7 @@ private FilterAndProjectOperator( connectorSession, yieldSignal, outputMemoryContext, + metrics, page, avoidPageMaterialization)) .transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext)) @@ -72,6 +76,12 @@ public WorkProcessor getOutputPages() return pages; } + @Override + public Metrics getMetrics() + { + return metrics.getMetrics(); + } + public static OperatorFactory createOperatorFactory( int operatorId, PlanNodeId planNodeId, diff --git a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java index 178b7746202..b197f808bd8 100644 --- a/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/ScanFilterAndProjectOperator.java @@ -30,6 +30,7 @@ import io.trino.operator.project.CursorProcessor; import io.trino.operator.project.CursorProcessorOutput; import io.trino.operator.project.PageProcessor; +import io.trino.operator.project.PageProcessorMetrics; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.connector.ColumnHandle; @@ -70,6 +71,7 @@ public class ScanFilterAndProjectOperator implements WorkProcessorSourceOperator { private final WorkProcessor pages; + private final PageProcessorMetrics pageProcessorMetrics = new PageProcessorMetrics(); @Nullable private RecordCursor cursor; @@ -170,6 +172,15 @@ public Metrics getConnectorMetrics() return metrics; } + @Override + public Metrics getMetrics() + { + if (cursor != null) { + return Metrics.EMPTY; + } + return pageProcessorMetrics.getMetrics(); + } + @Override public WorkProcessor getOutputPages() { @@ -294,6 +305,7 @@ WorkProcessor processPageSource() connectorSession, yieldSignal, outputMemoryContext, + pageProcessorMetrics, page, avoidPageMaterialization)) .transformProcessor(processor -> mergePages(types, minOutputPageSize.toBytes(), minOutputPageRowCount, processor, localAggregatedMemoryContext)) diff --git a/core/trino-main/src/main/java/io/trino/operator/project/PageProcessor.java b/core/trino-main/src/main/java/io/trino/operator/project/PageProcessor.java index ed5cc98de9f..a08faec243a 100644 --- a/core/trino-main/src/main/java/io/trino/operator/project/PageProcessor.java +++ b/core/trino-main/src/main/java/io/trino/operator/project/PageProcessor.java @@ -95,18 +95,26 @@ public PageProcessor(Optional filter, List this(filter, projections, OptionalInt.of(1)); } + @VisibleForTesting public Iterator> process(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page) { return process(session, yieldSignal, memoryContext, page, false); } - public Iterator> process(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page, boolean avoidPageMaterialization) + @VisibleForTesting + Iterator> process(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page, boolean avoidPageMaterialization) { - WorkProcessor processor = createWorkProcessor(session, yieldSignal, memoryContext, page, avoidPageMaterialization); + WorkProcessor processor = createWorkProcessor(session, yieldSignal, memoryContext, new PageProcessorMetrics(), page, avoidPageMaterialization); return processor.yieldingIterator(); } - public WorkProcessor createWorkProcessor(ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, Page page, boolean avoidPageMaterialization) + public WorkProcessor createWorkProcessor( + ConnectorSession session, + DriverYieldSignal yieldSignal, + LocalMemoryContext memoryContext, + PageProcessorMetrics metrics, + Page page, + boolean avoidPageMaterialization) { // limit the scope of the dictionary ids to just one page dictionarySourceIdFunction.reset(); @@ -116,7 +124,10 @@ public WorkProcessor createWorkProcessor(ConnectorSession session, DriverY } if (filter.isPresent()) { - SelectedPositions selectedPositions = filter.get().filter(session, filter.get().getInputChannels().getInputChannels(page)); + Page inputPage = filter.get().getInputChannels().getInputChannels(page); + long start = System.nanoTime(); + SelectedPositions selectedPositions = filter.get().filter(session, inputPage); + metrics.recordFilterTimeSince(start); if (selectedPositions.isEmpty()) { return WorkProcessor.of(); } @@ -127,7 +138,7 @@ public WorkProcessor createWorkProcessor(ConnectorSession session, DriverY } if (selectedPositions.size() != page.getPositionCount()) { - return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, page, selectedPositions, avoidPageMaterialization)); + return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, metrics, page, selectedPositions, avoidPageMaterialization)); } } else if (projections.isEmpty()) { @@ -135,7 +146,7 @@ else if (projections.isEmpty()) { return WorkProcessor.of(new Page(page.getPositionCount())); } - return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, page, positionsRange(0, page.getPositionCount()), avoidPageMaterialization)); + return WorkProcessor.create(new ProjectSelectedPositions(session, yieldSignal, memoryContext, metrics, page, positionsRange(0, page.getPositionCount()), avoidPageMaterialization)); } private class ProjectSelectedPositions @@ -144,6 +155,7 @@ private class ProjectSelectedPositions private final ConnectorSession session; private final DriverYieldSignal yieldSignal; private final LocalMemoryContext memoryContext; + private final PageProcessorMetrics metrics; private final boolean avoidPageMaterialization; private Page page; @@ -163,6 +175,7 @@ private ProjectSelectedPositions( ConnectorSession session, DriverYieldSignal yieldSignal, LocalMemoryContext memoryContext, + PageProcessorMetrics metrics, Page page, SelectedPositions selectedPositions, boolean avoidPageMaterialization) @@ -171,6 +184,7 @@ private ProjectSelectedPositions( this.session = session; this.yieldSignal = yieldSignal; + this.metrics = metrics; this.page = page; this.memoryContext = memoryContext; this.avoidPageMaterialization = avoidPageMaterialization; @@ -328,9 +342,11 @@ private ProcessBatchResult processBatch(int batchSize) } else { if (pageProjectWork == null) { + Page inputPage = projection.getInputChannels().getInputChannels(page); expressionProfiler.start(); - pageProjectWork = projection.project(session, yieldSignal, projection.getInputChannels().getInputChannels(page), positionsBatch); - expressionProfiler.stop(positionsBatch.size()); + pageProjectWork = projection.project(session, yieldSignal, inputPage, positionsBatch); + long projectionTimeNanos = expressionProfiler.stop(positionsBatch.size()); + metrics.recordProjectionTime(projectionTimeNanos); } if (!pageProjectWork.process()) { return ProcessBatchResult.processBatchYield(); diff --git a/core/trino-main/src/main/java/io/trino/operator/project/PageProcessorMetrics.java b/core/trino-main/src/main/java/io/trino/operator/project/PageProcessorMetrics.java new file mode 100644 index 00000000000..053327fbcc6 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/operator/project/PageProcessorMetrics.java @@ -0,0 +1,57 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.operator.project; + +import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; +import io.trino.plugin.base.metrics.DurationTiming; +import io.trino.spi.metrics.Metric; +import io.trino.spi.metrics.Metrics; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +public class PageProcessorMetrics +{ + private static final String FILTER_TIME = "Filter CPU time"; + private static final String PROJECTION_TIME = "Projection CPU time"; + + private long filterTimeNanos; + private boolean hasFilter; + private long projectionTimeNanos; + private boolean hasProjection; + + public void recordFilterTimeSince(long startNanos) + { + filterTimeNanos += System.nanoTime() - startNanos; + hasFilter = true; + } + + public void recordProjectionTime(long projectionTimeNanos) + { + this.projectionTimeNanos += projectionTimeNanos; + hasProjection = true; + } + + public Metrics getMetrics() + { + ImmutableMap.Builder> builder = ImmutableMap.builder(); + if (hasFilter) { + builder.put(FILTER_TIME, new DurationTiming(new Duration(filterTimeNanos, NANOSECONDS))); + } + if (hasProjection) { + builder.put(PROJECTION_TIME, new DurationTiming(new Duration(projectionTimeNanos, NANOSECONDS))); + } + return new Metrics(builder.buildOrThrow()); + } +} diff --git a/core/trino-main/src/main/java/io/trino/sql/gen/ExpressionProfiler.java b/core/trino-main/src/main/java/io/trino/sql/gen/ExpressionProfiler.java index 6608a3210a9..7839d49b2d3 100644 --- a/core/trino-main/src/main/java/io/trino/sql/gen/ExpressionProfiler.java +++ b/core/trino-main/src/main/java/io/trino/sql/gen/ExpressionProfiler.java @@ -54,7 +54,7 @@ public void start() previousTimestamp = ticker.read(); } - public void stop(int batchSize) + public long stop(int batchSize) { verify(previousTimestamp != NOT_INITALIZED, "start() is not called"); verify(batchSize > 0, "batchSize must be positive"); @@ -67,6 +67,7 @@ public void stop(int batchSize) isExpressionExpensive = false; } previousTimestamp = NOT_INITALIZED; + return delta; } public boolean isExpressionExpensive() diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java index f5c501d6984..e7a51afb1ee 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/BaseHiveConnectorTest.java @@ -8245,6 +8245,24 @@ public void testExplainAnalyzePhysicalReadWallTime() "'Physical input read time' = \\{duration=.*}"); } + @Test + public void testExplainAnalyzeScanFilterProjectWallTime() + { + assertExplainAnalyze( + "EXPLAIN ANALYZE VERBOSE SELECT nationkey * 2 FROM nation WHERE nationkey > 0", + "'Filter CPU time' = \\{duration=.*}", + "'Projection CPU time' = \\{duration=.*}"); + } + + @Test + public void testExplainAnalyzeFilterProjectWallTime() + { + assertExplainAnalyze( + "EXPLAIN ANALYZE VERBOSE SELECT * FROM (SELECT nationkey, count(*) cnt FROM nation GROUP BY 1) where cnt > 0", + "'Filter CPU time' = \\{duration=.*}", + "'Projection CPU time' = \\{duration=.*}"); + } + private static final Set NAMED_COLUMN_ONLY_FORMATS = ImmutableSet.of(HiveStorageFormat.AVRO, HiveStorageFormat.JSON); @DataProvider