diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java index a119a2494884..f9f5ecfaa8e4 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSplitManager.java @@ -50,6 +50,8 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static io.airlift.slice.Slices.utf8Slice; +import static io.trino.plugin.deltalake.DeltaLakeColumnHandle.pathColumnHandle; import static io.trino.plugin.deltalake.DeltaLakeMetadata.createStatisticsPredicate; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getMaxInitialSplitSize; @@ -131,6 +133,7 @@ private Stream getSplits( List validDataFiles = metastore.getValidDataFiles(tableHandle.getSchemaTableName(), session); TupleDomain enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint(); TupleDomain nonPartitionConstraint = tableHandle.getNonPartitionConstraint(); + Domain pathDomain = getPathDomain(nonPartitionConstraint); // Delta Lake handles updates and deletes by copying entire data files, minus updates/deletes. Because of this we can only have one Split/UpdatablePageSource // per file. @@ -157,6 +160,11 @@ private Stream getSplits( return Stream.empty(); } + String splitPath = buildSplitPath(tableLocation, addAction); + if (!pathMatchesPredicate(pathDomain, splitPath)) { + return Stream.empty(); + } + if (filesModifiedAfter.isPresent() && addAction.getModificationTime() <= filesModifiedAfter.get().toEpochMilli()) { return Stream.empty(); } @@ -194,7 +202,7 @@ private Stream getSplits( return splitsForFile( session, addAction, - tableLocation, + splitPath, addAction.getCanonicalPartitionValues(), statisticsPredicate, splittable, @@ -215,17 +223,28 @@ public static boolean partitionMatchesPredicate(Map> pa return true; } + private static Domain getPathDomain(TupleDomain effectivePredicate) + { + return effectivePredicate.getDomains() + .flatMap(domains -> Optional.ofNullable(domains.get(pathColumnHandle()))) + .orElse(Domain.all(pathColumnHandle().getType())); + } + + private static boolean pathMatchesPredicate(Domain pathDomain, String path) + { + return pathDomain.includesNullableValue(utf8Slice(path)); + } + private List splitsForFile( ConnectorSession session, AddFileEntry addFileEntry, - String tableLocation, + String splitPath, Map> partitionKeys, TupleDomain statisticsPredicate, boolean splittable, AtomicInteger remainingInitialSplits) { long fileSize = addFileEntry.getSize(); - String splitPath = buildSplitPath(tableLocation, addFileEntry); if (!splittable) { // remainingInitialSplits is not used when !splittable diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java index 2ed0132afdd6..2becfdf47756 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeMinioConnectorTest.java @@ -494,6 +494,24 @@ public void testTargetMaxFileSize() } } + @Test + public void testPathColumn() + { + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_path_column", "(x VARCHAR)")) { + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1); + String firstFilePath = (String) computeScalar("SELECT \"$path\" FROM " + table.getName()); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'second'", 1); + String secondFilePath = (String) computeScalar("SELECT \"$path\" FROM " + table.getName() + " WHERE x = 'second'"); + + // Verify predicate correctness on $path column + assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" = '" + firstFilePath + "'", "VALUES 'first'"); + assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" <> '" + firstFilePath + "'", "VALUES 'second'"); + assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" IN ('" + firstFilePath + "', '" + secondFilePath + "')", "VALUES ('first'), ('second')"); + assertQuery("SELECT x FROM " + table.getName() + " WHERE \"$path\" IS NOT NULL", "VALUES ('first'), ('second')"); + assertQueryReturnsEmptyResult("SELECT x FROM " + table.getName() + " WHERE \"$path\" IS NULL"); + } + } + @Override protected String createSchemaSql(String schemaName) {