diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 4e189aaf7e9d..4c5cd52e4d35 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -20,7 +20,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.airlift.slice.Slice; @@ -1123,7 +1122,10 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION); long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis(); - expireSnapshots(table, expireTimestampMillis, session, executeHandle.getSchemaTableName()); + + table.expireSnapshots() + .expireOlderThan(expireTimestampMillis) + .commit(); } private static void validateTableExecuteParameters( @@ -1163,44 +1165,6 @@ private static void validateTableExecuteParameters( sessionMinRetentionParameterName); } - private void expireSnapshots(Table table, long expireTimestamp, ConnectorSession session, SchemaTableName schemaTableName) - { - Set originalFiles = buildSetOfValidFiles(table); - table.expireSnapshots().expireOlderThan(expireTimestamp).cleanExpiredFiles(false).commit(); - Set validFiles = buildSetOfValidFiles(table); - try { - FileSystem fileSystem = hdfsEnvironment.getFileSystem(new HdfsEnvironment.HdfsContext(session), new Path(table.location())); - Sets.SetView filesToDelete = difference(originalFiles, validFiles); - for (String filePath : filesToDelete) { - log.debug("Deleting file %s while expiring snapshots %s", filePath, schemaTableName.getTableName()); - fileSystem.delete(new Path(filePath), false); - } - } - catch (IOException e) { - throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e); - } - } - - private static Set buildSetOfValidFiles(Table table) - { - List snapshots = ImmutableList.copyOf(table.snapshots()); - Stream dataFiles = snapshots.stream() - .map(Snapshot::snapshotId) - .flatMap(snapshotId -> stream(table.newScan().useSnapshot(snapshotId).planFiles())) - .map(fileScanTask -> fileScanTask.file().path().toString()); - Stream manifests = snapshots.stream() - .flatMap(snapshot -> snapshot.allManifests().stream()) - .map(ManifestFile::path); - Stream manifestLists = snapshots.stream() - .map(Snapshot::manifestListLocation); - Stream otherMetadataFiles = concat( - metadataFileLocations(table, false).stream(), - Stream.of(versionHintLocation(table))); - return concat(dataFiles, manifests, manifestLists, otherMetadataFiles) - .map(file -> URI.create(file).getPath()) - .collect(toImmutableSet()); - } - public void executeRemoveOrphanFiles(ConnectorSession session, IcebergTableExecuteHandle executeHandle) { IcebergRemoveOrphanFilesHandle removeOrphanFilesHandle = (IcebergRemoveOrphanFilesHandle) executeHandle.getProcedureHandle();