diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index c7e5df97e649..10cc286c2731 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -865,12 +865,23 @@ private static void cleanExtraOutputFiles(TrinoFileSystem fileSystem, String que log.info("Found %s files to delete and %s to retain in location %s for query %s", filesToDelete.size(), fileNamesToKeep.size(), location, queryId); ImmutableList.Builder deletedFilesBuilder = ImmutableList.builder(); Iterator filesToDeleteIterator = filesToDelete.iterator(); + List deleteBatch = new ArrayList<>(); while (filesToDeleteIterator.hasNext()) { String fileName = filesToDeleteIterator.next(); - log.debug("Deleting failed attempt file %s/%s for query %s", location, fileName, queryId); - fileSystem.deleteFile(location + "/" + fileName); deletedFilesBuilder.add(fileName); filesToDeleteIterator.remove(); + + deleteBatch.add(location + "/" + fileName); + if (deleteBatch.size() >= DELETE_BATCH_SIZE) { + log.debug("Deleting failed attempt files %s for query %s", deleteBatch, queryId); + fileSystem.deleteFiles(deleteBatch); + deleteBatch.clear(); + } + } + + if (!deleteBatch.isEmpty()) { + log.debug("Deleting failed attempt files %s for query %s", deleteBatch, queryId); + fileSystem.deleteFiles(deleteBatch); } List deletedFiles = deletedFilesBuilder.build(); @@ -1341,18 +1352,27 @@ private static ManifestReader> readerForManifest(Table private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp, Set validFiles, String subfolder) { try { + List filesToDelete = new ArrayList<>(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); FileIterator allFiles = fileSystem.listFiles(table.location() + "/" + subfolder); while (allFiles.hasNext()) { FileEntry entry = allFiles.next(); if (entry.lastModified() < expireTimestamp && !validFiles.contains(fileName(entry.path()))) { - log.debug("Deleting %s file while removing orphan files %s", entry.path(), schemaTableName.getTableName()); - fileSystem.deleteFile(entry.path()); + filesToDelete.add(entry.path()); + if (filesToDelete.size() >= DELETE_BATCH_SIZE) { + log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, filesToDelete); + fileSystem.deleteFiles(filesToDelete); + filesToDelete.clear(); + } } else { log.debug("%s file retained while removing orphan files %s", entry.path(), schemaTableName.getTableName()); } } + if (!filesToDelete.isEmpty()) { + log.debug("Deleting files while removing orphan files for table %s %s", schemaTableName, filesToDelete); + fileSystem.deleteFiles(filesToDelete); + } } catch (IOException e) { throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e); @@ -1880,11 +1900,10 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col if (!fullyDeletedFiles.isEmpty()) { try { TrinoFileSystem fileSystem = fileSystemFactory.create(session); - for (List commitTasksToCleanUp : fullyDeletedFiles.values()) { - for (CommitTaskData commitTaskData : commitTasksToCleanUp) { - fileSystem.deleteFile(commitTaskData.getPath()); - } - } + fileSystem.deleteFiles(fullyDeletedFiles.values().stream() + .flatMap(Collection::stream) + .map(CommitTaskData::getPath) + .collect(toImmutableSet())); } catch (IOException e) { log.warn(e, "Failed to clean up uncommitted position delete files");