Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import static com.facebook.presto.iceberg.IcebergTableType.CHANGELOG;
import static com.facebook.presto.iceberg.IcebergTableType.EQUALITY_DELETES;
import static com.facebook.presto.iceberg.IcebergUtil.getIcebergTable;
import static com.facebook.presto.iceberg.IcebergUtil.getMetadataColumnConstraints;
import static com.facebook.presto.iceberg.IcebergUtil.getNonMetadataColumnConstraints;
import static java.util.Objects.requireNonNull;

public class IcebergSplitManager
Expand Down Expand Up @@ -81,7 +83,8 @@ public ConnectorSplitSource getSplits(
return new FixedSplitSource(ImmutableList.of());
}

TupleDomain<IcebergColumnHandle> predicate = layoutHandle.getValidPredicate();
TupleDomain<IcebergColumnHandle> predicate = getNonMetadataColumnConstraints(layoutHandle
.getValidPredicate());
Table icebergTable = getIcebergTable(transactionManager.get(transaction), session, table.getSchemaTableName());

if (table.getIcebergTableName().getTableType() == CHANGELOG) {
Expand Down Expand Up @@ -114,7 +117,8 @@ else if (table.getIcebergTableName().getTableType() == EQUALITY_DELETES) {
session,
tableScan,
TableScanUtil.splitFiles(tableScan.planFiles(), tableScan.targetSplitSize()),
getMinimumAssignedSplitWeight(session));
getMinimumAssignedSplitWeight(session),
getMetadataColumnConstraints(layoutHandle.getValidPredicate()));
return splitSource;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.iceberg;

import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.iceberg.delete.DeleteFile;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
Expand Down Expand Up @@ -40,6 +41,7 @@
import static com.facebook.presto.iceberg.FileFormat.fromIcebergFileFormat;
import static com.facebook.presto.iceberg.IcebergUtil.getDataSequenceNumber;
import static com.facebook.presto.iceberg.IcebergUtil.getPartitionKeys;
import static com.facebook.presto.iceberg.IcebergUtil.metadataColumnsMatchPredicates;
import static com.facebook.presto.iceberg.IcebergUtil.partitionDataFromStructLike;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Iterators.limit;
Expand All @@ -56,16 +58,20 @@ public class IcebergSplitSource
private final double minimumAssignedSplitWeight;
private final ConnectorSession session;

private final TupleDomain<IcebergColumnHandle> metadataColumnConstraints;

public IcebergSplitSource(
ConnectorSession session,
TableScan tableScan,
CloseableIterable<FileScanTask> fileScanTaskIterable,
double minimumAssignedSplitWeight)
double minimumAssignedSplitWeight,
TupleDomain<IcebergColumnHandle> metadataColumnConstraints)
{
this.session = requireNonNull(session, "session is null");
this.tableScan = requireNonNull(tableScan, "tableScan is null");
this.fileScanTaskIterator = fileScanTaskIterable.iterator();
this.minimumAssignedSplitWeight = minimumAssignedSplitWeight;
this.metadataColumnConstraints = requireNonNull(metadataColumnConstraints, "metadataColumnConstraints is null");
closer.register(fileScanTaskIterator);
}

