diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java index bc1172f387f0c..02af247e3d076 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/HoodieDatasetBulkInsertHelper.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.BuiltinKeyGenerator; -import org.apache.hudi.keygen.ComplexKeyGenerator; import org.apache.hudi.keygen.NonpartitionedKeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; @@ -87,18 +86,15 @@ public static Dataset prepareHoodieDatasetForBulkInsert(SQLContext sqlConte BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties); Dataset rowDatasetWithRecordKeysAndPartitionPath; - if (keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName())) { + if (keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName()) + || (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) && !config.isHiveStylePartitioningEnabled())) { // for non partitioned, set partition path to empty. + // or if SimpleKeyGenerator and not Hive style partition, use withColumn to improve performance + Column partitionColumn = keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName()) + ? functions.lit("").cast(DataTypes.StringType) + : functions.col(partitionPathFields).cast(DataTypes.StringType); rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields)) - .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType)); - } else if (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) - || (keyGeneratorClass.equals(ComplexKeyGenerator.class.getName()) && !recordKeyFields.contains(",") && !partitionPathFields.contains(",") - && (!partitionPathFields.contains("timestamp")))) { // incase of ComplexKeyGen, check partition path type. - // simple fields for both record key and partition path: can directly use withColumn - String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields : - partitionPathFields.substring(partitionPathFields.indexOf(":") + 1); - rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) - .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType)); + .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionColumn); } else { // use udf String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java index 6b617ca208185..9a4598240a838 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieDatasetBulkInsertHelper.java @@ -97,9 +97,15 @@ private void init() throws IOException { public void testBulkInsertHelperConcurrently() { IntStream.range(0, 2).parallel().forEach(i -> { if (i % 2 == 0) { - testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "_row_key"); + testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), + "_row_key", + getProps("_row_key", SimpleKeyGenerator.class.getName()) + ); } else { - testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "ts"); + testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), + "ts", + getProps("ts", SimpleKeyGenerator.class.getName()) + ); } }); } @@ -114,10 +120,18 @@ private static Stream provideKeyGenArgs() { @ParameterizedTest @MethodSource("provideKeyGenArgs") public void testBulkInsertHelper(String keyGenClass) { - testBulkInsertHelperFor(keyGenClass, "_row_key"); + testBulkInsertHelperFor(keyGenClass, "_row_key", getProps("_row_key", keyGenClass)); } - private void testBulkInsertHelperFor(String keyGenClass, String recordKey) { + @ParameterizedTest + @MethodSource("provideKeyGenArgs") + public void testBulkInsertWithHiveStylePartition(String keyGenClass) { + Map props = getProps("_row_key", keyGenClass); + props.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true"); + testBulkInsertHelperFor(keyGenClass, "_row_key", props); + } + + private Map getProps(String recordKey, String keyGenClass) { Map props = null; if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) { props = getPropsAllSet(recordKey); @@ -126,6 +140,10 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKey) { } else { // NonPartitioned key gen props = getPropsForNonPartitionedKeyGen(recordKey); } + return props; + } + + private void testBulkInsertHelperFor(String keyGenClass, String recordKey, Map props) { HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build(); List rows = DataSourceTestUtils.generateRandomRows(10); Dataset dataset = sqlContext.createDataFrame(rows, structType); @@ -141,12 +159,25 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKey) { } boolean isNonPartitioned = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName()); + boolean isComplexKeyGenerator = keyGenClass.equals(ComplexKeyGenerator.class.getName()); + boolean hiveStylePartition = config.isHiveStylePartitioningEnabled(); result.toJavaRDD().foreach(entry -> { - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString())); - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(isNonPartitioned ? "" : entry.getAs("partition"))); - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals("")); - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals("")); - assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals("")); + String keyShouldBe = isComplexKeyGenerator ? recordKey + ":" + entry.getAs(recordKey).toString() : entry.getAs(recordKey).toString(); + assertEquals(keyShouldBe, entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString()); + + String partitionShouldBe; + if (isNonPartitioned) { + partitionShouldBe = ""; + } else if (hiveStylePartition) { + partitionShouldBe = "partition=" + entry.getAs("partition"); + } else { + partitionShouldBe = entry.getAs("partition").toString(); + } + assertEquals(partitionShouldBe, entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD))); + + assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD))); + assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD))); + assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD))); }); Dataset trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) @@ -277,7 +308,7 @@ private Map getPropsForComplexKeyGen(String recordKey) { Map props = new HashMap<>(); props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), ComplexKeyGenerator.class.getName()); props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey); - props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "simple:partition"); + props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition"); props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table"); return props; }