diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index 7fcb9a20402e..c1eb81efc07c 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -25,6 +25,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.collect.Streams; +import io.airlift.concurrent.MoreFutures; import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.airlift.slice.Slice; @@ -217,6 +218,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -457,6 +459,7 @@ public class IcebergMetadata private final ExecutorService icebergScanExecutor; private final Executor metadataFetchingExecutor; private final ExecutorService icebergPlanningExecutor; + private final ExecutorService icebergFileDeleteExecutor; private final Map> tableStatisticsCache = new ConcurrentHashMap<>(); private Transaction transaction; @@ -474,7 +477,8 @@ public IcebergMetadata( Predicate allowedExtraProperties, ExecutorService icebergScanExecutor, Executor metadataFetchingExecutor, - ExecutorService icebergPlanningExecutor) + ExecutorService icebergPlanningExecutor, + ExecutorService icebergFileDeleteExecutor) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null"); @@ -488,6 +492,7 @@ public IcebergMetadata( this.icebergScanExecutor = requireNonNull(icebergScanExecutor, "icebergScanExecutor is null"); this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null"); this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null"); + this.icebergFileDeleteExecutor = requireNonNull(icebergFileDeleteExecutor, "icebergFileDeleteExecutor is null"); } @Override @@ -2280,7 +2285,7 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl validMetadataFileNames.add(fileName(manifest.path())); try (ManifestReader> manifestReader = readerForManifest(table, manifest)) { - for (ContentFile contentFile : manifestReader) { + for (ContentFile contentFile : manifestReader.select(ImmutableList.of("file_path"))) { validDataFileNames.add(fileName(contentFile.location())); } } @@ -2347,8 +2352,9 @@ private static ManifestReader> readerForManifest(Table private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Set validFiles, String subfolder, Map fileIoProperties) { + List> deleteFutures = new ArrayList<>(); try { - List filesToDelete = new ArrayList<>(); + List filesToDelete = new ArrayList<>(DELETE_BATCH_SIZE); TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), fileIoProperties); FileIterator allFiles = fileSystem.listFiles(Location.of(table.location()).appendPath(subfolder)); while (allFiles.hasNext()) { @@ -2356,9 +2362,9 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc if (entry.lastModified().isBefore(expiration) && !validFiles.contains(entry.location().fileName())) { filesToDelete.add(entry.location()); if (filesToDelete.size() >= DELETE_BATCH_SIZE) { - log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, filesToDelete); - fileSystem.deleteFiles(filesToDelete); - filesToDelete.clear(); + List finalFilesToDelete = filesToDelete; + deleteFutures.add(icebergFileDeleteExecutor.submit(() -> deleteFiles(finalFilesToDelete, schemaTableName, fileSystem))); + filesToDelete = new ArrayList<>(DELETE_BATCH_SIZE); } } else { @@ -2369,6 +2375,22 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc log.debug("Deleting files while removing orphan files for table %s %s", schemaTableName, filesToDelete); fileSystem.deleteFiles(filesToDelete); } + + deleteFutures.forEach(MoreFutures::getFutureValue); + } + catch (IOException | UncheckedIOException e) { + throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e); + } + finally { + deleteFutures.forEach(future -> future.cancel(true)); + } + } + + private void deleteFiles(List files, SchemaTableName schemaTableName, TrinoFileSystem fileSystem) + { + log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, files); + try { + fileSystem.deleteFiles(files); } catch (IOException | UncheckedIOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e); diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java index 9ba7b888cab6..44c594a71feb 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadataFactory.java @@ -47,6 +47,7 @@ public class IcebergMetadataFactory private final ExecutorService icebergScanExecutor; private final Executor metadataFetchingExecutor; private final ExecutorService icebergPlanningExecutor; + private final ExecutorService icebergFileDeleteExecutor; @Inject public IcebergMetadataFactory( @@ -60,6 +61,7 @@ public IcebergMetadataFactory( @ForIcebergSplitManager ExecutorService icebergScanExecutor, @ForIcebergMetadata ExecutorService metadataExecutorService, @ForIcebergPlanning ExecutorService icebergPlanningExecutor, + @ForIcebergFileDelete ExecutorService icebergFileDeleteExecutor, IcebergConfig config) { this.typeManager = requireNonNull(typeManager, "typeManager is null"); @@ -85,6 +87,7 @@ public IcebergMetadataFactory( this.metadataFetchingExecutor = new BoundedExecutor(metadataExecutorService, config.getMetadataParallelism()); } this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null"); + this.icebergFileDeleteExecutor = requireNonNull(icebergFileDeleteExecutor, "icebergFileDeleteExecutor is null"); } public IcebergMetadata create(ConnectorIdentity identity) @@ -101,6 +104,7 @@ public IcebergMetadata create(ConnectorIdentity identity) allowedExtraProperties, icebergScanExecutor, metadataFetchingExecutor, - icebergPlanningExecutor); + icebergPlanningExecutor, + icebergFileDeleteExecutor); } } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java index 273921e747e0..cb85ee67e344 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/BaseTrinoCatalogTest.java @@ -152,6 +152,7 @@ public void testNonLowercaseNamespace() _ -> false, newDirectExecutorService(), directExecutor(), + newDirectExecutorService(), newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isFalse(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java index a65dacc3d4e5..24accc0fbcf0 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/glue/TestTrinoGlueCatalog.java @@ -142,6 +142,7 @@ public void testNonLowercaseGlueDatabase() _ -> false, newDirectExecutorService(), directExecutor(), + newDirectExecutorService(), newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)") .isFalse(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java index a2bcf3ae1312..29f0bf4b2a15 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java @@ -196,6 +196,7 @@ public void testNonLowercaseNamespace() _ -> false, newDirectExecutorService(), directExecutor(), + newDirectExecutorService(), newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java index 2c631fad39d8..594732b473f3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/TestTrinoRestCatalog.java @@ -126,6 +126,7 @@ public void testNonLowercaseNamespace() _ -> false, newDirectExecutorService(), directExecutor(), + newDirectExecutorService(), newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue(); diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java index 73cdc219c0bb..1e8b948a7b01 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/snowflake/TestTrinoSnowflakeCatalog.java @@ -229,6 +229,7 @@ public void testNonLowercaseNamespace() _ -> false, newDirectExecutorService(), directExecutor(), + newDirectExecutorService(), newDirectExecutorService()); assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)") .isTrue();