Skip to content

Conversation

@RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Jul 27, 2021

Spark 3.1 gives the users the ability to push down pruning of nested structs to datasources. This is enabled by default and causes an issue with Iceberg if a user attempts to read some elements from a nested struct.

For example

Given Schema (a Int, b Int, c Struct( x Int, y Int))
Select (c.x) from Table
Pushes down (c.x) to datasource for pruning

Our issue stems from our code in RowDataReader which attempts to build unsafe projection from our Avro records into the Spark Rows needed to fulfill the query. By default we generate this projection by first determining what an unpruned read would look like to Spark, then selecting out those expressions which match fields required by pruned request.

Since this logic only operated on top level elements we would end up building incorrect projects for nested struct elements. Every projection would require and transform every element of the struct, regardless of whether it was used. This wasn't a problem when the datasource was unable to push nested struct pruning since we always read the entire nested struct. Now that there is a pushdown, the source of our projection may not have all the nested struct elements and the destination also may not have all the elements.

Select (c.x) from Table
// Generates projection
Project (c(x, y)) -> (c(x, y))

Previously we would only prune top level schema elements which wasn't
a problem because Spark could not prune nested schema. In Spark 3.1
this is now possible so we need to adjust our projection code to
correctly make bindings when structs are pruned.
@RussellSpitzer RussellSpitzer force-pushed the FixNestedStructPruning branch from b99de57 to fab6091 Compare July 27, 2021 16:36
@RussellSpitzer
Copy link
Member Author

Solves #2783

@RussellSpitzer
Copy link
Member Author

@szehon-ho + @aokolnychyi + @rdblue + @cwsteinbach - If you have some time I would be grateful if you could check this out


private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
StructType struct = SparkSchemaUtil.convert(readSchema);
StructType readStruct = SparkSchemaUtil.convert(readSchema);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed these variables because too many things were called "struct, or ref" and I was getting confused which was which

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes still needed?

@RussellSpitzer RussellSpitzer force-pushed the FixNestedStructPruning branch from fab6091 to 883d4c0 Compare July 27, 2021 16:42
.collectAsList();

Assert.assertEquals("Should have a single entry", 1, actual.size());
Assert.assertEquals("Should only have file_path", 0, actual.get(0).fieldIndex("file_path"));
Copy link
Member

@szehon-ho szehon-ho Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/Opt: Not sure if this message matches the assert logic.

Maybe :
assertEquals("Should select one field", actual.get(0).schema().fieldNames().length, 1)
assertEquals("Selected field should be file_path", actual.get(0).schema().fieldNames()[0], "file_path")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, I really just wanted a check to make sure this didn't crash. Maybe I should add some additional pruning tests?

Copy link
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the time spending on this issue.. It's mostly fine to me but could you put a bit of explanation as well on the pr?

