diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileSystemUtils.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileSystemUtils.java new file mode 100644 index 000000000000..d02e00e8344f --- /dev/null +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/FileSystemUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.filesystem; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; + +public final class FileSystemUtils +{ + private FileSystemUtils() {} + + public static FileSystem getRawFileSystem(FileSystem fileSystem) + { + if (fileSystem instanceof FilterFileSystem) { + return getRawFileSystem(((FilterFileSystem) fileSystem).getRawFileSystem()); + } + return fileSystem; + } +} diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java index 13705742183c..4852519e2793 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/TrinoFileSystem.java @@ -16,6 +16,7 @@ import org.apache.iceberg.io.FileIO; import java.io.IOException; +import java.util.Collection; public interface TrinoFileSystem { @@ -28,6 +29,14 @@ public interface TrinoFileSystem void deleteFile(String path) throws IOException; + /** + * Delete paths in batches, it is not guaranteed to be atomic. + * @param paths collection of paths to be deleted + * @throws IOException when there is a problem with deletion of one or more specific paths + */ + void deleteFiles(Collection paths) + throws IOException; + void deleteDirectory(String path) throws IOException; diff --git a/lib/trino-filesystem/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java b/lib/trino-filesystem/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java index 8a1dd482b182..1db2dc9b004a 100644 --- a/lib/trino-filesystem/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java +++ b/lib/trino-filesystem/src/main/java/io/trino/filesystem/hdfs/HdfsFileSystem.java @@ -18,6 +18,7 @@ import io.trino.filesystem.TrinoInputFile; import io.trino.filesystem.TrinoOutputFile; import io.trino.filesystem.fileio.ForwardingFileIo; +import io.trino.hdfs.FileSystemWithBatchDelete; import io.trino.hdfs.HdfsContext; import io.trino.hdfs.HdfsEnvironment; import org.apache.hadoop.fs.FileSystem; @@ -26,10 +27,18 @@ import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import static io.trino.filesystem.FileSystemUtils.getRawFileSystem; import static io.trino.filesystem.hdfs.HadoopPaths.hadoopPath; import static java.lang.String.format; import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.groupingBy; +import static java.util.stream.Collectors.mapping; +import static java.util.stream.Collectors.toList; class HdfsFileSystem implements TrinoFileSystem @@ -75,6 +84,30 @@ public void deleteFile(String path) }); } + @Override + public void deleteFiles(Collection paths) + throws IOException + { + Map> pathsGroupedByDirectory = paths.stream().collect( + groupingBy( + path -> hadoopPath(path.replaceFirst("/[^/]*$", "")), + mapping(HadoopPaths::hadoopPath, toList()))); + for (Entry> directoryWithPaths : pathsGroupedByDirectory.entrySet()) { + FileSystem rawFileSystem = getRawFileSystem(environment.getFileSystem(context, directoryWithPaths.getKey())); + environment.doAs(context.getIdentity(), () -> { + if (rawFileSystem instanceof FileSystemWithBatchDelete fileSystemWithBatchDelete) { + fileSystemWithBatchDelete.deleteFiles(directoryWithPaths.getValue()); + } + else { + for (Path path : directoryWithPaths.getValue()) { + rawFileSystem.delete(path, false); + } + } + return null; + }); + } + } + @Override public void deleteDirectory(String path) throws IOException diff --git a/lib/trino-hdfs/src/main/java/io/trino/hdfs/FileSystemWithBatchDelete.java b/lib/trino-hdfs/src/main/java/io/trino/hdfs/FileSystemWithBatchDelete.java new file mode 100644 index 000000000000..d3dad98afa6d --- /dev/null +++ b/lib/trino-hdfs/src/main/java/io/trino/hdfs/FileSystemWithBatchDelete.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.hdfs; + +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Collection; + +public interface FileSystemWithBatchDelete +{ + void deleteFiles(Collection paths) + throws IOException; +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java index 191bf92e39e4..3a48efcc1db6 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java @@ -26,6 +26,7 @@ import org.apache.iceberg.io.FileIO; import java.io.IOException; +import java.util.Collection; import java.util.Map; import java.util.function.Consumer; @@ -98,6 +99,12 @@ public void deleteFile(String path) throw new UnsupportedOperationException(); } + @Override + public void deleteFiles(Collection paths) + { + throw new UnsupportedOperationException(); + } + @Override public void deleteDirectory(String path) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java index 2c7877d2c9d9..d0b9291d5c75 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/s3/TrinoS3FileSystem.java @@ -49,6 +49,8 @@ import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.CryptoConfiguration; import com.amazonaws.services.s3.model.DeleteObjectRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.GetObjectRequest; @@ -74,6 +76,7 @@ import com.google.common.base.Splitter; import com.google.common.collect.AbstractSequentialIterator; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.io.Closer; import com.google.common.net.MediaType; @@ -81,6 +84,7 @@ import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.hdfs.FSDataInputStreamTail; +import io.trino.hdfs.FileSystemWithBatchDelete; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -115,6 +119,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; +import java.util.Collection; import java.util.Date; import java.util.Iterator; import java.util.List; @@ -142,6 +147,7 @@ import static com.google.common.base.Throwables.throwIfInstanceOf; import static com.google.common.base.Throwables.throwIfUnchecked; import static com.google.common.base.Verify.verify; +import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterables.toArray; import static com.google.common.hash.Hashing.md5; import static io.airlift.concurrent.Threads.threadsNamed; @@ -169,6 +175,7 @@ public class TrinoS3FileSystem extends FileSystem + implements FileSystemWithBatchDelete { public static final String S3_USER_AGENT_PREFIX = "trino.s3.user-agent-prefix"; public static final String S3_CREDENTIALS_PROVIDER = "trino.s3.credentials-provider"; @@ -230,6 +237,7 @@ public class TrinoS3FileSystem private static final Set GLACIER_STORAGE_CLASSES = ImmutableSet.of(Glacier.toString(), DeepArchive.toString()); private static final MediaType DIRECTORY_MEDIA_TYPE = MediaType.create("application", "x-directory"); private static final String S3_DEFAULT_ROLE_SESSION_NAME = "trino-session"; + public static final int DELETE_BATCH_SIZE = 1000; private URI uri; private Path workingDirectory; @@ -643,6 +651,35 @@ private boolean deleteObject(String key) } } + @Override + public void deleteFiles(Collection paths) + throws IOException + { + try { + Iterable> partitions = Iterables.partition(paths, DELETE_BATCH_SIZE); + for (List currentBatch : partitions) { + deletePaths(currentBatch); + } + } + catch (AmazonClientException e) { + throw new IOException("Exception while batch deleting paths", e); + } + } + + private void deletePaths(List paths) + { + List keys = paths.stream() + .map(TrinoS3FileSystem::keyFromPath) + .map(KeyVersion::new) + .collect(toImmutableList()); + DeleteObjectsRequest deleteObjectsRequest = new DeleteObjectsRequest(getBucketName(uri)) + .withRequesterPays(requesterPaysEnabled) + .withKeys(keys) + .withQuiet(true); + + s3.deleteObjects(deleteObjectsRequest); + } + @Override public boolean mkdirs(Path f, FsPermission permission) { diff --git a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java index c8e4019669db..9d1df9f1b286 100644 --- a/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java +++ b/plugin/trino-hive/src/main/java/io/trino/plugin/hive/util/HiveWriteUtils.java @@ -51,7 +51,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FilterFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.viewfs.ViewFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -96,6 +95,7 @@ import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.io.BaseEncoding.base16; +import static io.trino.filesystem.FileSystemUtils.getRawFileSystem; import static io.trino.plugin.hive.HiveErrorCode.HIVE_DATABASE_LOCATION_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR; import static io.trino.plugin.hive.HiveErrorCode.HIVE_INVALID_PARTITION_VALUE; @@ -495,14 +495,6 @@ public static boolean isViewFileSystem(HdfsContext context, HdfsEnvironment hdfs } } - public static FileSystem getRawFileSystem(FileSystem fileSystem) - { - if (fileSystem instanceof FilterFileSystem) { - return getRawFileSystem(((FilterFileSystem) fileSystem).getRawFileSystem()); - } - return fileSystem; - } - private static boolean isDirectory(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path) { try { diff --git a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java index 708c4d435880..e6bd621cb73d 100644 --- a/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java +++ b/plugin/trino-hive/src/test/java/io/trino/plugin/hive/AbstractTestHiveFileSystem.java @@ -91,6 +91,7 @@ import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.trino.filesystem.FileSystemUtils.getRawFileSystem; import static io.trino.plugin.hive.AbstractTestHive.createTableProperties; import static io.trino.plugin.hive.AbstractTestHive.filterNonHiddenColumnHandles; import static io.trino.plugin.hive.AbstractTestHive.filterNonHiddenColumnMetadata; @@ -105,7 +106,6 @@ import static io.trino.plugin.hive.HiveType.HIVE_LONG; import static io.trino.plugin.hive.TestingThriftHiveMetastoreBuilder.testingThriftHiveMetastoreBuilder; import static io.trino.plugin.hive.metastore.PrincipalPrivileges.NO_PRIVILEGES; -import static io.trino.plugin.hive.util.HiveWriteUtils.getRawFileSystem; import static io.trino.spi.connector.MetadataProvider.NOOP_METADATA_PROVIDER; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.type.BigintType.BIGINT; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 0b9d29652337..6b575e699225 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -152,6 +152,7 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -269,6 +270,8 @@ public class IcebergMetadata private static final String NUMBER_OF_DISTINCT_VALUES_NAME = "NUMBER_OF_DISTINCT_VALUES"; private static final FunctionName NUMBER_OF_DISTINCT_VALUES_FUNCTION = new FunctionName(IcebergThetaSketchForStats.NAME); + private static final Integer DELETE_BATCH_SIZE = 1000; + private final TypeManager typeManager; private final JsonCodec commitTaskCodec; private final TrinoCatalog catalog; @@ -1176,10 +1179,32 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut IcebergSessionProperties.EXPIRE_SNAPSHOTS_MIN_RETENTION); long expireTimestampMillis = session.getStart().toEpochMilli() - retention.toMillis(); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + List pathsToDelete = new ArrayList<>(); + // deleteFunction is not accessed from multiple threads unless .executeDeleteWith() is used + Consumer deleteFunction = path -> { + pathsToDelete.add(path); + if (pathsToDelete.size() == DELETE_BATCH_SIZE) { + try { + fileSystem.deleteFiles(pathsToDelete); + pathsToDelete.clear(); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e); + } + } + }; table.expireSnapshots() .expireOlderThan(expireTimestampMillis) + .deleteWith(deleteFunction) .commit(); + try { + fileSystem.deleteFiles(pathsToDelete); + } + catch (IOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete files during snapshot expiration", e); + } } private static void validateTableExecuteParameters( diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java index bba5cefa3bdb..421044eb472b 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java @@ -14,6 +14,8 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableMap; +import io.minio.messages.Event; +import io.trino.Session; import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.testing.QueryRunner; import org.apache.iceberg.FileFormat; @@ -21,13 +23,17 @@ import org.testng.annotations.Test; import java.util.List; -import java.util.Locale; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.containers.HiveMinioDataLake.MINIO_ACCESS_KEY; import static io.trino.plugin.hive.containers.HiveMinioDataLake.MINIO_SECRET_KEY; import static io.trino.testing.sql.TestTable.randomTableSuffix; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; import static org.assertj.core.api.Assertions.assertThat; public abstract class BaseIcebergMinioConnectorSmokeTest @@ -41,7 +47,7 @@ public abstract class BaseIcebergMinioConnectorSmokeTest public BaseIcebergMinioConnectorSmokeTest(FileFormat format) { super(format); - this.schemaName = "tpch_" + format.name().toLowerCase(Locale.ENGLISH); + this.schemaName = "tpch_" + format.name().toLowerCase(ENGLISH); this.bucketName = "test-iceberg-minio-smoke-test-" + randomTableSuffix(); } @@ -138,8 +144,66 @@ public void testMetadataLocationWithDoubleSlash() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testExpireSnapshotsBatchDeletes() + { + String tableName = "test_expiring_snapshots_" + randomTableSuffix(); + Session sessionWithShortRetentionUnlocked = prepareCleanUpSession(); + String location = "s3://%s/%s/%s/".formatted(bucketName, schemaName, tableName); + Queue events = new ConcurrentLinkedQueue<>(); + hiveMinioDataLake.getMinioClient().captureBucketNotifications(bucketName, event -> { + if (event.eventType().toString().toLowerCase(ENGLISH).contains("remove")) { + events.add(event); + } + }); + + assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer) WITH (location='" + location + "')"); + assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1); + assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1); + assertThat(query("SELECT * FROM " + tableName)).matches("VALUES (VARCHAR 'one', 1), (VARCHAR 'two', 2)"); + + List initialMetadataFiles = hiveMinioDataLake.getMinioClient().listObjects(bucketName, "/%s/%s/metadata".formatted(schemaName, tableName)); + assertThat(initialMetadataFiles).isNotEmpty(); + + List initialSnapshots = getSnapshotIds(tableName); + assertThat(initialSnapshots).hasSizeGreaterThan(1); + + assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')"); + + List updatedMetadataFiles = hiveMinioDataLake.getMinioClient().listObjects(bucketName, "/%s/%s/metadata".formatted(schemaName, tableName)); + assertThat(updatedMetadataFiles).isNotEmpty().hasSizeLessThan(initialMetadataFiles.size()); + + List updatedSnapshots = getSnapshotIds(tableName); + assertThat(updatedSnapshots).hasSize(1); + + assertThat(query("SELECT * FROM " + tableName)) + .matches("VALUES (VARCHAR 'one', 1), (VARCHAR 'two', 2)"); + assertThat(events).hasSize(2); + // if files were deleted in batch there should be only one request id because there was one request only + assertThat(events.stream() + .map(event -> event.responseElements().get("x-amz-request-id")) + .collect(toImmutableSet())).hasSize(1); + + assertUpdate("DROP TABLE " + tableName); + } + private String onMetastore(@Language("SQL") String sql) { return hiveMinioDataLake.getHiveHadoop().runOnMetastore(sql); } + + private Session prepareCleanUpSession() + { + return Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "expire_snapshots_min_retention", "0s") + .build(); + } + + private List getSnapshotIds(String tableName) + { + return getQueryRunner().execute(format("SELECT snapshot_id FROM \"%s$snapshots\"", tableName)) + .getOnlyColumn() + .map(Long.class::cast) + .collect(toImmutableList()); + } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java index 3aa608bbc6c0..4aa41504c83d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TrackingFileSystemFactory.java @@ -29,6 +29,7 @@ import javax.annotation.concurrent.Immutable; import java.io.IOException; +import java.util.Collection; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -133,6 +134,13 @@ public void deleteFile(String path) delegate.deleteFile(path); } + @Override + public void deleteFiles(Collection paths) + throws IOException + { + delegate.deleteFiles(paths); + } + @Override public void deleteDirectory(String path) throws IOException