Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/GetProjectedIds.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,17 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;

class GetProjectedIds extends TypeUtil.SchemaVisitor<Set<Integer>> {
private final boolean includeStructIds;
private final Set<Integer> fieldIds = Sets.newHashSet();

GetProjectedIds() {
this(false);
}

GetProjectedIds(boolean includeStructIds) {
this.includeStructIds = includeStructIds;
}

@Override
public Set<Integer> schema(Schema schema, Set<Integer> structResult) {
return fieldIds;
Expand All @@ -39,7 +48,7 @@ public Set<Integer> struct(Types.StructType struct, List<Set<Integer>> fieldResu

@Override
public Set<Integer> field(Types.NestedField field, Set<Integer> fieldResult) {
if (fieldResult == null) {
if ((includeStructIds && field.type().isStructType()) || field.type().isPrimitiveType()) {
fieldIds.add(field.fieldId());
}
return fieldIds;
Expand Down
16 changes: 8 additions & 8 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,30 @@ public static Types.StructType select(Types.StructType struct, Set<Integer> fiel
}

public static Set<Integer> getProjectedIds(Schema schema) {
return ImmutableSet.copyOf(getIdsInternal(schema.asStruct()));
return ImmutableSet.copyOf(getIdsInternal(schema.asStruct(), true));
}

public static Set<Integer> getProjectedIds(Type type) {
if (type.isPrimitiveType()) {
return ImmutableSet.of();
}
return ImmutableSet.copyOf(getIdsInternal(type));
return ImmutableSet.copyOf(getIdsInternal(type, true));
}

private static Set<Integer> getIdsInternal(Type type) {
return visit(type, new GetProjectedIds());
private static Set<Integer> getIdsInternal(Type type, boolean includeStructIds) {
return visit(type, new GetProjectedIds(includeStructIds));
}

public static Types.StructType selectNot(Types.StructType struct, Set<Integer> fieldIds) {
Set<Integer> projectedIds = getIdsInternal(struct);
Set<Integer> projectedIds = getIdsInternal(struct, false);
projectedIds.removeAll(fieldIds);
return select(struct, projectedIds);
return project(struct, projectedIds);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue here is selectNot doest not actually deselect children when a parent ID is not selected. Previously this is because getProjectedIDs (behind getIdsInternal) would not return parent struct ids, so removing it from the set of projectedIds would not do anything.

Now It will not work because removing a parentID still leaves all child IDs. We could fix this but it would be a change in behavior from the previous code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with the decision to not change the behavior of this method, even though the opposite of "select" behavior would be to fully remove a struct when its ID is passed in fieldIds.

But I don't think that project is quite correct either. Consider the example schema 1: id bigint, 2: location struct<3: lat double, 4: long double>. Previously, selectNot(t, set(3, 4)) would produce 1: id bigint and omit the location entirely. Using project with the updated GetProjectedIds, the projected ID set will be {1, 2, 3, 4} and not {1, 3, 4}. That would result in the same call producing 1: id bigint, 2: location struct<>, which introduces a new bug because now there is an unexpected extra field.

To clean this up, I think we need a version of GetProjectedIds that doesn't select structs and uses the old behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems like the right behavior to me? Shouldn't you be required to explicitly omit the parent if you don't want the that element? Otherwise there would be no way to "selectNot" and only get back the empty struct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrote up these test cases, i'll run the full test suite to make sure this works with our other usages

    Schema schema = new Schema(
        Lists.newArrayList(
            required(1, "id", Types.LongType.get()),
            required(2, "location", Types.StructType.of(
                required(3, "lat", Types.DoubleType.get()),
                required(4, "long", Types.DoubleType.get())
            ))));

    Schema expectedNoPrimitive = new Schema(
        Lists.newArrayList(
            required(2, "location", Types.StructType.of(
                required(3, "lat", Types.DoubleType.get()),
                required(4, "long", Types.DoubleType.get())
            ))));

    Schema actualNoPrimitve = TypeUtil.selectNot(schema, Sets.newHashSet(1));
    Assert.assertEquals(expectedNoPrimitive.asStruct(), actualNoPrimitve.asStruct());

    // Expected legacy behavior is to completely remove structs if their elements are removed
    Schema expectedNoStructElements = new Schema(required(1, "id", Types.LongType.get()));
    Schema actualNoStructElements = TypeUtil.selectNot(schema, Sets.newHashSet(3, 4));
    Assert.assertEquals(expectedNoStructElements.asStruct(), actualNoStructElements.asStruct());

    // Expected legacy behavior is to ignore selectNot on struct elements.
    Schema actualNoStruct = TypeUtil.selectNot(schema, Sets.newHashSet(2));
    Assert.assertEquals(schema.asStruct(), actualNoStruct.asStruct());
    ```

}

public static Schema selectNot(Schema schema, Set<Integer> fieldIds) {
Set<Integer> projectedIds = getIdsInternal(schema.asStruct());
Set<Integer> projectedIds = getIdsInternal(schema.asStruct(), false);
projectedIds.removeAll(fieldIds);
return select(schema, projectedIds);
return project(schema, projectedIds);
}

public static Schema join(Schema left, Schema right) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class StructProjection implements StructLike {
*/
public static StructProjection create(Schema schema, Set<Integer> ids) {
StructType structType = schema.asStruct();
return new StructProjection(structType, TypeUtil.select(structType, ids));
return new StructProjection(structType, TypeUtil.project(structType, ids));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there aren't any uses of this call, which is good. I agree that we probably want this to use project instead of select.

}

/**
Expand Down
79 changes: 43 additions & 36 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package org.apache.iceberg.types;

import java.util.Set;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -323,48 +324,24 @@ public void testProjectMap() {
}

@Test
public void testProjectList() {
public void testGetProjectedIds() {
Schema schema = new Schema(
Lists.newArrayList(
required(10, "a", Types.IntegerType.get()),
required(11, "A", Types.IntegerType.get()),
required(12, "list", Types.ListType.ofRequired(13,
Types.StructType.of(
optional(20, "foo", Types.IntegerType.get()),
required(21, "subList", Types.ListType.ofRequired(14,
Types.StructType.of(
required(15, "x", Types.IntegerType.get()),
required(16, "y", Types.IntegerType.get()),
required(17, "z", Types.IntegerType.get())))))))));


AssertHelpers.assertThrows("Cannot explicitly project List",
IllegalArgumentException.class,
() -> TypeUtil.project(schema, Sets.newHashSet(12))
);

AssertHelpers.assertThrows("Cannot explicitly project List",
IllegalArgumentException.class,
() -> TypeUtil.project(schema, Sets.newHashSet(21))
);
required(35, "emptyStruct", Types.StructType.of()),
required(12, "someStruct", Types.StructType.of(
required(13, "b", Types.IntegerType.get()),
required(14, "B", Types.IntegerType.get()),
required(15, "anotherStruct", Types.StructType.of(
required(16, "c", Types.IntegerType.get()),
required(17, "C", Types.IntegerType.get()))
)))));

Schema expectedDepthOne = new Schema(
Lists.newArrayList(
required(12, "list", Types.ListType.ofRequired(13,
Types.StructType.of()))));
Schema actualDepthOne = TypeUtil.project(schema, Sets.newHashSet(13));
Assert.assertEquals(expectedDepthOne.asStruct(), actualDepthOne.asStruct());
Set<Integer> expectedIds = Sets.newHashSet(10, 11, 35, 12, 13, 14, 15, 16, 17);
Set<Integer> actualIds = TypeUtil.getProjectedIds(schema);

Schema expectedDepthTwo = new Schema(
Lists.newArrayList(
required(10, "a", Types.IntegerType.get()),
required(12, "list", Types.ListType.ofRequired(13,
Types.StructType.of(
optional(20, "foo", Types.IntegerType.get()),
required(21, "subList", Types.ListType.ofRequired(14,
Types.StructType.of())))))));
Schema actualDepthTwo = TypeUtil.project(schema, Sets.newHashSet(10, 13, 20, 14));
Assert.assertEquals(expectedDepthTwo.asStruct(), actualDepthTwo.asStruct());
Assert.assertEquals(expectedIds, actualIds);
}

@Test
Expand Down Expand Up @@ -475,4 +452,34 @@ public void testValidateSchemaViaIndexByName() {

TypeUtil.indexByName(Types.StructType.of(nestedType));
}

@Test
public void testSelectNot() {
Schema schema = new Schema(
Lists.newArrayList(
required(1, "id", Types.LongType.get()),
required(2, "location", Types.StructType.of(
required(3, "lat", Types.DoubleType.get()),
required(4, "long", Types.DoubleType.get())
))));

Schema expectedNoPrimitive = new Schema(
Lists.newArrayList(
required(2, "location", Types.StructType.of(
required(3, "lat", Types.DoubleType.get()),
required(4, "long", Types.DoubleType.get())
))));

Schema actualNoPrimitve = TypeUtil.selectNot(schema, Sets.newHashSet(1));
Assert.assertEquals(expectedNoPrimitive.asStruct(), actualNoPrimitve.asStruct());

// Expected legacy behavior is to completely remove structs if their elements are removed
Schema expectedNoStructElements = new Schema(required(1, "id", Types.LongType.get()));
Schema actualNoStructElements = TypeUtil.selectNot(schema, Sets.newHashSet(3, 4));
Assert.assertEquals(expectedNoStructElements.asStruct(), actualNoStructElements.asStruct());

// Expected legacy behavior is to ignore selectNot on struct elements.
Schema actualNoStruct = TypeUtil.selectNot(schema, Sets.newHashSet(2));
Assert.assertEquals(schema.asStruct(), actualNoStruct.asStruct());
}
}
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/BaseTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private Schema lazyColumnProjection() {
}
requiredFieldIds.addAll(selectedIds);

return TypeUtil.select(schema, requiredFieldIds);
return TypeUtil.project(schema, requiredFieldIds);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this because it is the opposite of GetProjectedIds used above.


} else if (context.projectedSchema() != null) {
return context.projectedSchema();
Expand Down
39 changes: 36 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/PruneColumns.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

package org.apache.iceberg.avro;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.SchemaNormalization;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -81,15 +83,26 @@ public Schema record(Schema record, List<String> names, List<Schema> fields) {

Schema fieldSchema = fields.get(field.pos());
// All primitives are selected by selecting the field, but map and list
// types can be selected by projecting the keys, values, or elements.
// types can be selected by projecting the keys, values, or elements. Empty
// Structs can be selected by selecting the record itself instead of its children.
// This creates two conditions where the field should be selected: if the
// id is selected or if the result of the field is non-null. The only
// case where the converted field is non-null is when a map or list is
// selected by lower IDs.
if (selectedIds.contains(fieldId)) {
filteredFields.add(copyField(field, field.schema(), fieldId));
if (fieldSchema != null) {
hasChange = true; // Sub-fields may be different
filteredFields.add(copyField(field, fieldSchema, fieldId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that I understand the reason for this change. Is this implementing the same change as the previous PR, but in the Avro PruneColumns?

It looks like if a struct field is selected and a sub-field is selected, then the selection for the struct isn't a full selection. But if a sub-field is not selected then the selection for the struct is a full selection. That doesn't make sense to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I'm thinking about this more, I think that the behavior in this class should always match project. I doubt there's a case where we want select behavior, right? In that case, shouldn't the else case check whether the type is a record and create an empty record?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to set hasChange in the cases where we don't return field.schema() for the field, right?

Copy link
Member Author

@RussellSpitzer RussellSpitzer Sep 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are actually fine here unless every field is selected because the logic for has change is a bit confusing.

You either

  1. Have a change (Make a new record using the filtered fields) ( Return Changed Records)
  2. Have no change and filtered fields size is the same as the original number of fields ( Return Original Record)
  3. Have no change and filtered field size is not empty (Make a new record using the filtered fields) (Return changed record)

Currently we have tests hitting 1 and 3 but not 2 :/
I'll add the "hasChange" flag

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good now.

} else {
if (isRecord(field.schema())) {
hasChange = true; // Sub-fields are now empty
filteredFields.add(copyField(field, makeEmptyCopy(field.schema()), fieldId));
} else {
filteredFields.add(copyField(field, field.schema(), fieldId));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so in this case the field is a map, list, or primitive. Then we just follow the old behavior. Looks good to me since the Iceberg schema selection would fail.

}
}
} else if (fieldSchema != null) {
hasChange = true;
hasChange = true; // Sub-fields may be different
filteredFields.add(copyField(field, fieldSchema, fieldId));
}
}
Expand Down Expand Up @@ -259,6 +272,26 @@ private static Schema copyRecord(Schema record, List<Schema.Field> newFields) {
return copy;
}

private boolean isRecord(Schema field) {
if (AvroSchemaUtil.isOptionSchema(field)) {
return AvroSchemaUtil.fromOption(field).getType().equals(Type.RECORD);
} else {
return field.getType().equals(Type.RECORD);
}
}

private static Schema makeEmptyCopy(Schema field) {
if (AvroSchemaUtil.isOptionSchema(field)) {
Schema innerSchema = AvroSchemaUtil.fromOption(field);
Schema emptyRecord = Schema.createRecord(innerSchema.getName(), innerSchema.getDoc(), innerSchema.getNamespace(),
innerSchema.isError(), Collections.emptyList());
return AvroSchemaUtil.toOption(emptyRecord);
} else {
return Schema.createRecord(field.getName(), field.getDoc(), field.getNamespace(), field.isError(),
Collections.emptyList());
}
}

private static Schema.Field copyField(Schema.Field field, Schema newSchema, Integer fieldId) {
Schema newSchemaReordered;
// if the newSchema is an optional schema, make sure the NULL option is always the first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testDeleteFields() {
Schema del = new SchemaUpdate(SCHEMA, 19).deleteColumn(name).apply();

Assert.assertEquals("Should match projection with '" + name + "' removed",
TypeUtil.select(SCHEMA, selected).asStruct(), del.asStruct());
TypeUtil.project(SCHEMA, selected).asStruct(), del.asStruct());
}
}

Expand Down
Loading