Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.hudi.keygen.ComplexKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedKeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
Expand Down Expand Up @@ -87,18 +86,15 @@ public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlConte
BuiltinKeyGenerator keyGenerator = (BuiltinKeyGenerator) ReflectionUtils.loadClass(keyGeneratorClass, properties);

Dataset<Row> rowDatasetWithRecordKeysAndPartitionPath;
if (keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName())) {
if (keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName())
|| (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) && !config.isHiveStylePartitioningEnabled())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if hive style partitioning is enabled, we are falling back to using udf flow is it? guess the intent was to do use udf based key gen only for non simple use-cases. Can we honor the same even w/ hive style partitioning enabled please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this PR will fall back to UDF if hive style partition enabled, same logic as 0.10.

think PR #6049 is a better fix which can improve performance by using withColumn

// for non partitioned, set partition path to empty.
// or if SimpleKeyGenerator and not Hive style partition, use withColumn to improve performance
Column partitionColumn = keyGeneratorClass.equals(NonpartitionedKeyGenerator.class.getName())
? functions.lit("").cast(DataTypes.StringType)
: functions.col(partitionPathFields).cast(DataTypes.StringType);
rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.lit("").cast(DataTypes.StringType));
} else if (keyGeneratorClass.equals(SimpleKeyGenerator.class.getName())
|| (keyGeneratorClass.equals(ComplexKeyGenerator.class.getName()) && !recordKeyFields.contains(",") && !partitionPathFields.contains(",")
&& (!partitionPathFields.contains("timestamp")))) { // incase of ComplexKeyGen, check partition path type.
// simple fields for both record key and partition path: can directly use withColumn
String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields :
partitionPathFields.substring(partitionPathFields.indexOf(":") + 1);
rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType))
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType));
.withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, partitionColumn);
} else {
// use udf
String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,15 @@ private void init() throws IOException {
public void testBulkInsertHelperConcurrently() {
IntStream.range(0, 2).parallel().forEach(i -> {
if (i % 2 == 0) {
testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "_row_key");
testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(),
"_row_key",
getProps("_row_key", SimpleKeyGenerator.class.getName())
);
} else {
testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(), "ts");
testBulkInsertHelperFor(SimpleKeyGenerator.class.getName(),
"ts",
getProps("ts", SimpleKeyGenerator.class.getName())
);
}
});
}
Expand All @@ -114,10 +120,18 @@ private static Stream<Arguments> provideKeyGenArgs() {
@ParameterizedTest
@MethodSource("provideKeyGenArgs")
public void testBulkInsertHelper(String keyGenClass) {
testBulkInsertHelperFor(keyGenClass, "_row_key");
testBulkInsertHelperFor(keyGenClass, "_row_key", getProps("_row_key", keyGenClass));
}

private void testBulkInsertHelperFor(String keyGenClass, String recordKey) {
@ParameterizedTest
@MethodSource("provideKeyGenArgs")
public void testBulkInsertWithHiveStylePartition(String keyGenClass) {
Map<String, String> props = getProps("_row_key", keyGenClass);
props.put(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), "true");
testBulkInsertHelperFor(keyGenClass, "_row_key", props);
}

private Map<String, String> getProps(String recordKey, String keyGenClass) {
Map<String, String> props = null;
if (keyGenClass.equals(SimpleKeyGenerator.class.getName())) {
props = getPropsAllSet(recordKey);
Expand All @@ -126,6 +140,10 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKey) {
} else { // NonPartitioned key gen
props = getPropsForNonPartitionedKeyGen(recordKey);
}
return props;
}

private void testBulkInsertHelperFor(String keyGenClass, String recordKey, Map<String, String> props) {
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Expand All @@ -141,12 +159,25 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKey) {
}

boolean isNonPartitioned = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
boolean isComplexKeyGenerator = keyGenClass.equals(ComplexKeyGenerator.class.getName());
boolean hiveStylePartition = config.isHiveStylePartitioningEnabled();
result.toJavaRDD().foreach(entry -> {
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).equals(entry.getAs(recordKey).toString()));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)).equals(isNonPartitioned ? "" : entry.getAs("partition")));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)).equals(""));
assertTrue(entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)).equals(""));
String keyShouldBe = isComplexKeyGenerator ? recordKey + ":" + entry.getAs(recordKey).toString() : entry.getAs(recordKey).toString();
assertEquals(keyShouldBe, entry.get(resultSchema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)).toString());

String partitionShouldBe;
if (isNonPartitioned) {
partitionShouldBe = "";
} else if (hiveStylePartition) {
partitionShouldBe = "partition=" + entry.getAs("partition");
} else {
partitionShouldBe = entry.getAs("partition").toString();
}
assertEquals(partitionShouldBe, entry.get(resultSchema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)));

assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD)));
assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.COMMIT_TIME_METADATA_FIELD)));
assertEquals("", entry.get(resultSchema.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)));
});

Dataset<Row> trimmedOutput = result.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
Expand Down Expand Up @@ -277,7 +308,7 @@ private Map<String, String> getPropsForComplexKeyGen(String recordKey) {
Map<String, String> props = new HashMap<>();
props.put(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), ComplexKeyGenerator.class.getName());
props.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), recordKey);
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "simple:partition");
props.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition");
props.put(HoodieWriteConfig.TBL_NAME.key(), recordKey + "_table");
return props;
}
Expand Down