Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ private void init() throws IOException {
// Add partitioning fields to writer schema for resulting row to contain null values for these fields
String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
List<String> partitioningFields =
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList())
partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple partition keys, they should also be separated by a ",". Can you paste an example of multiple partition fields that you notice is passed with a "/" ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example code I used:

import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.sql.SaveMode

var tableName = "hudi_multi_partitions_test"
var tablePath = "s3://emr-users/wenningd/hudi/tables/events/" + tableName
var tableType = "MERGE_ON_READ"

val inputDF2 = Seq(
  ("100", "event_name_897", "2015-01-01T23:52:39.340396Z", "type1", "2015", "01", "01"),
  ("101", "event_name_236", "2015-01-01T22:14:58.597216Z", "type2", "2015", "01", "01"),
  ("104", "event_name_764", "2015-02-01T12:15:00.512679Z", "type1", "2015", "01", "01"),
  ("105", "event_name_675", "2015-02-01T13:51:42.248818Z", "type2", "2015", "01", "01"),
  ("106", "event_name_337", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
  ("107", "event_name_452", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
  ("108", "event_name_234", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16"),
  ("199", "event_name_011", "2015-02-01T13:51:42.248818Z", "type2", "2015", "03", "16")
  ).toDF("_row_key", "event_name", "timestamp", "event_type", "year", "month", "day")

inputDF2.write.format("org.apache.hudi").option("hoodie.insert.shuffle.parallelism", "2")
    .option("hoodie.upsert.shuffle.parallelism", "2")
    .option(HoodieWriteConfig.TABLE_NAME, tableName)
    .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
    .option(DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY, DataSourceWriteOptions.MOR_STORAGE_TYPE_OPT_VAL)
    .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "_row_key")
    .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "year,month,day")
    .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "timestamp")
    .option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY, "true")
    .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY, "org.apache.hudi.ComplexKeyGenerator")
    .option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY, tableName)
    .option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, "org.apache.hudi.hive.MultiPartKeysValueExtractor")
    .option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY, "year,month,day")
    .mode(SaveMode.Append)
    .save(tablePath)

If you tried following query in Hive:

Caused by: org.apache.avro.SchemaParseException: Illegal character in: year/month/day
    at org.apache.avro.Schema.validateName(Schema.java:1083) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.Schema.access$200(Schema.java:79) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.Schema$Field.<init>(Schema.java:372) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.avro.Schema$Field.<init>(Schema.java:367) ~[avro-1.7.7.jar:1.7.7]
    at org.apache.hudi.common.util.HoodieAvroUtils.appendNullSchemaFields(HoodieAvroUtils.java:166) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.addPartitionFields(AbstractRealtimeRecordReader.java:305) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.init(AbstractRealtimeRecordReader.java:328) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader.<init>(AbstractRealtimeRecordReader.java:103) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader.<init>(RealtimeCompactedRecordReader.java:48) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.constructRecordReader(HoodieRealtimeRecordReader.java:67) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.<init>(HoodieRealtimeRecordReader.java:45) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat.getRecordReader(HoodieParquetRealtimeInputFormat.java:233) ~[hudi-hadoop-mr-bundle-0.5.1-SNAPSHOT.jar:0.5.1-SNAPSHOT]
    at org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:376) ~[hive-exec-2.3.3.jar:2.3.3]
    at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.<init>(MapTask.java:169) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:432) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) ~[hadoop-mapreduce-client-core-2.8.4.jar:?]
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:270) ~[hadoop-mapreduce-client-common-2.8.4.jar:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_212]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212]

Also in the Hive log, you can see a snippet of printed job configuration like this:

fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem, yarn.nodemanager.windows-container.memory-limit.enabled=false, yarn.nodemanager.remote-app-log-dir=/var/log/hadoop-yarn/apps, mapreduce.reduce.shuffle.retry-delay.max.ms=60000, io.map.index.interval=128, partition_columns=year/month/day

The last one is partition_columns=year/month/day. And partitionFields is get from String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zhedoubushishi for detailed information. This looks fine to me. Can you also add an integration test for MOR Table supporting multi-partition keys ? You can look at this class ITTestHoodieSanity.java. Currently, it does not have test case for MOR table. You can also look at HoodieJavaApp.java which is being called by ITTestHoodieSanity.java to get an idea. Let me know if you need any help and we can help you on this.

