Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.JoinDistributionType;
import com.facebook.presto.spi.plan.TableScanNode;
import com.facebook.presto.spi.plan.ValuesNode;
import com.facebook.presto.sql.planner.assertions.BasePlanTest;
Expand All @@ -36,9 +37,9 @@
import java.nio.file.Paths;
import java.util.stream.Stream;

import static com.facebook.presto.spi.plan.JoinDistributionType.REPLICATED;
import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.sql.Optimizer.PlanStage.OPTIMIZED_AND_VALIDATED;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.REPLICATED;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.testing.TestngUtils.toDataProvider;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
Expand Down Expand Up @@ -157,7 +158,7 @@ public String result()
@Override
public Void visitJoin(JoinNode node, Integer indent)
{
JoinNode.DistributionType distributionType = node.getDistributionType()
JoinDistributionType distributionType = node.getDistributionType()
.orElseThrow(() -> new VerifyException("Expected distribution type to be set"));
if (node.isCrossJoin()) {
checkState(node.getType() == INNER && distributionType == REPLICATED, "Expected CROSS JOIN to be INNER REPLICATED");
Expand Down
47 changes: 47 additions & 0 deletions presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ Property Name Description

``iceberg.enable-merge-on-read-mode`` Enable reading base tables that use merge-on-read for ``true``
updates.

``iceberg.delete-as-join-rewrite-enabled`` When enabled, equality delete row filtering is applied ``true``
as a join with the data of the equality delete files.
================================================== ============================================================= ============

Table Properties
Expand Down Expand Up @@ -269,6 +272,18 @@ and a file system location of ``s3://test_bucket/test_schema/test_table``:
location = 's3://test_bucket/test_schema/test_table')
)

Session Properties
-------------------

Session properties set behavior changes for queries executed within the given session.

============================================= ======================================================================
Property Name Description
Comment thread
steveburnett marked this conversation as resolved.
Outdated
============================================= ======================================================================
``iceberg.delete_as_join_rewrite_enabled`` Overrides the behavior of the connector property
``iceberg.delete-as-join-rewrite-enabled`` in the current session.
============================================= ======================================================================

Caching Support
----------------

Expand Down Expand Up @@ -373,6 +388,38 @@ Metastore cache only caches schema and table names. Other metadata would be fetc
hive.metastore-refresh-interval=3d
hive.metastore-cache-maximum-size=10000000

Extra Hidden Metadata Columns
Comment thread
steveburnett marked this conversation as resolved.
Outdated
----------------------------

The Iceberg connector exposes extra hidden metadata columns. You can query these
as part of a SQL query by including them in your SELECT statement.

``$path`` column
^^^^^^^^^^^^^^^^
* ``$path``: Full file system path name of the file for this row
.. code-block:: sql

SELECT "$path", regionkey FROM "ctas_nation";

.. code-block:: text

$path | regionkey
---------------------------------+-----------
/full/path/to/file/file.parquet | 2

``$data_sequence_number`` column
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
* ``$data_sequence_number``: The Iceberg data sequence number in which this row was added
.. code-block:: sql

SELECT "$data_sequence_number", regionkey FROM "ctas_nation";

.. code-block:: text

$data_sequence_number | regionkey
----------------------------------+------------
2 | 3

Extra Hidden Metadata Tables
----------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.execution.SqlQueryManager;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.plan.AggregationNode;
import com.facebook.presto.spi.plan.JoinDistributionType;
import com.facebook.presto.spi.plan.ProjectNode;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.sql.planner.Plan;
Expand Down Expand Up @@ -131,7 +132,7 @@ public void testBroadcastJoin()
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", defaultSession());

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinNode.DistributionType.PARTITIONED))
.where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinDistributionType.PARTITIONED))
.findFirst()
.isPresent());

Expand All @@ -146,7 +147,7 @@ public void testBroadcastJoin()
"(SELECT * FROM test_orders where ds = '2020-09-02' and substr(CAST(custkey AS VARCHAR), 1, 3) = '370') t2 ON t1.orderkey = t2.orderkey", defaultSession());

assertTrue(PlanNodeSearcher.searchFrom(plan.getRoot())
.where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinNode.DistributionType.REPLICATED))
.where(node -> node instanceof JoinNode && ((JoinNode) node).getDistributionType().get().equals(JoinDistributionType.REPLICATED))
.findFirst()
.isPresent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.StorageFormat.fromHiveStorageFormat;
import static com.facebook.presto.parquet.ParquetTypeUtils.pushdownColumnNameForSubfield;
import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.sql.analyzer.TypeSignatureProvider.fromTypes;
import static com.facebook.presto.sql.planner.assertions.MatchResult.NO_MATCH;
import static com.facebook.presto.sql.planner.assertions.MatchResult.match;
Expand All @@ -133,7 +134,6 @@
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.GATHER;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.collect.ImmutableList.toImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import static com.facebook.presto.hive.TestHiveLogicalPlanner.replicateHiveMetastore;
import static com.facebook.presto.hive.TestHiveLogicalPlanner.utf8Slices;
import static com.facebook.presto.spi.plan.AggregationNode.Step.SINGLE;
import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.spi.plan.JoinType.LEFT;
import static com.facebook.presto.sql.analyzer.FeaturesConfig.JoinReorderingStrategy.ELIMINATE_CROSS_JOINS;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.aggregation;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
Expand All @@ -71,8 +73,6 @@
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.singleGroupingSet;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.unnest;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.values;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
import static com.facebook.presto.testing.TestingAccessControlManager.TestingPrivilegeType.INSERT_TABLE;
import static com.facebook.presto.testing.TestingAccessControlManager.TestingPrivilegeType.SELECT_COLUMN;
import static com.facebook.presto.testing.TestingAccessControlManager.privilege;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
package com.facebook.presto.hive;

import com.facebook.presto.Session;
import com.facebook.presto.spi.plan.JoinType;
import com.facebook.presto.sql.planner.assertions.PlanMatchPattern;
import com.facebook.presto.sql.planner.plan.JoinNode.Type;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.google.common.collect.ImmutableList;
Expand All @@ -29,15 +29,15 @@
import static com.facebook.presto.SystemSessionProperties.PREFER_MERGE_JOIN_FOR_SORTED_INPUTS;
import static com.facebook.presto.hive.HiveQueryRunner.HIVE_CATALOG;
import static com.facebook.presto.hive.HiveSessionProperties.ORDER_BASED_EXECUTION_ENABLED;
import static com.facebook.presto.spi.plan.JoinDistributionType.PARTITIONED;
import static com.facebook.presto.spi.plan.JoinType.FULL;
import static com.facebook.presto.spi.plan.JoinType.INNER;
import static com.facebook.presto.spi.plan.JoinType.LEFT;
import static com.facebook.presto.spi.plan.JoinType.RIGHT;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.anyTree;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.equiJoinClause;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.join;
import static com.facebook.presto.sql.planner.assertions.PlanMatchPattern.mergeJoin;
import static com.facebook.presto.sql.planner.plan.JoinNode.DistributionType.PARTITIONED;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.FULL;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.INNER;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.LEFT;
import static com.facebook.presto.sql.planner.plan.JoinNode.Type.RIGHT;
import static io.airlift.tpch.TpchTable.CUSTOMER;
import static io.airlift.tpch.TpchTable.LINE_ITEM;
import static io.airlift.tpch.TpchTable.NATION;
Expand Down Expand Up @@ -316,7 +316,7 @@ private Session mergeJoinEnabled()
.build();
}

private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List<String> leftJoinKeys, List<String> rightJoinKeys, Type joinType, boolean mergeJoinEnabled)
private PlanMatchPattern joinPlan(String leftTableName, String rightTableName, List<String> leftJoinKeys, List<String> rightJoinKeys, JoinType joinType, boolean mergeJoinEnabled)
{
int suffix1 = 0;
int suffix2 = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,11 @@ public static ColumnIdentity primitiveColumnIdentity(int id, String name)

public static ColumnIdentity createColumnIdentity(Types.NestedField column)
{
int id = column.fieldId();
String name = column.name();
org.apache.iceberg.types.Type fieldType = column.type();
return createColumnIdentity(column.name(), column.fieldId(), column.type());
}

public static ColumnIdentity createColumnIdentity(String name, int id, org.apache.iceberg.types.Type fieldType)
{
if (fieldType.equals(Types.TimestampType.withZone())) {
throw new UnsupportedOperationException(format("Iceberg column type %s is not supported", fieldType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,22 @@
import static com.facebook.presto.hive.MetadataUtils.getPredicate;
import static com.facebook.presto.hive.MetadataUtils.getSubfieldPredicate;
import static com.facebook.presto.iceberg.ExpressionConverter.toIcebergExpression;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_METADATA;
import static com.facebook.presto.iceberg.IcebergColumnHandle.primitiveIcebergColumnHandle;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.DATA_SEQUENCE_NUMBER;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.FILE_PATH;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isPushdownFilterEnabled;
import static com.facebook.presto.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergTableProperties.LOCATION_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.DATA;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getColumns;
import static com.facebook.presto.iceberg.IcebergUtil.getFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeyColumnHandles;
Expand All @@ -138,7 +145,6 @@
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;

public abstract class IcebergAbstractMetadata
implements ConnectorMetadata
Expand Down Expand Up @@ -334,11 +340,16 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
protected ConnectorTableMetadata getTableMetadata(ConnectorSession session, SchemaTableName table, IcebergTableName icebergTableName)
{
Table icebergTable = getIcebergTable(session, new SchemaTableName(table.getSchemaName(), icebergTableName.getTableName()));
List<ColumnMetadata> columns = getColumnMetadatas(icebergTable);
ImmutableList.Builder<ColumnMetadata> columns = ImmutableList.builder();
columns.addAll(getColumnMetadatas(icebergTable));
if (icebergTableName.getTableType() == CHANGELOG) {
return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns);
return ChangelogUtil.getChangelogTableMeta(table, typeManager, columns.build());
}
return new ConnectorTableMetadata(table, columns, createMetadataProperties(icebergTable), getTableComment(icebergTable));
else {
columns.add(PATH_COLUMN_METADATA);
columns.add(DATA_SEQUENCE_NUMBER_COLUMN_METADATA);
}
return new ConnectorTableMetadata(table, columns.build(), createMetadataProperties(icebergTable), getTableComment(icebergTable));
}

@Override
Expand Down Expand Up @@ -632,8 +643,16 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
else {
schema = icebergTable.schema();
}
return getColumns(schema, icebergTable.spec(), typeManager).stream()
.collect(toImmutableMap(IcebergColumnHandle::getName, identity()));

ImmutableMap.Builder<String, ColumnHandle> columnHandles = ImmutableMap.builder();
for (IcebergColumnHandle columnHandle : getColumns(schema, icebergTable.spec(), typeManager)) {
columnHandles.put(columnHandle.getName(), columnHandle);
}
if (table.getIcebergTableName().getTableType() != CHANGELOG) {
columnHandles.put(FILE_PATH.getColumnName(), PATH_COLUMN_HANDLE);
columnHandles.put(DATA_SEQUENCE_NUMBER.getColumnName(), DATA_SEQUENCE_NUMBER_COLUMN_HANDLE);
}
return columnHandles.build();
}

@Override
Expand All @@ -654,7 +673,7 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName, Optional<ConnectorTableVersion> tableVersion)
{
IcebergTableName name = IcebergTableName.from(tableName.getTableName());
verify(name.getTableType() == DATA || name.getTableType() == CHANGELOG, "Wrong table type: " + name.getTableType());
verify(name.getTableType() == DATA || name.getTableType() == CHANGELOG || name.getTableType() == EQUALITY_DELETES, "Wrong table type: " + name.getTableType());

if (!tableExists(session, tableName)) {
return null;
Expand All @@ -680,7 +699,9 @@ public IcebergTableHandle getTableHandle(ConnectorSession session, SchemaTableNa
new IcebergTableName(name.getTableName(), name.getTableType(), tableSnapshotId, name.getChangelogEndSnapshot()),
name.getSnapshotId().isPresent(),
TupleDomain.all(),
tableSchemaJson);
tableSchemaJson,
Optional.empty(),
Optional.empty());
}

@Override
Expand Down
Loading