Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -865,12 +865,23 @@ private static void cleanExtraOutputFiles(TrinoFileSystem fileSystem, String que
log.info("Found %s files to delete and %s to retain in location %s for query %s", filesToDelete.size(), fileNamesToKeep.size(), location, queryId);
ImmutableList.Builder<String> deletedFilesBuilder = ImmutableList.builder();
Iterator<String> filesToDeleteIterator = filesToDelete.iterator();
List<String> deleteBatch = new ArrayList<>();
while (filesToDeleteIterator.hasNext()) {
String fileName = filesToDeleteIterator.next();
log.debug("Deleting failed attempt file %s/%s for query %s", location, fileName, queryId);
fileSystem.deleteFile(location + "/" + fileName);
deletedFilesBuilder.add(fileName);
filesToDeleteIterator.remove();

deleteBatch.add(location + "/" + fileName);
if (deleteBatch.size() >= DELETE_BATCH_SIZE) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep deleteBatch.add(location + "/" + fileName); and if (deleteBatch.size() >= DELETE_BATCH_ lines together

log.debug("Deleting failed attempt files %s for query %s", deleteBatch, queryId);
fileSystem.deleteFiles(deleteBatch);
deleteBatch.clear();
}
}

if (!deleteBatch.isEmpty()) {
log.debug("Deleting failed attempt files %s for query %s", deleteBatch, queryId);
fileSystem.deleteFiles(deleteBatch);
}

List<String> deletedFiles = deletedFilesBuilder.build();
Expand Down Expand Up @@ -1341,18 +1352,27 @@ private static ManifestReader<? extends ContentFile<?>> readerForManifest(Table
private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, long expireTimestamp, Set<String> validFiles, String subfolder)
{
try {
List<String> filesToDelete = new ArrayList<>();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
FileIterator allFiles = fileSystem.listFiles(table.location() + "/" + subfolder);
while (allFiles.hasNext()) {
FileEntry entry = allFiles.next();
if (entry.lastModified() < expireTimestamp && !validFiles.contains(fileName(entry.path()))) {
log.debug("Deleting %s file while removing orphan files %s", entry.path(), schemaTableName.getTableName());
fileSystem.deleteFile(entry.path());
filesToDelete.add(entry.path());
if (filesToDelete.size() >= DELETE_BATCH_SIZE) {
log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, filesToDelete);
fileSystem.deleteFiles(filesToDelete);
filesToDelete.clear();
}
}
else {
log.debug("%s file retained while removing orphan files %s", entry.path(), schemaTableName.getTableName());
}
}
if (!filesToDelete.isEmpty()) {
log.debug("Deleting files while removing orphan files for table %s %s", schemaTableName, filesToDelete);
fileSystem.deleteFiles(filesToDelete);
}
}
catch (IOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e);
Expand Down Expand Up @@ -1880,11 +1900,10 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
if (!fullyDeletedFiles.isEmpty()) {
try {
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
for (List<CommitTaskData> commitTasksToCleanUp : fullyDeletedFiles.values()) {
for (CommitTaskData commitTaskData : commitTasksToCleanUp) {
fileSystem.deleteFile(commitTaskData.getPath());
}
}
fileSystem.deleteFiles(fullyDeletedFiles.values().stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may want to make the multi-file deletion in batches of DELETE_BATCH_SIZE size.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add it, but IMO it's not really necessary. In remove_orphan_files or expire_snapshots it's a good idea because we're loading the list of files to delete incrementally, and batching there prevents us from materializing the full list in memory. Here, we already have the list in memory, so we can rely on the file system to batch in sizes appropriate for the implementation.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the way you did it here is not helpful. Actually I expected you will partition it on higher level -

fullyDeletedFiles.values().stream()
                        .flatMap(Collection::stream)
                        .map(CommitTaskData::getPath)

without collecting it to list first. But now, after I learnt that Lists.partition() is not lazy, I am not sure if this can be done...

.flatMap(Collection::stream)
.map(CommitTaskData::getPath)
.collect(toImmutableSet()));
}
catch (IOException e) {
log.warn(e, "Failed to clean up uncommitted position delete files");
Expand Down