exprs.add(refs.get(indexInReadSchema));
int indexInReadSchema = readStruct.fieldIndex(field.name());
if (field.type().isStructType()) {
// We may need to prune this attribute to only refer to our expected schema
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious question: probably don't have this in our case, but if we have another layer of nesting does it capture it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this basically creates the target expression from the attribute we are pruning but uses the pruned data type instead of the original read data type

@rdblue
Copy link
Contributor

rdblue commented Jul 27, 2021

@RussellSpitzer, can you update the description with a high-level summary of what you're changing?

@RussellSpitzer RussellSpitzer changed the title Fix nested struct pruning Spark: Fix nested struct pruning Jul 27, 2021
@RussellSpitzer
Copy link
Member Author

@RussellSpitzer, can you update the description with a high-level summary of what you're changing?

Added! Sorry I have been posting notes in a lot of different places. The other option for the solution is to figure out our read schema correctly the first time based on what is being pruned rather than just using the table schema but I think this is a bit simpler.

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Jul 27, 2021

I added another test and found an issue with my fix. We create a StructInternalRow with a "type" and it's data "struct"
When we actually prune the projected record's "type" does not match the struct layout, this is an issue for Map, Array, Struct types.

For example, if we make a BaseFile with a projection we end up with a mapping of ordinal to physical entry in the row, but the StructInternal row does not do the same mapping when looking up types. So although my fix works for all non parameterized lookups, it fails on parameterized types since the parameterized types are looked up based on the original layout and not the pruned one.

Ie
StructInternalRow(TableLayout) stores

Types = ( int, string, int, list)
data = ([1,2], "hello") with Map ( 0 -> 3, 2 -> 1)

Since the type lookup doesn't know about the projection it is incorrect ... trying to figure out a fix without breaking everything

This isn't an issue for setters or getters which know their type, since they never touch the "Types" struct.

@RussellSpitzer
Copy link
Member Author

RussellSpitzer commented Jul 28, 2021

Ok so trying to fix this from the Source side, the issue here for Entries table is although it reports a schema of

status, snapshot_id, sequence_number, data_file <Struct with 15 fields>

The manifest reader is allowed to project within data file which means the actual GenericManifestFiles it creates have a schema of

status, snapshot_id, sequence_number, data_file < pruned columns>

This means the table schema as set in the read tasks is incorrect and does not match what is actually in the read data.

Creating GenericManfiestFile with projection of data file column in the reader, creating structs with pruned columns and projections

return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
file -> (GenericManifestEntry<DataFile>) file);

Creating Spark StructInternalRow representation using incorrect schema (full table schema not projected schema used in GenericManfiestFile) no pruned columns or projections

StructInternalRow row = new StructInternalRow(tableSchema.asStruct());

The underlying issue we were trying to solve is that ManifestEntryTable is allowed
to prune columns from the underlying manifest entries that is reads but
it does not expose that it has done so in the Table Schema. Only the ManifestEntries
themselves know they have been pruned and because of this we have no way of recovering
this information at scan time. To fix this we add the ability for DataTasks to expose
a pruned schema which can be used by the various engines to generate proper projections.
@RussellSpitzer
Copy link
Member Author

@rdblue Attempted to fix this from the other direction, I don't like this because we have to muck about with the DataTask api. But our underlying issue is a DataTask may read an element with a struct layout different than the table it originated from based on
scan time projection. In this example the entry table when scanned produces tasks with a different schema (because of datafile pruning) than it originally reports as a table itself.

If you have another approach I'm all ears, but I think we will run into this again if we want to allow other metadata tables like the
pure files table to prune in a similar way.

@rdblue
Copy link
Contributor

rdblue commented Jul 29, 2021

Thanks, I'll take a look at this as soon as I can

private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
Schema taskSchema = task.schema() == null ? tableSchema : task.schema();
StructInternalRow row = new StructInternalRow(taskSchema.asStruct());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I see what's going on. For the entries table, Spark will push the projection into the scan and because we are reading manifests as the data files, we actually apply that projection when reading in the data task (the data_file schema is passed into each ManifestReadTask).

In theory, we should be able to use expectedSchema here instead of tableSchema to handle this because the expected schema should match the schema that gets pushed down by Spark. But in practice there are two problems:

  1. ManifestReader will only accept a file projection because it needs to return live entries and so it always projects all fields of manifest_entry
  2. Some tables use this Spark projection to avoid needing to project rows in Iceberg. For example, rows in the history table are never projected because we didn't want to implement a projection in Iceberg when it was built.

I see how this is a reasonable work-around, but I think we should fix some of the debt instead of moving ahead with it. We should make sure that tasks produce the expectedSchema instead of trying to figure out what schema the task produces.

I would solve this by using StructProjection to project rows in tables like the history table that return full rows. And I would also use it to prune out the additional top-level fields of manifest_entry. I think if you do that, then there will be no need to add a task-specific schema. And, we should be able to remove the Spark projection here, which exists because of the history table setup. Now that we have an Iceberg projection there is no need to continue doing that.

