diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java index ee146c969326..76a083d00241 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractIcebergTableOperations.java @@ -13,8 +13,11 @@ */ package io.trino.plugin.iceberg.catalog; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import dev.failsafe.Failsafe; import dev.failsafe.RetryPolicy; +import io.airlift.log.Logger; import io.trino.annotation.NotThreadSafe; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; @@ -28,11 +31,14 @@ import jakarta.annotation.Nullable; import org.apache.iceberg.TableMetadata; import org.apache.iceberg.TableMetadataParser; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.LocationProvider; import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.util.Tasks; import java.time.Duration; import java.util.List; @@ -40,6 +46,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; +import java.util.Set; import java.util.function.Function; import static com.google.common.base.Preconditions.checkState; @@ -70,6 +77,7 @@ public abstract class AbstractIcebergTableOperations implements IcebergTableOperations { + private static final Logger log = Logger.get(AbstractIcebergTableOperations.class); public static final StorageFormat ICEBERG_METASTORE_STORAGE_FORMAT = StorageFormat.create( LAZY_SIMPLE_SERDE_CLASS, FILE_INPUT_FORMAT_CLASS, @@ -155,6 +163,7 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) if (isMaterializedViewStorage(tableName)) { commitMaterializedViewRefresh(base, metadata); + deleteRemovedMetadataFiles(base, metadata); return; } @@ -171,6 +180,7 @@ public void commit(@Nullable TableMetadata base, TableMetadata metadata) } else { commitToExistingTable(base, metadata); + deleteRemovedMetadataFiles(base, metadata); } shouldRefresh = true; @@ -303,4 +313,41 @@ public static List toHiveColumns(List columns) Map.of())) .collect(toImmutableList()); } + + private void deleteRemovedMetadataFiles(TableMetadata base, TableMetadata metadata) + { + if (base == null) { + return; + } + + boolean deleteAfterCommit = + metadata.propertyAsBoolean( + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, + TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT); + + if (deleteAfterCommit) { + Set removedPreviousMetadataFiles = + Sets.newHashSet(base.previousFiles()); + // TableMetadata#addPreviousFile builds up the metadata log and uses + // TableProperties.METADATA_PREVIOUS_VERSIONS_MAX to determine how many files should stay in + // the log, thus we don't include metadata.previousFiles() for deletion - everything else can + // be removed + removedPreviousMetadataFiles.removeAll(metadata.previousFiles()); + if (io() instanceof SupportsBulkOperations) { + ((SupportsBulkOperations) io()) + .deleteFiles( + Iterables.transform( + removedPreviousMetadataFiles, TableMetadata.MetadataLogEntry::file)); + } + else { + Tasks.foreach(removedPreviousMetadataFiles) + .noRetry() + .suppressFailureWhenFinished() + .onFailure( + (file, e) -> + log.warn(e, "Delete failed for previous metadata file: %s", file)) + .run(previousMetadataFile -> io().deleteFile(previousMetadataFile.file())); + } + } + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergHiveMetastoreAutoCleanMetadataFile.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergHiveMetastoreAutoCleanMetadataFile.java new file mode 100644 index 000000000000..4688cb61cfce --- /dev/null +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergHiveMetastoreAutoCleanMetadataFile.java @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.iceberg; + +import com.google.common.collect.ImmutableMap; +import io.trino.filesystem.FileEntry; +import io.trino.filesystem.FileIterator; +import io.trino.filesystem.Location; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.metastore.HiveMetastore; +import io.trino.plugin.hive.TrinoViewHiveMetastore; +import io.trino.plugin.hive.containers.HiveMinioDataLake; +import io.trino.plugin.hive.metastore.HiveMetastoreFactory; +import io.trino.plugin.hive.metastore.cache.CachingHiveMetastore; +import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.TrinoCatalog; +import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider; +import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog; +import io.trino.spi.catalog.CatalogName; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.TestingTypeManager; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorSession; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +import static io.trino.plugin.hive.metastore.cache.CachingHiveMetastore.createPerTransactionCache; +import static io.trino.plugin.iceberg.IcebergQueryRunner.ICEBERG_CATALOG; +import static io.trino.plugin.iceberg.IcebergTestUtils.getFileSystemFactory; +import static io.trino.testing.TestingConnectorSession.SESSION; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.containers.Minio.MINIO_ACCESS_KEY; +import static io.trino.testing.containers.Minio.MINIO_REGION; +import static io.trino.testing.containers.Minio.MINIO_SECRET_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestIcebergHiveMetastoreAutoCleanMetadataFile + extends AbstractTestQueryFramework +{ + private static final String bucketName = "test-auto-clean-metadata" + randomNameSuffix(); + + private TrinoCatalog trinoCatalog; + private IcebergTableOperationsProvider tableOperationsProvider; + private TrinoFileSystem fileSystem; + + // Use MinIO for storage, since HDFS is hard to get working in a unit test + private HiveMinioDataLake dataLake; + private TrinoFileSystemFactory fileSystemFactory; + public static final int METADATA_PREVIOUS_VERSIONS_MAX = 5; + + @AfterAll + public void tearDown() + throws Exception + { + if (dataLake != null) { + dataLake.stop(); + dataLake = null; + } + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + this.dataLake = closeAfterClass(new HiveMinioDataLake(bucketName)); + this.dataLake.start(); + + DistributedQueryRunner queryRunner = IcebergQueryRunner.builder() + .setIcebergProperties( + ImmutableMap.builder() + .put("iceberg.file-format", FileFormat.PARQUET.name()) + .put("iceberg.catalog.type", "HIVE_METASTORE") + .put( + "hive.metastore.uri", + dataLake.getHiveHadoop().getHiveMetastoreEndpoint().toString()) + .put( + "hive.metastore.thrift.client.read-timeout", + "1m") // read timed out sometimes happens with the default timeout + .put("fs.hadoop.enabled", "false") + .put("fs.native-s3.enabled", "true") + .put("s3.aws-access-key", MINIO_ACCESS_KEY) + .put("s3.aws-secret-key", MINIO_SECRET_KEY) + .put("s3.region", MINIO_REGION) + .put("s3.endpoint", dataLake.getMinio().getMinioAddress()) + .put("s3.path-style-access", "true") + .put("s3.streaming.part-size", "5MB") // minimize memory usage + .put("s3.max-connections", "2") // verify no leaks + .put("iceberg.register-table-procedure.enabled", "true") + .put("iceberg.writer-sort-buffer-size", "1MB") + .buildOrThrow()) + .setSchemaInitializer( + SchemaInitializer.builder() + .withSchemaName("tpch") + .withSchemaProperties(Map.of("location", "'s3://" + bucketName + "/tpch'")) + .build()) + .build(); + HiveMetastore metastore = ((IcebergConnector) queryRunner.getCoordinator().getConnector(ICEBERG_CATALOG)).getInjector() + .getInstance(HiveMetastoreFactory.class) + .createMetastore(Optional.empty()); + + this.fileSystemFactory = getFileSystemFactory(queryRunner); + this.tableOperationsProvider = new FileMetastoreTableOperationsProvider(fileSystemFactory); + + CachingHiveMetastore cachingHiveMetastore = createPerTransactionCache(metastore, 1000); + this.trinoCatalog = new TrinoHiveCatalog( + new CatalogName("hive"), + cachingHiveMetastore, + new TrinoViewHiveMetastore(cachingHiveMetastore, false, "trino-version", "test"), + fileSystemFactory, + new TestingTypeManager(), + tableOperationsProvider, + false, + false, + false, + new IcebergConfig().isHideMaterializedViewStorageTable()); + + return queryRunner; + } + + @Test + public void testInsertWithAutoCleanMetadataFile() + throws IOException + { + assertUpdate("CREATE TABLE table_to_metadata_count (_bigint BIGINT, _varchar VARCHAR)"); + + Table table = IcebergUtil.loadIcebergTable(trinoCatalog, tableOperationsProvider, TestingConnectorSession.SESSION, + new SchemaTableName("tpch", "table_to_metadata_count")); + table.updateProperties() + .set(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, "true") + .set(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(METADATA_PREVIOUS_VERSIONS_MAX)) + .commit(); + for (int i = 0; i < 10; i++) { + assertUpdate("INSERT INTO table_to_metadata_count VALUES (1, 'a')", 1); + } + int count = 0; + fileSystem = fileSystemFactory.create(SESSION); + FileIterator fileIterator = fileSystem.listFiles(Location.of(table.location())); + while (fileIterator.hasNext()) { + FileEntry next = fileIterator.next(); + if (next.location().path().endsWith("metadata.json")) { + count++; + } + } + assertThat(count).isEqualTo(1 + METADATA_PREVIOUS_VERSIONS_MAX); + } +}