Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,9 @@ public <T> String predicate(UnboundPredicate<T> pred) {
case NOT_EQ:
return pred.ref().name() + " != " + sqlString(pred.literal());
case STARTS_WITH:
return pred.ref().name() + " LIKE '" + pred.literal() + "%'";
return pred.ref().name() + " LIKE '" + pred.literal().value() + "%'";
case NOT_STARTS_WITH:
return pred.ref().name() + " NOT LIKE '" + pred.literal() + "%'";
return pred.ref().name() + " NOT LIKE '" + pred.literal().value() + "%'";
case IN:
return pred.ref().name() + " IN (" + sqlString(pred.literals()) + ")";
case NOT_IN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -105,42 +105,52 @@ public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {

@Override
public Filter[] pushFilters(Filter[] filters) {
// there are 3 kinds of filters:
// (1) filters that can be pushed down completely and don't have to evaluated by Spark
// (e.g. filters that select entire partitions)
// (2) filters that can be pushed down partially and require record-level filtering in Spark
// (e.g. filters that may select some but not necessarily all rows in a file)
// (3) filters that can't be pushed down at all and have to be evaluated by Spark
// (e.g. unsupported filters)
// filters (1) and (2) are used prune files during job planning in Iceberg
// filters (2) and (3) form a set of post scan filters and must be evaluated by Spark

List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
List<Filter> pushableFilters = Lists.newArrayListWithExpectedSize(filters.length);
List<Filter> postScanFilters = Lists.newArrayListWithExpectedSize(filters.length);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may just me but I would mark this as "SparkFilters"
and "pushableFilters" as "IcebergFilters" to be clear which filters were being preformed by which system?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, icebergFilters would imply it is an Iceberg filter while it is actually a Spark filter that can be pushed down to Iceberg. I picked these names from Spark based on what it uses in its code and in the Javadoc to this method.

[Test worker] INFO org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown - 
Pushing operators to testhadoop.default.table
Pushed Filters: IsNotNull(dep), IsNotNull(id), EqualTo(dep,d1), EqualTo(id,1)
Post-Scan Filters: (id#22 = 1)

I agree it is hard to navigate without context. In order to make it a bit clear, I added a comment above. Could you check if it's any better?

Copy link
Copy Markdown
Member

@RussellSpitzer RussellSpitzer Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine, I don't really like the Spark nomenclature here but I think your comment does a good job defining it. I think I would add that. (1) and (2) are placed in "PushableFilters" and (2) and (3) are returned to spark in "postScanFilters"

Really I just think
IcebergPushedFilters and
SparkPostScanFilters would also be nice but I think your comment provides enough context for a non-spark expert to get what's going on


for (Filter filter : filters) {
Expression expr = null;
try {
expr = SparkFilters.convert(filter);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a little hard for me since everything is in the try catch here, maybe instead we keep the older pattern of

if (expr != null) {
  try {
   bind
   pushdown to iceberg
   If (not fully pushable) {
     add to spark
   }
  } catch {
    leave for spark
  }
} else {
  leave for spark
}

But i'm pretty sure this is correct either way

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have to wrap the code that converts the filter. It is unlikely to throw an exception but we have to make sure it does not fail the query. There are 3 calls that can throw an exception now. I did not want to have nested try-catch because it looked to complicated with added logic.

Github renders it in a way that's really hard to read. It does not seem to be that bad in IDE.

try {
  Expression expr = SparkFilters.convert(filter);

  if (expr != null) {
    // try binding the expression to ensure it can be pushed down
    Binder.bind(schema.asStruct(), expr, caseSensitive);
    expressions.add(expr);
    pushableFilters.add(filter);
  }

  if (expr == null || requiresSparkFiltering(expr)) {
    postScanFilters.add(filter);
  } else {
    LOG.info("Evaluating completely on Iceberg side: {}", filter);
  }

} catch (Exception e) {
  LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage());
  postScanFilters.add(filter);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it was just thinking about the different failure positions, I didn't know SparkFilters.convert could also throw, I thought it was just the binding.

Copy link
Copy Markdown
Contributor Author

@aokolnychyi aokolnychyi Jan 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had some bugs when SparkFilters.convert threw an exception. That's why there was a separate try-catch around it. Let me think a bit more here.

} catch (IllegalArgumentException e) {
// converting to Iceberg Expression failed, so this expression cannot be pushed down
LOG.info(
"Failed to convert filter to Iceberg expression, skipping push down for this expression: {}. {}",
filter,
e.getMessage());
}
Expression expr = SparkFilters.convert(filter);

if (expr != null) {
try {
if (expr != null) {
// try binding the expression to ensure it can be pushed down
Binder.bind(schema.asStruct(), expr, caseSensitive);
expressions.add(expr);
pushed.add(filter);
} catch (ValidationException e) {
// binding to the table schema failed, so this expression cannot be pushed down
LOG.info(
"Failed to bind expression to table schema, skipping push down for this expression: {}. {}",
filter,
e.getMessage());
pushableFilters.add(filter);
}

if (expr == null || requiresSparkFiltering(expr)) {
postScanFilters.add(filter);
} else {
LOG.info("Evaluating completely on Iceberg side: {}", filter);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this worth an INFO message? That seems verbose to me. How about debug?

}

} catch (Exception e) {
LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage());
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now a Warn instead of an info

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure about this. If there is an exception anywhere in this path, it indicates something went wrong. Seems like something we should warn the user about? I can revert it too. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the user can really do anything about the exceptions in this path. It's really only something a dev can fix when working on the Iceberg library correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to be more specific about the error here. Logging the ValidationError at INFO is correct because it's okay for some filters to not be convertible -- we want to catch those and support them in the future, but there is no problem for the job. I'm not sure what other errors filter through here now and whether WARN is appropriate for those. But I'd prefer to have them broken out more specifically.

postScanFilters.add(filter);
}
}

this.filterExpressions = expressions;
this.pushedFilters = pushed.toArray(new Filter[0]);
this.pushedFilters = pushableFilters.toArray(new Filter[0]);

return postScanFilters.toArray(new Filter[0]);
}

// Spark doesn't support residuals per task, so return all filters
// to get Spark to handle record-level filtering
return filters;
private boolean requiresSparkFiltering(Expression expr) {
return table.specs().values().stream()
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we simply check each spec in the table. In the future, we may optimize this to only look at selected specs but that won't be trivial. I think it is a reasonable start.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another optimization maybe worth doing would be to group all expressions based on bound column an operation. In the bad case we are considering we would end up checking wether or not we can filter a "column = literal" for a ton of different literal values.

.anyMatch(spec -> !ExpressionUtil.selectsPartitions(expr, spec, caseSensitive));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ package org.apache.spark.sql.execution.datasources
import org.apache.iceberg.spark.SparkFilters
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.catalyst.plans.logical.LeafNode

object SparkExpressionConverter {

Expand All @@ -37,15 +38,14 @@ object SparkExpressionConverter {

@throws[AnalysisException]
def collectResolvedSparkExpression(session: SparkSession, tableName: String, where: String): Expression = {
var expression: Expression = null
// Add a dummy prefix linking to the table to collect the resolved spark expression from optimized plan.
val prefix = String.format("SELECT 42 from %s where ", tableName)
val logicalPlan = session.sessionState.sqlParser.parsePlan(prefix + where)
val optimizedLogicalPlan = session.sessionState.executePlan(logicalPlan, CommandExecutionMode.ALL).optimizedPlan
val tableAttrs = session.table(tableName).queryExecution.analyzed.output
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 I like this

val unresolvedExpression = session.sessionState.sqlParser.parseExpression(where)
val filter = Filter(unresolvedExpression, DummyRelation(tableAttrs))
val optimizedLogicalPlan = session.sessionState.executePlan(filter).optimizedPlan
optimizedLogicalPlan.collectFirst {
case filter: Filter =>
expression = filter.expressions.head
}
expression
case filter: Filter => filter.condition
}.getOrElse(throw new AnalysisException("Failed to find filter expression"))
}

case class DummyRelation(output: Seq[Attribute]) extends LeafNode
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -229,6 +230,16 @@ protected void withUnavailableLocations(Iterable<String> locations, Action actio
}
}

protected void withDefaultTimeZone(String zoneId, Action action) {
TimeZone currentZone = TimeZone.getDefault();
try {
TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
action.invoke();
} finally {
TimeZone.setDefault(currentZone);
}
}

protected void withSQLConf(Map<String, String> conf, Action action) {
SQLConf sqlConf = SQLConf.get();

Expand Down
Loading