Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,27 @@
*/
package io.trino.filesystem.fileio;

import com.google.common.collect.Iterables;
import io.trino.filesystem.TrinoFileSystem;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.SupportsBulkOperations;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.stream.Stream;

import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

public class ForwardingFileIo
implements FileIO
implements SupportsBulkOperations
{
private static final int DELETE_BATCH_SIZE = 1000;
private static final int BATCH_DELETE_PATHS_MESSAGE_LIMIT = 5;

private final TrinoFileSystem fileSystem;

public ForwardingFileIo(TrinoFileSystem fileSystem)
Expand Down Expand Up @@ -61,4 +69,29 @@ public void deleteFile(String path)
throw new UncheckedIOException("Failed to delete file: " + path, e);
}
}

@Override
public void deleteFiles(Iterable<String> pathsToDelete)
throws BulkDeletionFailureException
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn’t thrown by the method. Should we be using this instead?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we cannot throw BulkDeletionFailureException becauase we cannot fill BulkDeletionFailureException#numberFailedObjects

see #15981 (comment) for more

{
Iterable<List<String>> partitions = Iterables.partition(pathsToDelete, DELETE_BATCH_SIZE);
partitions.forEach(this::deleteBatch);
}

private void deleteBatch(List<String> filesToDelete)
{
try {
fileSystem.deleteFiles(filesToDelete);
}
catch (IOException e) {
throw new UncheckedIOException(
"Failed to delete some or all of files: " +
Stream.concat(
filesToDelete.stream()
.limit(BATCH_DELETE_PATHS_MESSAGE_LIMIT),
filesToDelete.size() > BATCH_DELETE_PATHS_MESSAGE_LIMIT ? Stream.of("...") : Stream.of())
.collect(joining(", ", "[", "]")),
e);
}
}
}