Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_LAYOUT_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
Expand Down Expand Up @@ -359,6 +361,8 @@
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
Expand All @@ -383,6 +387,8 @@ public class IcebergMetadata
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(PARTITIONING_PROPERTY)
.add(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)
.add(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)
.add(SORTED_BY_PROPERTY)
.build();

Expand Down Expand Up @@ -2177,6 +2183,17 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
if (!objectStoreEnabled) {
throw new TrinoException(INVALID_TABLE_PROPERTY, "Data location can only be set when object store layout is enabled");
}
if (properties.containsKey(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)) {
boolean commitEnabled = (boolean) properties.get(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The metadata_delete_after_commit_enabled property cannot be empty"));
updateProperties.set(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(commitEnabled));
}

if (properties.containsKey(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)) {
int metadataPerviousVersionMax = (int) properties.get(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)
.orElseThrow(() -> new IllegalArgumentException("The metadata_previous_versions_max property cannot be empty"));
updateProperties.set(METADATA_PREVIOUS_VERSIONS_MAX, Integer.toString(metadataPerviousVersionMax));
}
updateProperties.set(WRITE_DATA_LOCATION, dataLocation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class IcebergTableProperties
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";
public static final String METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY = "metadata_delete_after_commit_enabled";
public static final String METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY = "metadata_previous_versions_max";
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";

public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
Expand All @@ -73,6 +75,8 @@ public class IcebergTableProperties
.add(ORC_BLOOM_FILTER_FPP_PROPERTY)
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
.add(DATA_LOCATION_PROPERTY)
.add(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)
.add(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)
.add(EXTRA_PROPERTIES_PROPERTY)
.add(PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY)
.build();
Expand Down Expand Up @@ -190,6 +194,15 @@ public IcebergTableProperties(
"File system location URI for the table's data files",
null,
false))
.add(booleanProperty(
METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY,
"Delete old tracked metadata files after each table commit",
null,
false))
.add(integerProperty(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY,
"The number of old metadata files to keep",
null,
false))
.build();

checkState(SUPPORTED_PROPERTIES.containsAll(tableProperties.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.OBJECT_STORE_LAYOUT_ENABLED_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_COLUMNS_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
Expand Down Expand Up @@ -174,6 +176,8 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METADATA_PREVIOUS_VERSIONS_MAX;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.ORC_BLOOM_FILTER_COLUMNS;
Expand Down Expand Up @@ -343,6 +347,15 @@ public static Map<String, Object> getIcebergTableProperties(Table icebergTable)
Optional<String> dataLocation = Optional.ofNullable(icebergTable.properties().get(WRITE_DATA_LOCATION));
dataLocation.ifPresent(location -> properties.put(DATA_LOCATION_PROPERTY, location));

String metadataDeleteAfterCommitEnabled = icebergTable.properties().get(METADATA_DELETE_AFTER_COMMIT_ENABLED);
if (metadataDeleteAfterCommitEnabled != null) {
properties.put(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY, Boolean.parseBoolean(metadataDeleteAfterCommitEnabled));
}
String metadataPreviousVersionsMax = icebergTable.properties().get(METADATA_PREVIOUS_VERSIONS_MAX);
if (metadataPreviousVersionsMax != null) {
properties.put(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY, Integer.parseInt(metadataPreviousVersionsMax));
}

return properties.buildOrThrow();
}

Expand Down Expand Up @@ -882,6 +895,13 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
}
}

if (tableMetadata.getProperties().containsKey(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)) {
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, ((Boolean) tableMetadata.getProperties().get(METADATA_DELETE_AFTER_COMMIT_ENABLED_PROPERTY)).toString());
}
if (tableMetadata.getProperties().containsKey(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)) {
propertiesBuilder.put(METADATA_PREVIOUS_VERSIONS_MAX, ((Integer) tableMetadata.getProperties().get(METADATA_PREVIOUS_VERSIONS_MAX_PROPERTY)).toString());
}

if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;

@NotThreadSafe
public class FileMetastoreTableOperations
Expand All @@ -56,7 +57,7 @@ public FileMetastoreTableOperations(
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
Table currentTable = getTable();
commitTableUpdate(currentTable, metadata, (table, newMetadataLocation) -> Table.builder(table)
commitTableUpdate(currentTable, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
.apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation)))
.build());
}
Expand All @@ -65,12 +66,12 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata)
{
Table materializedView = getTable(database, tableNameFrom(tableName));
commitTableUpdate(materializedView, metadata, (table, newMetadataLocation) -> Table.builder(table)
commitTableUpdate(materializedView, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
.apply(builder -> builder.setParameter(METADATA_LOCATION_PROP, newMetadataLocation).setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation))
.build());
}

private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
private void commitTableUpdate(Table table, TableMetadata base, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
{
checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);
Expand All @@ -97,5 +98,6 @@ private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<T
}
throw new CommitStateUnknownException(e);
}
deleteRemovedMetadataFiles(io(), base, metadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;

public class GlueIcebergTableOperations
extends AbstractIcebergTableOperations
Expand Down Expand Up @@ -161,6 +162,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
commitTableUpdate(
getTable(database, tableName, false),
base,
metadata,
(table, newMetadataLocation) ->
getTableInput(
Expand All @@ -179,6 +181,7 @@ protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata m
{
commitTableUpdate(
getTable(database, tableNameFrom(tableName), false),
base,
metadata,
(table, newMetadataLocation) -> {
Map<String, String> parameters = new HashMap<>(getTableParameters(table));
Expand All @@ -193,7 +196,7 @@ protected void commitMaterializedViewRefresh(TableMetadata base, TableMetadata m
});
}

private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<Table, String, TableInput> tableUpdateFunction)
private void commitTableUpdate(Table table, TableMetadata base, TableMetadata metadata, BiFunction<Table, String, TableInput> tableUpdateFunction)
{
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
TableInput tableInput = tableUpdateFunction.apply(table, newMetadataLocation);
Expand All @@ -218,6 +221,7 @@ private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<T
// regardless of the exception thrown (e.g. : timeout exception) or it actually failed
throw new CommitStateUnknownException(e);
}
deleteRemovedMetadataFiles(io(), base, metadata);
shouldRefresh = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.BaseMetastoreTableOperations.METADATA_LOCATION_PROP;
import static org.apache.iceberg.BaseMetastoreTableOperations.PREVIOUS_METADATA_LOCATION_PROP;
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;

@NotThreadSafe
public class HiveMetastoreTableOperations
Expand All @@ -65,7 +66,7 @@ public HiveMetastoreTableOperations(
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
Table currentTable = getTable();
commitTableUpdate(currentTable, metadata, (table, newMetadataLocation) -> Table.builder(table)
commitTableUpdate(currentTable, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
.apply(builder -> updateMetastoreTable(builder, metadata, newMetadataLocation, Optional.of(currentMetadataLocation)))
.build());
}
Expand All @@ -74,14 +75,14 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
protected final void commitMaterializedViewRefresh(TableMetadata base, TableMetadata metadata)
{
Table materializedView = getTable(database, tableNameFrom(tableName));
commitTableUpdate(materializedView, metadata, (table, newMetadataLocation) -> Table.builder(table)
commitTableUpdate(materializedView, base, metadata, (table, newMetadataLocation) -> Table.builder(table)
.apply(builder -> builder
.setParameter(METADATA_LOCATION_PROP, newMetadataLocation)
.setParameter(PREVIOUS_METADATA_LOCATION_PROP, currentMetadataLocation))
.build());
}

private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
private void commitTableUpdate(Table table, TableMetadata base, TableMetadata metadata, BiFunction<Table, String, Table> tableUpdateFunction)
{
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
long lockId = thriftMetastore.acquireTableExclusiveLock(
Expand Down Expand Up @@ -126,6 +127,7 @@ private void commitTableUpdate(Table table, TableMetadata metadata, BiFunction<T
}
}

deleteRemovedMetadataFiles(io(), base, metadata);
shouldRefresh = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;

public class IcebergJdbcTableOperations
extends AbstractIcebergTableOperations
Expand Down Expand Up @@ -65,6 +66,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
checkState(currentMetadataLocation != null, "No current metadata location for existing table");
String newMetadataLocation = writeNewMetadata(metadata, version.orElseThrow() + 1);
jdbcClient.alterTable(database, tableName, newMetadataLocation, currentMetadataLocation);
deleteRemovedMetadataFiles(io(), base, metadata);
shouldRefresh = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.CatalogUtil.deleteRemovedMetadataFiles;

public class IcebergNessieTableOperations
extends AbstractIcebergTableOperations
Expand Down Expand Up @@ -149,6 +150,7 @@ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
// CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry
throw new CommitFailedException(e, "Cannot commit: ref hash is out of date. Update the ref '%s' and try again", nessieClient.refName());
}
deleteRemovedMetadataFiles(io(), base, metadata);
shouldRefresh = true;
}

Expand Down
Loading
Loading