Skip to content

Commit 11ff09a

Browse files
committed
Limit number of manifest files written by OPTIMIZE_MANIFESTS
Avoids coordinator OOM and too many small manifests
1 parent 07f6c23 commit 11ff09a

File tree

2 files changed

+39
-16
lines changed

2 files changed

+39
-16
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@
176176
import org.apache.iceberg.SortField;
177177
import org.apache.iceberg.SortOrder;
178178
import org.apache.iceberg.StatisticsFile;
179-
import org.apache.iceberg.StructLike;
180179
import org.apache.iceberg.Table;
181180
import org.apache.iceberg.TableProperties;
182181
import org.apache.iceberg.TableScan;
@@ -200,6 +199,7 @@
200199
import org.apache.iceberg.types.Types.NestedField;
201200
import org.apache.iceberg.types.Types.StringType;
202201
import org.apache.iceberg.types.Types.StructType;
202+
import org.apache.iceberg.util.StructLikeWrapper;
203203

204204
import java.io.IOException;
205205
import java.io.UncheckedIOException;
@@ -219,6 +219,7 @@
219219
import java.util.LinkedHashMap;
220220
import java.util.List;
221221
import java.util.Map;
222+
import java.util.Objects;
222223
import java.util.Optional;
223224
import java.util.OptionalLong;
224225
import java.util.Set;
@@ -2116,17 +2117,23 @@ private void executeOptimizeManifests(ConnectorSession session, IcebergTableExec
21162117
if (manifests.isEmpty()) {
21172118
return;
21182119
}
2119-
if (manifests.size() == 1 && manifests.getFirst().length() < icebergTable.operations().current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT)) {
2120+
long manifestTargetSizeBytes = icebergTable.operations().current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT);
2121+
if (manifests.size() == 1 && manifests.getFirst().length() < manifestTargetSizeBytes) {
21202122
return;
21212123
}
2124+
long totalManifestsSize = manifests.stream().mapToLong(ManifestFile::length).sum();
2125+
// Having too many open manifest writers can potentially cause OOM on the coordinator
2126+
long targetManifestClusters = Math.min(((totalManifestsSize + manifestTargetSizeBytes - 1) / manifestTargetSizeBytes), 100);
21222127

21232128
beginTransaction(icebergTable);
21242129
RewriteManifests rewriteManifests = transaction.rewriteManifests();
2130+
Types.StructType structType = icebergTable.spec().partitionType();
21252131
rewriteManifests
21262132
.clusterBy(file -> {
2127-
// Use the first partition field as the clustering key
2128-
StructLike partition = file.partition();
2129-
return partition.size() > 1 ? Optional.ofNullable(partition.get(0, Object.class)) : partition;
2133+
// Cluster by partitions for better locality when reading data files
2134+
StructLikeWrapper partitionWrapper = StructLikeWrapper.forType(structType).set(file.partition());
2135+
// Limit the number of clustering buckets to avoid creating too many small manifest files
2136+
return Objects.hash(partitionWrapper) % targetManifestClusters;
21302137
})
21312138
.scanManifestsWith(icebergScanExecutor)
21322139
.commit();

plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/procedure/TestIcebergOptimizeManifestsProcedure.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ void testOptimizeManifestWithNullPartitions()
6666
assertUpdate("ALTER TABLE " + table.getName() + " EXECUTE optimize_manifests");
6767

6868
assertThat(manifestFiles(table.getName()))
69-
.hasSize(2)
69+
.hasSize(1)
7070
.doesNotContainAnyElementsOf(manifestFiles);
7171

7272
assertThat(query("SELECT * FROM " + table.getName()))
@@ -138,7 +138,7 @@ void testPartitionTable()
138138

139139
assertUpdate("ALTER TABLE " + table.getName() + " EXECUTE optimize_manifests");
140140
assertThat(manifestFiles(table.getName()))
141-
.hasSize(2)
141+
.hasSize(1)
142142
.doesNotContainAnyElementsOf(manifestFiles);
143143

144144
assertThat(query("SELECT * FROM " + table.getName()))
@@ -147,24 +147,40 @@ void testPartitionTable()
147147
}
148148

149149
@Test
150-
void testFirstPartitionField()
150+
void testMultiplePartitioningColumns()
151151
{
152152
try (TestTable table = newTrinoTable("test_partition", "(id int, part int, nested int) WITH (partitioning = ARRAY['part', 'nested'])")) {
153-
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 10, 100)", 1);
154-
assertUpdate("INSERT INTO " + table.getName() + " VALUES (2, 10, 200)", 1);
155-
assertUpdate("INSERT INTO " + table.getName() + " VALUES (3, 20, 300)", 1);
156-
assertUpdate("INSERT INTO " + table.getName() + " VALUES (4, 20, 400)", 1);
153+
for (int i = 0; i < 30; i++) {
154+
assertUpdate("INSERT INTO " + table.getName() + " VALUES (%d, %d, %d)".formatted(i, i % 10, i % 3), 1);
155+
}
157156

158157
Set<String> manifestFiles = manifestFiles(table.getName());
159-
assertThat(manifestFiles).hasSize(4);
158+
assertThat(manifestFiles).hasSize(30);
160159

161160
assertUpdate("ALTER TABLE " + table.getName() + " EXECUTE optimize_manifests");
162-
assertThat(manifestFiles(table.getName()))
161+
Set<String> currentManifestFiles = manifestFiles(table.getName());
162+
assertThat(currentManifestFiles)
163+
.hasSize(1)
164+
.doesNotContainAnyElementsOf(manifestFiles);
165+
166+
assertThat(query("SELECT COUNT(*) FROM " + table.getName()))
167+
.matches("VALUES BIGINT '30'");
168+
169+
// Set small target size to force split
170+
BaseTable icebergTable = loadTable(table.getName());
171+
icebergTable.updateProperties()
172+
.set("commit.manifest.target-size-bytes", "8000")
173+
.commit();
174+
manifestFiles = currentManifestFiles;
175+
assertUpdate("ALTER TABLE " + table.getName() + " EXECUTE optimize_manifests");
176+
177+
currentManifestFiles = manifestFiles(table.getName());
178+
assertThat(currentManifestFiles)
163179
.hasSize(2)
164180
.doesNotContainAnyElementsOf(manifestFiles);
165181

166-
assertThat(query("SELECT * FROM " + table.getName()))
167-
.matches("VALUES (1, 10, 100), (2, 10, 200), (3, 20, 300), (4, 20, 400)");
182+
assertThat(query("SELECT COUNT(*) FROM " + table.getName()))
183+
.matches("VALUES BIGINT '30'");
168184
}
169185
}
170186

0 commit comments

Comments
 (0)