Does that make sense?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's a good solution as well, I was worried about redoing the whole setup for every data task here but that makes sense to me. Basically we will only be projecting within our original row construction and not doing the projection in spark.

Previously DataTasks would return full schemas for some tables and pruned schemas for others and would rely on the Framework to do the actual projection. This moves projection and pruning into the core responsibility of the task.
case MAP:
case LIST:
throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectedField));
// TODO Figure this out
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about allowing the projection if the the fields are primitives or if the entire struct is projected? That would cover the cases that are currently supported and avoid introducing a new pruning bug to replace the one you're fixing (where nested structs don't match the requested struct schema).


static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform) {
static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform,
Schema original, Schema projected) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'm not sure if it's just me, but I'd normally place lambda function arguments at the end of the list. Since this is internal, we can move these just after InputFile.

Also, is original always the table schema? If so, maybe we should use tableSchema instead?


boolean keyProjectable = !projectedMap.keyType().isStructType() ||
projectedMap.keyType().equals(originalMap.keyType());
boolean valueProjectable = !projectedMap.valueType().isStructType() ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this will support things like map<string, map<string, int>>. I don't think that will be a problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swapped to !isNestedType

projectedList.elementType().equals(originalList.elementType());

Preconditions.checkArgument(elementProjectable,
"Cannot perform a projection of a list unless it's element is a primitive or a struct which is " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something shorter, like "Cannot project a partial list element struct: %s from %s"?

TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0));
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: unnecessary newline.

@RussellSpitzer RussellSpitzer merged commit 6809103 into apache:master Aug 5, 2021
@RussellSpitzer RussellSpitzer deleted the FixNestedStructPruning branch August 5, 2021 12:23
@RussellSpitzer
Copy link
Member Author

Closes #2783

@aokolnychyi
Copy link
Contributor

Awesome work, @RussellSpitzer!

@RussellSpitzer
Copy link
Member Author

Thanks @rdblue for the discussion and review :) Onto the next one, #1744

@rdblue
Copy link
Contributor

rdblue commented Aug 5, 2021

Awesome work here! It's great to get rid of that ugly reflection call to create a Spark projection!

@szehon-ho
Copy link
Member

Yea , great job @RussellSpitzer and @rdblue , thanks again !

szehon-ho added a commit to szehon-ho/iceberg that referenced this pull request Aug 6, 2021
chenjunjiedada pushed a commit to chenjunjiedada/incubator-iceberg that referenced this pull request Oct 20, 2021
Merge remote-tracking branch 'upstream/merge-master-20210816' into master
## 该MR主要解决什么?

merge upstream/master,引入最近的一些bugFix和优化

## 该MR的修改是什么?

核心关注PR:
> Predicate PushDown 支持,https://github.com/apache/iceberg/pull/2358, https://github.com/apache/iceberg/pull/2926, https://github.com/apache/iceberg/pull/2777/files
> Spark场景写入空dataset 报错问题,直接skip掉即可, apache#2960
> Flink UI补充uidPrefix到operator方便跟踪多个iceberg sink任务, apache#288
> Spark 修复nested Struct Pruning问题, apache#2877
> 可以使用Table Properties指定创建v2 format表,apache#2887
> 补充SortRewriteStrategy框架,逐步支持不同rewrite策略, apache#2609 (WIP:apache#2829)
> Spark 为catalog配置hadoop属性支持, apache#2792
> Spark 针对timestamps without timezone读写支持, apache#2757
> Spark MicroBatch支持配置属性skip delete snapshots, apache#2752
> Spark V2 RewriteDatafilesAction 支持
> Core: Add validation for row-level deletes with rewrites, apache#2865 > schema time travel 功能相关,补充schema-id, Core: add schema id to snapshot 
> Spark Extension支持identifier fields操作, apache#2560
> Parquet: Update to 1.12.0, apache#2441
> Hive: Vectorized ORC reads for Hive, apache#2613
> Spark: Add an action to remove all referenced files, apache#2415

## 该MR是如何测试的?

UT
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants