Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import io.airlift.concurrent.MoreFutures;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
Expand Down Expand Up @@ -217,6 +218,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand Down Expand Up @@ -457,6 +459,7 @@ public class IcebergMetadata
private final ExecutorService icebergScanExecutor;
private final Executor metadataFetchingExecutor;
private final ExecutorService icebergPlanningExecutor;
private final ExecutorService icebergFileDeleteExecutor;
private final Map<IcebergTableHandle, AtomicReference<TableStatistics>> tableStatisticsCache = new ConcurrentHashMap<>();

private Transaction transaction;
Expand All @@ -474,7 +477,8 @@ public IcebergMetadata(
Predicate<String> allowedExtraProperties,
ExecutorService icebergScanExecutor,
Executor metadataFetchingExecutor,
ExecutorService icebergPlanningExecutor)
ExecutorService icebergPlanningExecutor,
ExecutorService icebergFileDeleteExecutor)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.trinoCatalogHandle = requireNonNull(trinoCatalogHandle, "trinoCatalogHandle is null");
Expand All @@ -488,6 +492,7 @@ public IcebergMetadata(
this.icebergScanExecutor = requireNonNull(icebergScanExecutor, "icebergScanExecutor is null");
this.metadataFetchingExecutor = requireNonNull(metadataFetchingExecutor, "metadataFetchingExecutor is null");
this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null");
this.icebergFileDeleteExecutor = requireNonNull(icebergFileDeleteExecutor, "icebergFileDeleteExecutor is null");
}

@Override
Expand Down Expand Up @@ -2280,7 +2285,7 @@ private void removeOrphanFiles(Table table, ConnectorSession session, SchemaTabl

validMetadataFileNames.add(fileName(manifest.path()));
try (ManifestReader<? extends ContentFile<?>> manifestReader = readerForManifest(table, manifest)) {
for (ContentFile<?> contentFile : manifestReader) {
for (ContentFile<?> contentFile : manifestReader.select(ImmutableList.of("file_path"))) {
validDataFileNames.add(fileName(contentFile.location()));
}
}
Expand Down Expand Up @@ -2347,18 +2352,19 @@ private static ManifestReader<? extends ContentFile<?>> readerForManifest(Table

private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, SchemaTableName schemaTableName, Instant expiration, Set<String> validFiles, String subfolder, Map<String, String> fileIoProperties)
{
List<Future<?>> deleteFutures = new ArrayList<>();
try {
List<Location> filesToDelete = new ArrayList<>();
List<Location> filesToDelete = new ArrayList<>(DELETE_BATCH_SIZE);
TrinoFileSystem fileSystem = fileSystemFactory.create(session.getIdentity(), fileIoProperties);
FileIterator allFiles = fileSystem.listFiles(Location.of(table.location()).appendPath(subfolder));
while (allFiles.hasNext()) {
FileEntry entry = allFiles.next();
if (entry.lastModified().isBefore(expiration) && !validFiles.contains(entry.location().fileName())) {
filesToDelete.add(entry.location());
if (filesToDelete.size() >= DELETE_BATCH_SIZE) {
log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, filesToDelete);
fileSystem.deleteFiles(filesToDelete);
filesToDelete.clear();
List<Location> finalFilesToDelete = filesToDelete;
deleteFutures.add(icebergFileDeleteExecutor.submit(() -> deleteFiles(finalFilesToDelete, schemaTableName, fileSystem)));
Copy link
Contributor

@findinpath findinpath Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth seeing whether in production use cases the object store provider will choose to return 429 Too Many Requests

Is the filesystem layer being able to cope with backpressure?
I'm looking at https://github.com/trinodb/trino/blob/master/lib/trino-filesystem-s3/src/main/java/io/trino/filesystem/s3/S3FileSystem.java and i don't see any FailSafe logic built-in.

cc @wendigo @ebyhr

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall parallelism is bounded by iceberg.file-delete-threads.
In general, either the SDK or the filesystem layer is expected to handle backoff and retry on throttling errors.

filesToDelete = new ArrayList<>(DELETE_BATCH_SIZE);
}
}
else {
Expand All @@ -2369,6 +2375,22 @@ private void scanAndDeleteInvalidFiles(Table table, ConnectorSession session, Sc
log.debug("Deleting files while removing orphan files for table %s %s", schemaTableName, filesToDelete);
fileSystem.deleteFiles(filesToDelete);
}

deleteFutures.forEach(MoreFutures::getFutureValue);
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e);
}
finally {
deleteFutures.forEach(future -> future.cancel(true));
}
}

private void deleteFiles(List<Location> files, SchemaTableName schemaTableName, TrinoFileSystem fileSystem)
{
log.debug("Deleting files while removing orphan files for table %s [%s]", schemaTableName, files);
try {
fileSystem.deleteFiles(files);
}
catch (IOException | UncheckedIOException e) {
throw new TrinoException(ICEBERG_FILESYSTEM_ERROR, "Failed accessing data for table: " + schemaTableName, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class IcebergMetadataFactory
private final ExecutorService icebergScanExecutor;
private final Executor metadataFetchingExecutor;
private final ExecutorService icebergPlanningExecutor;
private final ExecutorService icebergFileDeleteExecutor;

@Inject
public IcebergMetadataFactory(
Expand All @@ -60,6 +61,7 @@ public IcebergMetadataFactory(
@ForIcebergSplitManager ExecutorService icebergScanExecutor,
@ForIcebergMetadata ExecutorService metadataExecutorService,
@ForIcebergPlanning ExecutorService icebergPlanningExecutor,
@ForIcebergFileDelete ExecutorService icebergFileDeleteExecutor,
IcebergConfig config)
{
this.typeManager = requireNonNull(typeManager, "typeManager is null");
Expand All @@ -85,6 +87,7 @@ public IcebergMetadataFactory(
this.metadataFetchingExecutor = new BoundedExecutor(metadataExecutorService, config.getMetadataParallelism());
}
this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null");
this.icebergFileDeleteExecutor = requireNonNull(icebergFileDeleteExecutor, "icebergFileDeleteExecutor is null");
}

public IcebergMetadata create(ConnectorIdentity identity)
Expand All @@ -101,6 +104,7 @@ public IcebergMetadata create(ConnectorIdentity identity)
allowedExtraProperties,
icebergScanExecutor,
metadataFetchingExecutor,
icebergPlanningExecutor);
icebergPlanningExecutor,
icebergFileDeleteExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void testNonLowercaseNamespace()
_ -> false,
newDirectExecutorService(),
directExecutor(),
newDirectExecutorService(),
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ public void testNonLowercaseGlueDatabase()
_ -> false,
newDirectExecutorService(),
directExecutor(),
newDirectExecutorService(),
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, databaseName)).as("icebergMetadata.schemaExists(databaseName)")
.isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public void testNonLowercaseNamespace()
_ -> false,
newDirectExecutorService(),
directExecutor(),
newDirectExecutorService(),
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public void testNonLowercaseNamespace()
_ -> false,
newDirectExecutorService(),
directExecutor(),
newDirectExecutorService(),
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public void testNonLowercaseNamespace()
_ -> false,
newDirectExecutorService(),
directExecutor(),
newDirectExecutorService(),
newDirectExecutorService());
assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
.isTrue();
Expand Down