Expand All @@ -77,7 +83,10 @@ public CompletableFuture<ConnectorSplitBatch> getNextBatch(ConnectorPartitionHan
Iterator<FileScanTask> iterator = limit(fileScanTaskIterator, maxSize);
while (iterator.hasNext()) {
FileScanTask task = iterator.next();
splits.add(toIcebergSplit(task));
IcebergSplit icebergSplit = (IcebergSplit) toIcebergSplit(task);
if (metadataColumnsMatchPredicates(metadataColumnConstraints, icebergSplit.getPath(), icebergSplit.getDataSequenceNumber())) {
splits.add(icebergSplit);
}
}
return completedFuture(new ConnectorSplitBatch(splits, isFinished()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,13 @@
import static com.facebook.presto.iceberg.FileContent.POSITION_DELETES;
import static com.facebook.presto.iceberg.FileContent.fromIcebergFileContent;
import static com.facebook.presto.iceberg.FileFormat.PARQUET;
import static com.facebook.presto.iceberg.IcebergColumnHandle.DATA_SEQUENCE_NUMBER_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergColumnHandle.PATH_COLUMN_HANDLE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_FORMAT_VERSION;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_PARTITION_VALUE;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_SNAPSHOT_ID;
import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_INVALID_TABLE_TIMESTAMP;
import static com.facebook.presto.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static com.facebook.presto.iceberg.IcebergPartitionType.IDENTITY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.getCompressionCodec;
import static com.facebook.presto.iceberg.IcebergSessionProperties.isMergeOnReadModeEnabled;
Expand Down Expand Up @@ -515,6 +518,38 @@ private static NullableValue parsePartitionValue(
return partitionValue == null ? NullableValue.asNull(prestoType) : NullableValue.of(prestoType, partitionValue);
}

// Strip the constraints on metadata columns like "$path", "$data_sequence_number" from the list.
public static <U> TupleDomain<IcebergColumnHandle> getNonMetadataColumnConstraints(TupleDomain<U> allConstraints)
{
return allConstraints.transform(c -> isMetadataColumnId(((IcebergColumnHandle) c).getId()) ? null : (IcebergColumnHandle) c);
}

public static <U> TupleDomain<IcebergColumnHandle> getMetadataColumnConstraints(TupleDomain<U> allConstraints)
{
return allConstraints.transform(c -> isMetadataColumnId(((IcebergColumnHandle) c).getId()) ? (IcebergColumnHandle) c : null);
}

public static boolean metadataColumnsMatchPredicates(TupleDomain<IcebergColumnHandle> constraints, String path, long dataSequenceNumber)
{
if (constraints.isAll()) {
return true;
}

boolean matches = true;
if (constraints.getDomains().isPresent()) {
for (Map.Entry<IcebergColumnHandle, Domain> constraint : constraints.getDomains().get().entrySet()) {
if (constraint.getKey() == PATH_COLUMN_HANDLE) {
matches &= constraint.getValue().includesNullableValue(utf8Slice(path));
}
else if (constraint.getKey() == DATA_SEQUENCE_NUMBER_COLUMN_HANDLE) {
matches &= constraint.getValue().includesNullableValue(dataSequenceNumber);
}
}
}

return matches;
}

public static List<HivePartition> getPartitions(
TypeManager typeManager,
ConnectorTableHandle tableHandle,
Expand All @@ -531,7 +566,9 @@ public static List<HivePartition> getPartitions(
}

TableScan tableScan = icebergTable.newScan()
.filter(toIcebergExpression(constraint.getSummary().simplify().transform(IcebergColumnHandle.class::cast)))
.filter(toIcebergExpression(getNonMetadataColumnConstraints(constraint
.getSummary()
.simplify())))
.useSnapshot(snapshotId.get());

Set<HivePartition> partitions = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.iceberg.IcebergTableName;
import com.facebook.presto.iceberg.IcebergTableType;
import com.facebook.presto.iceberg.IcebergTransactionManager;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPlanOptimizer;
import com.facebook.presto.spi.ConnectorPlanRewriter;
Expand Down Expand Up @@ -180,6 +181,7 @@ public PlanNode visitTableScan(TableScanNode node, RewriteContext<Void> context)

TupleDomain<IcebergColumnHandle> predicate = icebergTableLayoutHandle
.map(IcebergTableLayoutHandle::getValidPredicate)
.map(IcebergUtil::getNonMetadataColumnConstraints)
.orElse(TupleDomain.all());

// Collect info about each unique delete schema to join by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,61 @@ public void testExpireSnapshotWithDeletedEntries()
}
}

private void testPathHiddenColumn()
{
assertEquals(computeActual("SELECT \"$path\", * FROM test_hidden_columns").getRowCount(), 2);

// Fetch one of the file paths and use it in a filter
String filePath = (String) computeActual("SELECT \"$path\" from test_hidden_columns LIMIT 1").getOnlyValue();
assertEquals(
computeActual(format("SELECT * from test_hidden_columns WHERE \"$path\"='%s'", filePath)).getRowCount(),
1);

assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", filePath))
.getOnlyValue(),
1L);

// Filter for $path that doesn't exist.
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", "non-existent-path"))
.getOnlyValue(),
0L);
}

