Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ To use Iceberg, you need:

* Network access from the Trino coordinator and workers to the distributed
object storage.
* Access to a Hive metastore service (HMS) or AWS Glue.
* Access to a Hive metastore service (HMS), AWS Glue or a `Nessie server <https://projectnessie.org/>`_.
* Network access from the Trino coordinator to the HMS. Hive
metastore access with the Thrift protocol defaults to using port 9083.

Expand Down Expand Up @@ -133,9 +133,9 @@ contents of a Trino catalog file that uses the the Iceberg connector to
configures different Iceberg metadata catalogs.

The connector supports multiple Iceberg catalog types; you may use
either a Hive metastore service (HMS), AWS Glue, or a REST catalog. The catalog
either a Hive metastore service (HMS), AWS Glue, a REST catalog, or Nessie. The catalog
type is determined by the ``iceberg.catalog.type`` property. It can be set to
``HIVE_METASTORE``, ``GLUE``, ``JDBC``, or ``REST``.
``HIVE_METASTORE``, ``GLUE``, ``JDBC``, ``REST``, or ``NESSIE``.

.. _iceberg-hive-catalog:

Expand Down Expand Up @@ -225,6 +225,36 @@ properties:
REST catalog does not support :doc:`views</sql/create-view>` or
:doc:`materialized views</sql/create-materialized-view>`.

.. _iceberg-nessie-catalog:

Comment thread
nastra marked this conversation as resolved.
Nessie catalog
^^^^^^^^^^^^^^

In order to use a Nessie catalog, ensure to configure the catalog type with
``iceberg.catalog.type=nessie`` and provide further details with the following
properties:

==================================================== ============================================================
Property Name Description
==================================================== ============================================================
``iceberg.nessie-catalog.uri`` Nessie API endpoint URI (required).
Example: ``https://localhost:19120/api/v1``

``iceberg.nessie-catalog.ref`` The branch/tag to use for Nessie, defaults to ``main``.

``iceberg.nessie-catalog.default-warehouse-dir`` Default warehouse directory for schemas created without an
explicit ``location`` property.
Example: ``/tmp``
==================================================== ============================================================

.. code-block:: text

connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.nessie-catalog.uri=https://localhost:19120/api/v1
iceberg.nessie-catalog.default-warehouse-dir=/tmp


.. _iceberg-jdbc-catalog:

JDBC catalog
Expand Down
21 changes: 20 additions & 1 deletion plugin/trino-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
TODO (https://github.com/trinodb/trino/issues/11294) remove when we upgrade to surefire with https://issues.apache.org/jira/browse/SUREFIRE-1967
-->
<air.test.parallel>instances</air.test.parallel>
<!-- Nessie version (matching to Iceberg release) must be bumped along with Iceberg version bump to avoid compatibility issues -->
<dep.nessie.version>0.51.1</dep.nessie.version>
Comment thread
nastra marked this conversation as resolved.
</properties>

<dependencies>
Expand Down Expand Up @@ -216,6 +218,12 @@
<artifactId>iceberg-core</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-nessie</artifactId>
<version>${dep.iceberg.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-orc</artifactId>
Expand All @@ -231,6 +239,18 @@
<artifactId>jdbi3-core</artifactId>
</dependency>

<dependency>
<groupId>org.projectnessie.nessie</groupId>
<artifactId>nessie-client</artifactId>
<version>${dep.nessie.version}</version>
</dependency>

<dependency>
<groupId>org.projectnessie.nessie</groupId>
<artifactId>nessie-model</artifactId>
<version>${dep.nessie.version}</version>
</dependency>

<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
Expand Down Expand Up @@ -523,7 +543,6 @@
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ public enum CatalogType
GLUE,
REST,
JDBC,
NESSIE,
/**/;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.jdbc.IcebergJdbcCatalogModule;
import io.trino.plugin.iceberg.catalog.nessie.IcebergNessieCatalogModule;
import io.trino.plugin.iceberg.catalog.rest.IcebergRestCatalogModule;

import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.trino.plugin.iceberg.CatalogType.GLUE;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.CatalogType.JDBC;
import static io.trino.plugin.iceberg.CatalogType.NESSIE;
import static io.trino.plugin.iceberg.CatalogType.REST;
import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE;

Expand All @@ -42,6 +44,7 @@ protected void setup(Binder binder)
bindCatalogModule(GLUE, new IcebergGlueCatalogModule());
bindCatalogModule(REST, new IcebergRestCatalogModule());
bindCatalogModule(JDBC, new IcebergJdbcCatalogModule());
bindCatalogModule(NESSIE, new IcebergNessieCatalogModule());
}

private void bindCatalogModule(CatalogType catalogType, Module module)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.nessie;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;

import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;

import java.net.URI;

public class IcebergNessieCatalogConfig
{
private String defaultReferenceName = "main";
private String defaultWarehouseDir;
private URI serverUri;

@NotNull
public String getDefaultReferenceName()
{
return defaultReferenceName;
}

@Config("iceberg.nessie-catalog.ref")
@ConfigDescription("The default Nessie reference to work on")
public IcebergNessieCatalogConfig setDefaultReferenceName(String defaultReferenceName)
{
this.defaultReferenceName = defaultReferenceName;
return this;
}

@NotNull
public URI getServerUri()
{
return serverUri;
}

@Config("iceberg.nessie-catalog.uri")
@ConfigDescription("The URI to connect to the Nessie server")
public IcebergNessieCatalogConfig setServerUri(URI serverUri)
{
this.serverUri = serverUri;
return this;
}

@NotEmpty
public String getDefaultWarehouseDir()
{
return defaultWarehouseDir;
}

@Config("iceberg.nessie-catalog.default-warehouse-dir")
@ConfigDescription("The default warehouse to use for Nessie")
public IcebergNessieCatalogConfig setDefaultWarehouseDir(String defaultWarehouseDir)
{
this.defaultWarehouseDir = defaultWarehouseDir;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.nessie;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.http.HttpClientBuilder;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

public class IcebergNessieCatalogModule
extends AbstractConfigurationAwareModule
{
@Override
protected void setup(Binder binder)
{
configBinder(binder).bindConfig(IcebergNessieCatalogConfig.class);
binder.bind(IcebergTableOperationsProvider.class).to(IcebergNessieTableOperationsProvider.class).in(Scopes.SINGLETON);
newExporter(binder).export(IcebergTableOperationsProvider.class).withGeneratedName();
binder.bind(TrinoCatalogFactory.class).to(TrinoNessieCatalogFactory.class).in(Scopes.SINGLETON);
newExporter(binder).export(TrinoCatalogFactory.class).withGeneratedName();
}

@Provides
@Singleton
public static NessieIcebergClient createNessieIcebergClient(IcebergNessieCatalogConfig icebergNessieCatalogConfig)
{
return new NessieIcebergClient(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it intentional that the client is built in this initial phase without authentication?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently this is intentional. I didn't want to complicate this PR and add all the different nessie configuration settings (incl authentication). I'm planning to add this as a follow-up

HttpClientBuilder.builder()
.withUri(icebergNessieCatalogConfig.getServerUri())
.build(NessieApiV1.class),
icebergNessieCatalogConfig.getDefaultReferenceName(),
null,
ImmutableMap.of());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.nessie;

import io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.nessie.NessieIcebergClient;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.Namespace;

import java.util.Optional;

import static com.google.common.base.Verify.verify;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_CATALOG_ERROR;
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_COMMIT_ERROR;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class IcebergNessieTableOperations
extends AbstractIcebergTableOperations
{
private final NessieIcebergClient nessieClient;
private IcebergTable table;

protected IcebergNessieTableOperations(
NessieIcebergClient nessieClient,
FileIO fileIo,
ConnectorSession session,
String database,
String table,
Optional<String> owner,
Optional<String> location)
{
super(fileIo, session, database, table, owner, location);
this.nessieClient = requireNonNull(nessieClient, "nessieClient is null");
}

@Override
public TableMetadata refresh()
{
refreshNessieClient();
return super.refresh();
}

private void refreshNessieClient()
{
try {
nessieClient.refresh();
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to refresh as ref '%s' is no longer valid.", nessieClient.refName()), e);
}
}

@Override
public TableMetadata refresh(boolean invalidateCaches)
{
refreshNessieClient();
return super.refresh(invalidateCaches);
}

@Override
protected String getRefreshedLocation(boolean invalidateCaches)
{
table = nessieClient.table(toIdentifier(new SchemaTableName(database, tableName)));

if (table == null) {
throw new TableNotFoundException(getSchemaTableName());
}

return table.getMetadataLocation();
}

@Override
protected void commitNewTable(TableMetadata metadata)
{
verify(version.isEmpty(), "commitNewTable called on a table which already exists");
try {
nessieClient.commitTable(null, metadata, writeNewMetadata(metadata, 0), table, toKey(new SchemaTableName(database, this.tableName)));
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
}
catch (NessieConflictException e) {
// CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry
throw new CommitFailedException(e, "Cannot commit: ref hash is out of date. Update the ref '%s' and try again", nessieClient.refName());
}
shouldRefresh = true;
}

@Override
protected void commitToExistingTable(TableMetadata base, TableMetadata metadata)
{
verify(version.orElseThrow() >= 0, "commitToExistingTable called on a new table");
try {
nessieClient.commitTable(base, metadata, writeNewMetadata(metadata, version.getAsInt() + 1), table, toKey(new SchemaTableName(database, this.tableName)));
}
catch (NessieNotFoundException e) {
throw new TrinoException(ICEBERG_COMMIT_ERROR, format("Cannot commit: ref '%s' no longer exists", nessieClient.refName()), e);
}
catch (NessieConflictException e) {
// CommitFailedException is handled as a special case in the Iceberg library. This commit will automatically retry
throw new CommitFailedException(e, "Cannot commit: ref hash is out of date. Update the ref '%s' and try again", nessieClient.refName());
}
shouldRefresh = true;
}

private static ContentKey toKey(SchemaTableName tableName)
{
return ContentKey.of(Namespace.parse(tableName.getSchemaName()), tableName.getTableName());
}
}
Loading