Skip to content

Conversation

@szehon-ho
Copy link
Member

@szehon-ho szehon-ho commented Apr 26, 2022

The Partitions metadata table filter pushdown logic is always using the current table's partition spec and not the original spec of the manifest file. This would lead to errors if the table has data written to via different partition specs and a filter is applied to partitions table, like

Cannot find field 'data' in struct: struct<>
org.apache.iceberg.exceptions.ValidationException: Cannot find field 'data' in struct: struct<>
	at app//org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:50)
	at app//org.apache.iceberg.expressions.NamedReference.bind(NamedReference.java:46)
	at app//org.apache.iceberg.expressions.NamedReference.bind(NamedReference.java:27)
	at app//org.apache.iceberg.expressions.UnboundPredicate.bind(UnboundPredicate.java:106)
	at app//org.apache.iceberg.expressions.Binder$BindVisitor.predicate(Binder.java:145)
	at app//org.apache.iceberg.expressions.Binder$BindVisitor.predicate(Binder.java:104)
	at app//org.apache.iceberg.expressions.ExpressionVisitors.visit(ExpressionVisitors.java:330)
	at app//org.apache.iceberg.expressions.Binder.bind(Binder.java:62)
	at app//org.apache.iceberg.expressions.ManifestEvaluator.<init>(ManifestEvaluator.java:68)
	at app//org.apache.iceberg.expressions.ManifestEvaluator.forPartitionFilter(ManifestEvaluator.java:63)
	at app//org.apache.iceberg.ManifestGroup.lambda$entries$9(ManifestGroup.java:209)
	at app//com.github.benmanes.caffeine.cache.LocalLoadingCache.lambda$newMappingFunction$2(LocalLoadingCache.java:141)
	at app//com.github.benmanes.caffeine.cache.UnboundedLocalCache.lambda$computeIfAbsent$2(UnboundedLocalCache.java:238)
	at [email protected]/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
	at app//com.github.benmanes.caffeine.cache.UnboundedLocalCache.computeIfAbsent(UnboundedLocalCache.java:234)
	at app//com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
	at app//com.github.benmanes.caffeine.cache.LocalLoadingCache.get(LocalLoadingCache.java:54)
	at app//org.apache.iceberg.ManifestGroup.lambda$entries$10(ManifestGroup.java:222)
	at app//org.apache.iceberg.relocated.com.google.common.collect.Iterators$5.computeNext(Iterators.java:670)
	at app//org.apache.iceberg.relocated.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at app//org.apache.iceberg.relocated.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at app//org.apache.iceberg.relocated.com.google.common.collect.Iterators$5.computeNext(Iterators.java:668)
	at app//org.apache.iceberg.relocated.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at app//org.apache.iceberg.relocated.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at app//org.apache.iceberg.relocated.com.google.common.collect.Iterators$5.computeNext(Iterators.java:668)
	at app//org.apache.iceberg.relocated.com.google.common.collect.AbstractIterator.tryToComputeNext(AbstractIterator.java:146)
	at app//org.apache.iceberg.relocated.com.google.common.collect.AbstractIterator.hasNext(AbstractIterator.java:141)
	at app//org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:46)
	at app//org.apache.iceberg.relocated.com.google.common.collect.TransformedIterator.hasNext(TransformedIterator.java:46)
	at app//org.apache.iceberg.util.ParallelIterable$ParallelIterator.submitNextTask(ParallelIterable.java:130)
	at app//org.apache.iceberg.util.ParallelIterable$ParallelIterator.checkTasks(ParallelIterable.java:118)
	at app//org.apache.iceberg.util.ParallelIterable$ParallelIterator.hasNext(ParallelIterable.java:155)
	at app//org.apache.iceberg.PartitionsTable.partitions(PartitionsTable.java:106)
	at app//org.apache.iceberg.PartitionsTable.task(PartitionsTable.java:77)
	at app//org.apache.iceberg.PartitionsTable.access$400(PartitionsTable.java:36)
	at app//org.apache.iceberg.PartitionsTable$PartitionsScan.lambda$new$0(PartitionsTable.java:187)
	at app//org.apache.iceberg.StaticTableScan.doPlanFiles(StaticTableScan.java:47)
	at app//org.apache.iceberg.BaseTableScan.planFiles(BaseTableScan.java:195)
	at app//org.apache.iceberg.spark.source.SparkBatchQueryScan.files(SparkBatchQueryScan.java:114)
	at app//org.apache.iceberg.spark.source.SparkBatchQueryScan.tasks(SparkBatchQueryScan.java:128)
	at app//org.apache.iceberg.spark.source.SparkScan.toBatch(SparkScan.java:108)

