From 4ef626791288aa8e0e5171f314caa3222342434e Mon Sep 17 00:00:00 2001 From: Mayank Vadariya Date: Mon, 23 Jun 2025 10:39:22 -0400 Subject: [PATCH] Revert "Cleanup previous snapshot files during materialized view refresh" This reverts commit 48135782c4c3cfc68032219fb08e0e75fab25067. --- .../trino/plugin/iceberg/IcebergMetadata.java | 68 ++++++------- .../BaseIcebergMaterializedViewTest.java | 96 ------------------- 2 files changed, 27 insertions(+), 137 deletions(-) diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 0aff6bd5de08..c4fc8e518d5c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -2178,7 +2178,33 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION); long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis(); - executeExpireSnapshots(table, session, expireTimestampMillis); + TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), table.io().properties()); + List pathsToDelete = new ArrayList<>(); + // deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used + Consumer deleteFunction = path -> { + pathsToDelete.add(Location.of(path)); + if (pathsToDelete.size() == DELETE_BATCH_SIZE) { + try { + fileSystem.deleteFiles(pathsToDelete); + pathsToDelete.clear(); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e); + } + } + }; + + try { + table.expireSnapshots() + .expireOlderThan(expireTimestampMillis) + .deleteWith(deleteFunction) + .commit(); + + fileSystem.deleteFiles(pathsToDelete); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e); + } } private static void validateTableExecuteParameters( @@ -3782,15 +3808,6 @@ public Optional finishRefreshMaterializedView( commitUpdateAndTransaction(appendFiles, session, transaction, "refresh materialized view"); transaction = null; fromSnapshotForRefresh = Optional.empty(); - - // cleanup old snapshots - try { - executeExpireSnapshots(icebergTable, session, System.currentTimeMillis()); - } - catch (Exception e) { - log.error(e, "Failed to delete old snapshot files during materialized view refresh"); - } - return Optional.of(new HiveWrittenPartitions(commitTasks.stream() .map(CommitTaskData::path) .collect(toImmutableList()))); @@ -4075,37 +4092,6 @@ private void beginTransaction(Table icebergTable) transaction = catalog.newTransaction(icebergTable); } - private void executeExpireSnapshots(Table icebergTable, ConnectorSession session, long expireTimestampMillis) - { - TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), icebergTable.io().properties()); - List pathsToDelete = new ArrayList<>(); - // deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used - Consumer deleteFunction = path -> { - pathsToDelete.add(Location.of(path)); - if (pathsToDelete.size() == DELETE_BATCH_SIZE) { - try { - fileSystem.deleteFiles(pathsToDelete); - pathsToDelete.clear(); - } - catch (IOException | UncheckedIOException e) { - throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e); - } - } - }; - - try { - icebergTable.expireSnapshots() - .expireOlderThan(expireTimestampMillis) - .deleteWith(deleteFunction) - .commit(); - - fileSystem.deleteFiles(pathsToDelete); - } - catch (IOException | UncheckedIOException e) { - throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e); - } - } - private static IcebergTableHandle checkValidTableHandle(ConnectorTableHandle tableHandle) { requireNonNull(tableHandle, "tableHandle is null"); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java index 519e4a2d35f1..db7a8dffedf4 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMaterializedViewTest.java @@ -19,8 +19,6 @@ import io.trino.Session; import io.trino.connector.MockConnectorFactory; import io.trino.connector.MockConnectorPlugin; -import io.trino.filesystem.FileEntry; -import io.trino.filesystem.FileIterator; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystem; import io.trino.plugin.iceberg.fileio.ForwardingFileIo; @@ -53,7 +51,6 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -import java.io.IOException; import java.time.ZonedDateTime; import java.util.List; import java.util.Map; @@ -1111,99 +1108,6 @@ public void testRefreshWithCompaction() assertUpdate("DROP TABLE %s".formatted(sourceTableName)); } - @Test - void testPreviousSnapshotCleanupDuringRefresh() - throws IOException - { - String sourceTableName = "source_table" + randomNameSuffix(); - String materializedViewName = "test_materialized_view" + randomNameSuffix(); - - // create source table and an MV - assertUpdate("CREATE TABLE " + sourceTableName + " (a int, b varchar)"); - assertUpdate("INSERT INTO " + sourceTableName + " VALUES (1, 'abc'), (2, 'def')", 2); - assertUpdate("CREATE MATERIALIZED VIEW " + materializedViewName + " AS SELECT a, b FROM " + sourceTableName + " WHERE a < 3 OR a > 5"); - // Until first MV refresh no data files are created hence perform first MV refresh to get data files created for the MV - assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2); - - TrinoFileSystem fileSystemFactory = getFileSystemFactory(getQueryRunner()).create(ConnectorIdentity.ofUser("test")); - - // Identify different types of files containing in an MV - Location metadataLocation = Location.of(getStorageMetadataLocation(materializedViewName)); - FileIterator tableFiles = fileSystemFactory.listFiles(Location.of(metadataLocation.toString().substring(0, metadataLocation.toString().indexOf("/metadata")))); - ImmutableSet.Builder previousDataFiles = ImmutableSet.builder(); - ImmutableSet.Builder previousMetadataFiles = ImmutableSet.builder(); - ImmutableSet.Builder previousManifestsFiles = ImmutableSet.builder(); - while (tableFiles.hasNext()) { - FileEntry file = tableFiles.next(); - String location = file.location().toString(); - if (location.contains("/data/")) { - previousDataFiles.add(file); - } - else if (location.contains("/metadata/") && location.endsWith(".json")) { - previousMetadataFiles.add(file); - } - else if (location.contains("/metadata") && !location.contains("snap-") && location.endsWith(".avro")) { - previousManifestsFiles.add(file); - } - } - - // Execute MV refresh after deleting existing records and inserting new records in source table - assertUpdate("DELETE FROM " + sourceTableName + " WHERE a = 1 OR a = 2", 2); - assertQueryReturnsEmptyResult("SELECT * FROM " + sourceTableName); - assertUpdate("INSERT INTO " + sourceTableName + " VALUES (7, 'pqr'), (8, 'xyz')", 2); - assertUpdate("REFRESH MATERIALIZED VIEW " + materializedViewName, 2); - assertThat(query("SELECT * FROM " + materializedViewName)) - .matches("VALUES (7, VARCHAR 'pqr'), (8, VARCHAR 'xyz')"); - - // Identify different types of files containing in an MV after MV refresh - Location latestMetadataLocation = Location.of(getStorageMetadataLocation(materializedViewName)); - FileIterator latestTableFiles = fileSystemFactory.listFiles(Location.of(latestMetadataLocation.toString().substring(0, latestMetadataLocation.toString().indexOf("/metadata")))); - ImmutableSet.Builder currentDataFiles = ImmutableSet.builder(); - ImmutableSet.Builder currentMetadataFiles = ImmutableSet.builder(); - ImmutableSet.Builder currentManifestsFiles = ImmutableSet.builder(); - while (latestTableFiles.hasNext()) { - FileEntry file = latestTableFiles.next(); - String location = file.location().toString(); - if (location.contains("/data/")) { - currentDataFiles.add(file); - } - else if (location.contains("/metadata/") && location.endsWith(".json")) { - currentMetadataFiles.add(file); - } - else if (location.contains("/metadata") && !location.contains("snap-") && location.endsWith(".avro")) { - currentManifestsFiles.add(file); - } - } - - // data files from previous snapshot are absent in latest MV snapshot as those are cleaned up after MV refresh - assertThat(previousDataFiles.build()) - .isNotEmpty() - .satisfies(dataFilesBeforeMvRefresh -> - assertThat(currentDataFiles.build()) - .isNotEmpty() - .doesNotContainAnyElementsOf(dataFilesBeforeMvRefresh)); - - // metadata files from previous snapshot are still present in latest MV snapshot as those are not cleaned up after MV refresh - assertThat(previousMetadataFiles.build()) - .isNotEmpty() - .satisfies(metadataFilesBeforeMvRefresh -> - assertThat(currentMetadataFiles.build()) - .isNotEmpty() - .containsAll(metadataFilesBeforeMvRefresh)); - - // manifests files from previous snapshot are absent in latest MV snapshot as those are cleaned up after MV refresh - assertThat(previousManifestsFiles.build()) - .isNotEmpty() - .satisfies(manifestsBeforeMvRefresh -> - assertThat(currentManifestsFiles.build()) - .isNotEmpty() - .doesNotContainAnyElementsOf(manifestsBeforeMvRefresh)); - - // cleanup - assertUpdate("DROP MATERIALIZED VIEW " + materializedViewName); - assertUpdate("DROP TABLE " + sourceTableName); - } - protected String getColumnComment(String tableName, String columnName) { return (String) computeScalar("SELECT comment FROM information_schema.columns WHERE table_schema = '" + getSession().getSchema().orElseThrow() + "' AND table_name = '" + tableName + "' AND column_name = '" + columnName + "'");