Skip to content

Conversation

@xiarixiaoyao
Copy link
Contributor

@xiarixiaoyao xiarixiaoyao commented Sep 15, 2021

Tips

What is the purpose of the pull request

Support full schema evolution for hoodie:

1) support spark3 DDL. include:
alter statement:
ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
support follow types

int => long/float/double/string
long => float/double/string
float => double/String
double => String/Decimal
Decimal => Decimal/String
String => date/decimal
date => String

ALTER TABLE table1 ALTER COLUMN a.b.c SET NOT NULL
ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
add statement:
ALTER TABLE table1 ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
rename:
ALTER TABLE table1 RENAME COLUMN a.b.c TO x
drop:
ALTER TABLE table1 DROP COLUMN a.b.c
ALTER TABLE table1 DROP COLUMNS a.b.c, x, y
set/unset Properties:
ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value');
ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');

2) support mor(incremental/realtime/optimize) read/write
3) support cow (incremental/realtime) read/write
4) support mor compaction

Brief change log

(for example:)

  • Modify AnnotationLocation checkstyle rule in checkstyle.xml

Verify this pull request

(Please pick either of the following options)

This pull request is a trivial rework / code cleanup without any test coverage.

(or)

This pull request is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end.
  • Added HoodieClientWriteTest to verify the change.
  • Manually verified the change by running a job locally.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

@xiarixiaoyao
Copy link
Contributor Author

kindly ping @bvaradar @leesf could you help me review those code . thanks

@yanghua
Copy link
Contributor

yanghua commented Sep 15, 2021

This PR is about RFC-33? Have we reached an agreement on the design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what this dependency used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

used to cache the history schema. caffine is better than guava cache

Copy link
Contributor

@leesf leesf Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PositionType can be enum?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will change it

@xiarixiaoyao
Copy link
Contributor Author

@yanghua yes, We reached a preliminary consensus.

@vinothchandar vinothchandar changed the title [HUDI-2429][WIP] Full schema evolution [RFC-33] [HUDI-2429][WIP] Full schema evolution Sep 15, 2021
@vinothchandar
Copy link
Member

cc @codope @bvaradar lets use this as a basis and evolve our design, move forward.

@yanghua we are close I think. Few open items, that can be resolved soon hopefully.
You can follow along the discussion in the comments here
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+33++Hudi+supports+more+comprehensive+Schema+Evolution

@codope also has a bunch of these working on presto and trino already. Flink would be a good thing to tackle as well. Does Flink use the Hive record readers? cc @danny0405 as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does parentName means here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppose we have a nested column. the parentName of a.b.c is a.b; the parentName of a.b is a; the parentName of a is ""

@danny0405
Copy link
Contributor

cc @codope @bvaradar lets use this as a basis and evolve our design, move forward.

@yanghua we are close I think. Few open items, that can be resolved soon hopefully.
You can follow along the discussion in the comments here
https://cwiki.apache.org/confluence/display/HUDI/RFC+-+33++Hudi+supports+more+comprehensive+Schema+Evolution

@codope also has a bunch of these working on presto and trino already. Flink would be a good thing to tackle as well. Does Flink use the Hive record readers? cc @danny0405 as well.

Flink does not use the Hive record readers now ~

@xiarixiaoyao
Copy link
Contributor Author

@danny0405 Hope you can help us complete the full schema evolution of flink, thanks

@xiarixiaoyao xiarixiaoyao force-pushed the schema branch 2 times, most recently from ba407cc to 1a0b7d8 Compare September 16, 2021 12:28
Copy link
Member

@codope codope left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao This is great! Let's work on this together. I've taken one pass but I'm yet to look at the tests. Few high-level comments:

  1. Let's add docs for all the public classes and APIs.
  2. Does the merge schema action handle evolution of non-leaf fields in nested fields? For example, if a.b.c is renamed to a.d.c.
  3. IIUC, the patch has not yet handled old or existing schema compatibility as mentioned in the RFC right?
  4. Since schema history is being changed not only at the write time but also at the read time, so we need to think of both writer and reader concurrency.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about thread-safety? If there are concurrent readers, then should construction of TreeMap be synchronized?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This else block can be simplified to return fileSchema.findfullName(nameId) as we're doing the same thing in both the cases right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it can be done, but the algorithm is poorly readable. i tread to not simplified those code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to call this for all table and query types not just MOR snapshot relation. So, basically at following places:

DefaultSource#getBaseFileOnlyView
IncrementalRelation
MergeOnReadIncrementalRelation

Copy link
Contributor Author