This pr fixes this issue by making a cache of specs to ManifestEvaluators, and using it in the filtering. This fix is similar to #4520.

@szehon-ho szehon-ho changed the title Core: Fix Partitions table Filtering for Evolved Partition Specs Core: Fix Partitions table filtering for evolved partition specs Apr 26, 2022
Comment on lines 650 to 653
if (formatVersion == 2) {
table.newRowDelta().addDeletes(delete10).commit();
table.newRowDelta().addDeletes(delete11).commit();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Since this test has an assumption that the format is v2, is the if condition needed / does it provide any benefit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, removed.

@szehon-ho
Copy link
Member Author

szehon-ho commented Apr 28, 2022

@RussellSpitzer @aokolnychyi @kbendick can you guys take a look if you have time? Thanks

}

@Test
public void testPartitionSpecEvolutionAdditiveV1() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the separate tests for the version here?

Seems like the only difference is the Asserts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Difference was the way to make PartitionKey. Combined the test using different formatVersion handling for this part and the asserts.

validateIncludesPartitionScan(tasksAndEq, 0);
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello there

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added back the space (it was an inconsistent 2 space between the tests)

public void testPartitionTableFilterAddRemoveFields() throws ParseException {
// Create un-partitioned table
sql("CREATE TABLE %s (id bigint NOT NULL, category string, data string) USING iceberg " +
"TBLPROPERTIES ('commit.manifest-merge.enabled' 'false')", tableName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the Manfiests-merge important here?

Copy link
Member Author

@szehon-ho szehon-ho Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope , removed them, good catch (it mattered more in manifests table test)

Expression partitionFilter = Projections
.inclusive(transformSpec(scan.schema(), table.spec()), caseSensitive)
.project(scan.filter());
LoadingCache<Integer, ManifestEvaluator> evalCache = Caffeine.newBuilder().build(specId -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need the full LoadingCache here, but I'm ok with it if you like, we could probably just proactively build the full set of evaluators for all specs in the metadata. I probably should have suggested this before on the other PR as well. Since we know we will need every single evaluator

Copy link
Member Author

@szehon-ho szehon-ho Apr 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed, at this point we keep all partition specs so this will save a cycle if we have some spec without any manifests.

@szehon-ho szehon-ho force-pushed the partition_table_filter_evolving_spec branch from 753129f to 0b926e9 Compare April 28, 2022 23:49
.build();

table.newFastAppend().appendFile(data10).commit();
table.newFastAppend().appendFile(data11).commit();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need two commits here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea , it messes up the test a little bit as it combines into one manifest (as the test is a bit low level and depends on how many manifestReadTask get spawned)

} else {
// V2 drops the partition field so it is not used the planning, though data is still filtered out later
// 1 original data/delete files written by old spec, plus both of new data file/delete file written by new spec
Assert.assertEquals(3, Iterables.size(tasks));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand this, I would have thought the filter would be used with the correct column and give us the same result as the v1 table?

Copy link
Member Author

@szehon-ho szehon-ho May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to clarify the comment, can you see if it makes sense?

It's a bit confusing, but the background is, this occurs when trying to query with filter on a dropped partition field (data). The correct behavior, because this is a partition table, is that only partitions of old spec are returned and partitions of new spec without data should not be returned.

In V1, new files are written with void transform for the dropped field (data=null), so the predicate pushdown can filter them out early.

In V2 new files do not write any values for data, so predicate pushdown cannot filter them out early.

However, they are filtered out later by Spark data filtering, because the partition values are normalized to the Partioning,partitionType (union of all specs), and old field "data" is filled in as 'null' when returning to Spark. (That was done in #4560).

This is shown in the new test added added in TestMetadataTablesWithPartitionEvolution

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just wondering if we need some kind of special filter, if you have a predicate on a column not present in the spec just return cannot match

Copy link
Member Author

@szehon-ho szehon-ho May 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea i was thinking about it, but as we rely on existing ManifestEvaluator , it seems a bit heavy to implement a ManifestEvaluator only for this case (improving the perf for querying dropped partition fields in a metadata table), and also its a bit risky ( if we cannot definitively say a partition value matches or not, I feel safer not filtering), as there's bugs in the past : #4520

@szehon-ho szehon-ho force-pushed the partition_table_filter_evolving_spec branch from 8d02e1e to 87f50c6 Compare May 4, 2022 18:41
Copy link
Member

@RussellSpitzer RussellSpitzer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, We can push off questions I have about partition pruning optimization for later

@szehon-ho szehon-ho merged commit 4ae2002 into apache:master May 4, 2022
@szehon-ho
Copy link
Member Author

Thanks @kbendick and @RussellSpitzer for the review

sunchao pushed a commit to sunchao/iceberg that referenced this pull request May 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants