diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java index 0a2bc13de0d99..e8baf9ea5f5d1 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitManager.java @@ -45,6 +45,8 @@ import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG; import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES; import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable; +import static com.facebook.presto.iceberg.IcebergUtil.getMetadataColumnConstraints; +import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints; import static java.util.Objects.requireNonNull; public class IcebergSplitManager @@ -81,7 +83,8 @@ public ConnectorSplitSource getSplits( return new FixedSplitSource(ImmutableList.of()); } - TupleDomain predicate = layoutHandle.getValidPredicate(); + TupleDomain predicate = getNonMetadataColumnConstraints(layoutHandle + .getValidPredicate()); Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName()); if (table.getIcebergTableName().getTableType() == CHANGELOG) { @@ -114,7 +117,8 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) { session, tableScan, TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()), - getMinimumAssignedSplitWeight(session)); + getMinimumAssignedSplitWeight(session), + getMetadataColumnConstraints(layoutHandle.getValidPredicate())); return splitSource; } } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java index 2f260950d59e7..d6e97230ae6fd 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergSplitSource.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.iceberg; +import com.facebook.presto.common.predicate.TupleDomain; import com.facebook.presto.iceberg.delete.DeleteFile; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; @@ -40,6 +41,7 @@ import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat; import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber; import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys; +import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates; import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Iterators.limit; @@ -56,16 +58,20 @@ public class IcebergSplitSource private final double minimumAssignedSplitWeight; private final ConnectorSession session; + private final TupleDomain metadataColumnConstraints; + public IcebergSplitSource( ConnectorSession session, TableScan tableScan, CloseableIterable fileScanTaskIterable, - double minimumAssignedSplitWeight) + double minimumAssignedSplitWeight, + TupleDomain metadataColumnConstraints) { this.session = requireNonNull(session, "session is null"); this.tableScan = requireNonNull(tableScan, "tableScan is null"); this.fileScanTaskIterator = fileScanTaskIterable.iterator(); this.minimumAssignedSplitWeight = minimumAssignedSplitWeight; + this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null"); closer.register(fileScanTaskIterator); } @@ -77,7 +83,10 @@ public CompletableFuture getNextBatch(ConnectorPartitionHan Iterator iterator = limit(fileScanTaskIterator, maxSize); while (iterator.hasNext()) { FileScanTask task = iterator.next(); - splits.add(toIcebergSplit(task)); + IcebergSplit icebergSplit = (IcebergSplit) toIcebergSplit(task); + if (metadataColumnsMatchPredicates(metadataColumnConstraints, icebergSplit.getPath(), icebergSplit.getDataSequenceNumber())) { + splits.add(icebergSplit); + } } return completedFuture(new ConnectorSplitBatch(splits, isFinished())); } diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java index b079c89f14f33..03617b265a5d0 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergUtil.java @@ -127,10 +127,13 @@ import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES; import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent; import static com.facebook.presto.iceberg.FileFormat.PARQUET; +import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE; +import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID; import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP; +import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY; import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec; import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled; @@ -515,6 +518,38 @@ private static NullableValue parsePartitionValue( return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue); } + // Strip the constraints on metadata columns like "$path", "$data_sequence_number" from the list. + public static TupleDomain getNonMetadataColumnConstraints(TupleDomain allConstraints) + { + return allConstraints.transform(c -> isMetadataColumnId(((IcebergColumnHandle) c).getId()) ? null : (IcebergColumnHandle) c); + } + + public static TupleDomain getMetadataColumnConstraints(TupleDomain allConstraints) + { + return allConstraints.transform(c -> isMetadataColumnId(((IcebergColumnHandle) c).getId()) ? (IcebergColumnHandle) c : null); + } + + public static boolean metadataColumnsMatchPredicates(TupleDomain constraints, String path, long dataSequenceNumber) + { + if (constraints.isAll()) { + return true; + } + + boolean matches = true; + if (constraints.getDomains().isPresent()) { + for (Map.Entry constraint : constraints.getDomains().get().entrySet()) { + if (constraint.getKey() == PATH_COLUMN_HANDLE) { + matches &= constraint.getValue().includesNullableValue(utf8Slice(path)); + } + else if (constraint.getKey() == DATA_SEQUENCE_NUMBER_COLUMN_HANDLE) { + matches &= constraint.getValue().includesNullableValue(dataSequenceNumber); + } + } + } + + return matches; + } + public static List getPartitions( TypeManager typeManager, ConnectorTableHandle tableHandle, @@ -531,7 +566,9 @@ public static List getPartitions( } TableScan tableScan = icebergTable.newScan() - .filter(toIcebergExpression(constraint.getSummary().simplify().transform(IcebergColumnHandle.class::cast))) + .filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint + .getSummary() + .simplify()))) .useSnapshot(snapshotId.get()); Set partitions = new HashSet<>(); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java index 98a0549c8557e..2b9175e74a62b 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/optimizer/IcebergEqualityDeleteAsJoin.java @@ -27,6 +27,7 @@ import com.facebook.presto.iceberg.IcebergTableName; import com.facebook.presto.iceberg.IcebergTableType; import com.facebook.presto.iceberg.IcebergTransactionManager; +import com.facebook.presto.iceberg.IcebergUtil; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPlanOptimizer; import com.facebook.presto.spi.ConnectorPlanRewriter; @@ -180,6 +181,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext context) TupleDomain predicate = icebergTableLayoutHandle .map(IcebergTableLayoutHandle::getValidPredicate) + .map(IcebergUtil::getNonMetadataColumnConstraints) .orElse(TupleDomain.all()); // Collect info about each unique delete schema to join by diff --git a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java index 08ffd944f7fe9..bd36b1bc117ae 100644 --- a/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java +++ b/presto-iceberg/src/test/java/com/facebook/presto/iceberg/IcebergDistributedTestBase.java @@ -1609,6 +1609,61 @@ public void testExpireSnapshotWithDeletedEntries() } } + private void testPathHiddenColumn() + { + assertEquals(computeActual("SELECT \"$path\", * FROM test_hidden_columns").getRowCount(), 2); + + // Fetch one of the file paths and use it in a filter + String filePath = (String) computeActual("SELECT \"$path\" from test_hidden_columns LIMIT 1").getOnlyValue(); + assertEquals( + computeActual(format("SELECT * from test_hidden_columns WHERE \"$path\"='%s'", filePath)).getRowCount(), + 1); + + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", filePath)) + .getOnlyValue(), + 1L); + + // Filter for $path that doesn't exist. + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", "non-existent-path")) + .getOnlyValue(), + 0L); + } + + private void testDataSequenceNumberHiddenColumn() + { + assertEquals(computeActual("SELECT \"$data_sequence_number\", * FROM test_hidden_columns").getRowCount(), 2); + + // Fetch one of the data sequence numbers and use it in a filter + Long dataSequenceNumber = (Long) computeActual("SELECT \"$data_sequence_number\" from test_hidden_columns LIMIT 1").getOnlyValue(); + assertEquals( + computeActual(format("SELECT * from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber)).getRowCount(), + 1); + + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber)) + .getOnlyValue(), + 1L); + + // Filter for $data_sequence_number that doesn't exist. + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", 1000)) + .getOnlyValue(), + 0L); + } + + @Test + public void testHiddenColumns() + { + assertUpdate("DROP TABLE IF EXISTS test_hidden_columns"); + assertUpdate("CREATE TABLE test_hidden_columns AS SELECT * FROM tpch.tiny.region WHERE regionkey=0", 1); + assertUpdate("INSERT INTO test_hidden_columns SELECT * FROM tpch.tiny.region WHERE regionkey=1", 1); + + testPathHiddenColumn(); + testDataSequenceNumberHiddenColumn(); + } + private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List expectedFileContent) { // check delete file list diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp index 78d88e85fd1d7..faa4f93eaf67a 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxConnector.cpp @@ -1354,6 +1354,12 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( deletes.emplace_back(icebergDeleteFile); } + std::unordered_map metadataColumns; + metadataColumns.reserve(1); + metadataColumns.insert( + {"$data_sequence_number", + std::to_string(icebergSplit->dataSequenceNumber)}); + return std::make_unique( catalogId, icebergSplit->path, @@ -1364,7 +1370,8 @@ IcebergPrestoToVeloxConnector::toVeloxSplit( std::nullopt, customSplitInfo, nullptr, - deletes); + deletes, + metadataColumns); } std::unique_ptr diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java index 4c5f805a1b3b4..64d9d9851b839 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java @@ -109,20 +109,19 @@ public static void createLineitem(QueryRunner queryRunner) public static void createLineitem(QueryRunner queryRunner, boolean castDateToVarchar) { - if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "lineitem")) { - String shipDate = castDateToVarchar ? "cast(shipdate as varchar) as shipdate" : "shipdate"; - String commitDate = castDateToVarchar ? "cast(commitdate as varchar) as commitdate" : "commitdate"; - String receiptDate = castDateToVarchar ? "cast(receiptdate as varchar) as receiptdate" : "receiptdate"; - queryRunner.execute("CREATE TABLE lineitem AS " + - "SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " + - " returnflag, linestatus, " + shipDate + ", " + commitDate + ", " + receiptDate + ", " + - " shipinstruct, shipmode, comment, " + - " linestatus = 'O' as is_open, returnflag = 'R' as is_returned, " + - " cast(tax as real) as tax_as_real, cast(discount as real) as discount_as_real, " + - " cast(linenumber as smallint) as linenumber_as_smallint, " + - " cast(linenumber as tinyint) as linenumber_as_tinyint " + - "FROM tpch.tiny.lineitem"); - } + queryRunner.execute("DROP TABLE IF EXISTS lineitem"); + String shipDate = castDateToVarchar ? "cast(shipdate as varchar) as shipdate" : "shipdate"; + String commitDate = castDateToVarchar ? "cast(commitdate as varchar) as commitdate" : "commitdate"; + String receiptDate = castDateToVarchar ? "cast(receiptdate as varchar) as receiptdate" : "receiptdate"; + queryRunner.execute("CREATE TABLE lineitem AS " + + "SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " + + " returnflag, linestatus, " + shipDate + ", " + commitDate + ", " + receiptDate + ", " + + " shipinstruct, shipmode, comment, " + + " linestatus = 'O' as is_open, returnflag = 'R' as is_returned, " + + " cast(tax as real) as tax_as_real, cast(discount as real) as discount_as_real, " + + " cast(linenumber as smallint) as linenumber_as_smallint, " + + " cast(linenumber as tinyint) as linenumber_as_tinyint " + + "FROM tpch.tiny.lineitem"); } public static void createLineitemForIceberg(QueryRunner queryRunner) @@ -143,13 +142,12 @@ public static void createOrders(QueryRunner queryRunner) public static void createOrders(QueryRunner queryRunner, boolean castDateToVarchar) { - if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "orders")) { - String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate"; - queryRunner.execute("CREATE TABLE orders AS " + - "SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " + - " orderpriority, clerk, shippriority, comment " + - "FROM tpch.tiny.orders"); - } + queryRunner.execute("DROP TABLE IF EXISTS orders"); + String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate"; + queryRunner.execute("CREATE TABLE orders AS " + + "SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " + + " orderpriority, clerk, shippriority, comment " + + "FROM tpch.tiny.orders"); } public static void createOrdersEx(QueryRunner queryRunner) diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeIcebergGeneralQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeIcebergGeneralQueries.java new file mode 100644 index 0000000000000..45f6613f41e95 --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeIcebergGeneralQueries.java @@ -0,0 +1,97 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.testing.ExpectedQueryRunner; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import org.testng.annotations.Test; + +import static java.lang.String.format; +import static org.testng.Assert.assertEquals; + +public class TestPrestoNativeIcebergGeneralQueries + extends AbstractTestQueryFramework +{ + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createNativeIcebergQueryRunner(false, true); + } + + @Override + protected ExpectedQueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.createJavaIcebergQueryRunner(true); + } + + @Override + protected void createTables() + { + createTableToTestHiddenColumns(); + } + + private void createTableToTestHiddenColumns() + { + QueryRunner javaQueryRunner = ((QueryRunner) getExpectedQueryRunner()); + if (!javaQueryRunner.tableExists(getSession(), "test_hidden_columns")) { + javaQueryRunner.execute("CREATE TABLE test_hidden_columns AS SELECT * FROM tpch.tiny.region WHERE regionkey=0"); + javaQueryRunner.execute("INSERT INTO test_hidden_columns SELECT * FROM tpch.tiny.region WHERE regionkey=1"); + } + } + + @Test + public void testPathHiddenColumn() + { + assertQuery("SELECT \"$path\", * FROM test_hidden_columns"); + + // Fetch one of the file paths and use it in a filter + String filePath = (String) computeActual("SELECT \"$path\" from test_hidden_columns LIMIT 1").getOnlyValue(); + assertQuery(format("SELECT * from test_hidden_columns WHERE \"$path\"='%s'", filePath)); + + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", filePath)) + .getOnlyValue(), + 1L); + + // Filter for $path that doesn't exist. + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", "non-existent-path")) + .getOnlyValue(), + 0L); + } + + @Test + public void testDataSequenceNumberHiddenColumn() + { + assertQuery("SELECT \"$data_sequence_number\", * FROM test_hidden_columns"); + + // Fetch one of the data sequence numbers and use it in a filter + Long dataSequenceNumber = (Long) computeActual("SELECT \"$data_sequence_number\" from test_hidden_columns LIMIT 1").getOnlyValue(); + assertQuery(format("SELECT * from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber)); + + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber)) + .getOnlyValue(), + 1L); + + // Filter for $data_sequence_number that doesn't exist. + assertEquals( + (Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", 1000)) + .getOnlyValue(), + 0L); + } +}