-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-314] Fix multi partition keys error when querying a realtime table #978
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-314] Fix multi partition keys error when querying a realtime table #978
Conversation
| String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); | ||
| List<String> partitioningFields = | ||
| partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList()) | ||
| partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are multiple partition keys, they should also be separated by a ",". Can you paste an example of multiple partition fields that you notice is passed with a "/" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is an example code I used:
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode
var tableName = "hudi_multi_partitions_test"
var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
var tableType = "MERGE_ON_READ"
val inputDF2 = Seq(
("100", "event_name_897", "2015-01-01T23:52:39.340396Z", "type1", "2015", "01", "01"),
("101", "event_name_236", "2015-01-01T22:14:58.597216Z", "type2", "2015", "01", "01"),
("104", "event_name_764", "2015-02-01T12:15:00.512679Z", "type1", "2015", "01", "01"),
("105", "event_name_675", "2015-02-01T13:51:42.248818Z", "type2", "2015", "01", "01"),
("106", "event_name_337", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
("107", "event_name_452", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
("108", "event_name_234", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
("199", "event_name_011", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16")
).toDF("_row_key", "event_name", "timestamp", "event_type", "year", "month", "day")
inputDF2.write.format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "year,month,day")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.ComplexKeyGenerator")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "year,month,day")
.mode(SaveMode.Append)
.save(tablePath)
If you tried following query in Hive:
Caused by: org.apache.avro.SchemaParseException: Illegal character in: year/month/day
at org.apache.avro.Schema.validateName(Schema.java:1083) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.Schema.access$200(Schema.java:79) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.Schema$Field.<init>(Schema.java:372) ~[avro-1.7.7.jar:1.7.7]
at org.apache.avro.Schema$Field.<init>(Schema.java:367) ~[avro-1.7.7.jar:1.7.7]
at org.apache.hudi.common.util.HoodieAvroUtils.appendNullSchemaFields(HoodieAvroUtils.java:166) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.addPartitionFields(AbstractRealtimeRecordReader.java:305) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:328) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.<init>(AbstractRealtimeRecordReader.java:103) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:48) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:67) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:45) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:233) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:376) ~[hive-exec-2.3.3.jar:2.3.3]
at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:169) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:270) ~[hadoop-mapreduce-client-common-2.8.4.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_212]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212]
Also in the Hive log, you can see a snippet of printed job configuration like this:
fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem, yarn.nodemanager.windows-container.memory-limit.enabled=false, yarn.nodemanager.remote-app-log-dir=/var/log/hadoop-yarn/apps, mapreduce.reduce.shuffle.retry-delay.max.ms=60000, io.map.index.interval=128, partition_columns=year/month/day
The last one is partition_columns=year/month/day. And partitionFields is get from String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @zhedoubushishi for detailed information. This looks fine to me. Can you also add an integration test for MOR Table supporting multi-partition keys ? You can look at this class ITTestHoodieSanity.java. Currently, it does not have test case for MOR table. You can also look at HoodieJavaApp.java which is being called by ITTestHoodieSanity.java to get an idea. Let me know if you need any help and we can help you on this.
BTW, Good catch. Thanks for your contribution :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestions. I have already worked on adding MOR tests. But the test won't pass because of some exceptions when querying realtime table. So I just try to fix these errors first. (Similarly, there is another related PR. #972)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhedoubushishi : Added a comment in #972 Regarding this PR, if you stuck share the code in this PR and we can help you point in the right direction.
Balaji.V
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhedoubushishi : Added a comment in #972 Regarding this PR, if you stuck share the code in this PR and we can help you point in the right direction.
Balaji.V
Testing on MOR table is added in this PR.
vinothchandar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bvaradar as well to chime in..
umehrot2
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch Wenning ! May be we can publish tests for MOR tables in this PR itself ?
Test is added. |
|
@bvaradar please review and merge |
bvaradar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks clean and great. Awesome job :)
Jira: https://jira.apache.org/jira/browse/HUDI-314
Hive uses slash to split multiple partition keys e.g. "/year/month/day".
I also checked different versions of Hive, it seems that all of them uses slash. For me, I didn't see any reason that should use comma.
Manual testing shows this could work.