private void testDataSequenceNumberHiddenColumn()
{
assertEquals(computeActual("SELECT \"$data_sequence_number\", * FROM test_hidden_columns").getRowCount(), 2);

// Fetch one of the data sequence numbers and use it in a filter
Long dataSequenceNumber = (Long) computeActual("SELECT \"$data_sequence_number\" from test_hidden_columns LIMIT 1").getOnlyValue();
assertEquals(
computeActual(format("SELECT * from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber)).getRowCount(),
1);

assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber))
.getOnlyValue(),
1L);

// Filter for $data_sequence_number that doesn't exist.
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", 1000))
.getOnlyValue(),
0L);
}

@Test
public void testHiddenColumns()
{
assertUpdate("DROP TABLE IF EXISTS test_hidden_columns");
assertUpdate("CREATE TABLE test_hidden_columns AS SELECT * FROM tpch.tiny.region WHERE regionkey=0", 1);
assertUpdate("INSERT INTO test_hidden_columns SELECT * FROM tpch.tiny.region WHERE regionkey=1", 1);

testPathHiddenColumn();
testDataSequenceNumberHiddenColumn();
}

private void testCheckDeleteFiles(Table icebergTable, int expectedSize, List<FileContent> expectedFileContent)
{
// check delete file list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1354,6 +1354,12 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
deletes.emplace_back(icebergDeleteFile);
}

std::unordered_map<std::string, std::string> metadataColumns;
metadataColumns.reserve(1);
metadataColumns.insert(
{"$data_sequence_number",
std::to_string(icebergSplit->dataSequenceNumber)});

return std::make_unique<connector::hive::iceberg::HiveIcebergSplit>(
catalogId,
icebergSplit->path,
Expand All @@ -1364,7 +1370,8 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
std::nullopt,
customSplitInfo,
nullptr,
deletes);
deletes,
metadataColumns);
}

std::unique_ptr<velox::connector::ColumnHandle>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,19 @@ public static void createLineitem(QueryRunner queryRunner)

public static void createLineitem(QueryRunner queryRunner, boolean castDateToVarchar)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "lineitem")) {
String shipDate = castDateToVarchar ? "cast(shipdate as varchar) as shipdate" : "shipdate";
String commitDate = castDateToVarchar ? "cast(commitdate as varchar) as commitdate" : "commitdate";
String receiptDate = castDateToVarchar ? "cast(receiptdate as varchar) as receiptdate" : "receiptdate";
queryRunner.execute("CREATE TABLE lineitem AS " +
"SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " +
" returnflag, linestatus, " + shipDate + ", " + commitDate + ", " + receiptDate + ", " +
" shipinstruct, shipmode, comment, " +
" linestatus = 'O' as is_open, returnflag = 'R' as is_returned, " +
" cast(tax as real) as tax_as_real, cast(discount as real) as discount_as_real, " +
" cast(linenumber as smallint) as linenumber_as_smallint, " +
" cast(linenumber as tinyint) as linenumber_as_tinyint " +
"FROM tpch.tiny.lineitem");
}
queryRunner.execute("DROP TABLE IF EXISTS lineitem");
String shipDate = castDateToVarchar ? "cast(shipdate as varchar) as shipdate" : "shipdate";
String commitDate = castDateToVarchar ? "cast(commitdate as varchar) as commitdate" : "commitdate";
String receiptDate = castDateToVarchar ? "cast(receiptdate as varchar) as receiptdate" : "receiptdate";
queryRunner.execute("CREATE TABLE lineitem AS " +
"SELECT orderkey, partkey, suppkey, linenumber, quantity, extendedprice, discount, tax, " +
" returnflag, linestatus, " + shipDate + ", " + commitDate + ", " + receiptDate + ", " +
" shipinstruct, shipmode, comment, " +
" linestatus = 'O' as is_open, returnflag = 'R' as is_returned, " +
" cast(tax as real) as tax_as_real, cast(discount as real) as discount_as_real, " +
" cast(linenumber as smallint) as linenumber_as_smallint, " +
" cast(linenumber as tinyint) as linenumber_as_tinyint " +
"FROM tpch.tiny.lineitem");
}

