Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,52 @@ Property Name Description
======================================================= ============================================================= ============
``iceberg.catalog.warehouse`` The catalog warehouse root path for Iceberg tables.

The Hadoop catalog requires a file system that supports
an atomic rename operation, such as HDFS, to maintain
metadata files in order to implement an atomic transaction
commit.

Example: ``hdfs://nn:8020/warehouse/path``

Do not set ``iceberg.catalog.warehouse`` to a path in object
stores or local file systems in the production environment.

This property is required if the ``iceberg.catalog.type`` is
``hadoop``. Otherwise, it will be ignored.

``iceberg.catalog.hadoop.warehouse.datadir`` The catalog warehouse root data path for Iceberg tables.
It is only supported with the Hadoop catalog.

Example: ``s3://iceberg_bucket/warehouse``.

This optional property can be set to a path in object
stores or HDFS.
If set, all tables in this Hadoop catalog default to saving
their data and delete files in the specified root
data directory.

``iceberg.catalog.cached-catalog-num`` The number of Iceberg catalogs to cache. This property is ``10``
required if the ``iceberg.catalog.type`` is ``hadoop``.
Otherwise, it will be ignored.
======================================================= ============================================================= ============

Configure the `Amazon S3 <https://prestodb.io/docs/current/connector/hive.html#amazon-s3-configuration>`_
properties to specify a S3 location as the warehouse data directory for the Hadoop catalog. This way,
the data and delete files of Iceberg tables are stored in S3. An example configuration includes:

.. code-block:: none

connector.name=iceberg
iceberg.catalog.type=hadoop
iceberg.catalog.warehouse=hdfs://nn:8020/warehouse/path
iceberg.catalog.hadoop.warehouse.datadir=s3://iceberg_bucket/warehouse

hive.s3.use-instance-credentials=false
hive.s3.aws-access-key=accesskey
hive.s3.aws-secret-key=secretkey
hive.s3.endpoint=http://192.168.0.103:9878
hive.s3.path-style-access=true

Configuration Properties
------------------------

Expand Down Expand Up @@ -370,6 +407,12 @@ Property Name Description
``location`` Optionally specifies the file system location URI for
the table.

``write.data.path`` Optionally specifies the file system location URI for
storing the data and delete files of the table. This only
applies to files written after this property is set. Files
previously written aren't relocated to reflect this
parameter.

``format_version`` Optionally specifies the format version of the Iceberg ``2``
specification to use for new tables, either ``1`` or ``2``.

Expand Down
22 changes: 22 additions & 0 deletions presto-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,16 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down Expand Up @@ -598,6 +608,18 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static com.facebook.presto.hive.MetadataUtils.getDiscretePredicates;
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate;
import static com.facebook.presto.hive.metastore.MetastoreUtil.TABLE_COMMENT;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
Expand All @@ -146,21 +147,30 @@
import static com.facebook.presto.iceberg.IcebergTableProperties.METRICS_MAX_INFERRED_COLUMN;
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.getCommitRetries;
import static com.facebook.presto.iceberg.IcebergTableProperties.getFormatVersion;
import static com.facebook.presto.iceberg.IcebergTableProperties.getWriteDataLocation;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.MIN_FORMAT_VERSION_FOR_DELETE;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getDeleteMode;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataPreviousVersionsMax;
import static com.facebook.presto.iceberg.IcebergUtil.getMetricsMaxInferredColumn;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionFields;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionSpecsIncludingValidData;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitions;
import static com.facebook.presto.iceberg.IcebergUtil.getSnapshotIdTimeOperator;
import static com.facebook.presto.iceberg.IcebergUtil.getSortFields;
import static com.facebook.presto.iceberg.IcebergUtil.getSplitSize;
import static com.facebook.presto.iceberg.IcebergUtil.getTableComment;
import static com.facebook.presto.iceberg.IcebergUtil.getUpdateMode;
import static com.facebook.presto.iceberg.IcebergUtil.getViewComment;
import static com.facebook.presto.iceberg.IcebergUtil.isMetadataDeleteAfterCommit;
import static com.facebook.presto.iceberg.IcebergUtil.parseFormatVersion;
import static com.facebook.presto.iceberg.IcebergUtil.resolveSnapshotIdByName;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetLocation;
Expand Down Expand Up @@ -188,6 +198,7 @@
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardWarningCode.SORT_COLUMN_TRANSFORM_NOT_SUPPORTED_WARNING;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -200,10 +211,17 @@
import static org.apache.iceberg.SnapshotSummary.DELETED_RECORDS_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_EQ_DELETES_PROP;
import static org.apache.iceberg.SnapshotSummary.REMOVED_POS_DELETES_PROP;
import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL_DEFAULT;
import static org.apache.iceberg.TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED;
import static org.apache.iceberg.TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS;
import static org.apache.iceberg.TableProperties.ORC_COMPRESSION;
import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -716,12 +734,17 @@ protected ImmutableMap<String, Object> createMetadataProperties(Table icebergTab
properties.put(LOCATION_PROPERTY, icebergTable.location());
}

properties.put(DELETE_MODE, IcebergUtil.getDeleteMode(icebergTable));
properties.put(UPDATE_MODE, IcebergUtil.getUpdateMode(icebergTable));
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, IcebergUtil.getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, IcebergUtil.isMetadataDeleteAfterCommit(icebergTable));
properties.put(METRICS_MAX_INFERRED_COLUMN, IcebergUtil.getMetricsMaxInferredColumn(icebergTable));
properties.put(SPLIT_SIZE, IcebergUtil.getSplitSize(icebergTable));
String writeDataLocation = icebergTable.properties().get(WRITE_DATA_LOCATION);
if (!isNullOrEmpty(writeDataLocation)) {
properties.put(WRITE_DATA_LOCATION, writeDataLocation);
}

properties.put(DELETE_MODE, getDeleteMode(icebergTable));
properties.put(UPDATE_MODE, getUpdateMode(icebergTable));
properties.put(METADATA_PREVIOUS_VERSIONS_MAX, getMetadataPreviousVersionsMax(icebergTable));
properties.put(METADATA_DELETE_AFTER_COMMIT, isMetadataDeleteAfterCommit(icebergTable));
properties.put(METRICS_MAX_INFERRED_COLUMN, getMetricsMaxInferredColumn(icebergTable));
properties.put(SPLIT_SIZE, getSplitSize(icebergTable));

SortOrder sortOrder = icebergTable.sortOrder();
// TODO: Support sort column transforms (https://github.com/prestodb/presto/issues/24250)
Expand Down Expand Up @@ -1140,6 +1163,62 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
transaction.commitTransaction();
}

protected Map<String, String> populateTableProperties(ConnectorTableMetadata tableMetadata, com.facebook.presto.iceberg.FileFormat fileFormat, ConnectorSession session)
{
ImmutableMap.Builder<String, String> propertiesBuilder = ImmutableMap.builderWithExpectedSize(16);

String writeDataLocation = getWriteDataLocation(tableMetadata.getProperties());
if (!isNullOrEmpty(writeDataLocation)) {
propertiesBuilder.put(WRITE_DATA_LOCATION, writeDataLocation);
}
else {
Optional<String> dataLocation = getDataLocationBasedOnWarehouseDataDir(tableMetadata.getTable());
dataLocation.ifPresent(location -> propertiesBuilder.put(WRITE_DATA_LOCATION, location));
}

Integer commitRetries = getCommitRetries(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toString());
propertiesBuilder.put(COMMIT_NUM_RETRIES, String.valueOf(commitRetries));
switch (fileFormat) {
case PARQUET:
propertiesBuilder.put(PARQUET_COMPRESSION, getCompressionCodec(session).getParquetCompressionCodec().get().toString());
break;
case ORC:
propertiesBuilder.put(ORC_COMPRESSION, getCompressionCodec(session).getOrcCompressionKind().name());
break;
}
if (tableMetadata.getComment().isPresent()) {
propertiesBuilder.put(TABLE_COMMENT, tableMetadata.getComment().get());
}

String formatVersion = getFormatVersion(tableMetadata.getProperties());
verify(formatVersion != null, "Format version cannot be null");
propertiesBuilder.put(TableProperties.FORMAT_VERSION, formatVersion);

if (parseFormatVersion(formatVersion) < MIN_FORMAT_VERSION_FOR_DELETE) {
propertiesBuilder.put(TableProperties.DELETE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
propertiesBuilder.put(TableProperties.UPDATE_MODE, RowLevelOperationMode.COPY_ON_WRITE.modeName());
}
else {
RowLevelOperationMode deleteMode = IcebergTableProperties.getDeleteMode(tableMetadata.getProperties());
propertiesBuilder.put(TableProperties.DELETE_MODE, deleteMode.modeName());
RowLevelOperationMode updateMode = IcebergTableProperties.getUpdateMode(tableMetadata.getProperties());
propertiesBuilder.put(TableProperties.UPDATE_MODE, updateMode.modeName());
}

Integer metadataPreviousVersionsMax = IcebergTableProperties.getMetadataPreviousVersionsMax(tableMetadata.getProperties());
propertiesBuilder.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, String.valueOf(metadataPreviousVersionsMax));

Boolean metadataDeleteAfterCommit = IcebergTableProperties.isMetadataDeleteAfterCommit(tableMetadata.getProperties());
propertiesBuilder.put(METADATA_DELETE_AFTER_COMMIT_ENABLED, String.valueOf(metadataDeleteAfterCommit));

Integer metricsMaxInferredColumn = IcebergTableProperties.getMetricsMaxInferredColumn(tableMetadata.getProperties());
propertiesBuilder.put(METRICS_MAX_INFERRED_COLUMN_DEFAULTS, String.valueOf(metricsMaxInferredColumn));

propertiesBuilder.put(SPLIT_SIZE, String.valueOf(IcebergTableProperties.getTargetSplitSize(tableMetadata.getProperties())));
return propertiesBuilder.build();
}

/**
* Deletes all the files for a specific predicate
*
Expand Down Expand Up @@ -1265,4 +1344,9 @@ public void finishUpdate(ConnectorSession session, ConnectorTableHandle tableHan
handle.getSortOrder());
finishWrite(session, outputTableHandle, fragments, UPDATE_AFTER);
}

protected Optional<String> getDataLocationBasedOnWarehouseDataDir(SchemaTableName schemaTableName)
{
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static com.facebook.presto.common.Utils.checkArgument;
import static com.facebook.presto.iceberg.CatalogType.HADOOP;
import static com.facebook.presto.orc.StripeMetadataSource.CacheableRowGroupIndices;
import static com.facebook.presto.orc.StripeMetadataSource.CacheableSlice;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
Expand Down Expand Up @@ -142,6 +144,9 @@ protected void setup(Binder binder)

configBinder(binder).bindConfig(IcebergConfig.class);

IcebergConfig icebergConfig = buildConfigObject(IcebergConfig.class);
checkArgument(icebergConfig.getCatalogType().equals(HADOOP) || icebergConfig.getCatalogWarehouseDataDir() == null, "'iceberg.catalog.hadoop.warehouse.datadir' can only be specified in Hadoop catalog");

binder.bind(IcebergSessionProperties.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, IcebergNessieConfig.class); // bind optional Nessie config to IcebergSessionProperties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class IcebergConfig
private HiveCompressionCodec compressionCodec = GZIP;
private CatalogType catalogType = HIVE;
private String catalogWarehouse;
private String catalogWarehouseDataDir;
private int catalogCacheSize = 10;
private int maxPartitionsPerWriter = 100;
private List<String> hadoopConfigResources = ImmutableList.of();
Expand Down Expand Up @@ -127,6 +128,19 @@ public IcebergConfig setCatalogWarehouse(String catalogWarehouse)
return this;
}

public String getCatalogWarehouseDataDir()
{
return catalogWarehouseDataDir;
}

@Config("iceberg.catalog.hadoop.warehouse.datadir")
@ConfigDescription("Iceberg catalog default root data writing directory. This is only supported with Hadoop catalog.")
public IcebergConfig setCatalogWarehouseDataDir(String catalogWarehouseDataDir)
{
this.catalogWarehouseDataDir = catalogWarehouseDataDir;
return this;
}

@Min(1)
public int getCatalogCacheSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getHiveIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.isIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.populateTableProperties;
import static com.facebook.presto.iceberg.IcebergUtil.toHiveColumns;
import static com.facebook.presto.iceberg.IcebergUtil.tryGetProperties;
import static com.facebook.presto.iceberg.PartitionFields.parsePartitionFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class IcebergNativeCatalogFactory
private final String catalogName;
protected final CatalogType catalogType;
private final String catalogWarehouse;
private final String catalogWarehouseDataDir;
protected final IcebergConfig icebergConfig;

private final List<String> hadoopConfigResources;
Expand All @@ -69,6 +70,7 @@ public IcebergNativeCatalogFactory(
this.icebergConfig = requireNonNull(config, "config is null");
this.catalogType = config.getCatalogType();
this.catalogWarehouse = config.getCatalogWarehouse();
this.catalogWarehouseDataDir = config.getCatalogWarehouseDataDir();
this.hadoopConfigResources = icebergConfig.getHadoopConfigResources();
this.s3ConfigurationUpdater = requireNonNull(s3ConfigurationUpdater, "s3ConfigurationUpdater is null");
this.gcsConfigurationInitialize = requireNonNull(gcsConfigurationInitialize, "gcsConfigurationInitialize is null");
Expand All @@ -90,6 +92,11 @@ public Catalog getCatalog(ConnectorSession session)
}
}

public String getCatalogWarehouseDataDir()
{
return this.catalogWarehouseDataDir;
}

public SupportsNamespaces getNamespaces(ConnectorSession session)
{
Catalog catalog = getCatalog(session);
Expand Down
Loading
Loading