@xiarixiaoyao xiarixiaoyao Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes,we need do that。but i think this pr is more like the prototype of id-schema, I hope we can reach a basic agreement on this pr。 full spark adaption will be another pr 。

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao : It would be helpful to review if you can list the gaps in the current PR before reaching full spark support

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, cow incremental/snapshort view and mor incremetal/optimize view will added.

@xiarixiaoyao
Copy link
Contributor Author

xiarixiaoyao commented Sep 17, 2021

@codope thanks for your review. Let me answer some your questions first,pls forgive me for being busy today, I need to modify something for the pr of zorder
Let's add docs for all the public classes and APIs.
ok,i will added。
Does the merge schema action handle evolution of non-leaf fields in nested fields? For example, if a.b.c is renamed to a.d.c.
this is a strange demand, if you want to change a.b.c to a.d.c , why donot use spark.sql(s"alter table ${tableName} rename column a.b to d"). we support handle all the non-leaf fields.

IIUC, the patch has not yet handled old or existing schema compatibility as mentioned in the RFC right?
already deal with this situation. pls see the TestSpark3DDL, we first create a table and insert some data to it. now no id-schema is produced. then we do schema change, and the id-schema is produced. the test result confirmed this。

Since schema history is being changed not only at the write time but also at the read time, so we need to think of both writer and reader concurrency.
A little doubtful, hudi is snapshot isolated, maybe only need to deal with concurrency between write and write, if i am wrong pls fix me , thanks

Now there is the most important question, I now tend to use metatable to store historical schema。 as we know metatable use hfile to store data, the hfile has a very good point query performance, what do you suggest?

@xiarixiaoyao
Copy link
Contributor Author

@leesf @codope @bvaradar update the code , add docs to all public method, and resolved most comments. pls help me review those code again.

Copy link
Contributor

@bvaradar bvaradar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao : Thanks for opening the PR. Great start !

Have left comments. Regarding COW, I did not see the changes in HoodieMergeHandle when Hudi tries to read old parquet file, applies merge and writes new version of file. I guess this is still WIP.

We can try to make this PR complete functionally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of timeline, use metadata.getActiveTimeline()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar logic needs to be also be present in BaseCommitExecutor.commit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to introduce a config to enable/disable schema evolution. Only if the config is enables, we should let any side-effects of schema evolution to take effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to introduce a config to enable/disable schema evolution. Only if the config is enables, we should let any side-effects of schema evolution to take effect.

may no need introduce a config. only when user execute alter SQL /alter api explicitly, schema evolution will take effect; Otherwise, everything goes through the original process

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for implicit schema changes we would want to control whether to have current behavior or introduce the one. Same is true for the reader side.it is better to simply have a gatekeeper config to control the entire feature. when the feature matures, we can turn it on by default and deprecate later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, will deal it. thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when will this case happen ? SCHEMA_KEY is passed but not the LATESTSCHEMA ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we donot modify the table schema explicitly. LATESTSCHEMA is only produced when we do alter SQL or alter api.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

High level : Instead of storing historical schema in commit file, its better to store them as separate files under a new directory under .hoodie. Something like .hoodie/schema/...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but now we need to reach agreement on this issue。 vinothchandar suggest we save those information into metatable。
@vinothchandar What is your opinion on this。

Copy link
Contributor

@bvaradar bvaradar Sep 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiarixiaoyao : The synchronous metadata table support is still in the works and is getting ready. Its better to decouple both the projects and at the same time avoid adding historical schema to commit files. As an interim measure, lets write as separate files under .hoodie/schema and then we can do a follow up PR whenever synchronous metadata table is done. cc @vinothchandar @codope

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, will do it. thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the many conversions we are doing Avro, InternalSchema, Spark, Parquet... , instead of implementing these methods in helper/utils class, let rename the classes as "xxxxConverter" and make sure only the conversion functions are defined.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docs to all public APIs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this class performs both serialization and deserialization. If that is the case, lets move those methods to a SerDeHelper class

Copy link
Contributor Author

@xiarixiaoyao xiarixiaoyao Sep 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, docs added, and rename InternalSchemaParser to SerDeHelper

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using Visitor design pattern using classes instead of static methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as the one in HoodieUnMergedLogRecordScanner

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark mor/cow has not use HoodieUnmergedLogRecordScanner only hive/presto use it. this pr is only about spark adapation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You can simplify with something like
schemaUtil.getTableAvroSchema, internalSchemaOpt.orElse(null)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar . thanks for your review。 I will try to solve these problems。 There is a little question, do we need to add all adaptations to the spark engine on this pr。 if needed,full adaptation of spark engine(both mor and cow) will be added、

@bvaradar
Copy link
Contributor

