-
Notifications
You must be signed in to change notification settings - Fork 3k
Push down partition filter to Spark when Importing File Based Tables #3745
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| .sameType(DataTypes.CalendarIntervalType)) { | ||
| filterExpressions.add(new EqualTo(ref, | ||
| org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(), | ||
| DataTypes.CalendarIntervalType))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only tested dataType Integer and String by using the existing test suite TestAddFilesProcedure. If this PR looks OK, I will also test all the other data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this large if-else-if chain, you might want to look into this lookup-map pattern used here:
iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
Lines 80 to 220 in 466073b
| private static final Map<Class<? extends Filter>, Operation> FILTERS = ImmutableMap | |
| .<Class<? extends Filter>, Operation>builder() | |
| .put(AlwaysTrue.class, Operation.TRUE) | |
| .put(AlwaysTrue$.class, Operation.TRUE) | |
| .put(AlwaysFalse$.class, Operation.FALSE) | |
| .put(AlwaysFalse.class, Operation.FALSE) | |
| .put(EqualTo.class, Operation.EQ) | |
| .put(EqualNullSafe.class, Operation.EQ) | |
| .put(GreaterThan.class, Operation.GT) | |
| .put(GreaterThanOrEqual.class, Operation.GT_EQ) | |
| .put(LessThan.class, Operation.LT) | |
| .put(LessThanOrEqual.class, Operation.LT_EQ) | |
| .put(In.class, Operation.IN) | |
| .put(IsNull.class, Operation.IS_NULL) | |
| .put(IsNotNull.class, Operation.NOT_NULL) | |
| .put(And.class, Operation.AND) | |
| .put(Or.class, Operation.OR) | |
| .put(Not.class, Operation.NOT) | |
| .put(StringStartsWith.class, Operation.STARTS_WITH) | |
| .build(); | |
| public static Expression convert(Filter[] filters) { | |
| Expression expression = Expressions.alwaysTrue(); | |
| for (Filter filter : filters) { | |
| Expression converted = convert(filter); | |
| Preconditions.checkArgument(converted != null, "Cannot convert filter to Iceberg: %s", filter); | |
| expression = Expressions.and(expression, converted); | |
| } | |
| return expression; | |
| } | |
| public static Expression convert(Filter filter) { | |
| // avoid using a chain of if instanceof statements by mapping to the expression enum. | |
| Operation op = FILTERS.get(filter.getClass()); | |
| if (op != null) { | |
| switch (op) { | |
| case TRUE: | |
| return Expressions.alwaysTrue(); | |
| case FALSE: | |
| return Expressions.alwaysFalse(); | |
| case IS_NULL: | |
| IsNull isNullFilter = (IsNull) filter; | |
| return isNull(unquote(isNullFilter.attribute())); | |
| case NOT_NULL: | |
| IsNotNull notNullFilter = (IsNotNull) filter; | |
| return notNull(unquote(notNullFilter.attribute())); | |
| case LT: | |
| LessThan lt = (LessThan) filter; | |
| return lessThan(unquote(lt.attribute()), convertLiteral(lt.value())); | |
| case LT_EQ: | |
| LessThanOrEqual ltEq = (LessThanOrEqual) filter; | |
| return lessThanOrEqual(unquote(ltEq.attribute()), convertLiteral(ltEq.value())); | |
| case GT: | |
| GreaterThan gt = (GreaterThan) filter; | |
| return greaterThan(unquote(gt.attribute()), convertLiteral(gt.value())); | |
| case GT_EQ: | |
| GreaterThanOrEqual gtEq = (GreaterThanOrEqual) filter; | |
| return greaterThanOrEqual(unquote(gtEq.attribute()), convertLiteral(gtEq.value())); | |
| case EQ: // used for both eq and null-safe-eq | |
| if (filter instanceof EqualTo) { | |
| EqualTo eq = (EqualTo) filter; | |
| // comparison with null in normal equality is always null. this is probably a mistake. | |
| Preconditions.checkNotNull(eq.value(), | |
| "Expression is always false (eq is not null-safe): %s", filter); | |
| return handleEqual(unquote(eq.attribute()), eq.value()); | |
| } else { | |
| EqualNullSafe eq = (EqualNullSafe) filter; | |
| if (eq.value() == null) { | |
| return isNull(unquote(eq.attribute())); | |
| } else { | |
| return handleEqual(unquote(eq.attribute()), eq.value()); | |
| } | |
| } | |
| case IN: | |
| In inFilter = (In) filter; | |
| return in(unquote(inFilter.attribute()), | |
| Stream.of(inFilter.values()) | |
| .filter(Objects::nonNull) | |
| .map(SparkFilters::convertLiteral) | |
| .collect(Collectors.toList())); | |
| case NOT: | |
| Not notFilter = (Not) filter; | |
| Filter childFilter = notFilter.child(); | |
| Operation childOp = FILTERS.get(childFilter.getClass()); | |
| if (childOp == Operation.IN) { | |
| // infer an extra notNull predicate for Spark NOT IN filters | |
| // as Iceberg expressions don't follow the 3-value SQL boolean logic | |
| // col NOT IN (1, 2) in Spark is equivalent to notNull(col) && notIn(col, 1, 2) in Iceberg | |
| In childInFilter = (In) childFilter; | |
| Expression notIn = notIn(unquote(childInFilter.attribute()), | |
| Stream.of(childInFilter.values()) | |
| .map(SparkFilters::convertLiteral) | |
| .collect(Collectors.toList())); | |
| return and(notNull(childInFilter.attribute()), notIn); | |
| } else if (hasNoInFilter(childFilter)) { | |
| Expression child = convert(childFilter); | |
| if (child != null) { | |
| return not(child); | |
| } | |
| } | |
| return null; | |
| case AND: { | |
| And andFilter = (And) filter; | |
| Expression left = convert(andFilter.left()); | |
| Expression right = convert(andFilter.right()); | |
| if (left != null && right != null) { | |
| return and(left, right); | |
| } | |
| return null; | |
| } | |
| case OR: { | |
| Or orFilter = (Or) filter; | |
| Expression left = convert(orFilter.left()); | |
| Expression right = convert(orFilter.right()); | |
| if (left != null && right != null) { | |
| return or(left, right); | |
| } | |
| return null; | |
| } | |
| case STARTS_WITH: { | |
| StringStartsWith stringStartsWith = (StringStartsWith) filter; | |
| return startsWith(unquote(stringStartsWith.attribute()), stringStartsWith.value()); | |
| } | |
| } | |
| } | |
| return null; | |
| } |
I'm not sure if a look-up map can be used here because of the usage of sameType function, but it might be worth looking into 😄
| List<org.apache.spark.sql.catalyst.expressions.Expression> filterExpressions = new java.util.ArrayList<>(); | ||
| for (Map.Entry<String, String> entry : partitionFilter.entrySet()) { | ||
| try { | ||
| // IllegalArgumentException is thrown if schema doesn't contain this entry, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just wondering, how does Spark parse these. Is it just in the ParseExpressions code and then fed directly into here? I know in the RewriteDataProcedure we end up using the parser to do a parseExpression since then we don't have to deal with all of these internal transforms and such. I am wondering if a similar approach could work here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark doesn't wrap this with its internal Exceptions. It throws this java.lang.IllegalArgumentException directly if the schema doesn't contains the column.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I meant this whole conversion into Spark Expression classes.
I know in Spark this probably happens in the parser and I was wondering if we could do the same thing here like
spark.parser.parseExpression("x = y")
I guess somewhere it does some conversion into strings or from strings into the proper types? Or maybe it doesn't...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried spark.parser.parseExpression("dept = hr"), we will get a EqualTo filter with left side UnresolvedAttribute dept and right side UnresolvedAttribute hr. I think in RewriteDataFilesProcedure we will execute and the UnresolvedAttribute will get resolved. But in the case of listFiles, it doesn't go through the name resolution code and UnresolvedAttribute remains to be unresolved and cause problem.
I think I will have to manually construct this EqualTo filter. I will check more to see if I can find a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried spark.parser.parseExpression("value"), hoping Spark can turn this into the right type, but it didn't work. I think I will have to construct the Literal and Filter Expression by myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@huaxingao: As someone worked recently on the similar area. In my experience, It is better to avoid new implementation as it needs lot of testing (all data type * all expression) and chances of inducing issue is high.
For the unresolved expression issue with spark.parser.parseExpression("value") , I recently found a way to get the resolved expression by adding some prefix and collecting resolved expression from plan #3757
May be can you see if it can help you as well ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer
As @ajantha-bhat suggested, we can put the filter "x = y" inside a simple query and let Spark execute the query and then we collect the resolved filter Expression. This solution sounds good to me. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like it, but do we have to change all the types then? I know spark gets upset if our types don't match column vs literal, and we have all string literals here :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. I was off last Friday and yesterday. We don't need to change the types because Spark takes care of the casting.
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this patch @huaxingao! Some minor comments on code style conventions and things to look into.
I'm still digesting this PR but thought I'd give you those in the interim.
| org.apache.spark.sql.execution.datasources.PartitionSpec spec = fileIndex.partitionSpec(); | ||
| StructType schema = spec.partitionColumns(); | ||
| if (schema.isEmpty()) { | ||
| return new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Can you use Lists.newArrayList() or ImmutableList.empty() here?
For Lists, we use the repackaged internal guava version from org.apache.iceberg.relocated.com.google.common.collect.Lists. The same is true for ImmutableList, which is already imported.
You could also use Collections.emptyList() to be similar to the emptyMap above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, these can be fixed later once it's determined this PR looks OK. I noticed a comment below about holding off on things until the PR is determined to look OK, and this can definitely be done then 😄
| List<org.apache.spark.sql.catalyst.expressions.Expression> filterExpressions = | ||
| getPartitionFilterExpressions(schema, partitionFilter); | ||
|
|
||
| List<org.apache.spark.sql.catalyst.expressions.Expression> dataFilters = new java.util.ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Same note about avoiding new ArrayList<> in favor of one of the ones mentioned above 👍
| .sameType(DataTypes.CalendarIntervalType)) { | ||
| filterExpressions.add(new EqualTo(ref, | ||
| org.apache.spark.sql.catalyst.expressions.Literal.create(entry.getValue(), | ||
| DataTypes.CalendarIntervalType))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this large if-else-if chain, you might want to look into this lookup-map pattern used here:
iceberg/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkFilters.java
Lines 80 to 220 in 466073b
| private static final Map<Class<? extends Filter>, Operation> FILTERS = ImmutableMap | |
| .<Class<? extends Filter>, Operation>builder() | |
| .put(AlwaysTrue.class, Operation.TRUE) | |
| .put(AlwaysTrue$.class, Operation.TRUE) | |
| .put(AlwaysFalse$.class, Operation.FALSE) | |
| .put(AlwaysFalse.class, Operation.FALSE) | |
| .put(EqualTo.class, Operation.EQ) | |
| .put(EqualNullSafe.class, Operation.EQ) | |
| .put(GreaterThan.class, Operation.GT) | |
| .put(GreaterThanOrEqual.class, Operation.GT_EQ) | |
| .put(LessThan.class, Operation.LT) | |
| .put(LessThanOrEqual.class, Operation.LT_EQ) | |
| .put(In.class, Operation.IN) | |
| .put(IsNull.class, Operation.IS_NULL) | |
| .put(IsNotNull.class, Operation.NOT_NULL) | |
| .put(And.class, Operation.AND) | |
| .put(Or.class, Operation.OR) | |
| .put(Not.class, Operation.NOT) | |
| .put(StringStartsWith.class, Operation.STARTS_WITH) | |
| .build(); | |
| public static Expression convert(Filter[] filters) { | |
| Expression expression = Expressions.alwaysTrue(); | |
| for (Filter filter : filters) { | |
| Expression converted = convert(filter); | |
| Preconditions.checkArgument(converted != null, "Cannot convert filter to Iceberg: %s", filter); | |
| expression = Expressions.and(expression, converted); | |
| } | |
| return expression; | |
| } | |
| public static Expression convert(Filter filter) { | |
| // avoid using a chain of if instanceof statements by mapping to the expression enum. | |
| Operation op = FILTERS.get(filter.getClass()); | |
| if (op != null) { | |
| switch (op) { | |
| case TRUE: | |
| return Expressions.alwaysTrue(); | |
| case FALSE: | |
| return Expressions.alwaysFalse(); | |
| case IS_NULL: | |
| IsNull isNullFilter = (IsNull) filter; | |
| return isNull(unquote(isNullFilter.attribute())); | |
| case NOT_NULL: | |
| IsNotNull notNullFilter = (IsNotNull) filter; | |
| return notNull(unquote(notNullFilter.attribute())); | |
| case LT: | |
| LessThan lt = (LessThan) filter; | |
| return lessThan(unquote(lt.attribute()), convertLiteral(lt.value())); | |
| case LT_EQ: | |
| LessThanOrEqual ltEq = (LessThanOrEqual) filter; | |
| return lessThanOrEqual(unquote(ltEq.attribute()), convertLiteral(ltEq.value())); | |
| case GT: | |
| GreaterThan gt = (GreaterThan) filter; | |
| return greaterThan(unquote(gt.attribute()), convertLiteral(gt.value())); | |
| case GT_EQ: | |
| GreaterThanOrEqual gtEq = (GreaterThanOrEqual) filter; | |
| return greaterThanOrEqual(unquote(gtEq.attribute()), convertLiteral(gtEq.value())); | |
| case EQ: // used for both eq and null-safe-eq | |
| if (filter instanceof EqualTo) { | |
| EqualTo eq = (EqualTo) filter; | |
| // comparison with null in normal equality is always null. this is probably a mistake. | |
| Preconditions.checkNotNull(eq.value(), | |
| "Expression is always false (eq is not null-safe): %s", filter); | |
| return handleEqual(unquote(eq.attribute()), eq.value()); | |
| } else { | |
| EqualNullSafe eq = (EqualNullSafe) filter; | |
| if (eq.value() == null) { | |
| return isNull(unquote(eq.attribute())); | |
| } else { | |
| return handleEqual(unquote(eq.attribute()), eq.value()); | |
| } | |
| } | |
| case IN: | |
| In inFilter = (In) filter; | |
| return in(unquote(inFilter.attribute()), | |
| Stream.of(inFilter.values()) | |
| .filter(Objects::nonNull) | |
| .map(SparkFilters::convertLiteral) | |
| .collect(Collectors.toList())); | |
| case NOT: | |
| Not notFilter = (Not) filter; | |
| Filter childFilter = notFilter.child(); | |
| Operation childOp = FILTERS.get(childFilter.getClass()); | |
| if (childOp == Operation.IN) { | |
| // infer an extra notNull predicate for Spark NOT IN filters | |
| // as Iceberg expressions don't follow the 3-value SQL boolean logic | |
| // col NOT IN (1, 2) in Spark is equivalent to notNull(col) && notIn(col, 1, 2) in Iceberg | |
| In childInFilter = (In) childFilter; | |
| Expression notIn = notIn(unquote(childInFilter.attribute()), | |
| Stream.of(childInFilter.values()) | |
| .map(SparkFilters::convertLiteral) | |
| .collect(Collectors.toList())); | |
| return and(notNull(childInFilter.attribute()), notIn); | |
| } else if (hasNoInFilter(childFilter)) { | |
| Expression child = convert(childFilter); | |
| if (child != null) { | |
| return not(child); | |
| } | |
| } | |
| return null; | |
| case AND: { | |
| And andFilter = (And) filter; | |
| Expression left = convert(andFilter.left()); | |
| Expression right = convert(andFilter.right()); | |
| if (left != null && right != null) { | |
| return and(left, right); | |
| } | |
| return null; | |
| } | |
| case OR: { | |
| Or orFilter = (Or) filter; | |
| Expression left = convert(orFilter.left()); | |
| Expression right = convert(orFilter.right()); | |
| if (left != null && right != null) { | |
| return or(left, right); | |
| } | |
| return null; | |
| } | |
| case STARTS_WITH: { | |
| StringStartsWith stringStartsWith = (StringStartsWith) filter; | |
| return startsWith(unquote(stringStartsWith.attribute()), stringStartsWith.value()); | |
| } | |
| } | |
| } | |
| return null; | |
| } |
I'm not sure if a look-up map can be used here because of the usage of sameType function, but it might be worth looking into 😄
|
@kbendick Thanks a lot for reviewing! I will address your comments later because I might need to change other part of the code too. I will do all the changes in one commit. |
afa03e3 to
6fa436b
Compare
…t String filters to Spark expressions
4193ec8 to
51ee198
Compare
| }).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private static List getPartitionFilterExpressions(SparkSession spark, String tableName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use List<org.apache.spark.sql.catalyst.expressions.Expression> instead of List to avoid IDE warnings.
| SparkExpressionConverter.collectResolvedSparkExpression(spark, tableName, filter); | ||
| filterExpressions.add(expression); | ||
| } catch (AnalysisException e) { | ||
| // ignore if filter cannot be converted to Spark expression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can also add "PartitionFilter map is already validated in the caller" and we should log the error ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i guess maybe follow what you have here https://github.com/apache/iceberg/blob/master/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RewriteDataFilesProcedure.java#L124 and throw IllegalArgumentException?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, we can do that.
| SparkSession spark, String tableName, Map<String, String> partitionFilter) { | ||
| List<org.apache.spark.sql.catalyst.expressions.Expression> filterExpressions = Lists.newArrayList(); | ||
| for (Map.Entry<String, String> entry : partitionFilter.entrySet()) { | ||
| String filter = entry.getKey() + " = '" + entry.getValue() + "'"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we have quotes ('') for the value?
Example: If id is an integer column, then we need id = 3 in the query instead of id ='3' ?
have we tested both string and non string partition columns ?
I was also thinking instead of map, can we expose the where clause in the call procedure (similar to rewrite_data files), so user can give filters other than equals also?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also thinking instead of map, can we expose the where clause in the call procedure (similar to rewrite_data files), so user can give filters other than equals also?
But it will become breaking API changes I guess. Let's see what others think on this also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the filter is on String column, e.g. dept column with value hr, we want the filter to be dept = 'hr'.
For non-String columns, such as id = 3, the filter with id = '3' still works ok because Spark will have Literal with String value to begin with, and then cast to Literal with Int value after it has the column type.
TestAddFilesProcedure has both String and int type partition columns so these two are tested. We probably should test other types e.g. Timestamp just to make sure.
|
@RussellSpitzer @kbendick |
kbendick
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LGTM.
One question / corner case about what happens when a partitionFilter doesn't match any filters, but it's more of a user experience issue about the error message and not a correctness concern.
| Preconditions.checkArgument(!partitions.isEmpty(), | ||
| "Cannot find any partitions in table %s", partitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit / corner case:
Should we update the precondition message to indicate that it's possible that the filter didn't match any partitions? The current error message now might be kind of confusing to users if the file based table is partitioned. Maybe just Cannot find any matching partitions in table %s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Outdated
Show resolved
Hide resolved
| Seq<org.apache.spark.sql.catalyst.expressions.Expression> scalaPartitionFilters = | ||
| JavaConverters.asScalaBufferConverter(filterExpressions).asScala().toSeq(); | ||
| Seq<org.apache.spark.sql.catalyst.expressions.Expression> scalaDataFilters = | ||
| JavaConverters.asScalaBufferConverter(dataFilters).asScala().toSeq(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an easier way to construct an empty sequence? Also, since this is always empty, can you put the dataFilters definition and this line next to one another? The line to create scalaPartitionFilters can be next to the line above that creates filterExpressions.
| }); | ||
| return new SparkPartition(values, partition.path().toString(), format); | ||
| FileStatus fileStatus = | ||
| scala.collection.JavaConverters.seqAsJavaListConverter(partition.files()).asJava().get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala.collection.JavaConverters is imported. Can you remove the fully-qualified name?
| }).collect(Collectors.toList()); | ||
| } | ||
|
|
||
| private static List<org.apache.spark.sql.catalyst.expressions.Expression> getPartitionFilterExpressions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to move this to a separate util class to avoid the conflict with the connector Expression? Maybe SparkPartitionUtil or something?
| throw new IllegalArgumentException("filter " + filter + " cannot be converted to Spark expression"); | ||
| } | ||
| } | ||
| return filterExpressions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: missing whitespace between the control flow block and the following statement.
| SparkExpressionConverter.collectResolvedSparkExpression(spark, tableName, filter); | ||
| filterExpressions.add(expression); | ||
| } catch (AnalysisException e) { | ||
| throw new IllegalArgumentException("filter " + filter + " cannot be converted to Spark expression"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: The exception message should follow the conventions for errors:
- Use sentence case. That is, capitalize the first word of the message.
- State what went wrong first, "Cannot convert filter to Spark"
- Next, give context after a
:, which in this case is the filter - Never swallow cause exceptions
This should be "throw new IllegalArgumentException("Cannot convert filter to Spark: " + filter, e)`
| String filter = entry.getKey() + " = '" + entry.getValue() + "'"; | ||
| try { | ||
| org.apache.spark.sql.catalyst.expressions.Expression expression = | ||
| SparkExpressionConverter.collectResolvedSparkExpression(spark, tableName, filter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
collectResolvedSparkExpression is really expensive. Why does this need to call it?
Can't this produce Expression instances directly rather than building strings and converting with fake plans?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Thank you very much for reviewing my PR on the weekend!
Do you mean constructing a filter Expression instead of letting Spark generate the Expression? I initially generated Expression like this
BoundReference ref = new BoundReference(index, dataType, true);
switch (dataType.typeName()) {
case "integer":
filterExpressions.add(new EqualTo(ref,
org.apache.spark.sql.catalyst.expressions.Literal.create(Integer.parseInt(entry.getValue()),
DataTypes.IntegerType)));
break;
There are some concerns from the reviewers because we need to test each of the data types. Then I changed the code to call collectResolvedSparkExpression.
I will address all the other comments after I find out what to do for this one, so I can fix all the problems in one commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think you should construct the filter expression directly rather than calling collectResolvedSparkExpression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I changed the code to construct filter expression directly. Could you please take one more look? Thank you very much!
I have manually checked to make sure the filter expression can be created correctly for all the numeric types, Date and Timestamp. There are tests for String and int partition filters in TestAddFilesProcedure, and I added a test for Date partition filter. There is a bug with Timestamp partition filter in the current code. I will fix it in a separate PR.
The build failure doesn't seem to be related to my changes.
| return new SparkPartition(values, partition.path().toString(), format); | ||
|
|
||
| FileStatus fileStatus = | ||
| JavaConverters.seqAsJavaListConverter(partition.files()).asJava().get(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this use partition.files() instead of partition.path()?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because here partition is PartitionDirectory
case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
listFiles returns a Seq of PartitionDirectory
def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory]
Before my change, partition is PartitionPath
case class PartitionPath(values: InternalRow, path: Path)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, thanks for the context! I assumed that it would use the same values.
| * and value is the specific value to be filtered on the column. | ||
| * @return a List of filters in the format of Spark Expression. | ||
| */ | ||
| public static List getSparkFilterExpressions(StructType schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iceberg doesn't use get in method names because it tends to either be filler or prevent us from having more specific methods. Here, I think a more specific name is partitionMapToExpression
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
| * and value is the specific value to be filtered on the column. | ||
| * @return a List of filters in the format of Spark Expression. | ||
| */ | ||
| public static List getSparkFilterExpressions(StructType schema, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
List is not parameterized. Could you fix types?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
|
Looks great. Thanks, @huaxingao! |
|
Thank you all very much for helping me on this PR! |
When getting files from Spark, we want to push down partition filters to Spark, so only the partitions that match the filters will be returned.
Basically, I am using this Spark API to prune partitions