Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -639,9 +639,9 @@ public <T> String predicate(UnboundPredicate<T> pred) {
case NOT_EQ:
return pred.ref().name() + " != " + sqlString(pred.literal());
case STARTS_WITH:
return pred.ref().name() + " LIKE '" + pred.literal() + "%'";
return pred.ref().name() + " LIKE '" + pred.literal().value() + "%'";
Comment thread
RussellSpitzer marked this conversation as resolved.
case NOT_STARTS_WITH:
return pred.ref().name() + " NOT LIKE '" + pred.literal() + "%'";
return pred.ref().name() + " NOT LIKE '" + pred.literal().value() + "%'";
case IN:
return pred.ref().name() + " IN (" + sqlString(pred.literals()) + ")";
case NOT_IN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ExpressionUtil;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -106,41 +106,41 @@ public SparkScanBuilder caseSensitive(boolean isCaseSensitive) {
@Override
public Filter[] pushFilters(Filter[] filters) {
List<Expression> expressions = Lists.newArrayListWithExpectedSize(filters.length);
List<Filter> pushed = Lists.newArrayListWithExpectedSize(filters.length);
List<Filter> pushableFilters = Lists.newArrayListWithExpectedSize(filters.length);
List<Filter> postScanFilters = Lists.newArrayListWithExpectedSize(filters.length);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may just me but I would mark this as "SparkFilters"
and "pushableFilters" as "IcebergFilters" to be clear which filters were being preformed by which system?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me, icebergFilters would imply it is an Iceberg filter while it is actually a Spark filter that can be pushed down to Iceberg. I picked these names from Spark based on what it uses in its code and in the Javadoc to this method.

[Test worker] INFO org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown - 
Pushing operators to testhadoop.default.table
Pushed Filters: IsNotNull(dep), IsNotNull(id), EqualTo(dep,d1), EqualTo(id,1)
Post-Scan Filters: (id#22 = 1)

I agree it is hard to navigate without context. In order to make it a bit clear, I added a comment above. Could you check if it's any better?

@RussellSpitzer RussellSpitzer Jan 5, 2023

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fine, I don't really like the Spark nomenclature here but I think your comment does a good job defining it. I think I would add that. (1) and (2) are placed in "PushableFilters" and (2) and (3) are returned to spark in "postScanFilters"

Really I just think
IcebergPushedFilters and
SparkPostScanFilters would also be nice but I think your comment provides enough context for a non-spark expert to get what's going on


for (Filter filter : filters) {
Expression expr = null;
try {
expr = SparkFilters.convert(filter);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a little hard for me since everything is in the try catch here, maybe instead we keep the older pattern of

if (expr != null) {
  try {
   bind
   pushdown to iceberg
   If (not fully pushable) {
     add to spark
   }
  } catch {
    leave for spark
  }
} else {
  leave for spark
}

But i'm pretty sure this is correct either way

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have to wrap the code that converts the filter. It is unlikely to throw an exception but we have to make sure it does not fail the query. There are 3 calls that can throw an exception now. I did not want to have nested try-catch because it looked to complicated with added logic.

Github renders it in a way that's really hard to read. It does not seem to be that bad in IDE.

try {
  Expression expr = SparkFilters.convert(filter);

  if (expr != null) {
    // try binding the expression to ensure it can be pushed down
    Binder.bind(schema.asStruct(), expr, caseSensitive);
    expressions.add(expr);
    pushableFilters.add(filter);
  }

  if (expr == null || requiresSparkFiltering(expr)) {
    postScanFilters.add(filter);
  } else {
    LOG.info("Evaluating completely on Iceberg side: {}", filter);
  }

} catch (Exception e) {
  LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage());
  postScanFilters.add(filter);
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it was just thinking about the different failure positions, I didn't know SparkFilters.convert could also throw, I thought it was just the binding.

@aokolnychyi aokolnychyi Jan 5, 2023

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had some bugs when SparkFilters.convert threw an exception. That's why there was a separate try-catch around it. Let me think a bit more here.

} catch (IllegalArgumentException e) {
// converting to Iceberg Expression failed, so this expression cannot be pushed down
LOG.info(
"Failed to convert filter to Iceberg expression, skipping push down for this expression: {}. {}",
filter,
e.getMessage());
}
Expression expr = SparkFilters.convert(filter);

if (expr != null) {
try {
if (expr != null) {
// try binding the expression to ensure it can be pushed down
Comment thread
RussellSpitzer marked this conversation as resolved.
Binder.bind(schema.asStruct(), expr, caseSensitive);

expressions.add(expr);
pushed.add(filter);
} catch (ValidationException e) {
// binding to the table schema failed, so this expression cannot be pushed down
LOG.info(
"Failed to bind expression to table schema, skipping push down for this expression: {}. {}",
filter,
e.getMessage());
pushableFilters.add(filter);
}

if (expr == null || requiresRecordLevelFiltering(expr)) {
postScanFilters.add(filter);
}
} catch (Exception e) {
LOG.warn("Failed to check if {} can be pushed down: {}", filter, e.getMessage());

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is now a Warn instead of an info

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not sure about this. If there is an exception anywhere in this path, it indicates something went wrong. Seems like something we should warn the user about? I can revert it too. What do you think?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure the user can really do anything about the exceptions in this path. It's really only something a dev can fix when working on the Iceberg library correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to be more specific about the error here. Logging the ValidationError at INFO is correct because it's okay for some filters to not be convertible -- we want to catch those and support them in the future, but there is no problem for the job. I'm not sure what other errors filter through here now and whether WARN is appropriate for those. But I'd prefer to have them broken out more specifically.

postScanFilters.add(filter);
}
}

this.filterExpressions = expressions;
this.pushedFilters = pushed.toArray(new Filter[0]);
this.pushedFilters = pushableFilters.toArray(new Filter[0]);

// all unsupported filters and filters that require record-level filtering
// must be reported back and handled on the Spark side
return postScanFilters.toArray(new Filter[0]);
}

// Spark doesn't support residuals per task, so return all filters
// to get Spark to handle record-level filtering
return filters;
private boolean requiresRecordLevelFiltering(Expression expr) {
Comment thread
aokolnychyi marked this conversation as resolved.
Outdated
return table.specs().values().stream()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, we simply check each spec in the table. In the future, we may optimize this to only look at selected specs but that won't be trivial. I think it is a reasonable start.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think another optimization maybe worth doing would be to group all expressions based on bound column an operation. In the bad case we are considering we would end up checking wether or not we can filter a "column = literal" for a ton of different literal values.

.anyMatch(spec -> !ExpressionUtil.selectsPartitions(expr, spec, caseSensitive));
Comment thread
RussellSpitzer marked this conversation as resolved.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -229,6 +230,16 @@ protected void withUnavailableLocations(Iterable<String> locations, Action actio
}
}

protected void withDefaultTimeZone(String zoneId, Action action) {
TimeZone currentZone = TimeZone.getDefault();
try {
TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
action.invoke();
} finally {
TimeZone.setDefault(currentZone);
}
}

protected void withSQLConf(Map<String, String> conf, Action action) {
SQLConf sqlConf = SQLConf.get();

Expand Down
Loading