Skip to content

Conversation

@RussellSpitzer
Copy link
Member

@RussellSpitzer RussellSpitzer commented Nov 9, 2020

Fix Avro Pruning Bugs with Empty Structs

Previously we would crash whenever attempting to project only non
data_file columns from a partitioned Iceberg table. This would occur
because our projection in Manifest Readerwould always require the
"data_file" field even if it was an empty struct.

This was an issue because GetProjectedIDs only can retreive ids from
struct fields. If a struct is empty it ignores the field leading to
a projection without the struct.

This worked on unpartitioned tables because of a second bug in the column
pruning which would always included empty structs regardless of if they
were requested or not. This in concert with the above bug would allow un
partitioned tables to work. Their schema data_file schema could never be
empty because it contained a field (partition) which was an empty struct
so it would always be included.

Here we fix both bugs by properly ignoring not requested fields, and
correctly requesting the id's of empty structs if they exist.

Example of Bug 1

From (A, B, C, D { foo, bar } )
Project (A, B, C, D {})
Returns
(A, B, C) // Missing D which was required in the projection

This bug also applies if D is projected containing optional columns which are not in the write schema. For example
D { Optional : otherColumn } or metadata columns D { MetadataColumn.FILE_POSITION }

Example of Bug 2

From (A, B, C, D {foo, bar, baz {}} ) // Starting with D containing a field baz which is empty
Project (A)
Returns
(A, D{ baz {} }) // Includes D even though it was not requested

In combination these sometimes negate each other ~ kinda

Example of interaction of bugs

From (A, B, C, D {foo, bar, baz{}})
Project (A, B, C, D{})
Returns
(A, B, C, D{ baz{} }) // D is present although with an unrequested field but this satisfies the projection

@Test
public void testFilesTable() throws Exception {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor test cleanup here to use a constant partition spec like other tests

if (hasChange) {
return copyRecord(record, filteredFields);
} else if (filteredFields.size() == record.getFields().size()) {
} else if (record.getFields().size() != 0 && filteredFields.size() == record.getFields().size()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the second bug mentioned, If you are pruning
X : Struct { Y : Struct<>, Z: Int, ..... }
And don't request "X" or "Y"

Y Has no fields so previously filteredFields.size() = 0 = record.getFields().size() which means we return
Y as a required field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what keeps "data_file" from being pruned on Unpartitioned tables (since partitionType would be an empty struct)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a different operation that is the opposite of GetProjectedIds, like ProjectFromIds.

TypeUtil.select uses this class, PruneColumns, but it has behavior like a SQL SELECT. If I have a schema a int, b struct<x double, y double>, c string and I select b, then everything underneath b is selected, which is what you'd expect from SELECT b FROM table.

If we were to update GetProjectedIds with the logic above, then projecting b struct<> (which you can't do by naming columns) would actually result in the full struct getting projected because of the logic here that selects all of b. This class cannot be used to reconstruct a schema using the result of GetProjectedIds.

I think that we also need a BuildProjection that does the opposite of GetProjectedIds with the update to add empty structs. Then the datum reader could use that logic to prune the Avro schema and get an exact match with the expected schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah I forgot about that :/ yeah we'll need an inverse sort of thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to figure out if there was a simpler way to do this and I ended up modifying BuildAvroProjection. In my mind the real problem here was we were telling BuildAvroProjection what columns we wanted, but our pruned schema really only showed us what columns we needed that have data. This only left leaf empty-struct nodes. So during BuildAvroProjection we check for these empty-structs being requested in the expected schema and just add them back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a reasonable argument that BuildAvroProjection should do what your describe, since it is creating the final schema that will be requested from Avro.

If that's adding back the empty structs, then why change the behavior of PruneColumns here?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is the change to fix bug 2,

Example of Bug 2

From (A, B, C, D {foo, bar, baz {}} ) // Starting with D containing a field baz which is empty
Project (A)
Returns
(A, D{ baz {} }) // Includes D even though it was not requested

The issue being that a struct which contains no elements naturally will always be included because
records.filedsize == 0 == filteredFields.size == 0

Because the field is included, all parent structs which include it are also included in the projection. For example

From (A , B{ C { D { E { F{}, G }}}})
Project (A)
Returns
(A, B{ C { D { E { F{}}}}})

I think that behavior is incorrect, and this should be pruned out if it isn't needed in the projection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant to ask this on the block above.

@Override
public Set<Integer> field(Types.NestedField field, Set<Integer> fieldResult) {
if (fieldResult == null) {
if (fieldResult == null || isEmptyStruct(field)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error here previously, is that an empty struct has no fields, since it has no fields we never would ad an ID representing it (normally this happens by referring to subfields)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this change leaving the API behavior exactly the same, now instead we add on empty projections when doing the projection itself.

@rdblue
Copy link
Contributor

rdblue commented Nov 10, 2020

For reference later, the problem here is that Avro's BuildAvroProjection can fail if the expected schema and Avro schema don't match. The problem in GetProjectedIds caused an empty data_file field to not get projected in the result of PruneColumns because no ID was returned to select it. The number of fields in the final Avro projection must match so that all column indexes align between the expected Iceberg schema and the schema passed to the Avro resolver.

@RussellSpitzer
Copy link
Member Author

Also for reference, the other issue we are dealing with is that "PruneColumns" in the current implementation will always select any record whose sub records are empty structs regardless of pruning Id's.

@RussellSpitzer RussellSpitzer force-pushed the FixAvroPruning branch 2 times, most recently from 4e76fc8 to ae9c96a Compare November 11, 2020 04:29
@RussellSpitzer
Copy link
Member Author

Automated tests failed on the last run that work locally on my MacBook :/ but oddly enough not the direct schema projection tests, only the metadata table test ... maybe it won't break this time?

@RussellSpitzer
Copy link
Member Author

It was the merge, something in master has broken my tests :/

hasChange = true;
filteredFields.add(copyField(field, fieldSchema, fieldId));
} else if (emptyStructIds.contains(fieldId)) {
// This field does not require any known sub-fields but is required in the projection so keep it without
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the pruning Side fix to Bug 1, Basically making sure we add the parents of any fields in the projection even if none of their real (in the file) fields are projected.

For example without this fix

From (A, B { C } )
Project (A, B { D.Optional })
Returns
(A) 

import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

class GetEmptyStructIds extends TypeUtil.SchemaVisitor<Set<Integer>> {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This Visitor is used to find the FieldIDs of any fields which must remain in the projection, but if they have no sub fields, should be just included as empty Structs.

There are 3 Main cases of this

  1. The Struct is Empty

  2. The Sturct contains required MetadataColumns, these are not real so we cannot properly include their parents in the normal path. Currently the only one of these allowed is MetadataColumns.ROW_POSITION, see

    Preconditions.checkArgument(
    field.isOptional() || field.fieldId() == MetadataColumns.ROW_POSITION.fieldId(),
    "Missing required field: %s", field.name());
    // Create a field that will be defaulted to null. We assign a unique suffix to the field
    // to make sure that even if records in the file have the field it is not projected.

  3. The Struct contains Optional Columns which aren't part of the schema, these are a allowed by the same code block mentioned above, but if their parents are pruned out we will have an error when we attempt to add them back to the projection.

return false;
}
List<Types.NestedField> fields = ((Types.StructType) field.type()).fields();
return fields.stream().allMatch(f -> f.fieldId() > Integer.MAX_VALUE - 201 || f.isOptional());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The magic number here is our block of Reserved and Metadata FieldIds


Record projected = writeAndRead("empty_proj", writeSchema, emptyStruct, record);
Assert.assertNull("Should not project data", projected.get("data"));
Record result = (Record) projected.get("location");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should access the fields that are present by position as well. The expected position for this is 0, so asserting that the same record is returned would be a good check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be the same as checking if "id" is not projected? Maybe that's a better check? I can add that in too

}

@Test
public void testEmptyNestedStructRequiredProjection() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case looks right to me.

// This field does not require any known sub-fields but is required in the projection so keep it without
// any of it's subfields
hasChange = true;
Schema empty = AvroSchemaUtil.removeFields(field);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use removeFields when this class already has copyRecord? Couldn't this just call copyRecord(field.schema(), ImmutableList.of())?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CopyRecord does not work on UnionTypes :(


@Override
public Set<Integer> field(Types.NestedField field, Set<Integer> fieldResult) {
if (isEmptyStruct(field) || isPotentiallyEmpty(field)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The potentially empty handling here seems strange to me. I think it would make more sense for the field to be added or fixed up in BuildAvroProjection instead of catching the case so that an empty struct is retained by PruneColumns.

Copy link
Member Author

@RussellSpitzer RussellSpitzer Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's another alternative. We could pass through the original avro schema to build projection and just fall back to copy field and remove subfields if a record is missing but required.

In my mind "pruning" means removing everything that is unnecessary, but these fields are necessary so fixing pruning seemed more natural to me than building it back in in buildAvroProjection. Although we already do add more fields in the projection code for fields that don't exist at all.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so I did a brief look at the buildAvroProjection code to see any possible issues, the main difficulty is that we would need to simultaneously traverse the real avro schema along with the one passed as "pruned". If we don't do this we have no way of determining if a required field is actually in the avro schema or not so we can't do proper correctness checks for whether an expected required field should actually be there.

Other than that I think the implementation should be fairly straight forward, if we find a record that should be there but isn't, we build a new record based on the actual field then call "visit" on that field. We will end up breaking the

Let me see how difficult this would be.

return "_x" + Integer.toHexString(character).toUpperCase();
}

public static Schema removeFields(Schema.Field field) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation here seems very specific to a case and not useful as a generic method. Maybe we should move it into the PruneColumns class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep sure, it's not very common

@RussellSpitzer
Copy link
Member Author

@szehon-ho Let's work together to figure out a solution to this :)

@szehon-ho
Copy link
Member

szehon-ho commented Mar 30, 2021

Yes , as we talked offline, I ended up debugging the same thing as @RussellSpitzer while hitting #1378 . My thought was that that changing the PruneColumns or BuildAvroProjection behaviour would be a bit involved, so I took a different approach, which I put up as reference: #2395

It's a poor workaround of the problem by adding non-empty struct when reading the Metadata (manifest) Entries and All-Entries table, though it seems to work for most cases. (Bug hits nested non-empty required structs that are pruned away, can look for those if there are more) I just put it as an option in case there is no good other fix.

Of course if there is a proper fix, I'd be interested to follow/ help wherever I can, though I heard from Russell that it is not very easy :)

In any case would love to see a solution to the bug. We were trying to write an analytics job that calculated how many data lands in a table per time period, and thought to aggregate entries table and join with snapshot table (which has timestamp), but it seems this bug prevents any aggregates on entries table, without ugly workarounds.

Previously we would crash whenever attempting to project only non
data_file columns from a partitioned Iceberg table. This would occur
because our projection in Manifest Readerwould always require the
"data_file" field even if it was an empty struct.

This was an issue because GetProjectedIDs only can retreive ids from
struct fields. If a struct is empty it ignores the field leading to
a projection without the struct.

This worked on unpartitioned tables because of a second bug in the column
pruning which would always included empty structs regardless of if they
were requested or not. This in concert with the above bug would allow un
partitioned tables to work. Their schema data_file schema could never be
empty because it contained a field (partition) which was an empty struct
so it would always be included.

Here we fix both bugs by properly ignoring not requested fields, and
correctly projecting empty structs when they are explicitly requested.
Inorder to fix all variations of empty struct projections we introduce a second traversal
of the avro schema which just identifies fields which need to be read as empty structs.
We use these integers in parallel with those fields which are being actively included
in the pruned schema, but instead of including them as is, we remove all of their sub-
fields before adding them. Since we are still following in the normal PruneColumns logic
all required parents for the pruned column are added automatically. In addition
new tests are added to make sure that empty and nested empty projections work as expected.
Previously if a user attempted to project a column which did not exist in the file schema
we would throw an error if it was required, or allow it if it was optional. But if this optional
column was the only column in the parent struct, the parent struct would be ignored so
we could not add the child field.

To fix this we add structs which contain only Metadata columns or Optional columsn to our list
of structs to preserve because they may be needed for child fields which don't actually exist.
Was previously replacing fields which had known subfields with empty
versions. We should only do that if there are no known subfields.
@RussellSpitzer
Copy link
Member Author

@rdblue + @szehon-ho - @karuppayya reminded me to fix this with our hope that it would fix #2783, While I did fix this PR it doesn't seem to have fixed the other issue. That said, I at least fixed these two issues :) I would appreciate a review so we can at least fix this portion.

@rdblue
Copy link
Contributor

rdblue commented Aug 1, 2021

If I remember correctly, the main problem here was a mismatch between the expected schema and the actual schema when reading from Avro files. That was caused by some unexpected behaviors across a few helpers:

  1. TypeUtil.select (that uses PruneColumns) has behavior like SQL projection using IDs. Any field that is selected by ID is completely projected. For example, selecting {4} from the schema 3: id bigint, 4: location struct<5: lat double, 6: long double> is like SELECT location and produces 4: location struct<5: lat double, 6 long double> as the projected schema. As a result, empty structs can't be selected by this method (or in SQL).
  2. TypeUtil.getProjectedIds (that uses GetProjectedIds) is written expecting the current behavior from TypeUtil.select. When finding the IDs needed to describe a projection, it will return all leaf field IDs. Because empty structs contain no leaf fields, there are no IDs returned for an empty struct. Reusing the example above, if the requested projection were 4: location struct<> (no fields) then the result is {} rather than {4} because select would return all fields of location.
  3. Record types are position based and readers assume that the expected schema will match the read schema, even though the read schema is produced using the file schema, conversions, and calls to the two methods above. An empty struct in the requested schema will be silently removed, producing a mismatch in the field positions. When reading with id bigint, location struct<>, data string, the data produced will be id bigint, data string, resulting in an ArrayIndexOutOfBounds exception or a ClassCastException depending on the rest of the projection.
  4. There is a bug that sometimes avoids this problem by accidentally projecting empty structs, but this is not reliable because sometimes the empty struct should not be produced or else it introduces the opposite problem: projecting id bigint, data string actually produces id bigint, location struct<>, data string.

Hopefully that captures the issues here.

First, I think that select behavior is correct. This matches the behavior where a list of selected fields should be turned into a list of IDs and completely projected. Selecting a struct should select all of its fields, as expected by SELECT location. The problem is that this is that we assume that the output of getProjectedIds will be used for select and so it can't return any non-leaf field IDs.

To fix this, I propose the following:

  • Update the behavior of getProjectedIds to return an ID for empty structs. The ID should indicate that the struct must be returned without any fields, unless there are sub-fields selected by a nested field ID.
  • Add a TypeUtil.project method that is the opposite of getProjectedIds. The combination of getProjectedIds and project should always be the same schema, with empty structs.
  • Update all calls to select and avoid passing the result of getProjectedIds because of the behavior change. In some cases, this will actually fix behavior: StructProjection.create(Schema base, Schema projection) should use project to preserve empty structs, for example. But other places, like BaseTableScan.lazyColumnProjection can avoid calling getProjectedIds because it isn't needed (and would change behavior).
  • Update most calls that need to produce a schema matching the expected schema to use project instead of select so that empty structs are preserved.
  • Fix the second bug, which is unrelated but also incorrect.

I think that should fix this. To me, that seems like a couple of PRs: one to add TypeUtil.project that will produce empty structs instead of fully-selected structs (maybe just a boolean option to configure PruneColumns). Then one PR to update the behavior of getProjectedIds and fix select calls that use the result to correctly call project or avoid getProjectedIds. And finally, one PR to fix the second bug.

Does that sound reasonable?

@RussellSpitzer
Copy link
Member Author

Yep, Seems close to the original plan we had here, i'll keep the new tests and work on rebuilding the original getId's/Project methods.

@RussellSpitzer
Copy link
Member Author

Fixing with #2953

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants