Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,16 @@ public Set<Integer> identifierFieldIds() {
return lazyIdentifierFieldIdSet();
}

/**
* Returns the set of identifier field names.
*/
public Set<String> identifierFieldNames() {
return identifierFieldIds()
.stream()
.map(id -> findField(id).name())
.collect(Collectors.toSet());
}

/**
* Returns the {@link Type} of a sub-field identified by the field name.
*
Expand Down
26 changes: 21 additions & 5 deletions api/src/main/java/org/apache/iceberg/types/TypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -181,10 +183,24 @@ public static Schema assignFreshIds(int schemaId, Schema schema, NextID nextId)
* @return a structurally identical schema with new ids assigned by the nextId function
*/
public static Schema assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) {
return new Schema(TypeUtil
.visit(schema.asStruct(), new AssignFreshIds(schema, baseSchema, nextId))
.asNestedType()
.fields());
Types.StructType freshSchemaStruct = TypeUtil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except this assignFreshIds, other methods that have the same mehold name should also refresh its identified field id list , right ? I also think we will need more unit tests to address this changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing (unrelated to this PR but I think it's important): when we reconstruct the Schema in assignFreshIds, looks like we've ignored the Map<String, Integer> aliases, that not seems the correct behavior, right ? I mean we should use the existing aliases from the old schema to construct the refreshed new schema .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry forgot those methods, updated. So far I don't see a place those methods are called and need to rely on the alias, I will continue to look, if there is an exception I will put it in another PR.

Copy link
Member

@openinx openinx May 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, a schema is composed by schemaId, fields, aliases, identifierFieldIds . We will need to maintain all those members when refresh or reassign field ids based on the old schema, by default we should pass the old schema's info to the fresh schema if people don't provide a necessary info to fill.

Getting this work into a new PR looks good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think alias is a bit different here, I think alias is used mostly for integration purpose from converting a file schema to iceberg schema for easier lookup (i.e. as a form of helper method), and isn't part of the the iceberg schema itself and isn't written to table metadata; so I think they may not be strictly required when constructing new schemas from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes practically speaking the alias is not used in any code path related, that was why I did not care about that when making the change. But from correctness perspective I agree with @openinx that if the alias exists, we should do the conversion just in case it is somehow used somewhere for that purpose in the future.

What I am trying out is to change the AssignFreshIds visitor so that it can update the id along the way for alias and identifier. Will update the PR after completing the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx so after reading the code a bit more, I think it does not make sense to convert alias in the methods. The reason is that, as the documentation suggests:

Alias maps are created when translating an external schema, like an Avro Schema, to this format. The original column names can be provided in a Map when constructing this Schema.

So this happens in methods such as AvroSchemaUtil.toIceberg, ParquetSchemaUtil.convert, etc. However, these alias are never persisted in the actual table metadata. As a proof, the TypeUtil.assignFreshIds is called for every table metadata replacement, but the alias is never passed in. So changing the method to pass in the alias is a change of behavior and we should not do that. So I think the current implementation should be good enough, I will add a few tests based on what you and Yan suggested.

.visit(schema.asStruct(), new AssignFreshIds(schema, baseSchema, nextId))
.asStructType();
return new Schema(freshSchemaStruct.fields(), refreshIdentifierFields(freshSchemaStruct, schema));
}

/**
* Get the identifier fields in the fresh schema based on the identifier fields in the base schema.
* @param freshSchema fresh schema
* @param baseSchema base schema
* @return idnetifier fields in the fresh schema
*/
public static Set<Integer> refreshIdentifierFields(Types.StructType freshSchema, Schema baseSchema) {
Map<String, Integer> nameToId = TypeUtil.indexByName(freshSchema);
return baseSchema.identifierFieldNames().stream()
.map(nameToId::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}

/**
Expand Down Expand Up @@ -213,7 +229,7 @@ public static Schema assignIncreasingFreshIds(Schema schema) {
*/
public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
return new Schema(struct.fields());
return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since schema passed in to this method is almost always a schema just got constructed (and thus have to call this method to assign the right ids), I think refreshIdentifierFields(struct, schema) here will almost always be a no-op?

I wonder if we want to use idSourceSchema to get the identifier fields, although that might have a different problem that the input schema could be a subset of idSourceSchema and thus doesn't include all identifier fields. Though in this case identifier fields won't be useful and we may skip it, and we can visit two schema and verify they have the same number of columns to identify this case. I'm not sure about the use case of this method and if we really need to assign identifier fields here though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to call refreshIdentifierFields here to get an consistent view of identifier field id list for the schema, without caring about the idsourceSchema’s own identifier field id list. This makes the reassignIds method looks more general.

}

public static Type find(Schema schema, Predicate<Type> predicate) {
Expand Down
21 changes: 21 additions & 0 deletions api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package org.apache.iceberg.types;

import org.apache.iceberg.Schema;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -42,6 +44,25 @@ public void testReassignIdsDuplicateColumns() {
Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
}

@Test
public void testReassignIdsWithIdentifier() {
Schema schema = new Schema(
Lists.newArrayList(
required(0, "a", Types.IntegerType.get()),
required(1, "A", Types.IntegerType.get())),
Sets.newHashSet(0)
);
Schema sourceSchema = new Schema(
Lists.newArrayList(
required(1, "a", Types.IntegerType.get()),
required(2, "A", Types.IntegerType.get())),
Sets.newHashSet(1)
);
final Schema actualSchema = TypeUtil.reassignIds(schema, sourceSchema);
Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
Assert.assertEquals(sourceSchema.identifierFieldIds(), actualSchema.identifierFieldIds());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add a case to call assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) with schema doesn't have identifier fields but baseSchema has, and the output schema will have identifier fields? I think it's an existing use case we have in TableMetadata that may worth explicit testing.

@Test(expected = IllegalArgumentException.class)
public void testReassignIdsIllegalArgumentException() {
Schema schema = new Schema(
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/java/org/apache/iceberg/SchemaUpdate.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.iceberg.mapping.MappingUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
Expand Down Expand Up @@ -85,9 +84,7 @@ private SchemaUpdate(TableOperations ops, TableMetadata base, Schema schema, int
this.schema = schema;
this.lastColumnId = lastColumnId;
this.idToParent = Maps.newHashMap(TypeUtil.indexParents(schema.asStruct()));
this.identifierFieldNames = schema.identifierFieldIds().stream()
.map(id -> schema.findField(id).name())
.collect(Collectors.toSet());
this.identifierFieldNames = schema.identifierFieldNames();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,32 @@ public void testSchemaUpdateComplexType() throws Exception {
Assert.assertEquals("Should contain 0 Avro manifest files", 0, manifests.size());
}

@Test
public void testSchemaUpdateIdentifierFields() throws Exception {
Assert.assertTrue("Should create v1 metadata",
version(1).exists() && version(1).isFile());
Assert.assertFalse("Should not create v2 or newer versions",
version(2).exists());

Schema updatedSchema = new Schema(Lists.newArrayList(
required(1, "id", Types.IntegerType.get(), "unique ID"),
required(2, "data", Types.StringType.get())
), Sets.newHashSet(1));

table.updateSchema()
.setIdentifierFields("id")
.commit();

Assert.assertTrue("Should create v2 for the update",
version(2).exists() && version(2).isFile());
Assert.assertEquals("Should write the current version to the hint file",
2, readVersionHint());
Assert.assertEquals("Table schema should match schema with reassigned ids",
updatedSchema.asStruct(), table.schema().asStruct());
Assert.assertEquals("Identifier fields should match schema with reassigned ids",
updatedSchema.identifierFieldIds(), table.schema().identifierFieldIds());
}

@Test
public void testFailedCommit() throws Exception {
// apply the change to metadata without committing
Expand Down