diff --git a/docs/src/main/sphinx/connector/iceberg.rst b/docs/src/main/sphinx/connector/iceberg.rst
index 5d4304786678..12390251a893 100644
--- a/docs/src/main/sphinx/connector/iceberg.rst
+++ b/docs/src/main/sphinx/connector/iceberg.rst
@@ -38,7 +38,7 @@ To use Iceberg, you need:
* Network access from the Trino coordinator and workers to the distributed
object storage.
-* Access to a Hive metastore service (HMS) or AWS Glue.
+* Access to a Hive metastore service (HMS), AWS Glue or a `Nessie server `_.
* Network access from the Trino coordinator to the HMS. Hive
metastore access with the Thrift protocol defaults to using port 9083.
@@ -133,9 +133,9 @@ contents of a Trino catalog file that uses the the Iceberg connector to
configures different Iceberg metadata catalogs.
The connector supports multiple Iceberg catalog types; you may use
-either a Hive metastore service (HMS), AWS Glue, or a REST catalog. The catalog
+either a Hive metastore service (HMS), AWS Glue, a REST catalog, or Nessie. The catalog
type is determined by the ``iceberg.catalog.type`` property. It can be set to
-``HIVE_METASTORE``, ``GLUE``, ``JDBC``, or ``REST``.
+``HIVE_METASTORE``, ``GLUE``, ``JDBC``, ``REST``, or ``NESSIE``.
.. _iceberg-hive-catalog:
@@ -225,6 +225,36 @@ properties:
REST catalog does not support :doc:`views` or
:doc:`materialized views`.
+.. _iceberg-nessie-catalog:
+
+Nessie catalog
+^^^^^^^^^^^^^^
+
+In order to use a Nessie catalog, ensure to configure the catalog type with
+``iceberg.catalog.type=nessie`` and provide further details with the following
+properties:
+
+==================================================== ============================================================
+Property Name Description
+==================================================== ============================================================
+``iceberg.nessie-catalog.uri`` Nessie API endpoint URI (required).
+ Example: ``https://localhost:19120/api/v1``
+
+``iceberg.nessie-catalog.ref`` The branch/tag to use for Nessie, defaults to ``main``.
+
+``iceberg.nessie-catalog.default-warehouse-dir`` Default warehouse directory for schemas created without an
+ explicit ``location`` property.
+ Example: ``/tmp``
+==================================================== ============================================================
+
+.. code-block:: text
+
+ connector.name=iceberg
+ iceberg.catalog.type=nessie
+ iceberg.nessie-catalog.uri=https://localhost:19120/api/v1
+ iceberg.nessie-catalog.default-warehouse-dir=/tmp
+
+
.. _iceberg-jdbc-catalog:
JDBC catalog
diff --git a/plugin/trino-iceberg/pom.xml b/plugin/trino-iceberg/pom.xml
index 5918f565a5a5..0d40e3e7b819 100644
--- a/plugin/trino-iceberg/pom.xml
+++ b/plugin/trino-iceberg/pom.xml
@@ -24,6 +24,8 @@
TODO (https://github.com/trinodb/trino/issues/11294) remove when we upgrade to surefire with https://issues.apache.org/jira/browse/SUREFIRE-1967
-->
instances
+
+ 0.51.1
@@ -216,6 +218,12 @@
iceberg-core
+
+ org.apache.iceberg
+ iceberg-nessie
+ ${dep.iceberg.version}
+
+
org.apache.iceberg
iceberg-orc
@@ -231,6 +239,18 @@
jdbi3-core
+
+ org.projectnessie.nessie
+ nessie-client
+ ${dep.nessie.version}
+
+
+
+ org.projectnessie.nessie
+ nessie-model
+ ${dep.nessie.version}
+
+
org.roaringbitmap
RoaringBitmap
@@ -523,7 +543,6 @@
org.antlr
antlr4-maven-plugin
-
org.basepom.maven
duplicate-finder-maven-plugin
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java
index ed2ffb761051..857aa1a8e6c7 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/CatalogType.java
@@ -20,5 +20,6 @@ public enum CatalogType
GLUE,
REST,
JDBC,
+ NESSIE,
/**/;
}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java
index a2cf660043fc..e9440c4bebdd 100644
--- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/IcebergCatalogModule.java
@@ -22,12 +22,14 @@
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.jdbc.IcebergJdbcCatalogModule;
+import io.trino.plugin.iceberg.catalog.nessie.IcebergNessieCatalogModule;
import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogModule;
import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.trino.plugin.iceberg.CatalogType.GLUE;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.CatalogType.JDBC;
+import static io.trino.plugin.iceberg.CatalogType.NESSIE;
import static io.trino.plugin.iceberg.CatalogType.REST;
import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE;
@@ -42,6 +44,7 @@ protected void setup(Binder binder)
bindCatalogModule(GLUE, new IcebergGlueCatalogModule());
bindCatalogModule(REST, new IcebergRestCatalogModule());
bindCatalogModule(JDBC, new IcebergJdbcCatalogModule());
+ bindCatalogModule(NESSIE, new IcebergNessieCatalogModule());
}
private void bindCatalogModule(CatalogType catalogType, Module module)
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogConfig.java
new file mode 100644
index 000000000000..dff42a527442
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
+import java.net.URI;
+
+public class IcebergNessieCatalogConfig
+{
+ private String defaultReferenceName = "main";
+ private String defaultWarehouseDir;
+ private URI serverUri;
+
+ @NotNull
+ public String getDefaultReferenceName()
+ {
+ return defaultReferenceName;
+ }
+
+ @Config("iceberg.nessie-catalog.ref")
+ @ConfigDescription("The default Nessie reference to work on")
+ public IcebergNessieCatalogConfig setDefaultReferenceName(String defaultReferenceName)
+ {
+ this.defaultReferenceName = defaultReferenceName;
+ return this;
+ }
+
+ @NotNull
+ public URI getServerUri()
+ {
+ return serverUri;
+ }
+
+ @Config("iceberg.nessie-catalog.uri")
+ @ConfigDescription("The URI to connect to the Nessie server")
+ public IcebergNessieCatalogConfig setServerUri(URI serverUri)
+ {
+ this.serverUri = serverUri;
+ return this;
+ }
+
+ @NotEmpty
+ public String getDefaultWarehouseDir()
+ {
+ return defaultWarehouseDir;
+ }
+
+ @Config("iceberg.nessie-catalog.default-warehouse-dir")
+ @ConfigDescription("The default warehouse to use for Nessie")
+ public IcebergNessieCatalogConfig setDefaultWarehouseDir(String defaultWarehouseDir)
+ {
+ this.defaultWarehouseDir = defaultWarehouseDir;
+ return this;
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java
new file mode 100644
index 000000000000..f0e5de3ec30a
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieCatalogModule.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
+import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
+import org.apache.iceberg.nessie.NessieIcebergClient;
+import org.projectnessie.client.api.NessieApiV1;
+import org.projectnessie.client.http.HttpClientBuilder;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static org.weakref.jmx.guice.ExportBinder.newExporter;
+
+public class IcebergNessieCatalogModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ configBinder(binder).bindConfig(IcebergNessieCatalogConfig.class);
+ binder.bind(IcebergTableOperationsProvider.class).to(IcebergNessieTableOperationsProvider.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName();
+ binder.bind(TrinoCatalogFactory.class).to(TrinoNessieCatalogFactory.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();
+ }
+
+ @Provides
+ @Singleton
+ public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig icebergNessieCatalogConfig)
+ {
+ return new NessieIcebergClient(
+ HttpClientBuilder.builder()
+ .withUri(icebergNessieCatalogConfig.getServerUri())
+ .build(NessieApiV1.class),
+ icebergNessieCatalogConfig.getDefaultReferenceName(),
+ null,
+ ImmutableMap.of());
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java
new file mode 100644
index 000000000000..75be83f8293d
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperations.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.connector.TableNotFoundException;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.nessie.NessieIcebergClient;
+import org.projectnessie.error.NessieConflictException;
+import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.model.ContentKey;
+import org.projectnessie.model.IcebergTable;
+import org.projectnessie.model.Namespace;
+
+import java.util.Optional;
+
+import static com.google.common.base.Verify.verify;
+import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
+import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
+import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class IcebergNessieTableOperations
+ extends AbstractIcebergTableOperations
+{
+ private final NessieIcebergClient nessieClient;
+ private IcebergTable table;
+
+ protected IcebergNessieTableOperations(
+ NessieIcebergClient nessieClient,
+ FileIO fileIo,
+ ConnectorSession session,
+ String database,
+ String table,
+ Optional owner,
+ Optional location)
+ {
+ super(fileIo, session, database, table, owner, location);
+ this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
+ }
+
+ @Override
+ public TableMetadata refresh()
+ {
+ refreshNessieClient();
+ return super.refresh();
+ }
+
+ private void refreshNessieClient()
+ {
+ try {
+ nessieClient.refresh();
+ }
+ catch (NessieNotFoundException e) {
+ throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to refresh as ref '%s' is no longer valid.", nessieClient.refName()), e);
+ }
+ }
+
+ @Override
+ public TableMetadata refresh(boolean invalidateCaches)
+ {
+ refreshNessieClient();
+ return super.refresh(invalidateCaches);
+ }
+
+ @Override
+ protected String getRefreshedLocation(boolean invalidateCaches)
+ {
+ table = nessieClient.table(toIdentifier(new SchemaTableName(database, tableName)));
+
+ if (table == null) {
+ throw new TableNotFoundException(getSchemaTableName());
+ }
+
+ return table.getMetadataLocation();
+ }
+
+ @Override
+ protected void commitNewTable(TableMetadata metadata)
+ {
+ verify(version.isEmpty(), "commitNewTable called on a table which already exists");
+ try {
+ nessieClient.commitTable(null, metadata, writeNewMetadata(metadata, 0), table, toKey(new SchemaTableName(database, this.tableName)));
+ }
+ catch (NessieNotFoundException e) {
+ throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
+ }
+ catch (NessieConflictException e) {
+ // CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry
+ throw new CommitFailedException(e, "Cannot commit: ref hash is out of date. Update the ref '%s' and try again", nessieClient.refName());
+ }
+ shouldRefresh = true;
+ }
+
+ @Override
+ protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
+ {
+ verify(version.orElseThrow() >= 0, "commitToExistingTable called on a new table");
+ try {
+ nessieClient.commitTable(base, metadata, writeNewMetadata(metadata, version.getAsInt() + 1), table, toKey(new SchemaTableName(database, this.tableName)));
+ }
+ catch (NessieNotFoundException e) {
+ throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
+ }
+ catch (NessieConflictException e) {
+ // CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry
+ throw new CommitFailedException(e, "Cannot commit: ref hash is out of date. Update the ref '%s' and try again", nessieClient.refName());
+ }
+ shouldRefresh = true;
+ }
+
+ private static ContentKey toKey(SchemaTableName tableName)
+ {
+ return ContentKey.of(Namespace.parse(tableName.getSchemaName()), tableName.getTableName());
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperationsProvider.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperationsProvider.java
new file mode 100644
index 000000000000..e5da8b25a514
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieTableOperationsProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.plugin.iceberg.catalog.IcebergTableOperations;
+import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
+import io.trino.plugin.iceberg.catalog.TrinoCatalog;
+import io.trino.plugin.iceberg.fileio.ForwardingFileIo;
+import io.trino.spi.connector.ConnectorSession;
+import org.apache.iceberg.nessie.NessieIcebergClient;
+
+import javax.inject.Inject;
+
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+
+public class IcebergNessieTableOperationsProvider
+ implements IcebergTableOperationsProvider
+{
+ private final TrinoFileSystemFactory fileSystemFactory;
+ private final NessieIcebergClient nessieClient;
+
+ @Inject
+ public IcebergNessieTableOperationsProvider(TrinoFileSystemFactory fileSystemFactory, NessieIcebergClient nessieClient)
+ {
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
+ }
+
+ @Override
+ public IcebergTableOperations createTableOperations(
+ TrinoCatalog catalog,
+ ConnectorSession session,
+ String database,
+ String table,
+ Optional owner,
+ Optional location)
+ {
+ return new IcebergNessieTableOperations(
+ nessieClient,
+ new ForwardingFileIo(fileSystemFactory.create(session)),
+ session,
+ database,
+ table,
+ owner,
+ location);
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieUtil.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieUtil.java
new file mode 100644
index 000000000000..c39f5de74fde
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/IcebergNessieUtil.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import io.trino.spi.connector.SchemaTableName;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+final class IcebergNessieUtil
+{
+ private IcebergNessieUtil() {}
+
+ static TableIdentifier toIdentifier(SchemaTableName schemaTableName)
+ {
+ return TableIdentifier.of(schemaTableName.getSchemaName(), schemaTableName.getTableName());
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java
new file mode 100644
index 000000000000..e3889c119ea5
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalog.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.plugin.base.CatalogName;
+import io.trino.plugin.iceberg.catalog.AbstractTrinoCatalog;
+import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.CatalogSchemaTableName;
+import io.trino.spi.connector.ConnectorMaterializedViewDefinition;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorViewDefinition;
+import io.trino.spi.connector.SchemaNotFoundException;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.type.TypeManager;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.nessie.NessieIcebergClient;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.filesystem.Locations.appendPath;
+import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
+import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
+import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
+import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
+import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.connector.SchemaTableName.schemaTableName;
+import static java.util.Objects.requireNonNull;
+
+public class TrinoNessieCatalog
+ extends AbstractTrinoCatalog
+{
+ private final String warehouseLocation;
+ private final NessieIcebergClient nessieClient;
+ private final Map tableMetadataCache = new ConcurrentHashMap<>();
+ private final TrinoFileSystemFactory fileSystemFactory;
+
+ public TrinoNessieCatalog(
+ CatalogName catalogName,
+ TypeManager typeManager,
+ TrinoFileSystemFactory fileSystemFactory,
+ IcebergTableOperationsProvider tableOperationsProvider,
+ NessieIcebergClient nessieClient,
+ String warehouseLocation,
+ boolean useUniqueTableLocation)
+ {
+ super(catalogName, typeManager, tableOperationsProvider, useUniqueTableLocation);
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.warehouseLocation = requireNonNull(warehouseLocation, "warehouseLocation is null");
+ this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
+ }
+
+ @Override
+ public boolean namespaceExists(ConnectorSession session, String namespace)
+ {
+ try {
+ return nessieClient.loadNamespaceMetadata(Namespace.of(namespace)) != null;
+ }
+ catch (Exception e) {
+ return false;
+ }
+ }
+
+ @Override
+ public List listNamespaces(ConnectorSession session)
+ {
+ return nessieClient.listNamespaces(Namespace.empty()).stream()
+ .map(Namespace::toString)
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public void dropNamespace(ConnectorSession session, String namespace)
+ {
+ nessieClient.dropNamespace(Namespace.of(namespace));
+ }
+
+ @Override
+ public Map loadNamespaceMetadata(ConnectorSession session, String namespace)
+ {
+ try {
+ return ImmutableMap.copyOf(nessieClient.loadNamespaceMetadata(Namespace.of(namespace)));
+ }
+ catch (NoSuchNamespaceException e) {
+ throw new SchemaNotFoundException(namespace);
+ }
+ }
+
+ @Override
+ public Optional getNamespacePrincipal(ConnectorSession session, String namespace)
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ public void createNamespace(ConnectorSession session, String namespace, Map properties, TrinoPrincipal owner)
+ {
+ nessieClient.createNamespace(Namespace.of(namespace), Maps.transformValues(properties, property -> {
+ if (property instanceof String stringProperty) {
+ return stringProperty;
+ }
+ throw new TrinoException(NOT_SUPPORTED, "Non-string properties are not support for Iceberg Nessie catalogs");
+ }));
+ }
+
+ @Override
+ public void setNamespacePrincipal(ConnectorSession session, String namespace, TrinoPrincipal principal)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "setNamespacePrincipal is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void renameNamespace(ConnectorSession session, String source, String target)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "renameNamespace is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional namespace)
+ {
+ return nessieClient.listTables(namespace.isEmpty() ? Namespace.empty() : Namespace.of(namespace.get()))
+ .stream()
+ .map(id -> schemaTableName(id.namespace().toString(), id.name()))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public Table loadTable(ConnectorSession session, SchemaTableName table)
+ {
+ TableMetadata metadata = tableMetadataCache.computeIfAbsent(
+ table,
+ ignore -> {
+ TableOperations operations = tableOperationsProvider.createTableOperations(
+ this,
+ session,
+ table.getSchemaName(),
+ table.getTableName(),
+ Optional.empty(),
+ Optional.empty());
+ return new BaseTable(operations, quotedTableName(table)).operations().current();
+ });
+
+ return getIcebergTableWithMetadata(
+ this,
+ tableOperationsProvider,
+ session,
+ table,
+ metadata);
+ }
+
+ @Override
+ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ BaseTable table = (BaseTable) loadTable(session, schemaTableName);
+ validateTableCanBeDropped(table);
+ nessieClient.dropTable(toIdentifier(schemaTableName), true);
+ deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
+ }
+
+ @Override
+ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "Cannot drop corrupted table %s from Iceberg Nessie catalog".formatted(schemaTableName));
+ }
+
+ @Override
+ public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to)
+ {
+ nessieClient.renameTable(toIdentifier(from), toIdentifier(to));
+ }
+
+ @Override
+ public Transaction newCreateTableTransaction(
+ ConnectorSession session,
+ SchemaTableName schemaTableName,
+ Schema schema,
+ PartitionSpec partitionSpec,
+ SortOrder sortOrder,
+ String location,
+ Map properties)
+ {
+ return newCreateTableTransaction(
+ session,
+ schemaTableName,
+ schema,
+ partitionSpec,
+ sortOrder,
+ location,
+ properties,
+ Optional.of(session.getUser()));
+ }
+
+ @Override
+ public void registerTable(ConnectorSession session, SchemaTableName tableName, String tableLocation, String metadataLocation)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "registerTable is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "unregisterTable is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public String defaultTableLocation(ConnectorSession session, SchemaTableName schemaTableName)
+ {
+ Optional databaseLocation = Optional.empty();
+ if (namespaceExists(session, schemaTableName.getSchemaName())) {
+ databaseLocation = Optional.ofNullable((String) loadNamespaceMetadata(session, schemaTableName.getSchemaName()).get(LOCATION_PROPERTY));
+ }
+
+ String schemaLocation = databaseLocation.orElseGet(() ->
+ appendPath(warehouseLocation, schemaTableName.getSchemaName()));
+
+ return appendPath(schemaLocation, createNewTableName(schemaTableName.getTableName()));
+ }
+
+ @Override
+ public void setTablePrincipal(ConnectorSession session, SchemaTableName schemaTableName, TrinoPrincipal principal)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "setTablePrincipal is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void createView(ConnectorSession session, SchemaTableName schemaViewName, ConnectorViewDefinition definition, boolean replace)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "createView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "renameView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName,
+ Optional comment)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "updateViewComment is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional comment)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "updateViewColumnComment is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaViewName, TrinoPrincipal principal)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "setViewPrincipal is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void dropView(ConnectorSession session, SchemaTableName schemaViewName)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "dropView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public List listViews(ConnectorSession session, Optional namespace)
+ {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public Map getViews(ConnectorSession session, Optional namespace)
+ {
+ return ImmutableMap.of();
+ }
+
+ @Override
+ public Optional getView(ConnectorSession session, SchemaTableName viewIdentifier)
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ public List listMaterializedViews(ConnectorSession session, Optional namespace)
+ {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public void createMaterializedView(
+ ConnectorSession session,
+ SchemaTableName schemaViewName,
+ ConnectorMaterializedViewDefinition definition,
+ boolean replace,
+ boolean ignoreExisting)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "createMaterializedView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void dropMaterializedView(ConnectorSession session, SchemaTableName schemaViewName)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "dropMaterializedView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public Optional getMaterializedView(ConnectorSession session, SchemaTableName schemaViewName)
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ protected Optional doGetMaterializedView(ConnectorSession session, SchemaTableName schemaViewName)
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ public void renameMaterializedView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "renameMaterializedView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public Optional redirectTable(ConnectorSession session, SchemaTableName tableName)
+ {
+ return Optional.empty();
+ }
+}
diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java
new file mode 100644
index 000000000000..8bb3377794e1
--- /dev/null
+++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/nessie/TrinoNessieCatalogFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.plugin.base.CatalogName;
+import io.trino.plugin.iceberg.IcebergConfig;
+import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
+import io.trino.plugin.iceberg.catalog.TrinoCatalog;
+import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
+import io.trino.spi.security.ConnectorIdentity;
+import io.trino.spi.type.TypeManager;
+import org.apache.iceberg.nessie.NessieIcebergClient;
+
+import javax.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+public class TrinoNessieCatalogFactory
+ implements TrinoCatalogFactory
+{
+ private final IcebergTableOperationsProvider tableOperationsProvider;
+ private final String warehouseLocation;
+ private final NessieIcebergClient nessieClient;
+ private final boolean isUniqueTableLocation;
+ private final CatalogName catalogName;
+ private final TypeManager typeManager;
+ private final TrinoFileSystemFactory fileSystemFactory;
+
+ @Inject
+ public TrinoNessieCatalogFactory(
+ CatalogName catalogName,
+ TypeManager typeManager,
+ TrinoFileSystemFactory fileSystemFactory,
+ IcebergTableOperationsProvider tableOperationsProvider,
+ NessieIcebergClient nessieClient,
+ IcebergNessieCatalogConfig icebergNessieCatalogConfig,
+ IcebergConfig icebergConfig)
+ {
+ this.catalogName = requireNonNull(catalogName, "catalogName is null");
+ this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
+ this.typeManager = requireNonNull(typeManager, "typeManager is null");
+ this.tableOperationsProvider = requireNonNull(tableOperationsProvider, "tableOperationsProvider is null");
+ this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
+ this.warehouseLocation = icebergNessieCatalogConfig.getDefaultWarehouseDir();
+ this.isUniqueTableLocation = icebergConfig.isUniqueTableLocation();
+ }
+
+ @Override
+ public TrinoCatalog create(ConnectorIdentity identity)
+ {
+ return new TrinoNessieCatalog(catalogName, typeManager, fileSystemFactory, tableOperationsProvider, nessieClient, warehouseLocation, isUniqueTableLocation);
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java
index f048fd8f787c..8395130d2174 100644
--- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergPlugin.java
@@ -143,6 +143,16 @@ public void testRecordingMetastore()
"hive.metastore-recording-path", "/tmp"),
new TestingConnectorContext()))
.hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used");
+
+ // recording with nessie
+ assertThatThrownBy(() -> factory.create(
+ "test",
+ Map.of(
+ "iceberg.catalog.type", "nessie",
+ "hive.metastore.nessie.region", "us-east-2",
+ "hive.metastore-recording-path", "/tmp"),
+ new TestingConnectorContext()))
+ .hasMessageContaining("Configuration property 'hive.metastore-recording-path' was not used");
}
@Test
@@ -263,6 +273,21 @@ public void testJdbcCatalog()
.shutdown();
}
+ @Test
+ public void testNessieCatalog()
+ {
+ ConnectorFactory factory = getConnectorFactory();
+
+ factory.create(
+ "test",
+ Map.of(
+ "iceberg.catalog.type", "nessie",
+ "iceberg.nessie-catalog.default-warehouse-dir", "/tmp",
+ "iceberg.nessie-catalog.uri", "http://foo:1234"),
+ new TestingConnectorContext())
+ .shutdown();
+ }
+
private static ConnectorFactory getConnectorFactory()
{
return getOnlyElement(new IcebergPlugin().getConnectorFactories());
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConfig.java
new file mode 100644
index 000000000000..a247b5bfdcd0
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConfig.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.Test;
+
+import java.net.URI;
+import java.util.Map;
+
+import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
+import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
+import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
+
+public class TestIcebergNessieCatalogConfig
+{
+ @Test
+ public void testDefaults()
+ {
+ assertRecordedDefaults(recordDefaults(IcebergNessieCatalogConfig.class)
+ .setDefaultWarehouseDir(null)
+ .setServerUri(null)
+ .setDefaultReferenceName("main"));
+ }
+
+ @Test
+ public void testExplicitPropertyMapping()
+ {
+ Map properties = ImmutableMap.builder()
+ .put("iceberg.nessie-catalog.default-warehouse-dir", "/tmp")
+ .put("iceberg.nessie-catalog.uri", "http://localhost:xxx/api/v1")
+ .put("iceberg.nessie-catalog.ref", "someRef")
+ .buildOrThrow();
+
+ IcebergNessieCatalogConfig expected = new IcebergNessieCatalogConfig()
+ .setDefaultWarehouseDir("/tmp")
+ .setServerUri(URI.create("http://localhost:xxx/api/v1"))
+ .setDefaultReferenceName("someRef");
+
+ assertFullMapping(properties, expected);
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java
new file mode 100644
index 000000000000..c05ce302798f
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestIcebergNessieCatalogConnectorSmokeTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.Location;
+import io.trino.plugin.iceberg.BaseIcebergConnectorSmokeTest;
+import io.trino.plugin.iceberg.IcebergConfig;
+import io.trino.plugin.iceberg.IcebergQueryRunner;
+import io.trino.plugin.iceberg.SchemaInitializer;
+import io.trino.plugin.iceberg.containers.NessieContainer;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
+import io.trino.tpch.TpchTable;
+import org.testng.SkipException;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Optional;
+
+import static com.google.common.io.MoreFiles.deleteRecursively;
+import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE;
+import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_FACTORY;
+import static io.trino.plugin.iceberg.IcebergTestUtils.checkOrcFileSorting;
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class TestIcebergNessieCatalogConnectorSmokeTest
+ extends BaseIcebergConnectorSmokeTest
+{
+ private static NessieContainer nessieContainer;
+ private static Path tempDir;
+
+ public TestIcebergNessieCatalogConnectorSmokeTest()
+ {
+ super(new IcebergConfig().getFileFormat().toIceberg());
+ }
+
+ @BeforeClass
+ @Override
+ public void init()
+ throws Exception
+ {
+ nessieContainer = NessieContainer.builder().build();
+ nessieContainer.start();
+ tempDir = Files.createTempDirectory("test_trino_nessie_catalog");
+ super.init();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void teardown()
+ throws IOException
+ {
+ deleteRecursively(tempDir, ALLOW_INSECURE);
+ if (nessieContainer != null) {
+ nessieContainer.close();
+ }
+ }
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ return IcebergQueryRunner.builder()
+ .setBaseDataDir(Optional.of(tempDir))
+ .setIcebergProperties(
+ ImmutableMap.of(
+ "iceberg.file-format", format.name(),
+ "iceberg.catalog.type", "nessie",
+ "iceberg.nessie-catalog.uri", nessieContainer.getRestApiUri(),
+ "iceberg.nessie-catalog.default-warehouse-dir", tempDir.toString(),
+ "iceberg.writer-sort-buffer-size", "1MB"))
+ .setSchemaInitializer(
+ SchemaInitializer.builder()
+ .withClonedTpchTables(ImmutableList.>builder()
+ .addAll(REQUIRED_TPCH_TABLES)
+ .build())
+ .build())
+ .build();
+ }
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior)
+ {
+ return switch (connectorBehavior) {
+ case SUPPORTS_CREATE_VIEW, SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_RENAME_SCHEMA -> false;
+ default -> super.hasBehavior(connectorBehavior);
+ };
+ }
+
+ @Override
+ public void testView()
+ {
+ assertThatThrownBy(super::testView)
+ .hasStackTraceContaining("createView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testMaterializedView()
+ {
+ assertThatThrownBy(super::testMaterializedView)
+ .hasStackTraceContaining("createMaterializedView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testRenameSchema()
+ {
+ assertThatThrownBy(super::testRenameSchema)
+ .hasStackTraceContaining("renameNamespace is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testDeleteRowsConcurrently()
+ {
+ throw new SkipException("skipped for now due to flakiness");
+ }
+
+ @Override
+ protected void dropTableFromMetastore(String tableName)
+ {
+ // used when registering a table, which is not supported by the Nessie catalog
+ }
+
+ @Override
+ protected String getMetadataLocation(String tableName)
+ {
+ // used when registering a table, which is not supported by the Nessie catalog
+ throw new UnsupportedOperationException("metadata location for register_table is not supported");
+ }
+
+ @Override
+ public void testRegisterTableWithTableLocation()
+ {
+ assertThatThrownBy(super::testRegisterTableWithTableLocation)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testRegisterTableWithComments()
+ {
+ assertThatThrownBy(super::testRegisterTableWithComments)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testRegisterTableWithShowCreateTable()
+ {
+ assertThatThrownBy(super::testRegisterTableWithShowCreateTable)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testRegisterTableWithReInsert()
+ {
+ assertThatThrownBy(super::testRegisterTableWithReInsert)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testRegisterTableWithDroppedTable()
+ {
+ assertThatThrownBy(super::testRegisterTableWithDroppedTable)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testRegisterTableWithDifferentTableName()
+ {
+ assertThatThrownBy(super::testRegisterTableWithDifferentTableName)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testRegisterTableWithMetadataFile()
+ {
+ assertThatThrownBy(super::testRegisterTableWithMetadataFile)
+ .hasMessageContaining("metadata location for register_table is not supported");
+ }
+
+ @Override
+ public void testRegisterTableWithTrailingSpaceInLocation()
+ {
+ assertThatThrownBy(super::testRegisterTableWithTrailingSpaceInLocation)
+ .hasMessageContaining("register_table procedure is disabled");
+ }
+
+ @Override
+ public void testUnregisterTable()
+ {
+ assertThatThrownBy(super::testUnregisterTable)
+ .hasStackTraceContaining("unregisterTable is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testUnregisterBrokenTable()
+ {
+ assertThatThrownBy(super::testUnregisterBrokenTable)
+ .hasStackTraceContaining("unregisterTable is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testUnregisterTableNotExistingTable()
+ {
+ assertThatThrownBy(super::testUnregisterTableNotExistingTable)
+ .hasStackTraceContaining("unregisterTable is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testRepeatUnregisterTable()
+ {
+ assertThatThrownBy(super::testRepeatUnregisterTable)
+ .hasStackTraceContaining("unregisterTable is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Override
+ public void testDropTableWithMissingMetadataFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingMetadataFile)
+ .hasMessageMatching("metadata location for register_table is not supported");
+ }
+
+ @Override
+ public void testDropTableWithMissingSnapshotFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingSnapshotFile)
+ .hasMessageMatching("metadata location for register_table is not supported");
+ }
+
+ @Override
+ public void testDropTableWithMissingManifestListFile()
+ {
+ assertThatThrownBy(super::testDropTableWithMissingManifestListFile)
+ .hasMessageContaining("metadata location for register_table is not supported");
+ }
+
+ @Override
+ public void testDropTableWithNonExistentTableLocation()
+ {
+ assertThatThrownBy(super::testDropTableWithNonExistentTableLocation)
+ .hasMessageMatching("Cannot drop corrupted table (.*)");
+ }
+
+ @Override
+ protected boolean isFileSorted(Location path, String sortColumnName)
+ {
+ return checkOrcFileSorting(HDFS_FILE_SYSTEM_FACTORY, path, sortColumnName);
+ }
+
+ @Override
+ protected void deleteDirectory(String location)
+ {
+ // used when unregistering a table, which is not supported by the Nessie catalog
+ }
+
+ @Override
+ protected String schemaPath()
+ {
+ return format("%s/%s", tempDir, getSession().getSchema().orElseThrow());
+ }
+
+ @Override
+ protected boolean locationExists(String location)
+ {
+ return Files.exists(Path.of(location));
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java
new file mode 100644
index 000000000000..00a1119840f4
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/catalog/nessie/TestTrinoNessieCatalog.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.catalog.nessie;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.filesystem.TrinoFileSystemFactory;
+import io.trino.filesystem.hdfs.HdfsFileSystemFactory;
+import io.trino.plugin.base.CatalogName;
+import io.trino.plugin.hive.NodeVersion;
+import io.trino.plugin.iceberg.CommitTaskData;
+import io.trino.plugin.iceberg.IcebergMetadata;
+import io.trino.plugin.iceberg.TableStatisticsWriter;
+import io.trino.plugin.iceberg.catalog.BaseTrinoCatalogTest;
+import io.trino.plugin.iceberg.catalog.TrinoCatalog;
+import io.trino.plugin.iceberg.containers.NessieContainer;
+import io.trino.spi.connector.ConnectorMetadata;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.security.PrincipalType;
+import io.trino.spi.security.TrinoPrincipal;
+import io.trino.spi.type.TestingTypeManager;
+import org.apache.iceberg.nessie.NessieIcebergClient;
+import org.projectnessie.client.api.NessieApiV1;
+import org.projectnessie.client.http.HttpClientBuilder;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Path;
+import java.util.Map;
+
+import static io.airlift.json.JsonCodec.jsonCodec;
+import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT;
+import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS;
+import static io.trino.sql.planner.TestingPlannerContext.PLANNER_CONTEXT;
+import static io.trino.testing.TestingConnectorSession.SESSION;
+import static io.trino.testing.TestingNames.randomNameSuffix;
+import static java.nio.file.Files.createTempDirectory;
+import static java.util.Locale.ENGLISH;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.fail;
+
+public class TestTrinoNessieCatalog
+ extends BaseTrinoCatalogTest
+{
+ private NessieContainer nessieContainer;
+
+ @BeforeClass
+ public void setupServer()
+ {
+ nessieContainer = NessieContainer.builder().build();
+ nessieContainer.start();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void teardownServer()
+ {
+ if (nessieContainer != null) {
+ nessieContainer.close();
+ }
+ }
+
+ @Override
+ protected TrinoCatalog createTrinoCatalog(boolean useUniqueTableLocations)
+ {
+ Path tmpDirectory = null;
+ try {
+ tmpDirectory = createTempDirectory("test_nessie_catalog_warehouse_dir_");
+ }
+ catch (IOException e) {
+ fail(e.getMessage());
+ }
+ TrinoFileSystemFactory fileSystemFactory = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS);
+ IcebergNessieCatalogConfig icebergNessieCatalogConfig = new IcebergNessieCatalogConfig()
+ .setServerUri(URI.create(nessieContainer.getRestApiUri()));
+ NessieApiV1 nessieApi = HttpClientBuilder.builder()
+ .withUri(nessieContainer.getRestApiUri())
+ .build(NessieApiV1.class);
+ NessieIcebergClient nessieClient = new NessieIcebergClient(nessieApi, icebergNessieCatalogConfig.getDefaultReferenceName(), null, ImmutableMap.of());
+ return new TrinoNessieCatalog(
+ new CatalogName("catalog_name"),
+ new TestingTypeManager(),
+ fileSystemFactory,
+ new IcebergNessieTableOperationsProvider(fileSystemFactory, nessieClient),
+ nessieClient,
+ tmpDirectory.toAbsolutePath().toString(),
+ useUniqueTableLocations);
+ }
+
+ @Test
+ public void testDefaultLocation()
+ throws IOException
+ {
+ Path tmpDirectory = createTempDirectory("test_nessie_catalog_default_location_");
+ tmpDirectory.toFile().deleteOnExit();
+ TrinoFileSystemFactory fileSystemFactory = new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS);
+ IcebergNessieCatalogConfig icebergNessieCatalogConfig = new IcebergNessieCatalogConfig()
+ .setDefaultWarehouseDir(tmpDirectory.toAbsolutePath().toString())
+ .setServerUri(URI.create(nessieContainer.getRestApiUri()));
+ NessieApiV1 nessieApi = HttpClientBuilder.builder()
+ .withUri(nessieContainer.getRestApiUri())
+ .build(NessieApiV1.class);
+ NessieIcebergClient nessieClient = new NessieIcebergClient(nessieApi, icebergNessieCatalogConfig.getDefaultReferenceName(), null, ImmutableMap.of());
+ TrinoCatalog catalogWithDefaultLocation = new TrinoNessieCatalog(
+ new CatalogName("catalog_name"),
+ new TestingTypeManager(),
+ fileSystemFactory,
+ new IcebergNessieTableOperationsProvider(fileSystemFactory, nessieClient),
+ nessieClient,
+ icebergNessieCatalogConfig.getDefaultWarehouseDir(),
+ false);
+
+ String namespace = "test_default_location_" + randomNameSuffix();
+ String table = "tableName";
+ SchemaTableName schemaTableName = new SchemaTableName(namespace, table);
+ catalogWithDefaultLocation.createNamespace(SESSION, namespace, ImmutableMap.of(),
+ new TrinoPrincipal(PrincipalType.USER, SESSION.getUser()));
+ try {
+ File expectedSchemaDirectory = new File(tmpDirectory.toFile(), namespace);
+ File expectedTableDirectory = new File(expectedSchemaDirectory, schemaTableName.getTableName());
+ assertThat(catalogWithDefaultLocation.defaultTableLocation(SESSION, schemaTableName))
+ .isEqualTo(expectedTableDirectory.toPath().toAbsolutePath().toString());
+ }
+ finally {
+ catalogWithDefaultLocation.dropNamespace(SESSION, namespace);
+ }
+ }
+
+ @Test
+ @Override
+ public void testView()
+ {
+ assertThatThrownBy(super::testView)
+ .hasMessageContaining("createView is not supported for Iceberg Nessie catalogs");
+ }
+
+ @Test
+ @Override
+ public void testNonLowercaseNamespace()
+ {
+ TrinoCatalog catalog = createTrinoCatalog(false);
+
+ String namespace = "testNonLowercaseNamespace" + randomNameSuffix();
+ String schema = namespace.toLowerCase(ENGLISH);
+
+ // Currently this is actually stored in lowercase by all Catalogs
+ catalog.createNamespace(SESSION, namespace, Map.of(), new TrinoPrincipal(PrincipalType.USER, SESSION.getUser()));
+ try {
+ assertThat(catalog.namespaceExists(SESSION, namespace)).as("catalog.namespaceExists(namespace)")
+ .isTrue();
+ assertThat(catalog.namespaceExists(SESSION, schema)).as("catalog.namespaceExists(schema)")
+ .isFalse();
+ assertThat(catalog.listNamespaces(SESSION)).as("catalog.listNamespaces")
+ // Catalog listNamespaces may be used as a default implementation for ConnectorMetadata.schemaExists
+ .doesNotContain(schema)
+ .contains(namespace);
+
+ // Test with IcebergMetadata, should the ConnectorMetadata implementation behavior depend on that class
+ ConnectorMetadata icebergMetadata = new IcebergMetadata(
+ PLANNER_CONTEXT.getTypeManager(),
+ jsonCodec(CommitTaskData.class),
+ catalog,
+ connectorIdentity -> {
+ throw new UnsupportedOperationException();
+ },
+ new TableStatisticsWriter(new NodeVersion("test-version")));
+ assertThat(icebergMetadata.schemaExists(SESSION, namespace)).as("icebergMetadata.schemaExists(namespace)")
+ .isTrue();
+ assertThat(icebergMetadata.schemaExists(SESSION, schema)).as("icebergMetadata.schemaExists(schema)")
+ .isFalse();
+ assertThat(icebergMetadata.listSchemaNames(SESSION)).as("icebergMetadata.listSchemaNames")
+ .doesNotContain(schema)
+ .contains(namespace);
+ }
+ finally {
+ catalog.dropNamespace(SESSION, namespace);
+ }
+ }
+}
diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/containers/NessieContainer.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/containers/NessieContainer.java
new file mode 100644
index 000000000000..95dcb11f061a
--- /dev/null
+++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/containers/NessieContainer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.iceberg.containers;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.airlift.log.Logger;
+import io.trino.testing.containers.BaseTestContainer;
+import org.testcontainers.containers.Network;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public class NessieContainer
+ extends BaseTestContainer
+{
+ private static final Logger log = Logger.get(NessieContainer.class);
+
+ public static final String DEFAULT_IMAGE = "projectnessie/nessie:0.51.1";
+ public static final String DEFAULT_HOST_NAME = "nessie";
+ public static final String VERSION_STORE_TYPE = "INMEMORY";
+
+ public static final int PORT = 19121;
+
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ private NessieContainer(String image, String hostName, Set exposePorts, Map filesToMount, Map envVars, Optional network, int retryLimit)
+ {
+ super(image, hostName, exposePorts, filesToMount, envVars, network, retryLimit);
+ }
+
+ @Override
+ public void start()
+ {
+ super.start();
+ log.info("Nessie server container started with address for REST API: %s", getRestApiUri());
+ }
+
+ public String getRestApiUri()
+ {
+ return "http://" + getMappedHostAndPortForExposedPort(PORT) + "/api/v1";
+ }
+
+ public static class Builder
+ extends BaseTestContainer.Builder
+ {
+ private Builder()
+ {
+ this.image = DEFAULT_IMAGE;
+ this.hostName = DEFAULT_HOST_NAME;
+ this.exposePorts = ImmutableSet.of(PORT);
+ this.envVars = ImmutableMap.of("QUARKUS_HTTP_PORT", String.valueOf(PORT), "NESSIE_VERSION_STORE_TYPE", VERSION_STORE_TYPE);
+ }
+
+ @Override
+ public NessieContainer build()
+ {
+ return new NessieContainer(image, hostName, exposePorts, filesToMount, envVars, network, startupRetryLimit);
+ }
+ }
+}
diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergNessie.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergNessie.java
new file mode 100644
index 000000000000..521612a57e58
--- /dev/null
+++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvSinglenodeSparkIcebergNessie.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.tests.product.launcher.env.environment;
+
+import com.google.common.collect.ImmutableList;
+import io.trino.tests.product.launcher.docker.DockerFiles;
+import io.trino.tests.product.launcher.env.DockerContainer;
+import io.trino.tests.product.launcher.env.Environment;
+import io.trino.tests.product.launcher.env.EnvironmentConfig;
+import io.trino.tests.product.launcher.env.EnvironmentProvider;
+import io.trino.tests.product.launcher.env.common.Hadoop;
+import io.trino.tests.product.launcher.env.common.Standard;
+import io.trino.tests.product.launcher.env.common.TestsEnvironment;
+import io.trino.tests.product.launcher.testcontainers.PortBinder;
+import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy;
+
+import javax.inject.Inject;
+
+import static io.trino.tests.product.launcher.docker.ContainerUtil.forSelectedPorts;
+import static io.trino.tests.product.launcher.env.EnvironmentContainers.HADOOP;
+import static java.util.Objects.requireNonNull;
+import static org.testcontainers.utility.MountableFile.forHostPath;
+
+@TestsEnvironment
+public class EnvSinglenodeSparkIcebergNessie
+ extends EnvironmentProvider
+{
+ private static final int SPARK_THRIFT_PORT = 10213;
+ private static final int NESSIE_PORT = 19120;
+ private static final String NESSIE_VERSION = "0.51.1";
+ private static final String SPARK = "spark";
+
+ private final DockerFiles dockerFiles;
+ private final PortBinder portBinder;
+ private final String hadoopImagesVersion;
+
+ @Inject
+ public EnvSinglenodeSparkIcebergNessie(Standard standard, Hadoop hadoop, DockerFiles dockerFiles, EnvironmentConfig config, PortBinder portBinder)
+ {
+ super(ImmutableList.of(standard, hadoop));
+ this.dockerFiles = requireNonNull(dockerFiles, "dockerFiles is null");
+ this.portBinder = requireNonNull(portBinder, "portBinder is null");
+ this.hadoopImagesVersion = requireNonNull(config, "config is null").getHadoopImagesVersion();
+ }
+
+ @Override
+ public void extendEnvironment(Environment.Builder builder)
+ {
+ builder.addContainer(createNessieContainer());
+ builder.addConnector("iceberg", forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-nessie/iceberg.properties")));
+
+ builder.addContainer(createSparkContainer()).containerDependsOn(SPARK, HADOOP);
+ }
+
+ @SuppressWarnings("resource")
+ private DockerContainer createSparkContainer()
+ {
+ DockerContainer container = new DockerContainer("ghcr.io/trinodb/testing/spark3-iceberg:" + hadoopImagesVersion, SPARK)
+ .withEnv("HADOOP_USER_NAME", "hive")
+ .withCopyFileToContainer(
+ forHostPath(dockerFiles.getDockerFilesHostPath("conf/environment/singlenode-spark-iceberg-nessie/spark-defaults.conf")),
+ "/spark/conf/spark-defaults.conf")
+ .withCommand(
+ "spark-submit",
+ "--master", "local[*]",
+ "--class", "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
+ "--name", "Thrift JDBC/ODBC Server",
+ "--conf", "spark.hive.server2.thrift.port=" + SPARK_THRIFT_PORT,
+ "spark-internal")
+ .withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
+ .waitingFor(forSelectedPorts(SPARK_THRIFT_PORT));
+
+ portBinder.exposePort(container, SPARK_THRIFT_PORT);
+ return container;
+ }
+
+ private DockerContainer createNessieContainer()
+ {
+ DockerContainer container = new DockerContainer("projectnessie/nessie:" + NESSIE_VERSION, "nessie-server")
+ .withEnv("NESSIE_VERSION_STORE_TYPE", "INMEMORY")
+ .withEnv("QUARKUS_HTTP_PORT", Integer.valueOf(NESSIE_PORT).toString())
+ .withStartupCheckStrategy(new IsRunningStartupCheckStrategy())
+ .waitingFor(forSelectedPorts(NESSIE_PORT));
+
+ portBinder.exposePort(container, NESSIE_PORT);
+ return container;
+ }
+}
diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java
index ed1331a67660..0a84358dadc8 100644
--- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java
+++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteIceberg.java
@@ -19,6 +19,7 @@
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeHiveIcebergRedirections;
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIceberg;
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergJdbcCatalog;
+import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergNessie;
import io.trino.tests.product.launcher.env.environment.EnvSinglenodeSparkIcebergRest;
import io.trino.tests.product.launcher.suite.Suite;
import io.trino.tests.product.launcher.suite.SuiteTestRun;
@@ -49,6 +50,9 @@ public List getTestRuns(EnvironmentConfig config)
.build(),
testOnEnvironment(EnvSinglenodeSparkIcebergJdbcCatalog.class)
.withGroups("configured_features", "iceberg_jdbc")
+ .build(),
+ testOnEnvironment(EnvSinglenodeSparkIcebergNessie.class)
+ .withGroups("configured_features", "iceberg_nessie")
.build());
}
}
diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-nessie/iceberg.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-nessie/iceberg.properties
new file mode 100644
index 000000000000..f087f9cec4a0
--- /dev/null
+++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-nessie/iceberg.properties
@@ -0,0 +1,4 @@
+connector.name=iceberg
+iceberg.catalog.type=nessie
+iceberg.nessie-catalog.uri=http://nessie-server:19120/api/v1
+iceberg.nessie-catalog.default-warehouse-dir=hdfs://hadoop-master:9000/user/hive/warehouse
diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-nessie/spark-defaults.conf b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-nessie/spark-defaults.conf
new file mode 100644
index 000000000000..41e3e2b6bb2d
--- /dev/null
+++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/singlenode-spark-iceberg-nessie/spark-defaults.conf
@@ -0,0 +1,10 @@
+spark.sql.catalog.iceberg_test=org.apache.iceberg.spark.SparkCatalog
+spark.sql.catalog.iceberg_test.catalog-impl=org.apache.iceberg.nessie.NessieCatalog
+spark.sql.catalog.iceberg_test.uri=http://nessie-server:19120/api/v1
+spark.sql.catalog.iceberg_test.authentication.type=NONE
+spark.sql.catalog.iceberg_test.warehouse=hdfs://hadoop-master:9000/user/hive/warehouse
+; disabling caching allows us to run spark queries interchangeably with trino's
+spark.sql.catalog.iceberg_test.cache-enabled=false
+spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
+
+spark.hadoop.fs.defaultFS=hdfs://hadoop-master:9000
diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java
index 9ef635ebaf83..524576ce1773 100644
--- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java
+++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/TestGroups.java
@@ -74,6 +74,7 @@ public final class TestGroups
public static final String ICEBERG_FORMAT_VERSION_COMPATIBILITY = "iceberg_format_version_compatibility";
public static final String ICEBERG_REST = "iceberg_rest";
public static final String ICEBERG_JDBC = "iceberg_jdbc";
+ public static final String ICEBERG_NESSIE = "iceberg_nessie";
public static final String AVRO = "avro";
public static final String PHOENIX = "phoenix";
public static final String CLICKHOUSE = "clickhouse";
diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
index 7e8040916b84..327eae922e97 100644
--- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
+++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
@@ -65,6 +65,7 @@
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.tests.product.TestGroups.ICEBERG;
import static io.trino.tests.product.TestGroups.ICEBERG_JDBC;
+import static io.trino.tests.product.TestGroups.ICEBERG_NESSIE;
import static io.trino.tests.product.TestGroups.ICEBERG_REST;
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS;
import static io.trino.tests.product.iceberg.TestIcebergSparkCompatibility.CreateMode.CREATE_TABLE_AND_INSERT;
@@ -124,7 +125,7 @@ public void setUp()
onTrino().executeQuery(format("CREATE SCHEMA IF NOT EXISTS %s.%s", TRINO_CATALOG, TEST_SCHEMA_NAME));
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_trino_reading_primitive_types_" + storageFormat);
@@ -221,7 +222,7 @@ public void testTrinoReadingSparkData(StorageFormat storageFormat, int specVersi
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "testSparkReadingTrinoDataDataProvider")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "testSparkReadingTrinoDataDataProvider")
public void testSparkReadingTrinoData(StorageFormat storageFormat, CreateMode createMode)
{
String baseTableName = toLowerCase("test_spark_reading_primitive_types_" + storageFormat + "_" + createMode);
@@ -349,7 +350,7 @@ public Object[][] testSparkReadingTrinoDataDataProvider()
.toArray(Object[][]::new);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormats")
public void testSparkReadTrinoUuid(StorageFormat storageFormat)
{
String tableName = toLowerCase("test_spark_read_trino_uuid_" + storageFormat);
@@ -371,7 +372,7 @@ public void testSparkReadTrinoUuid(StorageFormat storageFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "specVersions")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "specVersions")
public void testSparkCreatesTrinoDrops(int specVersion)
{
String baseTableName = "test_spark_creates_trino_drops";
@@ -379,7 +380,7 @@ public void testSparkCreatesTrinoDrops(int specVersion)
onTrino().executeQuery("DROP TABLE " + trinoTableName(baseTableName));
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC})
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE})
public void testTrinoCreatesSparkDrops()
{
String baseTableName = "test_trino_creates_spark_drops";
@@ -387,7 +388,7 @@ public void testTrinoCreatesSparkDrops()
onSpark().executeQuery("DROP TABLE " + sparkTableName(baseTableName));
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormats")
public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat)
{
String baseTableName = toLowerCase("test_spark_reads_trino_partitioned_table_" + storageFormat);
@@ -415,7 +416,7 @@ public void testSparkReadsTrinoPartitionedTable(StorageFormat storageFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadsSparkPartitionedTable(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_trino_reads_spark_partitioned_table_" + storageFormat);
@@ -536,7 +537,7 @@ public void testPartitionedByNestedField()
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_trino_reading_spark_composites_" + storageFormat);
@@ -564,7 +565,7 @@ public void testTrinoReadingCompositeSparkData(StorageFormat storageFormat, int
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormats")
public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat)
{
String baseTableName = toLowerCase("test_spark_reading_trino_composites_" + storageFormat);
@@ -591,7 +592,7 @@ public void testSparkReadingCompositeTrinoData(StorageFormat storageFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_trino_reading_spark_iceberg_table_properties_" + storageFormat);
@@ -619,7 +620,7 @@ public void testTrinoReadingSparkIcebergTablePropertiesData(StorageFormat storag
onSpark().executeQuery("DROP TABLE IF EXISTS " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_trino_reading_nested_spark_data_" + storageFormat);
@@ -676,7 +677,7 @@ public void testTrinoReadingNestedSparkData(StorageFormat storageFormat, int spe
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormats")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormats")
public void testSparkReadingNestedTrinoData(StorageFormat storageFormat)
{
String baseTableName = toLowerCase("test_spark_reading_nested_trino_data_" + storageFormat);
@@ -733,7 +734,7 @@ public void testSparkReadingNestedTrinoData(StorageFormat storageFormat)
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_schema_evolution_for_nested_fields_" + storageFormat);
@@ -851,7 +852,7 @@ public void testIdBasedFieldMapping(StorageFormat storageFormat, int specVersion
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testReadAfterPartitionEvolution(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_read_after_partition_evolution_" + storageFormat);
@@ -987,7 +988,7 @@ public void testCreateAndDropTableWithSameLocationFailsOnTrino(int specVersion)
onTrino().executeQuery(format("DROP TABLE %s", trinoTableName(tableSameLocation2)));
}
- @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_object_storage_location_provider_" + storageFormat);
@@ -1017,7 +1018,7 @@ public void testTrinoWritingDataWithObjectStorageLocationProvider(StorageFormat
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS}, dataProvider = "storageFormatsWithSpecVersion")
+ @Test(groups = {ICEBERG, ICEBERG_JDBC, PROFILE_SPECIFIC_TESTS, ICEBERG_NESSIE}, dataProvider = "storageFormatsWithSpecVersion")
public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageFormat, int specVersion)
{
String baseTableName = toLowerCase("test_writer_data_path_" + storageFormat);
@@ -1084,7 +1085,7 @@ public void testTrinoWritingDataWithWriterDataPathSet(StorageFormat storageForma
Streams.mapWithIndex(SPECIAL_CHARACTER_VALUES.stream(), ((value, index) -> row((int) index, value)))
.collect(toImmutableList());
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC})
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE})
public void testStringPartitioningWithSpecialCharactersCtasInTrino()
{
String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino";
@@ -1102,7 +1103,7 @@ public void testStringPartitioningWithSpecialCharactersCtasInTrino()
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC})
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE})
public void testStringPartitioningWithSpecialCharactersInsertInTrino()
{
String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_trino";
@@ -1118,7 +1119,7 @@ public void testStringPartitioningWithSpecialCharactersInsertInTrino()
onTrino().executeQuery("DROP TABLE " + trinoTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC})
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE})
public void testStringPartitioningWithSpecialCharactersInsertInSpark()
{
String baseTableName = "test_string_partitioning_with_special_chars_ctas_in_spark";
@@ -1373,7 +1374,7 @@ public void testTrinoSparkConcurrentInsert()
}
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsAndCompressionCodecs")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsAndCompressionCodecs")
public void testTrinoReadingSparkCompressedData(StorageFormat storageFormat, String compressionCodec)
{
String baseTableName = toLowerCase("test_spark_compression" +
@@ -1438,7 +1439,7 @@ else if ("ZSTD".equals(compressionCodec)) {
onSpark().executeQuery("DROP TABLE " + sparkTableName);
}
- @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC}, dataProvider = "storageFormatsAndCompressionCodecs")
+ @Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS, ICEBERG_REST, ICEBERG_JDBC, ICEBERG_NESSIE}, dataProvider = "storageFormatsAndCompressionCodecs")
public void testSparkReadingTrinoCompressedData(StorageFormat storageFormat, String compressionCodec)
{
String baseTableName = toLowerCase("test_trino_compression" +