BTW, Good catch. Thanks for your contribution :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestions. I have already worked on adding MOR tests. But the test won't pass because of some exceptions when querying realtime table. So I just try to fix these errors first. (Similarly, there is another related PR. #972)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi : Added a comment in #972 Regarding this PR, if you stuck share the code in this PR and we can help you point in the right direction.

Balaji.V

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

@zhedoubushishi zhedoubushishi Nov 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhedoubushishi : Added a comment in #972 Regarding this PR, if you stuck share the code in this PR and we can help you point in the right direction.

Balaji.V

Testing on MOR table is added in this PR.

: new ArrayList<>();
writerSchema = addPartitionFields(writerSchema, partitioningFields);
List<String> projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.integ;

import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.model.HoodieTableType;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -39,8 +40,8 @@ enum PartitionType {
*/
public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_cow_test";
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.SINGLE_KEY_PARTITIONED);
executeHiveCommand("drop table if exists " + hiveTableName);
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@Test
Expand All @@ -51,8 +52,8 @@ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception
*/
public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_cow_test";
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.MULTI_KEYS_PARTITIONED);
executeHiveCommand("drop table if exists " + hiveTableName);
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@Test
Expand All @@ -63,29 +64,65 @@ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception
*/
public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_cow_test";
testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.NON_PARTITIONED);
executeHiveCommand("drop table if exists " + hiveTableName);
testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name());
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set
* and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception {
String hiveTableName = "docker_hoodie_single_partition_key_mor_test";
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys
* data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query
* in hive console.
*/
public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable() throws Exception {
String hiveTableName = "docker_hoodie_multi_partition_key_mor_test";
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}

@Test
/**
* A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie data-set and performs upserts on it.
* A basic integration test that runs HoodieJavaApp to create a sample non-partitioned MOR Hoodie data-set and
* performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive
* console.
*/
public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception {
String hiveTableName = "docker_hoodie_non_partition_key_mor_test";
testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED);
dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name());
}

/**
* A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it.
* Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add
* spark-shell test-case
*/
public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType partitionType) throws Exception {
public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType)
throws Exception {

String hdfsPath = "/" + hiveTableName;
String hdfsUrl = "hdfs://namenode" + hdfsPath;

// Drop Table if it exists
String hiveDropCmd = "drop table if exists " + hiveTableName;
try {
executeHiveCommand(hiveDropCmd);
dropHiveTables(hiveTableName, tableType);
} catch (AssertionError ex) {
// In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up
// Workaround to sleep for 5 secs and retry
Thread.sleep(5000);
executeHiveCommand(hiveDropCmd);
dropHiveTables(hiveTableName, tableType);
}

// Ensure table does not exist
Expand All @@ -96,13 +133,13 @@ public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType p
String cmd;
if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) {
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --hive-table " + hiveTableName;
+ " --table-type " + tableType + " --hive-table " + hiveTableName;
} else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) {
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --hive-table " + hiveTableName + " --use-multi-partition-keys";
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys";
} else {
cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL
+ " --hive-table " + hiveTableName + " --non-partitioned";
+ " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned";
}
executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true);

Expand All @@ -115,6 +152,13 @@ public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType p
Assert.assertEquals("Expecting 100 rows to be present in the new table", 100,
Integer.parseInt(stdOutErr.getLeft().trim()));

// If is MOR table, ensure realtime table row count is 100 (without duplicates)
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName + "_rt");
Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 100,
Integer.parseInt(stdOutErr.getLeft().trim()));
}

// Make the HDFS dataset non-hoodie and run the same query
// Checks for interoperability with non-hoodie tables

Expand All @@ -126,4 +170,11 @@ public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType p
Assert.assertEquals("Expecting 200 rows to be present in the new table", 200,
Integer.parseInt(stdOutErr.getLeft().trim()));
}

private void dropHiveTables(String hiveTableName, String tableType) throws Exception {
executeHiveCommand("drop table if exists " + hiveTableName);
if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) {
executeHiveCommand("drop table if exists " + hiveTableName + "_rt");
}
}
}