Skip to content

Commit 3413c22

Browse files
oneonestarebyhr
authored andcommitted
Add Iceberg metadata management properties
1 parent b049555 commit 3413c22

File tree

9 files changed

+225
-7
lines changed

9 files changed

+225
-7
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,8 @@
271271
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
272272
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
273273
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
274+
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY;
275+
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY;
274276
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_LAYOUT_ENABLED_PROPERTY;
275277
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
276278
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
@@ -359,6 +361,8 @@
359361
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
360362
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
361363
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
364+
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
365+
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
362366
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
363367
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
364368
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
@@ -383,6 +387,8 @@ public class IcebergMetadata
383387
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
384388
.add(DATA_LOCATION_PROPERTY)
385389
.add(PARTITIONING_PROPERTY)
390+
.add(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)
391+
.add(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)
386392
.add(SORTED_BY_PROPERTY)
387393
.build();
388394

@@ -2177,6 +2183,17 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
21772183
if (!objectStoreEnabled) {
21782184
throw new TrinoException(INVALID_TABLE_PROPERTY, "Data location can only be set when object store layout is enabled");
21792185
}
2186+
if (properties.containsKey(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)) {
2187+
boolean commitEnabled = (boolean) properties.get(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)
2188+
.orElseThrow(() -> new IllegalArgumentException("The metadata_delete_after_commit_enabled property cannot be empty"));
2189+
updateProperties.set(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(commitEnabled));
2190+
}
2191+
2192+
if (properties.containsKey(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)) {
2193+
int metadataPerviousVersionMax = (int) properties.get(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)
2194+
.orElseThrow(() -> new IllegalArgumentException("The metadata_previous_versions_max property cannot be empty"));
2195+
updateProperties.set(METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(metadataPerviousVersionMax));
2196+
}
21802197
updateProperties.set(WRITE_DATA_LOCATION, dataLocation);
21812198
}
21822199

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ public class IcebergTableProperties
6161
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
6262
public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled";
6363
public static final String DATA_LOCATION_PROPERTY = "data_location";
64+
public static final String METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY = "metadata_delete_after_commit_enabled";
65+
public static final String METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY = "metadata_previous_versions_max";
6466
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";
6567

6668
public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
@@ -73,6 +75,8 @@ public class IcebergTableProperties
7375
.add(ORC_BLOOM_FILTER_FPP_PROPERTY)
7476
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
7577
.add(DATA_LOCATION_PROPERTY)
78+
.add(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)
79+
.add(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)
7680
.add(EXTRA_PROPERTIES_PROPERTY)
7781
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
7882
.build();
@@ -190,6 +194,15 @@ public IcebergTableProperties(
190194
"File system location URI for the table's data files",
191195
null,
192196
false))
197+
.add(booleanProperty(
198+
METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY,
199+
"Delete old tracked metadata files after each table commit",
200+
null,
201+
false))
202+
.add(integerProperty(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY,
203+
"The number of old metadata files to keep",
204+
null,
205+
false))
193206
.build();
194207

195208
checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream()

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@
123123
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
124124
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
125125
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
126+
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY;
127+
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY;
126128
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_LAYOUT_ENABLED_PROPERTY;
127129
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
128130
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
@@ -174,6 +176,8 @@
174176
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
175177
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
176178
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
179+
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
180+
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
177181
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
178182
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
179183
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
@@ -343,6 +347,15 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
343347
Optional<String> dataLocation = Optional.ofNullable(icebergTable.properties().get(WRITE_DATA_LOCATION));
344348
dataLocation.ifPresent(location -> properties.put(DATA_LOCATION_PROPERTY, location));
345349

350+
String metadataDeleteAfterCommitEnabled = icebergTable.properties().get(METADATA_DELETE_AFTER_COMMIT_ENABLED);
351+
if (metadataDeleteAfterCommitEnabled != null) {
352+
properties.put(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY, Boolean.parseBoolean(metadataDeleteAfterCommitEnabled));
353+
}
354+
String metadataPreviousVersionsMax = icebergTable.properties().get(METADATA_PREVIOUS_VERSIONS_MAX);
355+
if (metadataPreviousVersionsMax != null) {
356+
properties.put(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY, Integer.parseInt(metadataPreviousVersionsMax));
357+
}
358+
346359
return properties.buildOrThrow();
347360
}
348361

@@ -882,6 +895,13 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
882895
}
883896
}
884897

898+
if (tableMetadata.getProperties().containsKey(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)) {
899+
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, ((Boolean) tableMetadata.getProperties().get(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)).toString());
900+
}
901+
if (tableMetadata.getProperties().containsKey(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)) {
902+
propertiesBuilder.put(METADATA_PREVIOUS_VERSIONS_MAX, ((Integer) tableMetadata.getProperties().get(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)).toString());
903+
}
904+
885905
if (tableMetadata.getComment().isPresent()) {
886906
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
887907
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/file/FileMetastoreTableOperations.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
3636
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
3737
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
38+
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;
3839

3940
@NotThreadSafe
4041
public class FileMetastoreTableOperations
@@ -56,7 +57,7 @@ public FileMetastoreTableOperations(
5657
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
5758
{
5859
Table currentTable = getTable();
59-
commitTableUpdate(currentTable, metadata, (table, newMetadataLocation) -> Table.builder(table)
60+
commitTableUpdate(currentTable, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
6061
.apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation)))
6162
.build());
6263
}
@@ -65,12 +66,12 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
6566
protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata)
6667
{
6768
Table materializedView = getTable(database, tableNameFrom(tableName));
68-
commitTableUpdate(materializedView, metadata, (table, newMetadataLocation) -> Table.builder(table)
69+
commitTableUpdate(materializedView, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
6970
.apply(builder -> builder.setParameter(METADATA_LOCATION_PROP, newMetadataLocation).setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation))
7071
.build());
7172
}
7273

