diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index fda13d4fde3c2..4544b12bb2bb5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -329,7 +329,7 @@ private void init() throws IOException { // Add partitioning fields to writer schema for resulting row to contain null values for these fields String partitionFields = jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""); List partitioningFields = - partitionFields.length() > 0 ? Arrays.stream(partitionFields.split(",")).collect(Collectors.toList()) + partitionFields.length() > 0 ? Arrays.stream(partitionFields.split("/")).collect(Collectors.toList()) : new ArrayList<>(); writerSchema = addPartitionFields(writerSchema, partitioningFields); List projectionFields = orderFields(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java index 2795ab78c0b7b..55b64db8d06fc 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieSanity.java @@ -19,6 +19,7 @@ package org.apache.hudi.integ; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.model.HoodieTableType; import org.junit.Assert; import org.junit.Test; @@ -39,8 +40,8 @@ enum PartitionType { */ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception { String hiveTableName = "docker_hoodie_single_partition_key_cow_test"; - testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.SINGLE_KEY_PARTITIONED); - executeHiveCommand("drop table if exists " + hiveTableName); + testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.SINGLE_KEY_PARTITIONED); + dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } @Test @@ -51,8 +52,8 @@ public void testRunHoodieJavaAppOnSinglePartitionKeyCOWTable() throws Exception */ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception { String hiveTableName = "docker_hoodie_multi_partition_key_cow_test"; - testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.MULTI_KEYS_PARTITIONED); - executeHiveCommand("drop table if exists " + hiveTableName); + testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.MULTI_KEYS_PARTITIONED); + dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); } @Test @@ -63,29 +64,65 @@ public void testRunHoodieJavaAppOnMultiPartitionKeysCOWTable() throws Exception */ public void testRunHoodieJavaAppOnNonPartitionedCOWTable() throws Exception { String hiveTableName = "docker_hoodie_non_partition_key_cow_test"; - testRunHoodieJavaAppOnCOWTable(hiveTableName, PartitionType.NON_PARTITIONED); - executeHiveCommand("drop table if exists " + hiveTableName); + testRunHoodieJavaApp(hiveTableName, HoodieTableType.COPY_ON_WRITE.name(), PartitionType.NON_PARTITIONED); + dropHiveTables(hiveTableName, HoodieTableType.COPY_ON_WRITE.name()); + } + + @Test + /** + * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with single partition key data-set + * and performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive + * console. + */ + public void testRunHoodieJavaAppOnSinglePartitionKeyMORTable() throws Exception { + String hiveTableName = "docker_hoodie_single_partition_key_mor_test"; + testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.SINGLE_KEY_PARTITIONED); + dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); + } + + @Test + /** + * A basic integration test that runs HoodieJavaApp to create a sample MOR Hoodie with multiple partition-keys + * data-set and performs upserts on it. Hive integration and upsert functionality is checked by running a count query + * in hive console. + */ + public void testRunHoodieJavaAppOnMultiPartitionKeysMORTable() throws Exception { + String hiveTableName = "docker_hoodie_multi_partition_key_mor_test"; + testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.MULTI_KEYS_PARTITIONED); + dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); } + @Test /** - * A basic integration test that runs HoodieJavaApp to create a sample COW Hoodie data-set and performs upserts on it. + * A basic integration test that runs HoodieJavaApp to create a sample non-partitioned MOR Hoodie data-set and + * performs upserts on it. Hive integration and upsert functionality is checked by running a count query in hive + * console. + */ + public void testRunHoodieJavaAppOnNonPartitionedMORTable() throws Exception { + String hiveTableName = "docker_hoodie_non_partition_key_mor_test"; + testRunHoodieJavaApp(hiveTableName, HoodieTableType.MERGE_ON_READ.name(), PartitionType.NON_PARTITIONED); + dropHiveTables(hiveTableName, HoodieTableType.MERGE_ON_READ.name()); + } + + /** + * A basic integration test that runs HoodieJavaApp to create a sample Hoodie data-set and performs upserts on it. * Hive integration and upsert functionality is checked by running a count query in hive console. TODO: Add * spark-shell test-case */ - public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType partitionType) throws Exception { + public void testRunHoodieJavaApp(String hiveTableName, String tableType, PartitionType partitionType) + throws Exception { String hdfsPath = "/" + hiveTableName; String hdfsUrl = "hdfs://namenode" + hdfsPath; // Drop Table if it exists - String hiveDropCmd = "drop table if exists " + hiveTableName; try { - executeHiveCommand(hiveDropCmd); + dropHiveTables(hiveTableName, tableType); } catch (AssertionError ex) { // In travis, sometimes, the hivemetastore is not ready even though we wait for the port to be up // Workaround to sleep for 5 secs and retry Thread.sleep(5000); - executeHiveCommand(hiveDropCmd); + dropHiveTables(hiveTableName, tableType); } // Ensure table does not exist @@ -96,13 +133,13 @@ public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType p String cmd; if (partitionType == PartitionType.SINGLE_KEY_PARTITIONED) { cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL - + " --hive-table " + hiveTableName; + + " --table-type " + tableType + " --hive-table " + hiveTableName; } else if (partitionType == PartitionType.MULTI_KEYS_PARTITIONED) { cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL - + " --hive-table " + hiveTableName + " --use-multi-partition-keys"; + + " --table-type " + tableType + " --hive-table " + hiveTableName + " --use-multi-partition-keys"; } else { cmd = HOODIE_JAVA_APP + " --hive-sync --table-path " + hdfsUrl + " --hive-url " + HIVE_SERVER_JDBC_URL - + " --hive-table " + hiveTableName + " --non-partitioned"; + + " --table-type " + tableType + " --hive-table " + hiveTableName + " --non-partitioned"; } executeCommandStringInDocker(ADHOC_1_CONTAINER, cmd, true); @@ -115,6 +152,13 @@ public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType p Assert.assertEquals("Expecting 100 rows to be present in the new table", 100, Integer.parseInt(stdOutErr.getLeft().trim())); + // If is MOR table, ensure realtime table row count is 100 (without duplicates) + if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + stdOutErr = executeHiveCommand("select count(1) from " + hiveTableName + "_rt"); + Assert.assertEquals("Expecting 100 rows to be present in the realtime table,", 100, + Integer.parseInt(stdOutErr.getLeft().trim())); + } + // Make the HDFS dataset non-hoodie and run the same query // Checks for interoperability with non-hoodie tables @@ -126,4 +170,11 @@ public void testRunHoodieJavaAppOnCOWTable(String hiveTableName, PartitionType p Assert.assertEquals("Expecting 200 rows to be present in the new table", 200, Integer.parseInt(stdOutErr.getLeft().trim())); } + + private void dropHiveTables(String hiveTableName, String tableType) throws Exception { + executeHiveCommand("drop table if exists " + hiveTableName); + if (tableType.equals(HoodieTableType.MERGE_ON_READ.name())) { + executeHiveCommand("drop table if exists " + hiveTableName + "_rt"); + } + } }