diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java index 195565a0557d..9ea5d21faa37 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverContext.java @@ -331,7 +331,6 @@ public DriverStats getDriverStats() DataSize internalNetworkInputDataSize; long internalNetworkInputPositions; - Duration internalNetworkInputReadTime; DataSize rawInputDataSize; long rawInputPositions; @@ -346,11 +345,10 @@ public DriverStats getDriverStats() if (inputOperator != null) { physicalInputDataSize = inputOperator.getPhysicalInputDataSize(); physicalInputPositions = inputOperator.getPhysicalInputPositions(); - physicalInputReadTime = inputOperator.getAddInputWall(); + physicalInputReadTime = inputOperator.getPhysicalInputReadTime(); internalNetworkInputDataSize = inputOperator.getInternalNetworkInputDataSize(); internalNetworkInputPositions = inputOperator.getInternalNetworkInputPositions(); - internalNetworkInputReadTime = inputOperator.getAddInputWall(); rawInputDataSize = inputOperator.getRawInputDataSize(); rawInputPositions = inputOperator.getInputPositions(); @@ -374,7 +372,6 @@ public DriverStats getDriverStats() internalNetworkInputDataSize = DataSize.ofBytes(0); internalNetworkInputPositions = 0; - internalNetworkInputReadTime = new Duration(0, MILLISECONDS); rawInputDataSize = DataSize.ofBytes(0); rawInputPositions = 0; @@ -419,7 +416,6 @@ public DriverStats getDriverStats() physicalInputReadTime, internalNetworkInputDataSize.succinct(), internalNetworkInputPositions, - internalNetworkInputReadTime, rawInputDataSize.succinct(), rawInputPositions, rawInputReadTime, diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java index b3f9f37bbead..816b3ee4fed1 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverStats.java @@ -59,7 +59,6 @@ public class DriverStats private final DataSize internalNetworkInputDataSize; private final long internalNetworkInputPositions; - private final Duration internalNetworkInputReadTime; private final DataSize rawInputDataSize; private final long rawInputPositions; @@ -104,7 +103,6 @@ public DriverStats() this.internalNetworkInputDataSize = DataSize.ofBytes(0); this.internalNetworkInputPositions = 0; - this.internalNetworkInputReadTime = new Duration(0, MILLISECONDS); this.rawInputDataSize = DataSize.ofBytes(0); this.rawInputPositions = 0; @@ -150,7 +148,6 @@ public DriverStats( @JsonProperty("internalNetworkInputDataSize") DataSize internalNetworkInputDataSize, @JsonProperty("internalNetworkInputPositions") long internalNetworkInputPositions, - @JsonProperty("internalNetworkInputReadTime") Duration internalNetworkInputReadTime, @JsonProperty("rawInputDataSize") DataSize rawInputDataSize, @JsonProperty("rawInputPositions") long rawInputPositions, @@ -195,7 +192,6 @@ public DriverStats( this.internalNetworkInputDataSize = requireNonNull(internalNetworkInputDataSize, "internalNetworkInputDataSize is null"); checkArgument(internalNetworkInputPositions >= 0, "internalNetworkInputPositions is negative"); this.internalNetworkInputPositions = internalNetworkInputPositions; - this.internalNetworkInputReadTime = requireNonNull(internalNetworkInputReadTime, "internalNetworkInputReadTime is null"); this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null"); checkArgument(rawInputPositions >= 0, "rawInputPositions is negative"); @@ -329,12 +325,6 @@ public long getInternalNetworkInputPositions() return internalNetworkInputPositions; } - @JsonProperty - public Duration getInternalNetworkInputReadTime() - { - return internalNetworkInputReadTime; - } - @JsonProperty public DataSize getRawInputDataSize() { diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java index f21b1f81b4f9..2d54a91f8f5c 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorContext.java @@ -543,6 +543,7 @@ private OperatorStats getOperatorStats() new Duration(addInputTiming.getCpuNanos(), NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(physicalInputDataSize.getTotalCount()), physicalInputPositions.getTotalCount(), + new Duration(physicalInputReadTimeNanos.get(), NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(internalNetworkInputDataSize.getTotalCount()), internalNetworkPositions.getTotalCount(), DataSize.ofBytes(physicalInputDataSize.getTotalCount() + internalNetworkInputDataSize.getTotalCount()), diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java index 58b55dd40cb2..2908b0ea3929 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorStats.java @@ -49,6 +49,7 @@ public class OperatorStats private final Duration addInputCpu; private final DataSize physicalInputDataSize; private final long physicalInputPositions; + private final Duration physicalInputReadTime; private final DataSize internalNetworkInputDataSize; private final long internalNetworkInputPositions; private final DataSize rawInputDataSize; @@ -102,6 +103,7 @@ public OperatorStats( @JsonProperty("addInputCpu") Duration addInputCpu, @JsonProperty("physicalInputDataSize") DataSize physicalInputDataSize, @JsonProperty("physicalInputPositions") long physicalInputPositions, + @JsonProperty("physicalInputReadTime") Duration physicalInputReadTime, @JsonProperty("internalNetworkInputDataSize") DataSize internalNetworkInputDataSize, @JsonProperty("internalNetworkInputPositions") long internalNetworkInputPositions, @JsonProperty("rawInputDataSize") DataSize rawInputDataSize, @@ -155,6 +157,7 @@ public OperatorStats( this.addInputCpu = requireNonNull(addInputCpu, "addInputCpu is null"); this.physicalInputDataSize = requireNonNull(physicalInputDataSize, "physicalInputDataSize is null"); this.physicalInputPositions = physicalInputPositions; + this.physicalInputReadTime = requireNonNull(physicalInputReadTime, "physicalInputReadTime is null"); this.internalNetworkInputDataSize = requireNonNull(internalNetworkInputDataSize, "internalNetworkInputDataSize is null"); this.internalNetworkInputPositions = internalNetworkInputPositions; this.rawInputDataSize = requireNonNull(rawInputDataSize, "rawInputDataSize is null"); @@ -262,6 +265,12 @@ public long getPhysicalInputPositions() return physicalInputPositions; } + @JsonProperty + public Duration getPhysicalInputReadTime() + { + return physicalInputReadTime; + } + @JsonProperty public DataSize getInternalNetworkInputDataSize() { @@ -439,6 +448,7 @@ public OperatorStats add(Iterable operators) long addInputCpu = this.addInputCpu.roundTo(NANOSECONDS); long physicalInputDataSize = this.physicalInputDataSize.toBytes(); long physicalInputPositions = this.physicalInputPositions; + long physicalInputReadTimeNanos = this.physicalInputReadTime.roundTo(NANOSECONDS); long internalNetworkInputDataSize = this.internalNetworkInputDataSize.toBytes(); long internalNetworkInputPositions = this.internalNetworkInputPositions; long rawInputDataSize = this.rawInputDataSize.toBytes(); @@ -486,6 +496,7 @@ public OperatorStats add(Iterable operators) addInputCpu += operator.getAddInputCpu().roundTo(NANOSECONDS); physicalInputDataSize += operator.getPhysicalInputDataSize().toBytes(); physicalInputPositions += operator.getPhysicalInputPositions(); + physicalInputReadTimeNanos += operator.getPhysicalInputReadTime().roundTo(NANOSECONDS); internalNetworkInputDataSize += operator.getInternalNetworkInputDataSize().toBytes(); internalNetworkInputPositions += operator.getInternalNetworkInputPositions(); rawInputDataSize += operator.getRawInputDataSize().toBytes(); @@ -545,6 +556,7 @@ public OperatorStats add(Iterable operators) new Duration(addInputCpu, NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(physicalInputDataSize), physicalInputPositions, + new Duration(physicalInputReadTimeNanos, NANOSECONDS).convertToMostSuccinctTimeUnit(), DataSize.ofBytes(internalNetworkInputDataSize), internalNetworkInputPositions, DataSize.ofBytes(rawInputDataSize), @@ -617,6 +629,7 @@ public OperatorStats summarize() addInputCpu, physicalInputDataSize, physicalInputPositions, + physicalInputReadTime, internalNetworkInputDataSize, internalNetworkInputPositions, rawInputDataSize, diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java index 7de696f8bc3c..cb27cbabdf7b 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorPipelineSourceOperator.java @@ -324,6 +324,7 @@ private List getNestedOperatorStats() succinctBytes(context.physicalInputDataSize.get()), context.physicalInputPositions.get(), + new Duration(context.operatorTiming.getWallNanos(), NANOSECONDS), succinctBytes(context.internalNetworkInputDataSize.get()), context.internalNetworkInputPositions.get(), diff --git a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java index 5c01cd7ef797..354da70578d2 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestQueryStats.java @@ -52,6 +52,7 @@ public class TestQueryStats new Duration(17, NANOSECONDS), succinctBytes(181L), 1811, + new Duration(18, NANOSECONDS), succinctBytes(182L), 1822, succinctBytes(18L), @@ -91,6 +92,7 @@ public class TestQueryStats new Duration(27, NANOSECONDS), succinctBytes(281L), 2811, + new Duration(28, NANOSECONDS), succinctBytes(282L), 2822, succinctBytes(28L), @@ -130,6 +132,7 @@ public class TestQueryStats new Duration(37, NANOSECONDS), succinctBytes(381L), 3811, + new Duration(38, NANOSECONDS), succinctBytes(382L), 3822, succinctBytes(38L), diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java index 81bd9a172df2..3e1827a14023 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDriverStats.java @@ -54,7 +54,6 @@ public class TestDriverStats DataSize.ofBytes(132), 142, - new Duration(152, NANOSECONDS), DataSize.ofBytes(13), 14, @@ -108,7 +107,6 @@ public static void assertExpectedDriverStats(DriverStats actual) assertEquals(actual.getInternalNetworkInputDataSize(), DataSize.ofBytes(132)); assertEquals(actual.getInternalNetworkInputPositions(), 142); - assertEquals(actual.getInternalNetworkInputReadTime(), new Duration(152, NANOSECONDS)); assertEquals(actual.getRawInputDataSize(), DataSize.ofBytes(13)); assertEquals(actual.getRawInputPositions(), 14); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java index c5ab25564946..87516a5f3e6c 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestOperatorStats.java @@ -50,6 +50,7 @@ public class TestOperatorStats new Duration(4, NANOSECONDS), DataSize.ofBytes(51), 511, + new Duration(5, NANOSECONDS), DataSize.ofBytes(52), 522, DataSize.ofBytes(5), @@ -97,6 +98,7 @@ public class TestOperatorStats new Duration(4, NANOSECONDS), DataSize.ofBytes(51), 511, + new Duration(5, NANOSECONDS), DataSize.ofBytes(52), 522, DataSize.ofBytes(5), @@ -153,6 +155,7 @@ public static void assertExpectedOperatorStats(OperatorStats actual) assertEquals(actual.getAddInputCpu(), new Duration(4, NANOSECONDS)); assertEquals(actual.getPhysicalInputDataSize(), DataSize.ofBytes(51)); assertEquals(actual.getPhysicalInputPositions(), 511); + assertEquals(actual.getPhysicalInputReadTime(), new Duration(5, NANOSECONDS)); assertEquals(actual.getInternalNetworkInputDataSize(), DataSize.ofBytes(52)); assertEquals(actual.getInternalNetworkInputPositions(), 522); assertEquals(actual.getRawInputDataSize(), DataSize.ofBytes(5)); @@ -203,6 +206,7 @@ public void testAdd() assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); assertEquals(actual.getPhysicalInputDataSize(), DataSize.ofBytes(3 * 51)); assertEquals(actual.getPhysicalInputPositions(), 3 * 511); + assertEquals(actual.getPhysicalInputReadTime(), new Duration(3 * 5, NANOSECONDS)); assertEquals(actual.getInternalNetworkInputDataSize(), DataSize.ofBytes(3 * 52)); assertEquals(actual.getInternalNetworkInputPositions(), 3 * 522); assertEquals(actual.getRawInputDataSize(), DataSize.ofBytes(3 * 5)); @@ -251,6 +255,7 @@ public void testAddMergeable() assertEquals(actual.getAddInputCpu(), new Duration(3 * 4, NANOSECONDS)); assertEquals(actual.getPhysicalInputDataSize(), DataSize.ofBytes(3 * 51)); assertEquals(actual.getPhysicalInputPositions(), 3 * 511); + assertEquals(actual.getPhysicalInputReadTime(), new Duration(3 * 5, NANOSECONDS)); assertEquals(actual.getInternalNetworkInputDataSize(), DataSize.ofBytes(3 * 52)); assertEquals(actual.getInternalNetworkInputPositions(), 3 * 522); assertEquals(actual.getRawInputDataSize(), DataSize.ofBytes(3 * 5)); diff --git a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java index 3b26d5f7a672..089b37faf1d5 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestWorkProcessorPipelineSourceOperator.java @@ -245,7 +245,8 @@ public void testWorkProcessorPipelineSourceOperator() assertEquals(sourceOperatorStats.getInputDataSize(), pipelineStats.getProcessedInputDataSize()); assertEquals(sourceOperatorStats.getInputPositions(), pipelineStats.getProcessedInputPositions()); - assertEquals(sourceOperatorStats.getAddInputWall(), pipelineStats.getPhysicalInputReadTime()); + assertThat(sourceOperatorStats.getPhysicalInputReadTime().convertToMostSuccinctTimeUnit()) + .isEqualTo(pipelineStats.getPhysicalInputReadTime().convertToMostSuccinctTimeUnit()); // assert pipeline metrics List operatorSummaries = pipelineStats.getOperatorSummaries();