Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.trino.memory.context.LocalMemoryContext;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.operator.OperationTimer.OperationTiming;
import io.trino.plugin.base.metrics.DurationTiming;
import io.trino.plugin.base.metrics.TDigestHistogram;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand Down Expand Up @@ -74,6 +75,7 @@ public class OperatorContext

private final CounterStat physicalInputDataSize = new CounterStat();
private final CounterStat physicalInputPositions = new CounterStat();
private final AtomicLong physicalInputReadTimeNanos = new AtomicLong();

private final CounterStat internalNetworkInputDataSize = new CounterStat();
private final CounterStat internalNetworkPositions = new CounterStat();
Expand Down Expand Up @@ -182,7 +184,7 @@ public void recordPhysicalInputWithTiming(long sizeInBytes, long positions, long
{
physicalInputDataSize.update(sizeInBytes);
physicalInputPositions.update(positions);
addInputTiming.record(readNanos, 0);
physicalInputReadTimeNanos.getAndAdd(readNanos);
Comment thread
sopel39 marked this conversation as resolved.
Outdated
}

/**
Expand Down Expand Up @@ -532,6 +534,16 @@ public static Metrics getOperatorMetrics(Metrics operatorMetrics, long inputPosi
return operatorMetrics.mergeWith(new Metrics(ImmutableMap.of("Input distribution", new TDigestHistogram(digest))));
}

public static Metrics getConnectorMetrics(Metrics connectorMetrics, long physicalInputReadTimeNanos)
{
if (physicalInputReadTimeNanos == 0) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a big fan of using 0 as a special marker. Can we use OptionalLong. Or if maybe -1 so we use value which is out of domain for valid values?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will leave it as is. It's not a maker really, but rather skipping meaningless stat.
We do something similar for other things, e.g:
39f48d5

            if (node.isMaySkipOutputDuplicates()) {
                nodeOutput.appendDetailsLine("maySkipOutputDuplicates = %s", node.isMaySkipOutputDuplicates());
            }

If I used -1 then I would have to take care of it while doing addition (synchronization). Similaraly Optional would require some synchronization and locking.

return connectorMetrics;
}

return connectorMetrics.mergeWith(new Metrics(ImmutableMap.of(
"Physical input read time", new DurationTiming(new Duration(physicalInputReadTimeNanos, NANOSECONDS)))));
}

public <C, R> R accept(QueryContextVisitor<C, R> visitor, C context)
{
return visitor.visitOperatorContext(this, context);
Expand Down Expand Up @@ -573,7 +585,7 @@ private OperatorStats getOperatorStats()

dynamicFilterSplitsProcessed.get(),
getOperatorMetrics(metrics.get(), inputPositionsCount),
connectorMetrics.get(),
getConnectorMetrics(connectorMetrics.get(), physicalInputReadTimeNanos.get()),

DataSize.ofBytes(physicalWrittenDataSize.get()),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.operator.BlockedReason.WAITING_FOR_MEMORY;
import static io.trino.operator.OperatorContext.getConnectorMetrics;
import static io.trino.operator.OperatorContext.getOperatorMetrics;
import static io.trino.operator.PageUtils.recordMaterializedBytes;
import static io.trino.operator.WorkProcessor.ProcessState.Type.BLOCKED;
Expand Down Expand Up @@ -319,8 +320,7 @@ private List<OperatorStats> getNestedOperatorStats()

// WorkProcessorOperator doesn't have addInput call
0,
// source operators report read time though
new Duration(context.readTimeNanos.get(), NANOSECONDS),
new Duration(0, NANOSECONDS),
ZERO_DURATION,

succinctBytes(context.physicalInputDataSize.get()),
Expand All @@ -344,7 +344,7 @@ private List<OperatorStats> getNestedOperatorStats()

context.dynamicFilterSplitsProcessed.get(),
getOperatorMetrics(context.metrics.get(), context.inputPositions.get()),
context.connectorMetrics.get(),
getConnectorMetrics(context.connectorMetrics.get(), context.readTimeNanos.get()),

DataSize.ofBytes(0),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.trino.operator.WorkProcessor.Transformation;
import io.trino.operator.WorkProcessor.TransformationState;
import io.trino.operator.WorkProcessorAssertion.Transform;
import io.trino.plugin.base.metrics.DurationTiming;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.spi.Page;
import io.trino.spi.connector.UpdatablePageSource;
Expand Down Expand Up @@ -217,7 +218,8 @@ public void testWorkProcessorPipelineSourceOperator()
.containsEntry("testSourceClosed", new LongCount(1));
assertEquals(sourceOperatorStats.getConnectorMetrics().getMetrics(), ImmutableMap.of(
"testSourceConnectorMetric", new LongCount(2),
"testSourceConnectorClosed", new LongCount(1)));
"testSourceConnectorClosed", new LongCount(1),
"Physical input read time", new DurationTiming(new Duration(7, NANOSECONDS))));

assertEquals(sourceOperatorStats.getDynamicFilterSplitsProcessed(), 42L);

Expand All @@ -230,7 +232,7 @@ public void testWorkProcessorPipelineSourceOperator()
assertEquals(sourceOperatorStats.getInputDataSize(), DataSize.ofBytes(5));
assertEquals(sourceOperatorStats.getInputPositions(), 6);

assertEquals(sourceOperatorStats.getAddInputWall(), new Duration(7, NANOSECONDS));
assertEquals(sourceOperatorStats.getAddInputWall(), new Duration(0, NANOSECONDS));

// pipeline input stats should match source WorkProcessorOperator stats
PipelineStats pipelineStats = pipelineOperator.getOperatorContext().getDriverContext().getPipelineContext().getPipelineStats();
Expand All @@ -253,7 +255,8 @@ public void testWorkProcessorPipelineSourceOperator()
.containsEntry("testSourceClosed", new LongCount(1));
assertEquals(operatorSummaries.get(0).getConnectorMetrics().getMetrics(), ImmutableMap.of(
"testSourceConnectorMetric", new LongCount(2),
"testSourceConnectorClosed", new LongCount(1)));
"testSourceConnectorClosed", new LongCount(1),
"Physical input read time", new DurationTiming(new Duration(7, NANOSECONDS))));
assertThat(operatorSummaries.get(1).getMetrics().getMetrics())
.hasSize(2)
.containsEntry("testOperatorMetric", new LongCount(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.memory.context.MemoryTrackingContext;
import io.trino.metadata.Split;
import io.trino.operator.WorkProcessorSourceOperatorAdapter.AdapterWorkProcessorSourceOperatorFactory;
import io.trino.plugin.base.metrics.DurationTiming;
import io.trino.plugin.base.metrics.LongCount;
import io.trino.spi.Page;
import io.trino.spi.connector.UpdatablePageSource;
Expand All @@ -36,6 +38,7 @@
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.SessionTestUtils.TEST_SESSION;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.assertj.core.api.Assertions.assertThat;

public class TestWorkProcessorSourceOperatorAdapter
Expand Down Expand Up @@ -73,14 +76,18 @@ public void testMetrics()
assertThat(getOnlyElement(context.getNestedOperatorStats()).getMetrics().getMetrics())
.hasSize(2)
.containsEntry("testOperatorMetric", new LongCount(1));
assertThat(getOnlyElement(context.getNestedOperatorStats()).getConnectorMetrics().getMetrics()).isEqualTo(ImmutableMap.of("testConnectorMetric", new LongCount(2)));
assertThat(getOnlyElement(context.getNestedOperatorStats()).getConnectorMetrics().getMetrics()).isEqualTo(ImmutableMap.of(
"testConnectorMetric", new LongCount(2),
"Physical input read time", new DurationTiming(new Duration(7, NANOSECONDS))));

operator.getOutput();
assertThat(operator.isFinished()).isTrue();
assertThat(getOnlyElement(context.getNestedOperatorStats()).getMetrics().getMetrics())
.hasSize(2)
.containsEntry("testOperatorMetric", new LongCount(2));
assertThat(getOnlyElement(context.getNestedOperatorStats()).getConnectorMetrics().getMetrics()).isEqualTo(ImmutableMap.of("testConnectorMetric", new LongCount(3)));
assertThat(getOnlyElement(context.getNestedOperatorStats()).getConnectorMetrics().getMetrics()).isEqualTo(ImmutableMap.of(
"testConnectorMetric", new LongCount(3),
"Physical input read time", new DurationTiming(new Duration(7, NANOSECONDS))));
}

private static class TestWorkProcessorOperatorFactory
Expand Down Expand Up @@ -138,6 +145,12 @@ public Metrics getConnectorMetrics()
return new Metrics(ImmutableMap.of("testConnectorMetric", new LongCount(count + 1)));
}

@Override
public Duration getReadTime()
{
return new Duration(7, NANOSECONDS);
}

@Override
public WorkProcessor<Page> getOutputPages()
{
Expand Down
22 changes: 22 additions & 0 deletions core/trino-spi/src/main/java/io/trino/spi/metrics/Timing.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.metrics;

import java.time.Duration;

public interface Timing<T>
extends Metric<T>
{
Duration getDuration();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.base.metrics;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.airlift.units.Duration;
import io.trino.spi.metrics.Timing;

import java.util.Objects;

import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

public class DurationTiming
implements Timing<DurationTiming>
{
// use Airlift duration for more human-friendly serialization format
// and to match duration serialization in other JSON objects
private final Duration duration;

@JsonCreator
public DurationTiming(Duration duration)
{
this.duration = requireNonNull(duration, "duration is null");
}

@JsonProperty("duration")
public Duration getAirliftDuration()
{
return duration;
}

@Override
public java.time.Duration getDuration()
Comment thread
sopel39 marked this conversation as resolved.
Outdated
{
return java.time.Duration.ofNanos(duration.roundTo(NANOSECONDS));
}

@Override
public DurationTiming mergeWith(DurationTiming other)
{
long durationNanos = duration.roundTo(NANOSECONDS);
long otherDurationNanos = other.getAirliftDuration().roundTo(NANOSECONDS);
return new DurationTiming(new Duration(durationNanos + otherDurationNanos, NANOSECONDS).convertToMostSuccinctTimeUnit());
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
DurationTiming that = (DurationTiming) o;
return duration.equals(that.duration);
}

@Override
public int hashCode()
{
return Objects.hash(duration);
}

@Override
public String toString()
{
return toStringHelper("")
.add("duration", duration)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
import com.google.common.collect.ImmutableMap;
import io.airlift.json.JsonCodec;
import io.airlift.stats.TDigest;
import io.airlift.units.Duration;
import io.trino.spi.metrics.Metric;
import io.trino.spi.metrics.Metrics;
import org.testng.annotations.Test;

import java.util.Arrays;
import java.util.Map;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.assertj.core.api.Assertions.assertThat;

public class TestMetrics
Expand Down Expand Up @@ -65,6 +67,21 @@ public void testMergeHistogram()
.matches("\\{count=3\\.00, p01=5\\.00, p05=5\\.00, p10=5\\.00, p25=5\\.00, p50=7\\.50, p75=10\\.00, p90=10\\.00, p95=10\\.00, p99=10\\.00, min=5\\.00, max=10\\.00\\}");
}

@Test
public void testDurationTiming()
{
DurationTiming d1 = new DurationTiming(new Duration(123, NANOSECONDS));
DurationTiming d2 = new DurationTiming(new Duration(1, NANOSECONDS));

Metrics m1 = new Metrics(ImmutableMap.of("a", d1));
Metrics m2 = new Metrics(ImmutableMap.of("a", d2));

DurationTiming merged = (DurationTiming) merge(m1, m2).getMetrics().get("a");

assertThat(merged.getAirliftDuration().roundTo(NANOSECONDS)).isEqualTo(124);
assertThat(merged.getDuration().toNanos()).isEqualTo(124);
}

@Test
public void testHistogramJson()
{
Expand All @@ -78,6 +95,16 @@ public void testHistogramJson()
assertThat(result.getDigest().getCount()).isEqualTo(digest.getCount());
}

@Test
public void testDurationJson()
{
JsonCodec<DurationTiming> codec = JsonCodec.jsonCodec(DurationTiming.class);
DurationTiming duration = new DurationTiming(new Duration(123, NANOSECONDS));
String json = codec.toJson(duration);
DurationTiming result = codec.fromJson(json);
assertThat(result.getAirliftDuration()).isEqualTo(duration.getAirliftDuration());
}

@Test(expectedExceptions = ClassCastException.class)
public void testFailIncompatibleTypes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8279,6 +8279,14 @@ public void testAutoPurgeProperty()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testExplainAnalyzePhysicalReadWallTime()
{
assertExplainAnalyze(
"EXPLAIN ANALYZE VERBOSE SELECT * FROM nation a",
"'Physical input read time' = \\{duration=.*}");
}

private static final Set<HiveStorageFormat> NAMED_COLUMN_ONLY_FORMATS = ImmutableSet.of(HiveStorageFormat.AVRO, HiveStorageFormat.JSON);

@DataProvider
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3022,12 +3022,12 @@ private void verifySplitCount(QueryId queryId, long expectedSplitCount)
OperatorStats operatorStats = getOperatorStats(queryId);
if (expectedSplitCount > 0) {
assertThat(operatorStats.getTotalDrivers()).isEqualTo(expectedSplitCount);
assertThat(operatorStats.getAddInputCalls()).isGreaterThan(0);
assertThat(operatorStats.getPhysicalInputPositions()).isGreaterThan(0);
}
else {
// expectedSplitCount == 0
assertThat(operatorStats.getTotalDrivers()).isEqualTo(1);
assertThat(operatorStats.getAddInputCalls()).isEqualTo(0);
assertThat(operatorStats.getPhysicalInputPositions()).isEqualTo(0);
}
}

Expand Down