Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.TableProperties.DEFAULT_WRITE_METRICS_MODE;
import static org.apache.iceberg.io.DeleteSchemaUtil.pathPosSchema;
import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert;

public class IcebergFileWriterFactory
{
private static final Schema POSITION_DELETE_SCHEMA = pathPosSchema();
private static final MetricsConfig FULL_METRICS_CONFIG = MetricsConfig.fromProperties(ImmutableMap.of(DEFAULT_WRITE_METRICS_MODE, "full"));
private static final Splitter COLUMN_NAMES_SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings();

private final TypeManager typeManager;
Expand Down Expand Up @@ -151,8 +149,8 @@ public IcebergFileWriter createPositionDeleteWriter(
Map<String, String> storageProperties)
{
return switch (fileFormat) {
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case PARQUET -> createParquetWriter(MetricsConfig.forPositionDelete(), fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
case ORC -> createOrcWriter(MetricsConfig.forPositionDelete(), fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, storageProperties);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3632,7 +3632,7 @@ private void finishWrite(ConnectorSession session, IcebergTableHandle table, Col
switch (task.content()) {
case DATA -> dataTasks.add(task);
case POSITION_DELETES -> deleteTasks.add(task);
case EQUALITY_DELETES -> throw new UnsupportedOperationException("Unsupported task content: " + task.content());
case EQUALITY_DELETES, DATA_MANIFEST, DELETE_MANIFEST -> throw new UnsupportedOperationException("Unsupported task content: " + task.content());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ private synchronized Iterator<FileScanTaskWithDomain> prepareFileTasksIterator(L
}
yield isUnconstrainedPathAndTimeDomain();
}
case DATA -> throw new IllegalStateException("Unexpected delete file: " + deleteFile);
case DATA, DATA_MANIFEST, DELETE_MANIFEST -> throw new IllegalStateException("Unexpected delete file: " + deleteFile);
})
.collect(toImmutableList());
scannedFiles.add(new DataFileWithDeleteFiles(wholeFileTask.file(), fullyAppliedDeletes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.HTTPClient;
import org.apache.iceberg.rest.RESTCatalogProperties;
import org.apache.iceberg.rest.RESTSessionCatalog;
import org.apache.iceberg.rest.RESTUtil;

Expand Down Expand Up @@ -129,7 +130,7 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
properties.put(CatalogProperties.URI, serverUri.toString());
warehouse.ifPresent(location -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, location));
prefix.ifPresent(prefix -> properties.put("prefix", prefix));
properties.put("view-endpoints-supported", Boolean.toString(viewEndpointsEnabled));
properties.put(RESTCatalogProperties.VIEW_ENDPOINTS_SUPPORTED, Boolean.toString(viewEndpointsEnabled));
properties.put("trino-version", trinoVersion);
properties.put(AUTH_SESSION_TIMEOUT_MS, String.valueOf(sessionTimeout.toMillis()));
connectionTimeout.ifPresent(duration -> properties.put("rest.client.connection-timeout-ms", String.valueOf(duration.toMillis())));
Expand All @@ -148,8 +149,13 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity)
(context, config) -> {
Comment thread
kaveti marked this conversation as resolved.
ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the changes from apache/iceberg#12591 disregarded the fact that the ioBuilder is not receiving now the storageCredentials.

@nastra was it intentional to skip it during the implementation phase?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findinpath this was not intentional. I believe Trino is the only engine that goes through that path. Can you please open a PR with a fix?

Copy link
Copy Markdown
Contributor Author

@kaveti kaveti Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

opened PR: apache/iceberg#15752 , cc @nastra

? ((ConnectorIdentity) context.wrappedIdentity())
: ConnectorIdentity.ofUser("fake");
return fileIoFactory.create(fileSystemFactory.create(currentIdentity, config), true, config);
: ConnectorIdentity.ofUser("trino");
return fileIoFactory.create(
fileSystemFactory.create(currentIdentity, config),
true,
config,
fileSystemFactory,
currentIdentity);
});
icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public Optional<RowPredicate> getDeletePredicate(
}
}
case EQUALITY_DELETES -> equalityDeleteFiles.add(deleteFile);
case DATA -> throw new VerifyException("DATA is not delete file type");
case DATA, DATA_MANIFEST, DELETE_MANIFEST -> throw new VerifyException("DATA is not delete file type");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,30 @@
package io.trino.plugin.iceberg.fileio;

import com.google.common.annotations.VisibleForTesting;
Comment thread
kaveti marked this conversation as resolved.
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Comment thread
kaveti marked this conversation as resolved.
import com.google.common.collect.Iterables;
import com.google.common.collect.Streams;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.IcebergFileSystemFactory;
import io.trino.spi.TrinoException;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestListFile;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.StorageCredential;
Comment thread
kaveti marked this conversation as resolved.
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.io.SupportsStorageCredentials;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand All @@ -45,7 +53,7 @@
import static java.util.stream.Collectors.joining;

public class ForwardingFileIo
implements SupportsBulkOperations
implements SupportsBulkOperations, SupportsStorageCredentials
Copy link
Copy Markdown
Contributor

@findinpath findinpath Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm looking at S3FileIO

https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L114

https://github.com/apache/iceberg/blob/main/aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java#L102

and am rather thinking that we should have a prefixed map of TrinoFileSystem actually (and probably the default one we used to have as fallback).

{
private static final int DELETE_BATCH_SIZE = 1000;
private static final int BATCH_DELETE_PATHS_MESSAGE_LIMIT = 5;
Expand All @@ -54,6 +62,11 @@ public class ForwardingFileIo
private final Map<String, String> properties;
private final boolean useFileSizeFromMetadata;
private final ExecutorService deleteExecutor;
private final IcebergFileSystemFactory storageCredentialFileSystemFactory;
private final ConnectorIdentity storageCredentialIdentity;

private volatile List<StorageCredential> storageCredentials = ImmutableList.of();
private volatile Map<String, TrinoFileSystem> prefixedFileSystems = ImmutableMap.of();

@VisibleForTesting
public ForwardingFileIo(TrinoFileSystem fileSystem, boolean useFileSizeFromMetadata)
Expand All @@ -62,22 +75,36 @@ public ForwardingFileIo(TrinoFileSystem fileSystem, boolean useFileSizeFromMetad
}

public ForwardingFileIo(TrinoFileSystem fileSystem, Map<String, String> properties, boolean useFileSizeFromMetadata, ExecutorService deleteExecutor)
{
this(fileSystem, properties, useFileSizeFromMetadata, deleteExecutor, null, null);
}

public ForwardingFileIo(
TrinoFileSystem fileSystem,
Map<String, String> properties,
boolean useFileSizeFromMetadata,
ExecutorService deleteExecutor,
IcebergFileSystemFactory storageCredentialFileSystemFactory,
ConnectorIdentity storageCredentialIdentity)
{
this.fileSystem = requireNonNull(fileSystem, "fileSystem is null");
this.deleteExecutor = requireNonNull(deleteExecutor, "executorService is null");
this.properties = ImmutableMap.copyOf(requireNonNull(properties, "properties is null"));
this.useFileSizeFromMetadata = useFileSizeFromMetadata;
this.storageCredentialFileSystemFactory = storageCredentialFileSystemFactory;
this.storageCredentialIdentity = storageCredentialIdentity;
}

@Override
public InputFile newInputFile(String path)
{
return new ForwardingInputFile(fileSystem.newInputFile(Location.of(path)));
return new ForwardingInputFile(fileSystemForPath(path).newInputFile(Location.of(path)));
}

@Override
public InputFile newInputFile(String path, long length)
{
TrinoFileSystem fileSystem = fileSystemForPath(path);
if (!useFileSizeFromMetadata) {
return new ForwardingInputFile(fileSystem.newInputFile(Location.of(path)));
}
Expand All @@ -88,14 +115,14 @@ public InputFile newInputFile(String path, long length)
@Override
public OutputFile newOutputFile(String path)
{
return new ForwardingOutputFile(fileSystem, Location.of(path));
return new ForwardingOutputFile(fileSystemForPath(path), Location.of(path));
}

@Override
public void deleteFile(String path)
{
try {
fileSystem.deleteFile(Location.of(path));
fileSystemForPath(path).deleteFile(Location.of(path));
}
catch (IOException e) {
throw new UncheckedIOException("Failed to delete file: " + path, e);
Expand Down Expand Up @@ -149,10 +176,24 @@ public InputFile newInputFile(DeleteFile file)
return SupportsBulkOperations.super.newInputFile(file);
}

@Override
public InputFile newInputFile(ManifestListFile manifestList)
{
return SupportsBulkOperations.super.newInputFile(manifestList);
}

private void deleteBatch(List<String> filesToDelete)
{
try {
fileSystem.deleteFiles(filesToDelete.stream().map(Location::of).toList());
Map<TrinoFileSystem, List<Location>> locationsByFileSystem = new IdentityHashMap<>();
for (String path : filesToDelete) {
TrinoFileSystem fileSystem = fileSystemForPath(path);
locationsByFileSystem.computeIfAbsent(fileSystem, ignored -> new ArrayList<>())
.add(Location.of(path));
}
for (Map.Entry<TrinoFileSystem, List<Location>> entry : locationsByFileSystem.entrySet()) {
entry.getKey().deleteFiles(entry.getValue());
}
}
catch (IOException e) {
throw new UncheckedIOException(
Expand All @@ -172,6 +213,51 @@ public Map<String, String> properties()
return properties;
}

@Override
public void setCredentials(List<StorageCredential> credentials)
{
storageCredentials = ImmutableList.copyOf(requireNonNull(credentials, "credentials is null"));
rebuildPrefixedFileSystems();
}

@Override
public List<StorageCredential> credentials()
{
return storageCredentials;
}

private TrinoFileSystem fileSystemForPath(String path)
{
TrinoFileSystem matchingFileSystem = fileSystem;
int matchingPrefixLength = -1;
for (Map.Entry<String, TrinoFileSystem> prefixedFileSystem : prefixedFileSystems.entrySet()) {
String prefix = prefixedFileSystem.getKey();
if (path.startsWith(prefix) && prefix.length() > matchingPrefixLength) {
matchingPrefixLength = prefix.length();
matchingFileSystem = prefixedFileSystem.getValue();
}
}
return matchingFileSystem;
}

private void rebuildPrefixedFileSystems()
{
if (storageCredentials.isEmpty() || storageCredentialFileSystemFactory == null || storageCredentialIdentity == null) {
prefixedFileSystems = ImmutableMap.of();
return;
}

ImmutableMap.Builder<String, TrinoFileSystem> rebuiltFileSystems = ImmutableMap.builder();
for (StorageCredential storageCredential : storageCredentials) {
Map<String, String> mergedProperties = ImmutableMap.<String, String>builder()
.putAll(properties)
.putAll(storageCredential.config())
.buildKeepingLast();
rebuiltFileSystems.put(storageCredential.prefix(), storageCredentialFileSystemFactory.create(storageCredentialIdentity, mergedProperties));
}
prefixedFileSystems = rebuiltFileSystems.buildKeepingLast();
}

@Override
public void initialize(Map<String, String> properties)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import com.google.inject.Inject;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.iceberg.ForIcebergFileDelete;
import io.trino.plugin.iceberg.IcebergFileSystemFactory;
import io.trino.spi.security.ConnectorIdentity;
import org.apache.iceberg.io.FileIO;

import java.util.Map;
Expand Down Expand Up @@ -48,4 +50,20 @@ public FileIO create(TrinoFileSystem fileSystem, boolean useFileSizeFromMetadata
{
return new ForwardingFileIo(fileSystem, properties, useFileSizeFromMetadata, deleteExecutor);
}

public FileIO create(
TrinoFileSystem fileSystem,
boolean useFileSizeFromMetadata,
Map<String, String> properties,
IcebergFileSystemFactory storageCredentialFileSystemFactory,
ConnectorIdentity storageCredentialIdentity)
{
return new ForwardingFileIo(
fileSystem,
properties,
useFileSizeFromMetadata,
deleteExecutor,
storageCredentialFileSystemFactory,
storageCredentialIdentity);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7927,6 +7927,77 @@ public void testPreparedStatementWithParameterizedVersionedTable()
assertQueryFails(session, "DESCRIBE OUTPUT my_query", ".* DESCRIBE is not supported if a versioned table uses parameters");
}

@Test
public void testTimeTravelWithFilterOnRenamedColumn()
{
testTimeTravelWithFilterOnRenamedColumn(false);
testTimeTravelWithFilterOnRenamedColumn(true);
}

private void testTimeTravelWithFilterOnRenamedColumn(boolean partitioned)
{
String partition = partitioned ? "WITH (partitioning = ARRAY['part'])" : "";
try (TestTable table = newTrinoTable("time_travel_with_filter_on_rename_", "(x int, y int, part int)" + partition)) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)", 3);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)");
long firstSnapshotId = getCurrentSnapshotId(table.getName());

assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN x TO renamed_x");

// generate a new version
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2, 3)", 1);

assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE x = 1"))
.matches("VALUES (1, 1, 1), (1, 2, 2)");
}
}

@Test
public void testTimeTravelWithFilterOnDroppedColumn()
{
testTimeTravelWithFilterOnDroppedColumn(false);
testTimeTravelWithFilterOnDroppedColumn(true);
}

private void testTimeTravelWithFilterOnDroppedColumn(boolean partitioned)
{
String partition = partitioned ? "WITH (partitioning = ARRAY['part'])" : "";
try (TestTable table = newTrinoTable("time_travel_with_filter_on_drop_", "(x int, y int, part int)" + partition)) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)", 3);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (1, 1, 1), (1, 2, 2), (2, 2, 2)");
long firstSnapshotId = getCurrentSnapshotId(table.getName());

assertUpdate("ALTER TABLE " + table.getName() + " DROP COLUMN x");

// generate a new version
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 2)", 1);

assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE x = 1"))
.matches("VALUES (1, 1, 1), (1, 2, 2)");
}
}

@Test
public void testTimeTravelWithFilterOnRenamedPartitionColumn()
{
try (TestTable table = newTrinoTable("time_travel_with_filter_on_drop_", "(x int, part1 int, part2 int) WITH (partitioning = ARRAY['part1', 'part2'])")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 1), (1, 1, 2), (2, 2, 2)", 3);
assertThat(query("SELECT * FROM " + table.getName()))
.matches("VALUES (1, 1, 1), (1, 1, 2), (2, 2, 2)");
long firstSnapshotId = getCurrentSnapshotId(table.getName());

assertUpdate("ALTER TABLE " + table.getName() + " RENAME COLUMN part1 TO renamed_part");

// generate a new version
assertUpdate("INSERT INTO " + table.getName() + " VALUES (1, 1, 3)", 1);

assertThat(query("SELECT * FROM " + table.getName() + " FOR VERSION AS OF " + firstSnapshotId + " WHERE part1 = 1"))
.matches("VALUES (1, 1, 1), (1, 1, 2)");
}
}

@Test
public void testDeleteRetainsTableHistory()
{
Expand Down
Loading
Loading