Thanks @xiarixiaoyao , Yes, It makes sense to add all the spark adapations related changes to this PR .

@xiarixiaoyao xiarixiaoyao force-pushed the schema branch 2 times, most recently from b2d8b3f to b5e0b22 Compare September 27, 2021 03:57
@bvaradar
Copy link
Contributor

@xiarixiaoyao : Can you add commits to this PR instead of squashing. It makes things easy for us to find the delta changes. We can do final squash before landing the PR.

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar @codope @leesf . could you pls help me review this pr again, thanks
code changes

  1. support mor(incremental/realtime/optimize) read/write
  2. support cow (incremental/realtime) read/write
  3. support spark3 DDL. include:
    alter statement:
    • ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint
    • ALTER TABLE table1 ALTER COLUMN a.b.c SET NOT NULL
    • ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
    • ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment'
    • ALTER TABLE table1 ALTER COLUMN a.b.c FIRST
    • ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x
      add statement:
    • ALTER TABLE table1 ADD COLUMNS (col_name data_type [COMMENT col_comment], ...);
      rename:
    • ALTER TABLE table1 RENAME COLUMN a.b.c TO x
      drop:
    • ALTER TABLE table1 DROP COLUMN a.b.c
    • ALTER TABLE table1 DROP COLUMNS a.b.c, x, y
      set/unset Properties:
    • ALTER TABLE table SET TBLPROPERTIES ('table_property' = 'property_value');
    • ALTER TABLE table UNSET TBLPROPERTIES [IF EXISTS] ('comment', 'key');
  4. support spark2 DDL.
  5. add FileBaseInternalSchemasManager to manger history schemas, and save historySchemas in "./hoodie/.schema" . now we no need to save historySchemas into commit file.
  6. add segment lock to TableInternalSchemaUtils to support concurrent read and write cache.
  7. rename mergeSchemaAction to SchemaMerger; remove helper methods from TableChanges to a helper class, now TableChanges is ok; use visitor mode to produce nameToId for internalSchema; and other samll fixed.

Remaining problem: add more UT for this pr, add support for bootstrap table.

@bvaradar forgive me this change is too large, i still use squahsing. Subsequent modifications will be in the form of add commit.

@xiarixiaoyao xiarixiaoyao force-pushed the schema branch 2 times, most recently from 0fe3038 to a423a63 Compare September 29, 2021 11:22
@xiarixiaoyao
Copy link
Contributor Author

@hudi-bot run azure

@xiarixiaoyao
Copy link
Contributor Author

@hudi-bot run azure

@xiarixiaoyao
Copy link
Contributor Author

@bvaradar rebased the code and add config to control schema evolution。 the test fail has no relate to this pr。

@codope
Copy link
Member

codope commented Oct 11, 2021

@xiarixiaoyao It would really help if you could share a gist showing the schema evolution steps. For example, earlier I tried this and add/drop/reorder was working but rename was not working. Now, after the latest push, when I try the same but with the below spark-sql command, i don't see schema directory being created.

./bin/spark-sql --jars ${JARS} \
  --packages org.apache.spark:spark-avro_2.11:2.4.7,com.github.ben-manes.caffeine:caffeine:2.9.1 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
  --conf 'spark.kryoserializer.buffer.max=1024m' --conf "spark.memory.storageFraction=0.8" \
  --conf "hoodie.schema.evolution.enable=true"
  --conf spark.rdd.compress=true --driver-memory 2g --conf "spark.memory.fraction=0.8"

@xiarixiaoyao
Copy link
Contributor Author

@codope thanks for your try this pr. notice that we should not use --conf "hoodie.schema.evolution.enable=true", this conf is not start with spark/hadoop/hive spark will ignore this conf. see the test case, we can execute set command to enable schema evolution, sql("set hoodie.schema.evolution.enable=true"). another things it's very strange that rename cannot work well, TestSpark3DDL already contains rename example, spark2 has no same test case . maybe we can discuss this problem by slack, thanks.

@rubenssoto
Copy link

This feature is amazing, long-awaited functionality.

@vinothchandar vinothchandar added the status:in-progress Work in progress label Oct 26, 2021
@hudi-bot
Copy link
Collaborator

hudi-bot commented Nov 5, 2021

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

@xiarixiaoyao xiarixiaoyao changed the title [RFC-33] [HUDI-2429][WIP] Full schema evolution [RFC-33] [HUDI-2429][WIP] Support full Schema evolution for Spark/Hive Feb 7, 2022
@xiarixiaoyao
Copy link
Contributor Author

close this pr, as this pr is too old, and we also have a new pr about this feature

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants