Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private void init() throws IOException {
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple partition keys, they should also be separated by a ",". Can you paste an example of multiple partition fields that you notice is passed with a "/" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example code I used:

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode

var tableName = "hudi_multi_partitions_test"
var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
var tableType = "MERGE_ON_READ"

val inputDF2 = Seq(
  ("100", "event_name_897", "2015-01-01T23:52:39.340396Z", "type1", "2015", "01", "01"),
  ("101", "event_name_236", "2015-01-01T22:14:58.597216Z", "type2", "2015", "01", "01"),
  ("104", "event_name_764", "2015-02-01T12:15:00.512679Z", "type1", "2015", "01", "01"),
  ("105", "event_name_675", "2015-02-01T13:51:42.248818Z", "type2", "2015", "01", "01"),
  ("106", "event_name_337", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
  ("107", "event_name_452", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
  ("108", "event_name_234", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
  ("199", "event_name_011", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16")
  ).toDF("_row_key", "event_name", "timestamp", "event_type", "year", "month", "day")

inputDF2.write.format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
    .option("hoodie.upsert.shuffle.parallelism", "2")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "year,month,day")
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
    .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.ComplexKeyGenerator")
    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "year,month,day")
    .mode(SaveMode.Append)
    .save(tablePath)

If you tried following query in Hive:

Caused by: org.apache.avro.SchemaParseException: Illegal character in: year/month/day
    at org.apache.avro.Schema.validateName(Schema.java:1083) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.Schema.access$200(Schema.java:79) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.Schema$Field.<init>(Schema.java:372) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.Schema$Field.<init>(Schema.java:367) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.hudi.common.util.HoodieAvroUtils.appendNullSchemaFields(HoodieAvroUtils.java:166) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.addPartitionFields(AbstractRealtimeRecordReader.java:305) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:328) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.<init>(AbstractRealtimeRecordReader.java:103) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:48) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:67) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:45) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:233) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:376) ~[hive-exec-2.3.3.jar:2.3.3]
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:169) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:270) ~[hadoop-mapreduce-client-common-2.8.4.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_212]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212]

Also in the Hive log, you can see a snippet of printed job configuration like this:

fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem, yarn.nodemanager.windows-container.memory-limit.enabled=false, yarn.nodemanager.remote-app-log-dir=/var/log/hadoop-yarn/apps, mapreduce.reduce.shuffle.retry-delay.max.ms=60000, io.map.index.interval=128, partition_columns=year/month/day

The last one is partition_columns=year/month/day. And partitionFields is get from String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhedoubushishi for detailed information. This looks fine to me. Can you also add an integration test for MOR Table supporting multi-partition keys ? You can look at this class ITTestHoodieSanity.java. Currently, it does not have test case for MOR table. You can also look at HoodieJavaApp.java which is being called by ITTestHoodieSanity.java to get an idea. Let me know if you need any help and we can help you on this.

BTW, Good catch. Thanks for your contribution :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestions. I have already worked on adding MOR tests. But the test won't pass because of some exceptions when querying realtime table. So I just try to fix these errors first. (Similarly, there is another related PR. #972)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi : Added a comment in #972 Regarding this PR, if you stuck share the code in this PR and we can help you point in the right direction.

Balaji.V

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

@zhedoubushishi zhedoubushishi Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi : Added a comment in #972 Regarding this PR, if you stuck share the code in this PR and we can help you point in the right direction.

Balaji.V

Testing on MOR table is added in this PR.

: new ArrayList<>();
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
Expand Down