Skip to content

Conversation

@jackye1995
Copy link
Contributor

fixes #2528

Allow callers of the schema constructor to take in identifier information and refresh the identifier field IDs if necessary.

This mostly applies to schema update related code paths. The following 2 cases will continue to use the existing constructor without identifier:

  1. callers that are coverting from a file format schema or an engine schema except Flink, because they do not have a primary key-like concept.
    1. I assume @openinx will do all necessary changes in Flink through Flink: Support SQL primary key #2410.
    2. I will do another PR for Spark with SQL extension added.
  2. callers that are converting schema for query transformation purpose such as select, join, file projection, etc., because identifier is not well-defined for those transformed schemas.

.visit(schema.asStruct(), new AssignFreshIds(schema, baseSchema, nextId))
.asNestedType()
.fields());
Types.StructType freshSchemaStruct = TypeUtil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except this assignFreshIds, other methods that have the same mehold name should also refresh its identified field id list , right ? I also think we will need more unit tests to address this changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another thing (unrelated to this PR but I think it's important): when we reconstruct the Schema in assignFreshIds, looks like we've ignored the Map<String, Integer> aliases, that not seems the correct behavior, right ? I mean we should use the existing aliases from the old schema to construct the refreshed new schema .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry forgot those methods, updated. So far I don't see a place those methods are called and need to rely on the alias, I will continue to look, if there is an exception I will put it in another PR.

Copy link
Member

@openinx openinx May 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In theory, a schema is composed by schemaId, fields, aliases, identifierFieldIds . We will need to maintain all those members when refresh or reassign field ids based on the old schema, by default we should pass the old schema's info to the fresh schema if people don't provide a necessary info to fill.

Getting this work into a new PR looks good to me.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think alias is a bit different here, I think alias is used mostly for integration purpose from converting a file schema to iceberg schema for easier lookup (i.e. as a form of helper method), and isn't part of the the iceberg schema itself and isn't written to table metadata; so I think they may not be strictly required when constructing new schemas from this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes practically speaking the alias is not used in any code path related, that was why I did not care about that when making the change. But from correctness perspective I agree with @openinx that if the alias exists, we should do the conversion just in case it is somehow used somewhere for that purpose in the future.

What I am trying out is to change the AssignFreshIds visitor so that it can update the id along the way for alias and identifier. Will update the PR after completing the implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@openinx so after reading the code a bit more, I think it does not make sense to convert alias in the methods. The reason is that, as the documentation suggests:

Alias maps are created when translating an external schema, like an Avro Schema, to this format. The original column names can be provided in a Map when constructing this Schema.

So this happens in methods such as AvroSchemaUtil.toIceberg, ParquetSchemaUtil.convert, etc. However, these alias are never persisted in the actual table metadata. As a proof, the TypeUtil.assignFreshIds is called for every table metadata replacement, but the alias is never passed in. So changing the method to pass in the alias is a change of behavior and we should not do that. So I think the current implementation should be good enough, I will add a few tests based on what you and Yan suggested.

public static Schema reassignIds(Schema schema, Schema idSourceSchema) {
Types.StructType struct = visit(schema, new ReassignIds(idSourceSchema)).asStructType();
return new Schema(struct.fields());
return new Schema(struct.fields(), refreshIdentifierFields(struct, schema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since schema passed in to this method is almost always a schema just got constructed (and thus have to call this method to assign the right ids), I think refreshIdentifierFields(struct, schema) here will almost always be a no-op?

I wonder if we want to use idSourceSchema to get the identifier fields, although that might have a different problem that the input schema could be a subset of idSourceSchema and thus doesn't include all identifier fields. Though in this case identifier fields won't be useful and we may skip it, and we can visit two schema and verify they have the same number of columns to identify this case. I'm not sure about the use case of this method and if we really need to assign identifier fields here though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to call refreshIdentifierFields here to get an consistent view of identifier field id list for the schema, without caring about the idsourceSchema’s own identifier field id list. This makes the reassignIds method looks more general.

Assert.assertEquals(sourceSchema.asStruct(), actualSchema.asStruct());
Assert.assertEquals(sourceSchema.identifierFieldIds(), actualSchema.identifierFieldIds());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to add a case to call assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) with schema doesn't have identifier fields but baseSchema has, and the output schema will have identifier fields? I think it's an existing use case we have in TableMetadata that may worth explicit testing.

@openinx
Copy link
Member

openinx commented May 10, 2021

Looks like we are encountering the flaky unit test. Let's retry the travis ci !

org.apache.iceberg.flink.TestFlinkTableSink > testHashDistributeMode[catalogName=testhive, baseNamespace=, format=AVRO, isStreaming=true] FAILED
    java.lang.AssertionError: There should be only 1 data file in partition 'aaa' expected:<1> but was:<2>
        at org.junit.Assert.fail(Assert.java:88)
        at org.junit.Assert.failNotEquals(Assert.java:834)
        at org.junit.Assert.assertEquals(Assert.java:645)
        at org.apache.iceberg.flink.TestFlinkTableSink.testHashDistributeMode(TestFlinkTableSink.java:274)

    org.apache.flink.table.api.ValidationException: Could not execute DROP DATABASE IF EXISTS  testhive.db RESTRICT
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:989)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)
        at org.apache.iceberg.flink.FlinkTestBase.exec(FlinkTestBase.java:91)
        at org.apache.iceberg.flink.FlinkTestBase.exec(FlinkTestBase.java:95)
        at org.apache.iceberg.flink.FlinkTestBase.sql(FlinkTestBase.java:99)
        at org.apache.iceberg.flink.TestFlinkTableSink.clean(TestFlinkTableSink.java:126)

        Caused by:
        org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException: Database db in catalog testhive is not empty.
            at org.apache.iceberg.flink.FlinkCatalog.dropDatabase(FlinkCatalog.java:240)
            at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:983)
            ... 5 more

            Caused by:
            org.apache.iceberg.exceptions.NamespaceNotEmptyException: Namespace db is not empty. One or more tables exist.
                at org.apache.iceberg.hive.HiveCatalog.dropNamespace(HiveCatalog.java:307)
                at org.apache.iceberg.flink.FlinkCatalog.dropDatabase(FlinkCatalog.java:231)
                ... 6 more

                Caused by:
                InvalidOperationException(message:Database db is not empty. One or more tables exist.)
                    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$drop_database_result$drop_database_resultStandardScheme.read(ThriftHiveMetastore.java:28714)
                    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$drop_database_result$drop_database_resultStandardScheme.read(ThriftHiveMetastore.java:28691)
                    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$drop_database_result.read(ThriftHiveMetastore.java:28625)
                    at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:86)
                    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_drop_database(ThriftHiveMetastore.java:813)
                    at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.drop_database(ThriftHiveMetastore.java:798)
                    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.dropDatabase(HiveMetaStoreClient.java:868)
                    at org.apache.iceberg.hive.HiveCatalog.lambda$dropNamespace$9(HiveCatalog.java:296)
                    at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51)
                    at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:77)
                    at org.apache.iceberg.hive.HiveCatalog.dropNamespace(HiveCatalog.java:295)
                    ... 7 more

@openinx openinx closed this May 10, 2021
@openinx openinx reopened this May 10, 2021
@openinx
Copy link
Member

openinx commented May 11, 2021

Looks good to me now. I will get this merged !

@openinx openinx merged commit 20feb92 into apache:master May 11, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Should fresh IdentifierFieldIds (Follow-up for PR2465)

3 participants