-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Spark 3.3, 3.4: use a deterministic where condition to make rewrite_data_files… #6760
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
22b8722
f579cc7
6da89af
8e09dac
f67abe2
a27bf48
1aa2838
f31e4e3
304e52a
99f91c8
54ebdc5
38145a6
ed35030
24bdd88
4198b5f
e98a6e0
b8770b1
22f0e83
5570ed0
6d5b660
7d446e6
47a8359
e865f06
dc45f3d
9a5d87d
24aa16a
a298b97
26c8aec
d433101
5faac1e
4a026d3
b600d11
5bbb179
31588fe
0a4d2c2
2451995
5fc3614
fa865e8
3b0c395
bf413e1
0408626
e69ebdb
c006716
d500fe4
02e2f76
63dfe2f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -245,6 +245,28 @@ public void testRewriteDataFilesWithFilter() { | |
| assertEquals("Data after compaction should not change", expectedRecords, actualRecords); | ||
| } | ||
|
|
||
| @Test | ||
| public void testRewriteDataFilesWithFalseFilter() { | ||
| createTable(); | ||
| List<Object[]> expectedRecords = currentData(); | ||
| // select only 0 files for compaction | ||
| List<Object[]> output = | ||
| sql( | ||
| "CALL %s.system.rewrite_data_files(table => '%s', where => '0=1')", | ||
| catalogName, tableIdent); | ||
| assertEquals( | ||
| "Action should rewrite 0 data files and add 0 data files", | ||
| row(0, 0), | ||
| Arrays.copyOf(output.get(0), 2)); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we get output=0,0,0? Can we just assert all 3 values instead of first two in this case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think it is due to the first two values are of type of integer and the last one is of type of long.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You mean the assert fails? How about row(0,0,0L?)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // verify rewritten bytes separately | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems no need for this comment, as we don't assert for bytes.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. let me fix it. |
||
| assertThat(output.get(0)).hasSize(3); | ||
| assertThat(output.get(0)[2]) | ||
| .isInstanceOf(Long.class) | ||
| .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); | ||
| List<Object[]> actualRecords = currentData(); | ||
| assertEquals("Data after compaction should not change", expectedRecords, actualRecords); | ||
| } | ||
|
|
||
| @Test | ||
| public void testRewriteDataFilesWithFilterOnPartitionTable() { | ||
| createPartitionTable(); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -37,14 +37,15 @@ object SparkExpressionConverter { | |
| } | ||
|
|
||
| @throws[AnalysisException] | ||
|
ludlows marked this conversation as resolved.
|
||
| def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = { | ||
| def collectResolvedSparkExpressionOption(session: SparkSession, | ||
| tableName: String, where: String): Option[Expression] = { | ||
| val tableAttrs = session.table(tableName).queryExecution.analyzed.output | ||
| val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where) | ||
| val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs)) | ||
| val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan | ||
| optimizedLogicalPlan.collectFirst { | ||
| case filter: Filter => filter.condition | ||
| }.getOrElse(throw new AnalysisException("Failed to find filter expression")) | ||
| case filter: Filter => Some(filter.condition) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be fair to assume we would get back an empty local table scan back if the condition is evaluated to false? If so, what about modifying the logic in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume we should have 3 branches:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1st -> we have a real filter
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @aokolnychyi , thanks for your suggestions.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ludlows, I think the underlying problem we are trying to solve is that the logic in the existing I believe we only check this now: I think it should be something like this instead:
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think @aokolnychyi comment is the right way: We just need to modify the method : collectResolvedIcebergExpression , and add those extra scala pattern-matching that Anton showed. Then, no need to modify the outside method. The convert will automatically convert those Spark true/false to Iceberg true/false.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @ludlows , actually taking a look at this, why didn't we try just using @aokolnychyi 's code suggestion directly? It looks like it should work. ConstantFolding rule will probably get rid of the filter and make this match the second case (DummyRelation). And we have added test case for alwaysTrue and alwaysFalse to catch if we miss something.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hi @szehon-ho, yes. @aokolnychyi and you are right. now the current version is using this method to distinguish alwaysTrue, alwaysFalse and undetermined. thanks for your explanation. as I remember, I didn't use this method previously since I didn't understand the behavior of |
||
| }.getOrElse(Option.empty) | ||
| } | ||
|
|
||
| case class DummyRelation(output: Seq[Attribute]) extends LeafNode | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: select no files..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. let me fix it.