public static void createLineitemForIceberg(QueryRunner queryRunner)
Expand All @@ -143,13 +142,12 @@ public static void createOrders(QueryRunner queryRunner)

public static void createOrders(QueryRunner queryRunner, boolean castDateToVarchar)
{
if (!queryRunner.tableExists(queryRunner.getDefaultSession(), "orders")) {
String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate";
queryRunner.execute("CREATE TABLE orders AS " +
"SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " +
" orderpriority, clerk, shippriority, comment " +
"FROM tpch.tiny.orders");
}
queryRunner.execute("DROP TABLE IF EXISTS orders");
String orderDate = castDateToVarchar ? "cast(orderdate as varchar) as orderdate" : "orderdate";
queryRunner.execute("CREATE TABLE orders AS " +
"SELECT orderkey, custkey, orderstatus, totalprice, " + orderDate + ", " +
" orderpriority, clerk, shippriority, comment " +
"FROM tpch.tiny.orders");
}

public static void createOrdersEx(QueryRunner queryRunner)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.nativeworker;

import com.facebook.presto.testing.ExpectedQueryRunner;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import org.testng.annotations.Test;

import static java.lang.String.format;
import static org.testng.Assert.assertEquals;

public class TestPrestoNativeIcebergGeneralQueries
extends AbstractTestQueryFramework
{
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.createNativeIcebergQueryRunner(false, true);
}

@Override
protected ExpectedQueryRunner createExpectedQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.createJavaIcebergQueryRunner(true);
}

@Override
protected void createTables()
{
createTableToTestHiddenColumns();
}

private void createTableToTestHiddenColumns()
{
QueryRunner javaQueryRunner = ((QueryRunner) getExpectedQueryRunner());
if (!javaQueryRunner.tableExists(getSession(), "test_hidden_columns")) {
javaQueryRunner.execute("CREATE TABLE test_hidden_columns AS SELECT * FROM tpch.tiny.region WHERE regionkey=0");
javaQueryRunner.execute("INSERT INTO test_hidden_columns SELECT * FROM tpch.tiny.region WHERE regionkey=1");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test with filter not matching? For both tests

}
}

@Test
public void testPathHiddenColumn()
Comment thread
hantangwangd marked this conversation as resolved.
{
assertQuery("SELECT \"$path\", * FROM test_hidden_columns");

// Fetch one of the file paths and use it in a filter
String filePath = (String) computeActual("SELECT \"$path\" from test_hidden_columns LIMIT 1").getOnlyValue();
assertQuery(format("SELECT * from test_hidden_columns WHERE \"$path\"='%s'", filePath));

assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", filePath))
.getOnlyValue(),
1L);

// Filter for $path that doesn't exist.
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$path\"='%s'", "non-existent-path"))
.getOnlyValue(),
0L);
}

@Test
public void testDataSequenceNumberHiddenColumn()
{
assertQuery("SELECT \"$data_sequence_number\", * FROM test_hidden_columns");

// Fetch one of the data sequence numbers and use it in a filter
Long dataSequenceNumber = (Long) computeActual("SELECT \"$data_sequence_number\" from test_hidden_columns LIMIT 1").getOnlyValue();
assertQuery(format("SELECT * from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber));

assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", dataSequenceNumber))
.getOnlyValue(),
1L);

// Filter for $data_sequence_number that doesn't exist.
assertEquals(
(Long) computeActual(format("SELECT count(*) from test_hidden_columns WHERE \"$data_sequence_number\"=%d", 1000))
.getOnlyValue(),
0L);
}
}