Skip to content

Conversation

@prodeezy
Copy link
Contributor

@prodeezy prodeezy commented Mar 7, 2019

Addresses Issue#122

Update:
This PR should be ready for full review. I'v added tests and comments where possible. Addressed most PR comments.

A basic test run with this change run on pre-generated Iceberg formatted data containing struct level metrics (Run with Spark patched to push struct filters down to Iceberg) :

val schema = new StructType().add("age", IntegerType).add("name", StringType).add("friends", MapType(StringType, IntegerType)).add("location", new StructType().add("lat", DoubleType).add("lon", DoubleType))

val iceDf = spark.read.format("iceberg").load("iceberg-people-nestedfield-metrics")
iceDf.createOrReplaceTempView("iceberg_people_nestedfield_metrics")


// Struct filter pushed down by Spark to Iceberg Scan
scala> spark.sql("select * from iceberg_people_nestedfield_metrics where location.lat = 101.123").explain()
== Physical Plan ==
*(1) Project [age#0, name#1, friends#2, location#3]
+- *(1) Filter (isnotnull(location#3) && (location#3.lat = 101.123))
   +- *(1) ScanV2 iceberg[age#0, name#1, friends#2, location#3] (Filters: [isnotnull(location#3), (location#3.lat = 101.123)], Options: [path=iceberg-people-nestedfield-metrics,paths=[]])

// Without this PR following query would fail with the exception listed in Issue#22
scala> spark.sql("select * from iceberg_people_nestedfield_metrics where location.lat = 101.123").show()
+---+----+--------------------+-----------------+
|age|name|             friends|         location|
+---+----+--------------------+-----------------+
| 30|Andy|[Josh -> 10, Bisw...|[101.123, 50.324]|
+---+----+--------------------+-----------------+


scala> spark.sql("select * from iceberg_people_nestedfield_metrics where location.lat <= 101.123").show()
+---+----+--------------------+-----------------+
|age|name|             friends|         location|
+---+----+--------------------+-----------------+
| 30|Andy|[Josh -> 10, Bisw...|[101.123, 50.324]|
+---+----+--------------------+-----------------+


scala> spark.sql("select * from iceberg_people_nestedfield_metrics where location.lat >= 101.123").show()
+---+------+--------------------+-----------------+
|age|  name|             friends|         location|
+---+------+--------------------+-----------------+
| 30|  Andy|[Josh -> 10, Bisw...|[101.123, 50.324]|
| 19|Justin|[Kannan -> 75, Sa...|[175.926, 20.524]|
+---+------+--------------------+-----------------+


scala> spark.sql("select * from iceberg_people_nestedfield_metrics where location.lat > 101.123").show()

+---+------+--------------------+-----------------+
|age|  name|             friends|         location|
+---+------+--------------------+-----------------+
| 19|Justin|[Kannan -> 75, Sa...|[175.926, 20.524]|
+---+------+--------------------+-----------------+

Gist to create data : https://gist.github.com/prodeezy/001cf155ff0675be7d307e9f842e1dac

/cc @rdblue @aokolnychyi @xabriel @fbocse

}
}
}
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this method be returning a null value in case none of the nested fields of this struct match by field id?
Instead of an instance of BoundReference with a null private attribute type would it be acceptable that we instead throw new ValidationException("Cannot find nested field id %d in struct: %s", fieldId, struct); ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@fbocse
Copy link
Contributor

fbocse commented Mar 8, 2019

Thank you @prodeezy for contributing this, it looks really helpful!

@fbocse
Copy link
Contributor

fbocse commented Mar 8, 2019

General API design related question - how do we bubble up in the API layer the fact that we are supporting the struct field based filtering but not other nested field types filtering? I am not referring to the implementation details of exposing this in the API, I am just curious what a client's expectations should be that the API exposes this new capacity in a coherent manner.

@prodeezy
Copy link
Contributor Author

prodeezy commented Mar 11, 2019

General API design related question - how do we bubble up in the API layer the fact that we are supporting the struct field based filtering but not other nested field types filtering? .. I am just curious what a client's expectations should be that the API exposes this new capacity in a coherent manner.

@fbocse Currently it would be implied. The only feedback the client gets right now is in the Physical plan, that this filter is pushed down to Iceberg Scan level and if the client inspects the Scan (using iceTable.newScan().filter(structFilterExp).planFiles ) , should be able to notice the appropriate files being skipped. AFAIK, this is consistent with how top-level field based filter expressions currently communicate to the client.

boolean isNestedFieldExp = expressionFieldPath.indexOf('.') > -1;

field = isNestedFieldExp ? findNestedField(struct, expressionFieldPath, caseSensitive) :
caseSensitive ? struct.field(ref().name()) : struct.caseInsensitiveField(ref().name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: can we reuse expressionFieldPath instead of obtaining ref().name() again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

this.fieldId = fieldId;
this.pos = find(fieldId, struct);
this.type = struct.fields().get(pos).type();
this.pos = findTopFieldPos(fieldId, struct);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean to have a pos = -1?

I ask because although this field is private, we do expose it via pos() and toString() methods, so we may need to populate this field with the actual position of the matched inner struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good point. going to write some unit tests to evaluate impact. will handle accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handled this using accessors


return subField.field(lastFieldInPath);

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we need to recurse, I wonder if it makes sense to reuse the indexes available in TypeUtils: https://github.com/apache/incubator-iceberg/blob/0c9c63140e838875dc8cc52a57be2c8f24ad9975/api/src/main/java/com/netflix/iceberg/types/TypeUtil.java#L78-L84 so that this code simplifies to:

Integer idx = indexByName.get(expressionFieldPath);
Types.NestedField field = indexById.get(idx);

That code doesn't check for non-struct parents; we'd have to perhaps create a custom Indexer. Also we would need to calculate the index lazily and perhaps cache it ( As in the Schema, see https://github.com/apache/incubator-iceberg/blob/master/api/src/main/java/com/netflix/iceberg/Schema.java#L59-L71 )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like TypeUtils.indexByName & TypeUtils.indexById are used when reading data. It's used by the Manifest reader to index on manifest file statistics data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are suggesting we now start using this. i dunno what impact that would have though. do you see a major benefit to it? I can look into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you are suggesting we now start using this.

Right.

do you see a major benefit to it?

Code reuse. But it would only make sense if findNestedField() is called many times. The first call to findNestedField() would build the indexes and consult them. Then any subsequent call to findNestedField() would just consult the cached Maps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. seems like it's being called many times but by different expression evaluators. A cache to lookup field name to Types.NestedField would help. Good call. will add one in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xabriel I'm now creating an index that keeps ids to accessors and then use accessors to reach type and fields. access to this index is lazy as well.

@prodeezy
Copy link
Contributor Author

Added struct field based unit testing to TestDictionaryRowGroupFilter & TestMetricsRowGroupFilter.

@prodeezy
Copy link
Contributor Author

prodeezy commented Mar 18, 2019

@rdblue thoughts?

This PR should be ready for full review. I'v added tests and comments where possible. Addressed most PR comments.

@rdblue
Copy link
Contributor

rdblue commented Mar 18, 2019

@prodeezy, thanks for working on this! I'll review it as soon as I get time this week.

@prodeezy
Copy link
Contributor Author

prodeezy commented Mar 20, 2019

@aokolnychyi would be good to get your thoughts on this since you did all the upstream work to enable this :-)

@prodeezy
Copy link
Contributor Author

prodeezy commented Mar 21, 2019

Looks like pull#138 introduced conflicts. Will rebase with latest on master and push.

@prodeezy
Copy link
Contributor Author

Looks like pull#138 introduced conflicts. Will rebase with latest on master and push.

done.

@rdblue
Copy link
Contributor

rdblue commented Mar 23, 2019

@prodeezy, this is looking good. I think you have the right approach, but some things are missing.

With this patch, BoundReference has the correct type and field ID. That works for InclusiveMetricsEvaluator and the Parquet filters that look up data based on ID. But it isn't enough for other evaluators that require binding:

  • Evaluator calls BoundReference.get to get a value from a StructLike. That method needs to be updated to handle struct nesting, preferably using an Accessor (that's an unfinished branch of mine working on this problem). As it is now, evaluators would fail because get will try to access position -1 for any nested field.
  • InclusiveManifestEvaluator uses BoundReference.pos to get the correct PartitionFieldSummary from a ManifestFile instance. The partition field summaries are stored in the order of partition fields, so binding to the partition struct gives the correct position in the field summaries list. I think this would actually work because all fields are top-level, but I think there should be a better signal than returning -1.

This should also bind differently. We never parse identifiers in Iceberg. Instead of parsing, we build indexes using the possible field names. That way, we never have to worry about quoting.

For example, if a user passes in an expression for "a.b.c", that could mean multiple paths: ["a", "b", "c"] or ["a.b", "c"] or ["a", "b.c"] or ["a.b.c"]. But the important thing is that there can be only one column that flattens to "a.b.c" because the columns are ambiguous otherwise. Rather than searching, we keep a map in the schema from "a.b.c" to the right field ID.

Expression binding should use an index like the ones used by Schema and then build accessors using the actual path of field names.

field = isNestedFieldExp ? findNestedField(struct, expressionFieldPath, caseSensitive) :
caseSensitive ? struct.field(expressionFieldPath) : struct.caseInsensitiveField(ref().name());
Schema schema = new Schema(struct.fields());
Types.NestedField field = schema.findField(caseSensitive? ref().name(): ref().name().toLowerCase());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably use a case insensitive version of findField instead of assuming that passing lower-case in will work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

@prodeezy
Copy link
Contributor Author

@rdblue Is there a way to test the Evaluator as an end to end test? I haven't been able to test that part out properly with my conventional gist examples.

@rdblue
Copy link
Contributor

rdblue commented Mar 25, 2019

@prodeezy, IcebergGenerics is a way to read a table directly and will use Evaluator to filter records. Just make sure that should updates unit tests for the evaluators as well as adding end-to-end tests.

@prodeezy
Copy link
Contributor Author

Added tests for Evaluator and addressed other review comments.

@prodeezy
Copy link
Contributor Author

@rdblue Thanks for taking a detailed look! I think i'v addressed the review comments and added pending tests.

}

public int pos() {
if (pos == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to me to throw a ValidationException in some cases. It is likely that code paths will not know whether the bound expression is for a or a.b, but the latter will fail here, during evaluation instead of during binding.

Could the evaluator that uses pos be updated so that this accessor could be used? Maybe PositionAccessor could support accessing summaries from List. That would be cleaner.

Copy link
Contributor Author

@prodeezy prodeezy May 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need some clarification here .. The evaluator in question is the InclusiveManifestEvaluator which evaluates partition fields for matching manifests. So the ref.pos() is going to be on partition fields. Afaik partition stats are kept separately in snapshot files and regular fields wont show up in this partition summary list. Is this an issue when filtering on a nested field in the schema? If so, Is the partition source id used always to reference that field in schema? Can I assume this for logical partitions as well?

e.g. This is a partition summary in snapshot which is used in the said evaluator. This is separate from the stats kept on data schema fields.

"partitions": {
    "array": [
      {
        "contains_null": true,
        "lower_bound": {
          "bytes": "\u0013\u0000\u0000\u0000"
        },
        "upper_bound": {
          "bytes": "\u001e\u0000\u0000\u0000"
        }
      }
    ]
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're saying that this is technically safe, and that's correct. The only code path that calls this that evaluator and it is always binding to a flat partition structure. That's why it can use the position: it knows that the array of partition summaries is in the same order as a tuple of partition values.

My point here is that it is brittle to bind to a struct type and use the position for something else, and also that it is a bad API to expose the position when no normal path uses the position directly. Instead, maybe that evaluator should get this position from the first accessor. That way, it validates that the partition field is not nested (should be a single position accessor). My original thought was to add a method to the accessor that can return one of the partition summaries from a list. That would work, too, but requires another accessor method so it isn't a great idea.

Copy link
Contributor Author

@prodeezy prodeezy May 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. So i'm now using PositionAccessor to fetch position during manifest evaluation. Throwing an error if it's a different kind of accessor. Took out the pos() api from BoundReference

Copy link
Contributor Author

@prodeezy prodeezy May 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. This also cleared out usage of schema in BoundReference

@rdblue
Copy link
Contributor

rdblue commented May 7, 2019

@prodeezy, are you still working on this? I'd like to get it in before we release.

@prodeezy
Copy link
Contributor Author

prodeezy commented May 7, 2019

@rdblue sorry bout the delay. I'l have the pending comments addressed in a day or two.

@rdblue
Copy link
Contributor

rdblue commented May 7, 2019

No problem! Just let me know if you'd like me to help out.

@prodeezy
Copy link
Contributor Author

prodeezy commented May 13, 2019

@rdblue addressed all but one comments. Need some clarification/guidance on evaluating position for the bound exp reference.

@prodeezy
Copy link
Contributor Author

I took another glance. Think i'v addressed all pending comments. @rdblue

@prodeezy
Copy link
Contributor Author

@rdblue gentle reminder. Lemme know if this looks ok to you.

@rdblue
Copy link
Contributor

rdblue commented May 22, 2019

Thanks for the reminder. I should have time to review it this week. Sorry for the delay.

@rdblue
Copy link
Contributor

rdblue commented May 25, 2019

@prodeezy, I thought that it would be easier if I made a few minor changes since it would take longer to ask you to do them than to just move a few things around. I opened a PR against your branch: prodeezy#1

If you agree with those changes, just merge the PR and push and I'll commit this. Thanks!

@rdblue
Copy link
Contributor

rdblue commented May 26, 2019

@prodeezy, the test failures are because I removed the map column from TestMetricsRowGroupFilter. I thought that wasn't used but I guess it was. If you add that column back, the tests should pass.

@prodeezy
Copy link
Contributor Author

Fixed test.

@rdblue rdblue merged commit 81f29e2 into apache:master May 28, 2019
@rdblue
Copy link
Contributor

rdblue commented May 28, 2019

Merged! Thanks for fixing this @prodeezy!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants