Skip to content

Conversation

@huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Jun 2, 2022

Co-Authored-By: Xi Chen [email protected]
Co-Authored-By: Hao Lin [email protected]
Co-Authored-By: Huaxin Gao [email protected]

This is the read path of parquet row group bloom filter. The original PR is here

Co-authored-by: Xi Chen <[email protected]>
Co-authored-by: Hao Lin <[email protected]>
Co-authored-by: Hao Lin <[email protected]>
@huaxingao
Copy link
Contributor Author

I don't have a good way to test the read path, but I have tested these changes with the write path on my local.

@huaxingao
Copy link
Contributor Author

cc @rdblue @RussellSpitzer @kbendick @chenjunjiedada @hililiwei

* under the License.
*/

package org.apache.iceberg.parquet;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to include the unit test in this PR? I think all you'd need to do is configure the bloom filter settings using the Parquet settings in a Hadoop Configuration rather than through the Iceberg write settings.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to config using ParquetOutputFormat.BLOOM_FILTER_ENABLED by replacing this line with .set(ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#_id", "true") , but it doesn't work.

Seems Iceberg only honors the Iceberg's properties (which are set by Context at here), but it doesn't really take the properties set by Parquet.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that properties will be passed through to the Hadoop Configuration automatically. Is that no longer true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdblue Thanks for your quick reply!

Seems the properties need to be passed to InternalParquetRecordWriter through this encodingProps. We need to call the withXXX method explicitly e.g. withDictionaryPageSize to set the property to encodingPropsBuilder

So it seems to me that we have to call this withBloomFilterEnabled explicitly to set the bloom filter property toencodingPropsBuilder. Otherwise, Parquet's InternalParquetRecordWriter won't be able to take it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I set the Parquet properties to encodingPropsBuilder? If the same properties is also set by iceberg properties, I will reset to overwrite.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Is it possible to get some tests working without the write-side changes? Maybe write a Parquet file directly and use name mapping? If not then let's try to make the minimal write-side changes to get the test in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some minimal write-side changes to get the test in. Hope this is OK.

}

public static Set<Integer> references(
StructType struct, List<Expression> exprs, boolean caseSensitive, boolean alreadyBound) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we detect that an expression is unbound? Maybe identify named refs and return? Then we could just use one method for everything.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can i just use instanceof Unbound to detect if an expression is unbound? Something like this:

  private static boolean isUnbound(Expression expr) {
    switch (expr.op()) {
      case TRUE:
        return false;
      case FALSE:
        return false;
      case NOT:
        Not not = (Not) expr;
        return isUnbound(not.child());
      case AND:
        And and = (And) expr;
        return isUnbound(and.left()) || isUnbound(and.right());
      case OR:
        Or or = (Or) expr;
        return isUnbound(or.left()) || isUnbound(or.right());
      default:
        return expr instanceof Unbound;
    }
  }

Then the method boundReferences can be

  public static Set<Integer> boundReferences(StructType struct, List<Expression> exprs, boolean caseSensitive) {
    if (exprs == null) {
      return ImmutableSet.of();
    }
    ReferenceVisitor visitor = new ReferenceVisitor();
    for (Expression expr : exprs) {
      if (isUnbound(expr)) {
        ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
      } else {
        ExpressionVisitors.visit(expr, visitor);
      }
    }
    return visitor.references;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all expressions are bound or unbound. What about just trying to bind the expression and catching the exception that is thrown if it's already bound? Then you can just move on and return the references.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me!

* @return false if the file cannot contain rows that match the expression, true otherwise.
*/
public boolean shouldRead(MessageType fileSchema, BlockMetaData rowGroup,
BloomFilterReader bloomReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style nit: the starting point of all argument lines should be aligned. That can be at 2 indents from the start of the method OR after the opening ( to start method arguments. There should not be two different indentation levels like you have here (both after ( and 2 indents from the method definition line).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder. I will pay attention to the style next time.

@github-actions github-actions bot added the core label Jun 9, 2022
try {
ExpressionVisitors.visit(bind(struct, expr, caseSensitive), visitor);
} catch (IllegalStateException e) {
if (e.getMessage().contains("Found already bound predicate")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't rely on exception messages like this.

I think instead we should just add a utility to detect whether an expression is bound. Here's an implementation:

  /**
   * Returns whether an expression is bound.
   * <p>
   * An expression is bound if all of its predicates are bound.
   *
   * @param expr an {@link Expression}
   * @return true if the expression is bound
   * @throws IllegalArgumentException if the expression has both bound and unbound predicates.
   */
  public static boolean isBound(Expression expr) {
    Boolean isBound = ExpressionVisitors.visit(expr, new IsBoundVisitor());
    return isBound != null ? isBound : false; // assume unbound if undetermined
  }

  private static class IsBoundVisitor extends ExpressionVisitors.ExpressionVisitor<Boolean> {
    @Override
    public Boolean not(Boolean result) {
      return result;
    }

    @Override
    public Boolean and(Boolean leftResult, Boolean rightResult) {
      return combineResults(leftResult, rightResult);
    }

    @Override
    public Boolean or(Boolean leftResult, Boolean rightResult) {
      return combineResults(leftResult, rightResult);
    }

    @Override
    public <T> Boolean predicate(BoundPredicate<T> pred) {
      return true;
    }

    @Override
    public <T> Boolean predicate(UnboundPredicate<T> pred) {
      return false;
    }

    private Boolean combineResults(Boolean isLeftBound, Boolean isRightBound) {
      if (isLeftBound != null) {
        Preconditions.checkArgument(isRightBound == null || isLeftBound.equals(isRightBound),
            "Found partially bound expression");
        return isLeftBound;
      } else {
        return isRightBound;
      }
    }
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great! Thank you very much for the implementation!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem! Thanks for all your work on this.

.build(),
metricsConfig);
.withDictionaryPageSize(dictionaryPageSize);
// Todo: The following code needs to be improved in the bloom filter write path PR.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is okay for now to get this in with tests. Thanks, @huaxingao!

@rdblue rdblue merged commit 87242c0 into apache:master Jun 13, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 13, 2022

Thanks, @huaxingao!

@huaxingao
Copy link
Contributor Author

Thank you very much @rdblue! Also thank you @RussellSpitzer @kbendick @chenjunjiedada @hililiwei @stevenzwu

@huaxingao huaxingao deleted the bf_read branch June 13, 2022 03:26
@kbendick
Copy link
Contributor

kbendick commented Jun 13, 2022

Thank you for making support for reading Parquet bloom filters happen @huaxingao! Your work is highly appreciated. This will be very valuable for a number of use cases.

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Jul 18, 2022
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Jul 18, 2022
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Jul 20, 2022
Initial-neko pushed a commit to Initial-neko/iceberg that referenced this pull request Jul 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants