Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,19 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {

List<Types.NestedField> fields = Lists.newArrayList();
fields.addAll(projection.asStruct().fields());
fields.add(MetadataColumns.ROW_POSITION);

if (fields.size() != 0) {
// Empty Projection, don't add metadata columns
fields.add(MetadataColumns.ROW_POSITION);
}

switch (format) {
case AVRO:
AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
.rename("manifest_entry", GenericManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
.rename("partition_r102", PartitionData.class.getName())
.rename("r102", PartitionData.class.getName())
.rename("data_file", content.fileClass())
.rename("r2", content.fileClass())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,26 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
updatedFields.add(avroField);

} else {
Preconditions.checkArgument(
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),
Preconditions.checkArgument(fieldWillBeEmpty(field),
"Missing required field: %s", field.name());
// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField = new Schema.Field(
field.name() + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE);
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
updatedFields.add(newField);
hasChange = true;
// We also need to apply any renames since the required column may have an alternative reader
if (field.isRequired() && field.type().isStructType()) {
Schema.Field newField = new Schema.Field(
field.name(),
AvroSchemaUtil.convert(field.type().asStructType(), renames.getOrDefault(field.name(), field.name())));
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
updatedFields.add(newField);
hasChange = true;
} else {
Schema.Field newField = new Schema.Field(
field.name() + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE);
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
updatedFields.add(newField);
hasChange = true;
}
}
}

Expand All @@ -131,6 +140,7 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
String expectedName = expectedField.name();

this.current = expectedField.type();

try {
Schema schema = fieldResult.get();

Expand Down Expand Up @@ -256,4 +266,18 @@ public Schema primitive(Schema primitive) {
}
}

/**
* Given a field, determine if it or any of it's sub-field will actually be read from the file.
* This checks to see if there are any fields which are not Optional, Metadata, or Empty Structs.
* @param field a field which exists in the projection but not in the pruned Avro Schema
* @return true if the field does not represent any real read from the file
*/
private static boolean fieldWillBeEmpty(Types.NestedField field) {
if (field.type().isStructType()) {
return field.isOptional() ||
field.type().asStructType().fields().stream().allMatch(BuildAvroProjection::fieldWillBeEmpty);
} else {
return field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Schema record(Schema record, List<String> names, List<Schema> fields) {

if (hasChange) {
return copyRecord(record, filteredFields);
} else if (filteredFields.size() == record.getFields().size()) {
} else if (record.getFields().size() != 0 && filteredFields.size() == record.getFields().size()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the second bug mentioned, If you are pruning
X : Struct { Y : Struct<>, Z: Int, ..... }
And don't request "X" or "Y"

Y Has no fields so previously filteredFields.size() = 0 = record.getFields().size() which means we return
Y as a required field.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what keeps "data_file" from being pruned on Unpartitioned tables (since partitionType would be an empty struct)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a different operation that is the opposite of GetProjectedIds, like ProjectFromIds.

TypeUtil.select uses this class, PruneColumns, but it has behavior like a SQL SELECT. If I have a schema a int, b struct<x double, y double>, c string and I select b, then everything underneath b is selected, which is what you'd expect from SELECT b FROM table.

If we were to update GetProjectedIds with the logic above, then projecting b struct<> (which you can't do by naming columns) would actually result in the full struct getting projected because of the logic here that selects all of b. This class cannot be used to reconstruct a schema using the result of GetProjectedIds.

I think that we also need a BuildProjection that does the opposite of GetProjectedIds with the update to add empty structs. Then the datum reader could use that logic to prune the Avro schema and get an exact match with the expected schema.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah I forgot about that :/ yeah we'll need an inverse sort of thing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to figure out if there was a simpler way to do this and I ended up modifying BuildAvroProjection. In my mind the real problem here was we were telling BuildAvroProjection what columns we wanted, but our pruned schema really only showed us what columns we needed that have data. This only left leaf empty-struct nodes. So during BuildAvroProjection we check for these empty-structs being requested in the expected schema and just add them back.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a reasonable argument that BuildAvroProjection should do what your describe, since it is creating the final schema that will be requested from Avro.

If that's adding back the empty structs, then why change the behavior of PruneColumns here?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Nov 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is the change to fix bug 2,

Example of Bug 2

From (A, B, C, D {foo, bar, baz {}} ) // Starting with D containing a field baz which is empty
Project (A)
Returns
(A, D{ baz {} }) // Includes D even though it was not requested

The issue being that a struct which contains no elements naturally will always be included because
records.filedsize == 0 == filteredFields.size == 0

Because the field is included, all parent structs which include it are also included in the projection. For example

From (A , B{ C { D { E { F{}, G }}}})
Project (A)
Returns
(A, B{ C { D { E { F{}}}}})

I think that behavior is incorrect, and this should be pruned out if it isn't needed in the projection

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant to ask this on the block above.

return record;
} else if (!filteredFields.isEmpty()) {
return copyRecord(record, filteredFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ protected Record writeAndRead(String desc,
Record record = super.writeAndRead(desc, writeSchema, readSchema, inputRecord);
Record projectedWithNameMapping = writeAndRead(
writeSchema, readSchema, inputRecord, MappingUtil.create(writeSchema));
Assert.assertEquals(record, projectedWithNameMapping);
// Ignore anonymous field names, we want positions and values to be the same
Assert.assertEquals(record.toString(), projectedWithNameMapping.toString());
return record;
}

Expand Down
225 changes: 225 additions & 0 deletions core/src/test/java/org/apache/iceberg/avro/TestReadProjection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -526,4 +527,228 @@ public void testListOfStructsProjection() throws IOException {
AssertHelpers.assertEmptyAvroField(projectedP2, "y");
Assert.assertNull("Should project null z", projectedP2.get("z"));
}

@Test
public void testEmptyStructProjection() throws Exception {
Schema writeSchema = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.optional(3, "location", Types.StructType.of(
Types.NestedField.required(1, "lat", Types.FloatType.get()),
Types.NestedField.required(2, "long", Types.FloatType.get())
))
);

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
Record location = new Record(
AvroSchemaUtil.fromOption(record.getSchema().getField("location").schema()));
location.put("lat", 52.995143f);
location.put("long", -1.539054f);
record.put("location", location);

Schema emptyStruct = new Schema(
Types.NestedField.required(3, "location", Types.StructType.of())
);

Record projected = writeAndRead("empty_proj", writeSchema, emptyStruct, record);
AssertHelpers.assertEmptyAvroField(projected, "id");
Record result = (Record) projected.get("location");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should access the fields that are present by position as well. The expected position for this is 0, so asserting that the same record is returned would be a good check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be the same as checking if "id" is not projected? Maybe that's a better check? I can add that in too

Assert.assertEquals("location should be in the 0th position", result, projected.get(0));
Assert.assertNotNull("Should contain an empty record", result);
AssertHelpers.assertEmptyAvroField(result, "lat");
AssertHelpers.assertEmptyAvroField(result, "long");
}

@Test
public void testEmptyStructRequiredProjection() throws Exception {
Schema writeSchema = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.required(3, "location", Types.StructType.of(
Types.NestedField.required(1, "lat", Types.FloatType.get()),
Types.NestedField.required(2, "long", Types.FloatType.get())
))
);

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
Record location = new Record(record.getSchema().getField("location").schema());
location.put("lat", 52.995143f);
location.put("long", -1.539054f);
record.put("location", location);

Schema emptyStruct = new Schema(
Types.NestedField.required(3, "location", Types.StructType.of())
);

Record projected = writeAndRead("empty_req_proj", writeSchema, emptyStruct, record);
AssertHelpers.assertEmptyAvroField(projected, "id");
Record result = (Record) projected.get("location");
Assert.assertEquals("location should be in the 0th position", result, projected.get(0));
Assert.assertNotNull("Should contain an empty record", result);
AssertHelpers.assertEmptyAvroField(result, "lat");
AssertHelpers.assertEmptyAvroField(result, "long");
}

@Test
public void testRequiredEmptyStructInRequiredStruct() throws Exception {
Schema writeSchema = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.required(3, "location", Types.StructType.of(
Types.NestedField.required(1, "lat", Types.FloatType.get()),
Types.NestedField.required(2, "long", Types.FloatType.get()),
Types.NestedField.required(4, "empty", Types.StructType.of())
))
);

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
Record location = new Record(record.getSchema().getField("location").schema());
location.put("lat", 52.995143f);
location.put("long", -1.539054f);
record.put("location", location);

Schema emptyStruct = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.required(3, "location", Types.StructType.of(
Types.NestedField.required(4, "empty", Types.StructType.of())
))
);

Record projected = writeAndRead("req_empty_req_proj", writeSchema, emptyStruct, record);
Assert.assertEquals("Should project id", 34L, projected.get("id"));
Record result = (Record) projected.get("location");
Assert.assertEquals("location should be in the 1st position", result, projected.get(1));
Assert.assertNotNull("Should contain an empty record", result);
AssertHelpers.assertEmptyAvroField(result, "lat");
AssertHelpers.assertEmptyAvroField(result, "long");
Assert.assertNotNull("Should project empty", result.getSchema().getField("empty"));
Assert.assertNotNull("Empty should not be null", result.get("empty"));
Assert.assertEquals("Empty should be empty", 0,
((Record) result.get("empty")).getSchema().getFields().size());
}

@Test
public void testEmptyNestedStructProjection() throws Exception {
Schema writeSchema = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.optional(3, "outer", Types.StructType.of(
Types.NestedField.required(1, "lat", Types.FloatType.get()),
Types.NestedField.optional(2, "inner", Types.StructType.of(
Types.NestedField.required(5, "lon", Types.FloatType.get())
)
)
))
);

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
Record outer = new Record(
AvroSchemaUtil.fromOption(record.getSchema().getField("outer").schema()));
Record inner = new Record(AvroSchemaUtil.fromOption(outer.getSchema().getField("inner").schema()));
inner.put("lon", 32.14f);
outer.put("lat", 52.995143f);
outer.put("inner", inner);
record.put("outer", outer);

Schema emptyStruct = new Schema(
Types.NestedField.required(3, "outer", Types.StructType.of(
Types.NestedField.required(2, "inner", Types.StructType.of())
)));

Record projected = writeAndRead("nested_empty_proj", writeSchema, emptyStruct, record);
AssertHelpers.assertEmptyAvroField(projected, "id");
Record outerResult = (Record) projected.get("outer");
Assert.assertEquals("Outer should be in the 0th position", outerResult, projected.get(0));
Assert.assertNotNull("Should contain the outer record", outerResult);
AssertHelpers.assertEmptyAvroField(outerResult, "lat");
Record innerResult = (Record) outerResult.get("inner");
Assert.assertEquals("Inner should be in the 0th position", innerResult, outerResult.get(0));
Assert.assertNotNull("Should contain the inner record", innerResult);
AssertHelpers.assertEmptyAvroField(innerResult, "lon");
}

@Test
public void testEmptyNestedStructRequiredProjection() throws Exception {
Schema writeSchema = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.required(3, "outer", Types.StructType.of(
Types.NestedField.required(1, "lat", Types.FloatType.get()),
Types.NestedField.required(2, "inner", Types.StructType.of(
Types.NestedField.required(5, "lon", Types.FloatType.get())
)
)
))
);

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
Record outer = new Record(record.getSchema().getField("outer").schema());
Record inner = new Record(outer.getSchema().getField("inner").schema());
inner.put("lon", 32.14f);
outer.put("lat", 52.995143f);
outer.put("inner", inner);
record.put("outer", outer);

Schema emptyStruct = new Schema(
Types.NestedField.required(3, "outer", Types.StructType.of(
Types.NestedField.required(2, "inner", Types.StructType.of())
)));

Record projected = writeAndRead("nested_empty_req_proj", writeSchema, emptyStruct, record);
AssertHelpers.assertEmptyAvroField(projected, "id");
Record outerResult = (Record) projected.get("outer");
Assert.assertEquals("Outer should be in the 0th position", outerResult, projected.get(0));
Assert.assertNotNull("Should contain the outer record", outerResult);
AssertHelpers.assertEmptyAvroField(outerResult, "lat");
Record innerResult = (Record) outerResult.get("inner");
Assert.assertEquals("Inner should be in the 0th position", innerResult, outerResult.get(0));
Assert.assertNotNull("Should contain the inner record", innerResult);
AssertHelpers.assertEmptyAvroField(innerResult, "lon");
}

@Test
public void testNonExistentProjection() throws Exception {
Assume.assumeFalse("Bug in pruning code will make the names not match when name mapping applied",
this.getClass().getName().equals(TestAvroNameMapping.class.getName()));
// TODO Purning code keeps records whose subfields have changed even if those fields are not required,
// this means BuildAvroProjection builds a r_Named "foo" because "location" is kept in the pruned schema
// even though it should not be. Otherwise location would be missing and the foo field would be returned
// "foo" since it is a subfield of required field being built rather than the field being built.
Schema writeSchema = new Schema(
Types.NestedField.required(0, "id", Types.LongType.get()),
Types.NestedField.optional(3, "location", Types.StructType.of(
Types.NestedField.required(1, "lat", Types.FloatType.get()),
Types.NestedField.required(2, "long", Types.FloatType.get())
))
);

Record record = new Record(AvroSchemaUtil.convert(writeSchema, "table"));
record.put("id", 34L);
Record location = new Record(
AvroSchemaUtil.fromOption(record.getSchema().getField("location").schema()));
location.put("lat", 52.995143f);
location.put("long", -1.539054f);
record.put("location", location);

Schema emptyStruct = new Schema(
Types.NestedField.required(3, "location", Types.StructType.of(
Types.NestedField.optional(10000, "foo", Types.StructType.of(
Types.NestedField.optional(10001, "bar", Types.IntegerType.get())
))
))
);

Record projected = writeAndRead("non_existant_proj", writeSchema, emptyStruct, record);
AssertHelpers.assertEmptyAvroField(projected, "id");
Record result = (Record) projected.get("location");
Assert.assertEquals("location should be in the 0th position", result, projected.get(0));
Assert.assertNotNull("Should contain an fake optional record", result);
AssertHelpers.assertEmptyAvroField(result, "lat");
AssertHelpers.assertEmptyAvroField(result, "long");
Assert.assertNotNull("Schema should contain foo", result.getSchema().getField("foo"));
Assert.assertNull("foo should be null since it is not present in the data", result.get("foo"));
Assert.assertNotNull("Schema should contain foo.bar",
AvroSchemaUtil.fromOption(result.getSchema().getField("foo").schema())
.getField("bar"));
}
}
Loading