Skip to content

Commit 70e1d5e

Browse files
j-sundhantangwangd
authored andcommitted
RuntimeStatsMetrics Reporter implementation
1 parent 0acd038 commit 70e1d5e

File tree

11 files changed

+296
-31
lines changed

11 files changed

+296
-31
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/FilesTable.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.iceberg;
1515

1616
import com.facebook.presto.common.Page;
17+
import com.facebook.presto.common.RuntimeStats;
1718
import com.facebook.presto.common.predicate.TupleDomain;
1819
import com.facebook.presto.common.type.ArrayType;
1920
import com.facebook.presto.common.type.StandardTypes;
@@ -117,13 +118,14 @@ public ConnectorTableMetadata getTableMetadata()
117118
@Override
118119
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
119120
{
120-
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId));
121+
return new FixedPageSource(buildPages(tableMetadata, icebergTable, snapshotId, session));
121122
}
122123

123-
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId)
124+
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, Table icebergTable, Optional<Long> snapshotId, ConnectorSession session)
124125
{
125126
PageListBuilder pagesBuilder = forTable(tableMetadata);
126-
TableScan tableScan = getTableScan(TupleDomain.all(), snapshotId, icebergTable).includeColumnStats();
127+
RuntimeStats runtimeStats = session.getRuntimeStats();
128+
TableScan tableScan = getTableScan(TupleDomain.all(), snapshotId, icebergTable, runtimeStats).includeColumnStats();
127129
Map<Integer, Type> idToTypeMap = getIdToTypeMap(icebergTable.schema());
128130

129131
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergAbstractMetadata.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.json.JsonCodec;
1717
import com.facebook.airlift.log.Logger;
18+
import com.facebook.presto.common.RuntimeStats;
1819
import com.facebook.presto.common.Subfield;
1920
import com.facebook.presto.common.predicate.TupleDomain;
2021
import com.facebook.presto.common.type.BigintType;
@@ -291,12 +292,14 @@ public ConnectorTableLayoutResult getTableLayoutForConstraint(
291292
partitions = ImmutableList.of(new HivePartition(handle.getSchemaTableName()));
292293
}
293294
else {
295+
RuntimeStats runtimeStats = session.getRuntimeStats();
294296
partitions = getPartitions(
295297
typeManager,
296298
handle,
297299
icebergTable,
298300
constraint,
299-
partitionColumns);
301+
partitionColumns,
302+
runtimeStats);
300303
}
301304

302305
ConnectorTableLayout layout = getTableLayout(

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public ConnectorSplitSource getSplits(
9191
long toSnapshot = table.getIcebergTableName().getChangelogEndSnapshot()
9292
.orElseGet(icebergTable.currentSnapshot()::snapshotId);
9393
IncrementalChangelogScan scan = icebergTable.newIncrementalChangelogScan()
94+
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
9495
.fromSnapshotExclusive(fromSnapshot)
9596
.toSnapshot(toSnapshot);
9697
return new ChangelogSplitSource(session, typeManager, icebergTable, scan);
@@ -100,12 +101,14 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
100101
table.getIcebergTableName().getSnapshotId().get(),
101102
predicate,
102103
table.getPartitionSpecId(),
103-
table.getEqualityFieldIds());
104+
table.getEqualityFieldIds(),
105+
session.getRuntimeStats());
104106

105107
return new EqualityDeletesSplitSource(session, icebergTable, deleteFiles);
106108
}
107109
else {
108110
TableScan tableScan = icebergTable.newScan()
111+
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
109112
.filter(toIcebergExpression(predicate))
110113
.useSnapshot(table.getIcebergTableName().getSnapshotId().get())
111114
.planWith(executor);

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.log.Logger;
1717
import com.facebook.presto.common.GenericInternalException;
18+
import com.facebook.presto.common.RuntimeStats;
1819
import com.facebook.presto.common.predicate.Domain;
1920
import com.facebook.presto.common.predicate.NullableValue;
2021
import com.facebook.presto.common.predicate.TupleDomain;
@@ -410,10 +411,13 @@ public static Optional<String> getViewComment(View view)
410411
return Optional.ofNullable(view.properties().get(TABLE_COMMENT));
411412
}
412413

413-
public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Optional<Long> snapshotId, Table icebergTable)
414+
public static TableScan getTableScan(TupleDomain<IcebergColumnHandle> predicates, Optional<Long> snapshotId, Table icebergTable, RuntimeStats runtimeStats)
414415
{
415416
Expression expression = ExpressionConverter.toIcebergExpression(predicates);
416-
TableScan tableScan = icebergTable.newScan().filter(expression);
417+
TableScan tableScan = icebergTable
418+
.newScan()
419+
.metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats))
420+
.filter(expression);
417421
return snapshotId
418422
.map(id -> isSnapshot(icebergTable, id) ? tableScan.useSnapshot(id) : tableScan.asOfTime(id))
419423
.orElse(tableScan);
@@ -434,9 +438,11 @@ public static LocationProvider getLocationProvider(SchemaTableName schemaTableNa
434438
return locationsFor(tableLocation, storageProperties);
435439
}
436440

437-
public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType)
441+
public static TableScan buildTableScan(Table icebergTable, MetadataTableType metadataTableType, RuntimeStats runtimeStats)
438442
{
439-
return createMetadataTableInstance(icebergTable, metadataTableType).newScan();
443+
return createMetadataTableInstance(icebergTable, metadataTableType)
444+
.newScan()
445+
.metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats));
440446
}
441447

442448
public static Map<String, Integer> columnNameToPositionInSchema(Schema schema)
@@ -593,7 +599,8 @@ public static List<HivePartition> getPartitions(
593599
ConnectorTableHandle tableHandle,
594600
Table icebergTable,
595601
Constraint<ColumnHandle> constraint,
596-
List<IcebergColumnHandle> partitionColumns)
602+
List<IcebergColumnHandle> partitionColumns,
603+
RuntimeStats runtimeStats)
597604
{
598605
IcebergTableName name = ((IcebergTableHandle) tableHandle).getIcebergTableName();
599606
FileFormat fileFormat = getFileFormat(icebergTable);
@@ -603,7 +610,9 @@ public static List<HivePartition> getPartitions(
603610
return ImmutableList.of();
604611
}
605612

606-
TableScan tableScan = icebergTable.newScan()
613+
TableScan tableScan = icebergTable
614+
.newScan()
615+
.metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats))
607616
.filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint
608617
.getSummary()
609618
.simplify())))
@@ -878,10 +887,16 @@ public static CloseableIterable<DeleteFile> getDeleteFiles(Table table,
878887
long snapshot,
879888
TupleDomain<IcebergColumnHandle> filter,
880889
Optional<Set<Integer>> requestedPartitionSpec,
881-
Optional<Set<Integer>> requestedSchema)
890+
Optional<Set<Integer>> requestedSchema,
891+
RuntimeStats runtimeStats)
882892
{
883893
Expression filterExpression = toIcebergExpression(filter);
884-
CloseableIterable<FileScanTask> fileTasks = table.newScan().useSnapshot(snapshot).filter(filterExpression).planFiles();
894+
CloseableIterable<FileScanTask> fileTasks = table
895+
.newScan()
896+
.metricsReporter(new RuntimeStatsMetricsReporter(runtimeStats))
897+
.useSnapshot(snapshot)
898+
.filter(filterExpression)
899+
.planFiles();
885900

886901
return new CloseableIterable<DeleteFile>()
887902
{

presto-iceberg/src/main/java/com/facebook/presto/iceberg/PartitionTable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
162162
return new InMemoryRecordSet(resultTypes, ImmutableList.of()).cursor();
163163
}
164164
TableScan tableScan = icebergTable.newScan()
165+
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
165166
.useSnapshot(snapshotId.get())
166167
.includeColumnStats();
167168
return buildRecordCursor(getPartitions(tableScan), icebergTable.spec().fields());
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.common.RuntimeStats;
17+
import com.facebook.presto.common.RuntimeUnit;
18+
import org.apache.iceberg.metrics.MetricsReport;
19+
import org.apache.iceberg.metrics.MetricsReporter;
20+
import org.apache.iceberg.metrics.ScanReport;
21+
22+
/**
23+
* A MetricsReporter implementation for reporting
24+
* Iceberg scan metrics to Presto's RuntimeStats.
25+
*/
26+
27+
public final class RuntimeStatsMetricsReporter
28+
implements MetricsReporter
29+
{
30+
/**
31+
* RuntimeStats variable used for storing scan metrics from Iceberg reports.
32+
*/
33+
private final RuntimeStats runtimeStats;
34+
35+
/**
36+
* Constructs a RuntimeStatsMetricsReporter.
37+
*
38+
* @param runtimeStat the RuntimeStats instance to report metrics to
39+
*/
40+
public RuntimeStatsMetricsReporter(final RuntimeStats runtimeStat)
41+
{
42+
this.runtimeStats = runtimeStat;
43+
}
44+
45+
@Override
46+
public void report(final MetricsReport report)
47+
{
48+
if (!(report instanceof ScanReport)) {
49+
return;
50+
}
51+
52+
ScanReport scanReport = (ScanReport) report;
53+
String tableName = scanReport.tableName();
54+
55+
if (scanReport.scanMetrics().totalPlanningDuration() != null) {
56+
runtimeStats.addMetricValue(
57+
tableScanString(tableName, "totalPlanningDuration"),
58+
RuntimeUnit.NANO,
59+
scanReport.scanMetrics().totalPlanningDuration()
60+
.totalDuration().toNanos());
61+
}
62+
63+
if (scanReport.scanMetrics().resultDataFiles() != null) {
64+
runtimeStats.addMetricValue(
65+
tableScanString(tableName, "resultDataFiles"),
66+
RuntimeUnit.NONE,
67+
scanReport.scanMetrics().resultDataFiles().value());
68+
}
69+
70+
if (scanReport.scanMetrics().resultDeleteFiles() != null) {
71+
runtimeStats.addMetricValue(
72+
tableScanString(tableName, "resultDeleteFiles"),
73+
RuntimeUnit.NONE,
74+
scanReport.scanMetrics().resultDeleteFiles().value());
75+
}
76+
77+
if (scanReport.scanMetrics().totalDataManifests() != null) {
78+
runtimeStats.addMetricValue(
79+
tableScanString(tableName, "totalDataManifests"),
80+
RuntimeUnit.NONE,
81+
scanReport.scanMetrics().totalDataManifests().value());
82+
}
83+
84+
if (scanReport.scanMetrics().totalDeleteManifests() != null) {
85+
runtimeStats.addMetricValue(
86+
tableScanString(tableName, "totalDeleteManifests"),
87+
RuntimeUnit.NONE,
88+
scanReport.scanMetrics().totalDeleteManifests().value());
89+
}
90+
91+
if (scanReport.scanMetrics().scannedDataManifests() != null) {
92+
runtimeStats.addMetricValue(
93+
tableScanString(tableName, "scannedDataManifests"),
94+
RuntimeUnit.NONE,
95+
scanReport.scanMetrics().scannedDataManifests().value());
96+
}
97+
98+
if (scanReport.scanMetrics().skippedDataManifests() != null) {
99+
runtimeStats.addMetricValue(
100+
tableScanString(tableName, "skippedDataManifests"),
101+
RuntimeUnit.NONE,
102+
scanReport.scanMetrics().skippedDataManifests().value());
103+
}
104+
105+
if (scanReport.scanMetrics().totalFileSizeInBytes() != null) {
106+
runtimeStats.addMetricValue(
107+
tableScanString(tableName, "totalFileSizeInBytes"),
108+
RuntimeUnit.BYTE,
109+
scanReport.scanMetrics().totalFileSizeInBytes()
110+
.value());
111+
}
112+
113+
if (scanReport.scanMetrics().totalDeleteFileSizeInBytes() != null) {
114+
runtimeStats.addMetricValue(
115+
tableScanString(tableName, "totalDeleteFileSizeInBytes"),
116+
RuntimeUnit.BYTE,
117+
scanReport.scanMetrics().totalDeleteFileSizeInBytes()
118+
.value());
119+
}
120+
121+
if (scanReport.scanMetrics().skippedDataFiles() != null) {
122+
runtimeStats.addMetricValue(
123+
tableScanString(tableName, "skippedDataFiles"),
124+
RuntimeUnit.NONE,
125+
scanReport.scanMetrics().skippedDataFiles()
126+
.value());
127+
}
128+
129+
if (scanReport.scanMetrics().skippedDeleteFiles() != null) {
130+
runtimeStats.addMetricValue(
131+
tableScanString(tableName, "skippedDeleteFiles"),
132+
RuntimeUnit.NONE,
133+
scanReport.scanMetrics().skippedDeleteFiles().value());
134+
}
135+
136+
if (scanReport.scanMetrics().scannedDeleteManifests() != null) {
137+
runtimeStats.addMetricValue(
138+
tableScanString(tableName, "scannedDeleteManifests"),
139+
RuntimeUnit.NONE,
140+
scanReport.scanMetrics().scannedDeleteManifests().value());
141+
}
142+
143+
if (scanReport.scanMetrics().skippedDeleteManifests() != null) {
144+
runtimeStats.addMetricValue(
145+
tableScanString(tableName, "skippedDeleteManifests"),
146+
RuntimeUnit.NONE,
147+
scanReport.scanMetrics().skippedDeleteManifests().value());
148+
}
149+
150+
if (scanReport.scanMetrics().indexedDeleteFiles() != null) {
151+
runtimeStats.addMetricValue(
152+
tableScanString(tableName, "indexedDeleteFiles"),
153+
RuntimeUnit.NONE,
154+
scanReport.scanMetrics().indexedDeleteFiles().value());
155+
}
156+
157+
if (scanReport.scanMetrics().equalityDeleteFiles() != null) {
158+
runtimeStats.addMetricValue(
159+
tableScanString(tableName, "equalityDeleteFiles"),
160+
RuntimeUnit.NONE,
161+
scanReport.scanMetrics().equalityDeleteFiles().value());
162+
}
163+
164+
if (scanReport.scanMetrics().positionalDeleteFiles() != null) {
165+
runtimeStats.addMetricValue(
166+
tableScanString(tableName, "positionalDeleteFiles"),
167+
RuntimeUnit.NONE,
168+
scanReport.scanMetrics().positionalDeleteFiles().value());
169+
}
170+
}
171+
172+
/**
173+
* Helper method to construct the full metric name for a table scan.
174+
*
175+
* @param tableName the name of the table
176+
* @param metricName the name of the metric
177+
* @return the composed metric name in the format: table.scan.metric
178+
*/
179+
private static String tableScanString(final String tableName, final String metricName)
180+
{
181+
return tableName + ".scan." + metricName;
182+
}
183+
}

presto-iceberg/src/main/java/com/facebook/presto/iceberg/SnapshotsTable.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package com.facebook.presto.iceberg;
1515

1616
import com.facebook.presto.common.Page;
17+
import com.facebook.presto.common.RuntimeStats;
1718
import com.facebook.presto.common.predicate.TupleDomain;
1819
import com.facebook.presto.common.type.StandardTypes;
1920
import com.facebook.presto.common.type.TimeZoneKey;
@@ -101,7 +102,8 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
101102
private static List<Page> buildPages(ConnectorTableMetadata tableMetadata, ConnectorSession session, Table icebergTable)
102103
{
103104
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
104-
TableScan tableScan = buildTableScan(icebergTable, SNAPSHOTS);
105+
RuntimeStats runtimeStats = session.getRuntimeStats();
106+
TableScan tableScan = buildTableScan(icebergTable, SNAPSHOTS, runtimeStats);
105107
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
106108

107109
Map<String, Integer> columnNameToPosition = columnNameToPositionInSchema(tableScan.schema());

presto-iceberg/src/main/java/com/facebook/presto/iceberg/TableStatisticsMaker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ private Partition getDataTableSummary(IcebergTableHandle tableHandle,
282282
List<PartitionField> partitionFields)
283283
{
284284
TableScan tableScan = icebergTable.newScan()
285+
.metricsReporter(new RuntimeStatsMetricsReporter(session.getRuntimeStats()))
285286
.filter(toIcebergExpression(intersection))
286287
.select(selectedColumns.stream().map(IcebergColumnHandle::getName).collect(Collectors.toList()))
287288
.useSnapshot(tableHandle.getIcebergTableName().getSnapshotId().get())
@@ -301,7 +302,8 @@ private Partition getEqualityDeleteTableSummary(IcebergTableHandle tableHandle,
301302
tableHandle.getIcebergTableName().getSnapshotId().get(),
302303
intersection,
303304
tableHandle.getPartitionSpecId(),
304-
tableHandle.getEqualityFieldIds());
305+
tableHandle.getEqualityFieldIds(),
306+
session.getRuntimeStats());
305307
CloseableIterable<ContentFile<?>> files = CloseableIterable.transform(deleteFiles, deleteFile -> deleteFile);
306308
return getSummaryFromFiles(files, idToTypeMapping, nonPartitionPrimitiveColumns, partitionFields);
307309
}

0 commit comments

Comments
 (0)