Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 54 additions & 2 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ The default is ``GZIP``.
``iceberg.catalog.type``
^^^^^^^^^^^^^^^^^^^^^^^^

The catalog type for Iceberg tables. The available values are ``hive``
and ``hadoop``, corresponding to the catalogs in the Iceberg.
The catalog type for Iceberg tables. The available values are ``hive``/``hadoop``/``nessie``,
corresponding to the catalogs in Iceberg.

The default is ``hive``.

Expand Down Expand Up @@ -114,6 +114,58 @@ The Maximum number of partitions handled per writer.

The default is 100.

Nessie catalog
^^^^^^^^^^^^^^

In order to use a Nessie catalog, ensure to configure the catalog type with
``iceberg.catalog.type=nessie`` and provide further details with the following
properties:

==================================================== ============================================================
Property Name Description
==================================================== ============================================================
``iceberg.nessie.ref`` The branch/tag to use for Nessie, defaults to ``main``.

``iceberg.nessie.uri`` Nessie API endpoint URI (required).
Example: ``https://localhost:19120/api/v1``

``iceberg.nessie.auth.type`` The authentication type to use.
Available values are ``BASIC`` or ``BEARER``.
Example: ``BEARER``

``iceberg.nessie.auth.basic.username`` The username to use with ``BASIC`` authentication.
Example: ``test_user``

``iceberg.nessie.auth.basic.password`` The password to use with ``BASIC`` authentication.
Example: ``my$ecretPass``

``iceberg.nessie.auth.bearer.token`` The token to use with ``BEARER`` authentication.
Example: ``SXVLUXUhIExFQ0tFUiEK``

``iceberg.nessie.read-timeout-ms`` The read timeout in milliseconds for requests
to the Nessie server.
Example: ``5000``

``iceberg.nessie.connect-timeout-ms`` The connection timeout in milliseconds for connection
requests to the Nessie server.
Example: ``10000``

``iceberg.nessie.compression-enabled`` Configuration of whether compression should be enabled or
not for requests to the Nessie server, defaults to ``true``.

``iceberg.nessie.client-builder-impl`` Configuration of the custom ClientBuilder implementation
class to be used.

==================================================== ============================================================

.. code-block:: none

connector.name=iceberg
iceberg.catalog.type=nessie
iceberg.catalog.warehouse=/tmp
iceberg.nessie.uri=https://localhost:19120/api/v1


Schema Evolution
------------------------

Expand Down
36 changes: 36 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<dep.iceberg.version>0.13.1</dep.iceberg.version>
<dep.nessie.version>0.30.0</dep.nessie.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -321,6 +322,30 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-nessie</artifactId>
<version>${dep.iceberg.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.projectnessie</groupId>
<artifactId>nessie-model</artifactId>
<version>${dep.nessie.version}</version>
</dependency>

<dependency>
<groupId>org.projectnessie</groupId>
<artifactId>nessie-client</artifactId>
<version>${dep.nessie.version}</version>
</dependency>

<dependency>
<groupId>org.weakref</groupId>
<artifactId>jmxutils</artifactId>
Expand Down Expand Up @@ -414,6 +439,17 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-testing-docker</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.nessie.NessieCatalog;

public enum CatalogType
{
HADOOP(HadoopCatalog.class.getName()),
HIVE(HiveCatalog.class.getName()),
NESSIE(NessieCatalog.class.getName()),

/**/;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public ConnectorMetadata create()
{
switch (catalogType) {
case HADOOP:
return new IcebergHadoopMetadata(resourceFactory, typeManager, commitTaskCodec);
case NESSIE:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: to be more consistent, we could put all the cases in alphabetical order

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's what I had initially, but Intellij would then complain due to duplicate case branches. So I figured that it's better to merge HADOOP + NESSIE together

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beinan let me know if you would like me to still change this and introduce duplicate case branches

return new IcebergNativeMetadata(resourceFactory, typeManager, commitTaskCodec, catalogType);
case HIVE:
return new IcebergHiveMetadata(metastore, hdfsEnvironment, typeManager, commitTaskCodec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.facebook.presto.hive.metastore.HivePartitionMutator;
import com.facebook.presto.hive.metastore.MetastoreCacheStats;
import com.facebook.presto.hive.metastore.MetastoreConfig;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.iceberg.optimizer.IcebergPlanOptimizer;
import com.facebook.presto.orc.CachingStripeMetadataSource;
import com.facebook.presto.orc.DwrfAwareStripeMetadataSourceFactory;
Expand Down Expand Up @@ -132,6 +133,7 @@ public void configure(Binder binder)
configBinder(binder).bindConfig(HiveClientConfig.class);
configBinder(binder).bindConfig(IcebergConfig.class);
configBinder(binder).bindConfig(MetastoreConfig.class);
configBinder(binder).bindConfig(NessieConfig.class);

binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON);
binder.bind(IcebergTableProperties.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getPartitioning;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getHadoopIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getNativeIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
Expand All @@ -75,21 +75,24 @@
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;

public class IcebergHadoopMetadata
public class IcebergNativeMetadata
extends IcebergAbstractMetadata
{
private static final String INFORMATION_SCHEMA = "information_schema";
private static final String TABLE_COMMENT = "comment";

private final IcebergResourceFactory resourceFactory;
private final CatalogType catalogType;

public IcebergHadoopMetadata(
public IcebergNativeMetadata(
IcebergResourceFactory resourceFactory,
TypeManager typeManager,
JsonCodec<CommitTaskData> commitTaskCodec)
JsonCodec<CommitTaskData> commitTaskCodec,
CatalogType catalogType)
{
super(typeManager, commitTaskCodec);
this.resourceFactory = requireNonNull(resourceFactory, "resourceFactory is null");
this.catalogType = requireNonNull(catalogType, "catalogType is null");
}

@Override
Expand Down Expand Up @@ -169,7 +172,7 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = getHadoopIcebergTable(resourceFactory, session, table.getSchemaTableName());
Table icebergTable = getNativeIcebergTable(resourceFactory, session, table.getSchemaTableName());
return getColumns(icebergTable.schema(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, identity()));
}
Expand Down Expand Up @@ -208,7 +211,7 @@ public void dropSchema(ConnectorSession session, String schemaName)
@Override
public void renameSchema(ConnectorSession session, String source, String target)
{
throw new PrestoException(NOT_SUPPORTED, "Iceberg hadoop catalog does not support rename namespace");
throw new PrestoException(NOT_SUPPORTED, format("Iceberg %s catalog does not support rename namespace", catalogType.name()));
}

@Override
Expand Down Expand Up @@ -257,7 +260,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
public ConnectorInsertTableHandle beginInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle table = (IcebergTableHandle) tableHandle;
Table icebergTable = getHadoopIcebergTable(resourceFactory, session, table.getSchemaTableName());
Table icebergTable = getNativeIcebergTable(resourceFactory, session, table.getSchemaTableName());

return beginIcebergTableInsert(table, icebergTable);
}
Expand Down Expand Up @@ -308,7 +311,7 @@ protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, Sche
{
Table icebergTable;
try {
icebergTable = getHadoopIcebergTable(resourceFactory, session, table);
icebergTable = getNativeIcebergTable(resourceFactory, session, table);
}
catch (NoSuchTableException e) {
throw new TableNotFoundException(table);
Expand All @@ -323,7 +326,7 @@ protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, Sche
public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTableHandle tableHandle, Optional<ConnectorTableLayoutHandle> tableLayoutHandle, List<ColumnHandle> columnHandles, Constraint<ColumnHandle> constraint)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
Table icebergTable = getHadoopIcebergTable(resourceFactory, session, handle.getSchemaTableName());
Table icebergTable = getNativeIcebergTable(resourceFactory, session, handle.getSchemaTableName());
return TableStatisticsMaker.getTableStatistics(typeManager, constraint, handle, icebergTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.security.ConnectorIdentity;
Expand All @@ -32,6 +33,11 @@
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static com.facebook.presto.iceberg.CatalogType.NESSIE;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceHash;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getNessieReferenceName;
import static com.facebook.presto.iceberg.nessie.AuthenticationType.BASIC;
import static com.facebook.presto.iceberg.nessie.AuthenticationType.BEARER;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.base.Throwables.throwIfUnchecked;
Expand All @@ -49,15 +55,17 @@ public class IcebergResourceFactory
private final CatalogType catalogType;
private final String catalogWarehouse;
private final List<String> hadoopConfigResources;
private final NessieConfig nessieConfig;

@Inject
public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogName)
public IcebergResourceFactory(IcebergConfig config, IcebergCatalogName catalogName, NessieConfig nessieConfig)
{
this.catalogName = requireNonNull(catalogName, "catalogName is null").getCatalogName();
requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.hadoopConfigResources = config.getHadoopConfigResources();
this.nessieConfig = requireNonNull(nessieConfig, "nessieConfig is null");
catalogCache = CacheBuilder.newBuilder()
.maximumSize(config.getCatalogCacheSize())
.build();
Expand Down Expand Up @@ -108,6 +116,12 @@ private String getCatalogCacheKey(ConnectorSession session)
});
}

if (catalogType == NESSIE) {
sb.append(getNessieReferenceName(session));
sb.append("@");
sb.append(getNessieReferenceHash(session));
}

return sb.toString();
}

Expand All @@ -134,6 +148,32 @@ public Map<String, String> getCatalogProperties(ConnectorSession session)
if (catalogWarehouse != null) {
properties.put(WAREHOUSE_LOCATION, catalogWarehouse);
}
if (catalogType == NESSIE) {
properties.put("ref", getNessieReferenceName(session));
properties.put("uri", nessieConfig.getServerUri().orElseThrow(() -> new IllegalStateException("iceberg.nessie.uri must be set for Nessie")));
String hash = getNessieReferenceHash(session);
if (hash != null) {
properties.put("ref.hash", hash);
}
nessieConfig.getReadTimeoutMillis().ifPresent(val -> properties.put("transport.read-timeout", val.toString()));
nessieConfig.getConnectTimeoutMillis().ifPresent(val -> properties.put("transport.connect-timeout", val.toString()));
nessieConfig.getClientBuilderImpl().ifPresent(val -> properties.put("client-builder-impl", val));
nessieConfig.getAuthenticationType().ifPresent(type -> {
if (type == BASIC) {
properties.put("authentication.username", nessieConfig.getUsername()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.username must be set with BASIC authentication")));
properties.put("authentication.password", nessieConfig.getPassword()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.basic.password must be set with BASIC authentication")));
}
else if (type == BEARER) {
properties.put("authentication.token", nessieConfig.getBearerToken()
.orElseThrow(() -> new IllegalStateException("iceberg.nessie.auth.bearer.token must be set with BEARER authentication")));
}
});
if (nessieConfig.isCompressionDisabled()) {
properties.put("transport.disable-compression", "true");
}
}
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.hive.HiveCompressionCodec;
import com.facebook.presto.hive.OrcFileWriterConfig;
import com.facebook.presto.hive.ParquetFileWriterConfig;
import com.facebook.presto.iceberg.nessie.NessieConfig;
import com.facebook.presto.orc.OrcWriteValidation;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -71,6 +72,8 @@ public final class IcebergSessionProperties
private static final String ORC_COMPRESSION_CODEC = "orc_compression_codec";
private static final String CACHE_ENABLED = "cache_enabled";
private static final String NODE_SELECTION_STRATEGY = "node_selection_strategy";
private static final String NESSIE_REFERENCE_NAME = "nessie_reference_name";
private static final String NESSIE_REFERENCE_HASH = "nessie_reference_hash";
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
Expand All @@ -79,7 +82,8 @@ public IcebergSessionProperties(
HiveClientConfig hiveClientConfig,
ParquetFileWriterConfig parquetFileWriterConfig,
OrcFileWriterConfig orcFileWriterConfig,
CacheConfig cacheConfig)
CacheConfig cacheConfig,
NessieConfig nessieConfig)
{
sessionProperties = ImmutableList.of(
new PropertyMetadata<>(
Expand Down Expand Up @@ -240,6 +244,16 @@ public IcebergSessionProperties(
CACHE_ENABLED,
"Enable cache for Iceberg",
cacheConfig.isCachingEnabled(),
false),
stringProperty(
NESSIE_REFERENCE_NAME,
"Nessie reference name to use",
nessieConfig.getDefaultReferenceName(),
false),
stringProperty(
NESSIE_REFERENCE_HASH,
"Nessie reference hash to use",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we have config for NESSIE_REFERENCE_HASH?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because this is generally something a user would only specify at runtime when actually reading data at a particular hash. Having a config option for that would just be confusing to users because they would have to know the hash upfront.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So does it mean usually different users would use different hash and they don't usually share? and thus we don't have any need to set a default value on a cluster level

null,
false));
}

Expand Down Expand Up @@ -387,4 +401,14 @@ public static NodeSelectionStrategy getNodeSelectionStrategy(ConnectorSession se
{
return session.getProperty(NODE_SELECTION_STRATEGY, NodeSelectionStrategy.class);
}

public static String getNessieReferenceName(ConnectorSession session)
{
return session.getProperty(NESSIE_REFERENCE_NAME, String.class);
}

public static String getNessieReferenceHash(ConnectorSession session)
{
return session.getProperty(NESSIE_REFERENCE_HASH, String.class);
}
}
Loading