diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml index 87be6398dba3..59c2a765d377 100644 --- a/plugin/trino-iceberg/pom.xml +++ b/plugin/trino-iceberg/pom.xml @@ -73,12 +73,6 @@ trino-plugin-toolkit - - - io.trino.hadoop - hadoop-apache - - io.trino.hive hive-apache @@ -175,6 +169,11 @@ jjwt-jackson + + javax.annotation + javax.annotation-api + + javax.inject javax.inject @@ -249,6 +248,12 @@ runtime + + io.trino.hadoop + hadoop-apache + runtime + + io.airlift concurrent @@ -261,7 +266,6 @@ runtime - io.airlift node @@ -283,14 +287,14 @@ org.apache.httpcomponents.client5 httpclient5 - 5.1 + 5.2.1 runtime org.apache.httpcomponents.core5 httpcore5 - 5.1.1 + 5.2.1 runtime @@ -538,16 +542,6 @@ - - org.apache.maven.plugins - maven-dependency-plugin - - - io.trino.hadoop:hadoop-apache - - - - org.basepom.maven duplicate-finder-maven-plugin diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java index f80c0192d8ba..bb980218ae35 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSplitSource.java @@ -193,8 +193,6 @@ public CompletableFuture getNextBatch(int maxSize) closer.register(fileScanTaskIterable); this.fileScanTaskIterator = fileScanTaskIterable.iterator(); closer.register(fileScanTaskIterator); - // TODO: Remove when NPE check has been released: https://github.com/trinodb/trino/issues/15372 - isFinished(); } TupleDomain dynamicFilterPredicate = dynamicFilter.getCurrentPredicate() diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java index 4a2023dee212..86078c414963 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/jdbc/TrinoJdbcCatalogFactory.java @@ -20,14 +20,16 @@ import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.spi.security.ConnectorIdentity; import io.trino.spi.type.TypeManager; import org.apache.iceberg.jdbc.JdbcCatalog; +import org.apache.iceberg.jdbc.JdbcClientPool; -import javax.annotation.concurrent.GuardedBy; +import javax.annotation.PreDestroy; import javax.inject.Inject; -import java.util.Optional; +import java.util.Map; import static java.util.Objects.requireNonNull; import static org.apache.iceberg.CatalogProperties.URI; @@ -43,14 +45,10 @@ public class TrinoJdbcCatalogFactory private final TrinoFileSystemFactory fileSystemFactory; private final IcebergJdbcClient jdbcClient; private final String jdbcCatalogName; - private final String connectionUrl; - private final Optional connectionUser; - private final Optional connectionPassword; private final String defaultWarehouseDir; private final boolean isUniqueTableLocation; - - @GuardedBy("this") - private JdbcCatalog icebergCatalog; + private final Map catalogProperties; + private final JdbcClientPool clientPool; @Inject public TrinoJdbcCatalogFactory( @@ -69,39 +67,42 @@ public TrinoJdbcCatalogFactory( this.isUniqueTableLocation = requireNonNull(icebergConfig, "icebergConfig is null").isUniqueTableLocation(); this.jdbcClient = requireNonNull(jdbcClient, "jdbcClient is null"); this.jdbcCatalogName = jdbcConfig.getCatalogName(); - this.connectionUrl = jdbcConfig.getConnectionUrl(); - this.connectionUser = jdbcConfig.getConnectionUser(); - this.connectionPassword = jdbcConfig.getConnectionPassword(); this.defaultWarehouseDir = jdbcConfig.getDefaultWarehouseDir(); + + ImmutableMap.Builder properties = ImmutableMap.builder(); + properties.put(URI, jdbcConfig.getConnectionUrl()); + properties.put(WAREHOUSE_LOCATION, defaultWarehouseDir); + jdbcConfig.getConnectionUser().ifPresent(user -> properties.put(PROPERTY_PREFIX + "user", user)); + jdbcConfig.getConnectionPassword().ifPresent(password -> properties.put(PROPERTY_PREFIX + "password", password)); + this.catalogProperties = properties.buildOrThrow(); + + this.clientPool = new JdbcClientPool(jdbcConfig.getConnectionUrl(), catalogProperties); + } + + @PreDestroy + public void shutdown() + { + clientPool.close(); } @Override - public synchronized TrinoCatalog create(ConnectorIdentity identity) + public TrinoCatalog create(ConnectorIdentity identity) { - // Reuse JdbcCatalog instance to avoid JDBC connection leaks - if (icebergCatalog == null) { - icebergCatalog = createJdbcCatalog(); - } + JdbcCatalog jdbcCatalog = new JdbcCatalog( + config -> new ForwardingFileIo(fileSystemFactory.create(identity)), + config -> clientPool, + false); + + jdbcCatalog.initialize(jdbcCatalogName, catalogProperties); + return new TrinoJdbcCatalog( catalogName, typeManager, tableOperationsProvider, - icebergCatalog, + jdbcCatalog, jdbcClient, fileSystemFactory, isUniqueTableLocation, defaultWarehouseDir); } - - private JdbcCatalog createJdbcCatalog() - { - JdbcCatalog jdbcCatalog = new JdbcCatalog(); - ImmutableMap.Builder properties = ImmutableMap.builder(); - properties.put(URI, connectionUrl); - properties.put(WAREHOUSE_LOCATION, defaultWarehouseDir); - connectionUser.ifPresent(user -> properties.put(PROPERTY_PREFIX + "user", user)); - connectionPassword.ifPresent(password -> properties.put(PROPERTY_PREFIX + "password", password)); - jdbcCatalog.initialize(jdbcCatalogName, properties.buildOrThrow()); - return jdbcCatalog; - } } diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java index 0a6b9e54f533..02a990feaf99 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoIcebergRestCatalogFactory.java @@ -14,15 +14,17 @@ package io.trino.plugin.iceberg.catalog.rest; import com.google.common.collect.ImmutableMap; -import io.trino.hdfs.ConfigurationUtils; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.base.CatalogName; import io.trino.plugin.hive.NodeVersion; import io.trino.plugin.iceberg.IcebergConfig; import io.trino.plugin.iceberg.catalog.TrinoCatalog; import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory; import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogConfig.SessionType; +import io.trino.plugin.iceberg.fileio.ForwardingFileIo; import io.trino.spi.security.ConnectorIdentity; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.rest.HTTPClient; import org.apache.iceberg.rest.RESTSessionCatalog; import javax.annotation.concurrent.GuardedBy; @@ -36,6 +38,7 @@ public class TrinoIcebergRestCatalogFactory implements TrinoCatalogFactory { + private final TrinoFileSystemFactory fileSystemFactory; private final CatalogName catalogName; private final String trinoVersion; private final URI serverUri; @@ -49,12 +52,14 @@ public class TrinoIcebergRestCatalogFactory @Inject public TrinoIcebergRestCatalogFactory( + TrinoFileSystemFactory fileSystemFactory, CatalogName catalogName, IcebergRestCatalogConfig restConfig, SecurityProperties securityProperties, IcebergConfig icebergConfig, NodeVersion nodeVersion) { + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.catalogName = requireNonNull(catalogName, "catalogName is null"); this.trinoVersion = requireNonNull(nodeVersion, "nodeVersion is null").toString(); requireNonNull(restConfig, "restConfig is null"); @@ -77,8 +82,14 @@ public synchronized TrinoCatalog create(ConnectorIdentity identity) warehouse.ifPresent(location -> properties.put(CatalogProperties.WAREHOUSE_LOCATION, location)); properties.put("trino-version", trinoVersion); properties.putAll(securityProperties.get()); - RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog(); - icebergCatalogInstance.setConf(ConfigurationUtils.getInitialConfiguration()); + RESTSessionCatalog icebergCatalogInstance = new RESTSessionCatalog( + config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build(), + (context, config) -> { + ConnectorIdentity currentIdentity = (context.wrappedIdentity() != null) + ? ((ConnectorIdentity) context.wrappedIdentity()) + : ConnectorIdentity.ofUser("fake"); + return new ForwardingFileIo(fileSystemFactory.create(currentIdentity)); + }); icebergCatalogInstance.initialize(catalogName.toString(), properties.buildOrThrow()); icebergCatalog = icebergCatalogInstance; diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java index bc108eb9fcbc..afbff20eba1a 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/rest/TrinoRestCatalog.java @@ -402,7 +402,7 @@ public void updateViewColumnComment(ConnectorSession session, SchemaTableName sc private SessionCatalog.SessionContext convert(ConnectorSession session) { return switch (sessionType) { - case NONE -> SessionCatalog.SessionContext.createEmpty(); + case NONE -> new SessionContext(randomUUID().toString(), null, null, ImmutableMap.of(), session.getIdentity()); case USER -> { String sessionId = format("%s-%s", session.getUser(), session.getSource().orElse("default")); @@ -429,7 +429,7 @@ private SessionCatalog.SessionContext convert(ConnectorSession session) .put(OAuth2Properties.JWT_TOKEN_TYPE, subjectJwt) .buildOrThrow(); - yield new SessionCatalog.SessionContext(sessionId, session.getUser(), credentials, properties); + yield new SessionCatalog.SessionContext(sessionId, session.getUser(), credentials, properties, session.getIdentity()); } }; } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index c5ae2c480369..c994e282bb9d 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -5591,7 +5591,7 @@ public void testExpireSnapshots() .matches("VALUES (BIGINT '3', VARCHAR 'one two')"); List updatedFiles = getAllMetadataFilesFromTableDirectory(tableLocation); List updatedSnapshots = getSnapshotIds(tableName); - assertThat(updatedFiles.size()).isEqualTo(initialFiles.size() - 1); + assertThat(updatedFiles.size()).isEqualTo(initialFiles.size() - 2); assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size()); assertThat(updatedSnapshots.size()).isEqualTo(1); assertThat(initialSnapshots).containsAll(updatedSnapshots); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java index 450083e11e0e..769c88989c0a 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergMinioConnectorSmokeTest.java @@ -205,7 +205,7 @@ public void testExpireSnapshotsBatchDeletes() assertThat(query("SELECT * FROM " + tableName)) .matches("VALUES (VARCHAR 'one', 1), (VARCHAR 'two', 2)"); - assertThat(events).hasSize(2); + assertThat(events).hasSize(3); // if files were deleted in batch there should be only one request id because there was one request only assertThat(events.stream() .map(event -> event.responseElements().get("x-amz-request-id")) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java index 9c1db9802d0e..249f5e740dba 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/DataFileRecord.java @@ -34,6 +34,7 @@ public class DataFileRecord private final Map lowerBounds; private final Map upperBounds; + @SuppressWarnings("unchecked") public static DataFileRecord toDataFileRecord(MaterializedRow row) { assertEquals(row.getFieldCount(), 14); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java index 057cab8c10e4..683acfbc9aa3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetadataFileOperations.java @@ -140,11 +140,11 @@ public void testCreateTableAsSelect() ImmutableMultiset.builder() .addCopies(new FileOperation(MANIFEST, OUTPUT_FILE_CREATE_OR_OVERWRITE), 1) .addCopies(new FileOperation(MANIFEST, OUTPUT_FILE_LOCATION), 1) - .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(METADATA_JSON, INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(METADATA_JSON, OUTPUT_FILE_CREATE), 2) // TODO (https://github.com/trinodb/trino/issues/15439): it would be good to publish data and stats in one commit .addCopies(new FileOperation(METADATA_JSON, OUTPUT_FILE_LOCATION), 2) - .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 2) - .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_GET_LENGTH), 1) + .addCopies(new FileOperation(SNAPSHOT, INPUT_FILE_NEW_STREAM), 1) .addCopies(new FileOperation(SNAPSHOT, OUTPUT_FILE_CREATE_OR_OVERWRITE), 1) .addCopies(new FileOperation(SNAPSHOT, OUTPUT_FILE_LOCATION), 2) .addCopies(new FileOperation(STATS, OUTPUT_FILE_CREATE), 1) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java index eb392bdd2891..ffa3fd210cb5 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergMetastoreAccessOperations.java @@ -116,7 +116,7 @@ public void testCreateTableAsSelect() ImmutableMultiset.builder() .add(GET_DATABASE) .add(CREATE_TABLE) - .addCopies(GET_TABLE, 5) + .addCopies(GET_TABLE, 4) .add(REPLACE_TABLE) .build()); } diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java index cc0961b86804..09c5a7c40721 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/rest/RestCatalogTestUtils.java @@ -51,7 +51,7 @@ public static Catalog backendCatalog(File warehouseLocation) HdfsContext context = new HdfsContext(connectorSession); JdbcCatalog catalog = new JdbcCatalog(); - catalog.setConf(hdfsEnvironment.getConfiguration(context, new Path(warehouseLocation.getAbsolutePath()))); + catalog.setConf((Object) hdfsEnvironment.getConfiguration(context, new Path(warehouseLocation.getAbsolutePath()))); catalog.initialize("backend_jdbc", properties.buildOrThrow()); return catalog; diff --git a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java index b56069650e10..c0f1994b277f 100644 --- a/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java +++ b/plugin/trino-iceberg/src/test/java/org/apache/iceberg/rest/DelegatingRestSessionCatalog.java @@ -36,7 +36,7 @@ private DelegatingRestSessionCatalog() {} DelegatingRestSessionCatalog(RESTCatalogAdapter adapter, Catalog delegate) { - super(properties -> adapter); + super(properties -> adapter, null); this.adapter = requireNonNull(adapter, "adapter is null"); this.delegate = requireNonNull(delegate, "delegate catalog is null"); } diff --git a/pom.xml b/pom.xml index 9d9972572713..54ca87eb8204 100644 --- a/pom.xml +++ b/pom.xml @@ -67,7 +67,7 @@ 3.3.2 4.14.0 8.4.5 - 1.1.0 + 1.2.1 3.22.2 4.5.0 4.1.79.Final diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java index 41a1503fd694..3c875b08f7a8 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergRest.java @@ -43,7 +43,7 @@ public class EnvSinglenodeSparkIcebergRest private static final int REST_SERVER_PORT = 8181; private static final String SPARK_CONTAINER_NAME = "spark"; private static final String REST_CONTAINER_NAME = "iceberg-with-rest"; - private static final String REST_SERVER_IMAGE = "tabulario/iceberg-rest:0.3.0"; + private static final String REST_SERVER_IMAGE = "tabulario/iceberg-rest:0.4.0"; private static final String CATALOG_WAREHOUSE = "hdfs://hadoop-master:9000/user/hive/warehouse"; private final DockerFiles dockerFiles;