From 9e86fa33ebc17631cfa1d4c48f8158316e45510e Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Thu, 29 Sep 2022 16:33:49 -0400 Subject: [PATCH 1/6] Test metadata files access counts for Iceberg remove_orphan_files --- .../TestIcebergMetadataFileOperations.java | 34 ++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index 41c877fe1768..815bc8f0451e 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -49,6 +49,7 @@ import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; import static io.trino.testing.QueryAssertions.copyTpchTables; import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; import static java.util.Collections.nCopies; import static java.util.Objects.requireNonNull; @@ -384,10 +385,41 @@ public void testPredicateWithVarcharCastToDate() assertUpdate("DROP TABLE test_varchar_as_date_predicate"); } + @Test + public void testRemoveOrphanFiles() + { + String tableName = "test_remove_orphan_files_" + randomTableSuffix(); + Session sessionWithShortRetentionUnlocked = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "remove_orphan_files_min_retention", "0s") + .build(); + assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)"); + assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2), ('three', 3)", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 1); + + assertFileSystemAccesses( + sessionWithShortRetentionUnlocked, + "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')", + ImmutableMultiset.builder() + .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 4) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 24) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 24) + .build()); + + assertUpdate("DROP TABLE " + tableName); + } + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedAccesses) + { + assertFileSystemAccesses(TEST_SESSION, query, expectedAccesses); + } + + private void assertFileSystemAccesses(Session session, @Language("SQL") String query, Multiset expectedAccesses) { resetCounts(); - getDistributedQueryRunner().executeWithQueryId(TEST_SESSION, query); + getDistributedQueryRunner().executeWithQueryId(session, query); assertThat(ImmutableMultiset.copyOf(getOperations())).containsExactlyInAnyOrderElementsOf(expectedAccesses); } From 64c86c2422581dc03a755371823227c6b14c2da3 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Fri, 30 Sep 2022 11:06:01 -0400 Subject: [PATCH 2/6] Improve Iceberg remove_orphan_files test Add testing for deletes and ensure the operation does not corrupt the table. --- .../java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 9b7678fc4785..703ccdc9952d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -5260,11 +5260,14 @@ public void testRemoveOrphanFiles() Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)"); assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2), ('three', 3)", 2); + assertUpdate("DELETE FROM " + tableName + " WHERE key = 'two'", 1); String location = getTableLocation(tableName); Path orphanFile = Files.createFile(Path.of(getIcebergTableDataPath(location).toString(), "invalidData." + format)); List initialDataFiles = getAllDataFilesFromTableDirectory(tableName); assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE REMOVE_ORPHAN_FILES (retention_threshold => '0s')"); + assertQuery("SELECT * FROM " + tableName, "VALUES ('one', 1), ('three', 3)"); List updatedDataFiles = getAllDataFilesFromTableDirectory(tableName); assertThat(updatedDataFiles.size()).isLessThan(initialDataFiles.size()); From ee36f8561db111c7269d1221af88bfea9e40bf26 Mon Sep 17 00:00:00 2001 From: Alex Jo Date: Fri, 30 Sep 2022 11:07:16 -0400 Subject: [PATCH 3/6] Improve Iceberg remove_orphan_files performance Avoid reading the Avro manifest files multiple times while building the set of live files. --- .../trino/plugin/iceberg/IcebergMetadata.java | 76 +++++++++++-------- .../TestIcebergMetadataFileOperations.java | 4 +- 2 files changed, 47 insertions(+), 33 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index b86f8c0f3049..27c6f38ecc0f 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -98,6 +98,7 @@ import org.apache.datasketches.theta.CompactSketch; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.BaseTable; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataFiles; import org.apache.iceberg.DeleteFile; @@ -107,11 +108,12 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.IsolationLevel; import org.apache.iceberg.ManifestFile; +import org.apache.iceberg.ManifestFiles; +import org.apache.iceberg.ManifestReader; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.PartitionSpecParser; -import org.apache.iceberg.ReachableFileUtil; import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; @@ -155,7 +157,6 @@ import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.stream.Stream; import static com.google.common.base.Preconditions.checkArgument; @@ -168,9 +169,6 @@ import static com.google.common.collect.Iterables.getLast; import static com.google.common.collect.Maps.transformValues; import static com.google.common.collect.Sets.difference; -import static com.google.common.collect.Sets.union; -import static com.google.common.collect.Streams.concat; -import static com.google.common.collect.Streams.stream; import static io.trino.plugin.base.util.Procedures.checkProcedureArgument; import static io.trino.plugin.hive.HiveApplyProjectionUtil.extractSupportedProjectedColumns; import static io.trino.plugin.hive.HiveApplyProjectionUtil.replaceWithNewVariables; @@ -1242,39 +1240,55 @@ public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecu long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis(); removeOrphanFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis); - removeOrphanMetadataFiles(table, session, executeHandle.getSchemaTableName(), expireTimestampMillis); } private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp) { - Set validDataFilePaths = stream(table.snapshots()) - .map(Snapshot::snapshotId) - .flatMap(snapshotId -> stream(table.newScan().useSnapshot(snapshotId).planFiles())) - .map(fileScanTask -> fileName(fileScanTask.file().path().toString())) - .collect(toImmutableSet()); - Set validDeleteFilePaths = stream(table.snapshots()) - .map(Snapshot::snapshotId) - .flatMap(snapshotId -> stream(table.newScan().useSnapshot(snapshotId).planFiles())) - .flatMap(fileScanTask -> fileScanTask.deletes().stream().map(file -> fileName(file.path().toString()))) - .collect(Collectors.toUnmodifiableSet()); - scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, union(validDataFilePaths, validDeleteFilePaths), "/data"); + Set processedManifestFilePaths = new HashSet<>(); + // Similarly to issues like https://github.com/trinodb/trino/issues/13759, equivalent paths may have different String + // representations due to things like double slashes. Using file names may result in retaining files which could be removed. + // However, in practice Iceberg metadata and data files have UUIDs in their names which makes this unlikely. + ImmutableSet.Builder validMetadataFileNames = ImmutableSet.builder(); + ImmutableSet.Builder validDataFileNames = ImmutableSet.builder(); + + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.manifestListLocation() != null) { + validMetadataFileNames.add(fileName(snapshot.manifestListLocation())); + } + + for (ManifestFile manifest : snapshot.allManifests(table.io())) { + if (!processedManifestFilePaths.add(manifest.path())) { + // Already read this manifest + continue; + } + + validMetadataFileNames.add(fileName(manifest.path())); + try (ManifestReader> manifestReader = readerForManifest(table, manifest)) { + for (ContentFile contentFile : manifestReader) { + validDataFileNames.add(fileName(contentFile.path().toString())); + } + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Unable to list manifest file content from " + manifest.path(), e); + } + } + } + + metadataFileLocations(table, false).stream() + .map(IcebergUtil::fileName) + .forEach(validMetadataFileNames::add); + validMetadataFileNames.add(fileName(versionHintLocation(table))); + + scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, validDataFileNames.build(), "data"); + scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, validMetadataFileNames.build(), "metadata"); } - private void removeOrphanMetadataFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp) + private static ManifestReader> readerForManifest(Table table, ManifestFile manifest) { - ImmutableSet manifests = stream(table.snapshots()) - .flatMap(snapshot -> snapshot.allManifests(table.io()).stream()) - .map(ManifestFile::path) - .collect(toImmutableSet()); - List manifestLists = ReachableFileUtil.manifestListLocations(table); - List otherMetadataFiles = concat( - metadataFileLocations(table, false).stream(), - Stream.of(versionHintLocation(table))) - .collect(toImmutableList()); - Set validMetadataFiles = concat(manifests.stream(), manifestLists.stream(), otherMetadataFiles.stream()) - .map(IcebergUtil::fileName) - .collect(toImmutableSet()); - scanAndDeleteInvalidFiles(table, session, schemaTableName, expireTimestamp, validMetadataFiles, "metadata"); + return switch (manifest.content()) { + case DATA -> ManifestFiles.read(manifest, table.io()); + case DELETES -> ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs()); + }; } private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp, Set validFiles, String subfolder) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index 815bc8f0451e..ddf22d85ff65 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -404,8 +404,8 @@ public void testRemoveOrphanFiles() .add(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM)) .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 4) .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 4) - .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 24) - .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 24) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_GET_LENGTH), 6) + .addCopies(new FileOperation(MANIFEST, INPUT_FILE_NEW_STREAM), 6) .build()); assertUpdate("DROP TABLE " + tableName); From 0323ebe046ea67ff26302b599ded518eb92f9230 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 12 Oct 2022 14:59:29 +0200 Subject: [PATCH 4/6] empty From 48d7095cb66ebb1e96560ffe5000a3f82252fe5f Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Thu, 13 Oct 2022 14:57:22 +0200 Subject: [PATCH 5/6] empty From 9acbd5e874c1368461e52907aac8a440cd2d27da Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Fri, 14 Oct 2022 12:58:54 +0200 Subject: [PATCH 6/6] empty