Skip to content

Commit aaa888b

Browse files
committed
Use icebergScanExecutor for reading manifests in IcebergMetadata
Reduces bottleneck on the default iceberg worker pool
1 parent 1298ab2 commit aaa888b

File tree

1 file changed

+11
-5
lines changed

1 file changed

+11
-5
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,6 +2029,7 @@ private void finishOptimize(ConnectorSession session, IcebergTableExecuteHandle
20292029
// Set dataSequenceNumber to avoid contention between OPTIMIZE and concurrent writing of equality deletes
20302030
rewriteFiles.dataSequenceNumber(snapshot.sequenceNumber());
20312031
rewriteFiles.validateFromSnapshot(snapshot.snapshotId());
2032+
rewriteFiles.scanManifestsWith(icebergScanExecutor);
20322033
commitUpdateAndTransaction(rewriteFiles, session, transaction, "optimize");
20332034

20342035
// TODO (https://github.com/trinodb/trino/issues/15439) this may not exactly be the snapshot we committed, if there is another writer
@@ -2129,11 +2130,14 @@ private void executeOptimizeManifests(ConnectorSession session, IcebergTableExec
21292130

21302131
beginTransaction(icebergTable);
21312132
RewriteManifests rewriteManifests = transaction.rewriteManifests();
2132-
rewriteManifests.clusterBy(file -> {
2133-
// Use the first partition field as the clustering key
2134-
StructLike partition = file.partition();
2135-
return partition.size() > 1 ? Optional.ofNullable(partition.get(0, Object.class)) : partition;
2136-
}).commit();
2133+
rewriteManifests
2134+
.clusterBy(file -> {
2135+
// Use the first partition field as the clustering key
2136+
StructLike partition = file.partition();
2137+
return partition.size() > 1 ? Optional.ofNullable(partition.get(0, Object.class)) : partition;
2138+
})
2139+
.scanManifestsWith(icebergScanExecutor)
2140+
.commit();
21372141
commitTransaction(transaction, "optimize manifests");
21382142
transaction = null;
21392143
}
@@ -2198,6 +2202,7 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21982202
table.expireSnapshots()
21992203
.expireOlderThan(expireTimestampMillis)
22002204
.deleteWith(deleteFunction)
2205+
.planWith(icebergScanExecutor)
22012206
.commit();
22022207

22032208
fileSystem.deleteFiles(pathsToDelete);
@@ -3154,6 +3159,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
31543159
// Ensure a row that is updated by this commit was not deleted by a separate commit
31553160
rowDelta.validateDeletedFiles();
31563161
rowDelta.validateNoConflictingDeleteFiles();
3162+
rowDelta.scanManifestsWith(icebergScanExecutor);
31573163

31583164
ImmutableSet.Builder<String> writtenFiles = ImmutableSet.builder();
31593165
ImmutableSet.Builder<String> referencedDataFiles = ImmutableSet.builder();

0 commit comments

Comments
 (0)