73-
private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
74+
private void commitTableUpdate(Table table, TableMetadata base, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
7475
{
7576
checkState(currentMetadataLocation != null, "No current metadata location for existing table");
7677
String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
@@ -97,5 +98,6 @@ private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<T
9798
}
9899
throw new CommitStateUnknownException(e);
99100
}
101+
deleteRemovedMetadataFiles(io(), base, metadata);
100102
}
101103
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/glue/GlueIcebergTableOperations.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static java.util.Objects.requireNonNull;
6363
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
6464
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
65+
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;
6566

6667
public class GlueIcebergTableOperations
6768
extends AbstractIcebergTableOperations
@@ -161,6 +162,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
161162
{
162163
commitTableUpdate(
163164
getTable(database, tableName, false),
165+
base,
164166
metadata,
165167
(table, newMetadataLocation) ->
166168
getTableInput(
@@ -179,6 +181,7 @@ protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata m
179181
{
180182
commitTableUpdate(
181183
getTable(database, tableNameFrom(tableName), false),
184+
base,
182185
metadata,
183186
(table, newMetadataLocation) -> {
184187
Map<String, String> parameters = new HashMap<>(getTableParameters(table));
@@ -193,7 +196,7 @@ protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata m
193196
});
194197
}
195198

196-
private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<Table, String, TableInput> tableUpdateFunction)
199+
private void commitTableUpdate(Table table, TableMetadata base, TableMetadata metadata, BiFunction<Table, String, TableInput> tableUpdateFunction)
197200
{
198201
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
199202
TableInput tableInput = tableUpdateFunction.apply(table, newMetadataLocation);
@@ -218,6 +221,7 @@ private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<T
218221
// regardless of the exception thrown (e.g. : timeout exception) or it actually failed
219222
throw new CommitStateUnknownException(e);
220223
}
224+
deleteRemovedMetadataFiles(io(), base, metadata);
221225
shouldRefresh = true;
222226
}
223227

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/hms/HiveMetastoreTableOperations.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import static java.util.Objects.requireNonNull;
4040
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
4141
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
42+
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;
4243

4344
@NotThreadSafe
4445
public class HiveMetastoreTableOperations
@@ -65,7 +66,7 @@ public HiveMetastoreTableOperations(
6566
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
6667
{
6768
Table currentTable = getTable();
68-
commitTableUpdate(currentTable, metadata, (table, newMetadataLocation) -> Table.builder(table)
69+
commitTableUpdate(currentTable, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
6970
.apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation)))
7071
.build());
7172
}
@@ -74,14 +75,14 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
7475
protected final void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata)
7576
{
7677
Table materializedView = getTable(database, tableNameFrom(tableName));
77-
commitTableUpdate(materializedView, metadata, (table, newMetadataLocation) -> Table.builder(table)
78+
commitTableUpdate(materializedView, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
7879
.apply(builder -> builder
7980
.setParameter(METADATA_LOCATION_PROP, newMetadataLocation)
8081
.setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation))
8182
.build());
8283
}
8384

84-
private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
85+
private void commitTableUpdate(Table table, TableMetadata base, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
8586
{
8687
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
8788
long lockId = thriftMetastore.acquireTableExclusiveLock(
@@ -126,6 +127,7 @@ private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<T
126127
}
127128
}
128129

130+
deleteRemovedMetadataFiles(io(), base, metadata);
129131
shouldRefresh = true;
130132
}
131133
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/IcebergJdbcTableOperations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static com.google.common.base.Preconditions.checkState;
2525
import static com.google.common.base.Verify.verify;
2626
import static java.util.Objects.requireNonNull;
27+
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;
2728

2829
public class IcebergJdbcTableOperations
2930
extends AbstractIcebergTableOperations
@@ -65,6 +66,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
6566
checkState(currentMetadataLocation != null, "No current metadata location for existing table");
6667
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
6768
jdbcClient.alterTable(database, tableName, newMetadataLocation, currentMetadataLocation);
69+
deleteRemovedMetadataFiles(io(), base, metadata);
6870
shouldRefresh = true;
6971
}
7072

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
3939
import static java.lang.String.format;
4040
import static java.util.Objects.requireNonNull;
41+
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;
4142

4243
public class IcebergNessieTableOperations
4344
extends AbstractIcebergTableOperations
@@ -149,6 +150,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
149150
// CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry
150151
throw new CommitFailedException(e, "Cannot commit: ref hash is out of date. Update the ref '%s' and try again", nessieClient.refName());
151152
}
153+
deleteRemovedMetadataFiles(io(), base, metadata);
152154
shouldRefresh = true;
153155
}
154156

0 commit comments

Comments
 (0)