Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,24 @@
* PartitionId refers to spark's partition Id.
* RowId refers to the row index within the spark partition.
*/
public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator {
public class AutoRecordGenWrapperAvroKeyGenerator extends BaseKeyGenerator implements AutoRecordKeyGeneratorWrapper {

private final BaseKeyGenerator keyGenerator;
private final int partitionId;
private final String instantTime;
private Integer partitionId;
private String instantTime;
private int rowId;

public AutoRecordGenWrapperAvroKeyGenerator(TypedProperties config, BaseKeyGenerator keyGenerator) {
super(config);
this.keyGenerator = keyGenerator;
this.rowId = 0;
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
partitionId = null;
instantTime = null;
}

@Override
public String getRecordKey(GenericRecord record) {
return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
return generateSequenceId(rowId++);
}

@Override
Expand All @@ -80,4 +80,19 @@ public List<String> getPartitionPathFields() {
public boolean isConsistentLogicalTimestampEnabled() {
return keyGenerator.isConsistentLogicalTimestampEnabled();
}

@Override
public BaseKeyGenerator getPartitionKeyGenerator() {
return keyGenerator;
}

private String generateSequenceId(long recordIndex) {
if (partitionId == null) {
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
}
if (instantTime == null) {
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
}
return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.keygen;

/**
* Interface for {@link KeyGenerator} implementations that
* generate a unique record key internally.
*/
public interface AutoRecordKeyGeneratorWrapper {

/**
* @returns the underlying key generator used for the partition path.
*/
BaseKeyGenerator getPartitionKeyGenerator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,62 +47,76 @@
* PartitionId refers to spark's partition Id.
* RowId refers to the row index within the spark partition.
*/
public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator {
public class AutoRecordGenWrapperKeyGenerator extends BuiltinKeyGenerator implements AutoRecordKeyGeneratorWrapper {

private final BuiltinKeyGenerator builtinKeyGenerator;
private final int partitionId;
private final String instantTime;
private final BuiltinKeyGenerator keyGenerator;
private Integer partitionId;
private String instantTime;
private int rowId;

public AutoRecordGenWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator builtinKeyGenerator) {
public AutoRecordGenWrapperKeyGenerator(TypedProperties config, BuiltinKeyGenerator keyGenerator) {
super(config);
this.builtinKeyGenerator = builtinKeyGenerator;
this.keyGenerator = keyGenerator;
this.rowId = 0;
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
partitionId = null;
instantTime = null;
}

@Override
public String getRecordKey(GenericRecord record) {
return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
return generateSequenceId(rowId++);
}

@Override
public String getPartitionPath(GenericRecord record) {
return builtinKeyGenerator.getPartitionPath(record);
return keyGenerator.getPartitionPath(record);
}

@Override
public String getRecordKey(Row row) {
return HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++);
return generateSequenceId(rowId++);
}

@Override
public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
return UTF8String.fromString(HoodieRecord.generateSequenceId(instantTime, partitionId, rowId++));
return UTF8String.fromString(generateSequenceId(rowId++));
}

@Override
public String getPartitionPath(Row row) {
return builtinKeyGenerator.getPartitionPath(row);
return keyGenerator.getPartitionPath(row);
}

@Override
public UTF8String getPartitionPath(InternalRow internalRow, StructType schema) {
return builtinKeyGenerator.getPartitionPath(internalRow, schema);
return keyGenerator.getPartitionPath(internalRow, schema);
}

@Override
public List<String> getRecordKeyFieldNames() {
return builtinKeyGenerator.getRecordKeyFieldNames();
return keyGenerator.getRecordKeyFieldNames();
}

public List<String> getPartitionPathFields() {
return builtinKeyGenerator.getPartitionPathFields();
return keyGenerator.getPartitionPathFields();
}

public boolean isConsistentLogicalTimestampEnabled() {
return builtinKeyGenerator.isConsistentLogicalTimestampEnabled();
return keyGenerator.isConsistentLogicalTimestampEnabled();
}

@Override
public BuiltinKeyGenerator getPartitionKeyGenerator() {
return keyGenerator;
}

private String generateSequenceId(long recordIndex) {
if (partitionId == null) {
this.partitionId = config.getInteger(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG);
}
if (instantTime == null) {
this.instantTime = config.getString(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG);
}
return HoodieRecord.generateSequenceId(instantTime, partitionId, recordIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ import org.apache.hudi.common.config.TypedProperties
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.common.util.ValidationUtils.checkArgument
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator, SimpleKeyGenerator}
import org.apache.hudi.keygen.{AutoRecordKeyGeneratorWrapper, AutoRecordGenWrapperKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, GlobalAvroDeleteKeyGenerator, GlobalDeleteKeyGenerator, KeyGenerator, NonpartitionedAvroKeyGenerator, NonpartitionedKeyGenerator}
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory.getKeyGeneratorClassName

import scala.collection.JavaConverters._

object SparkKeyGenUtils {

Expand All @@ -34,26 +31,34 @@ object SparkKeyGenUtils {
* @return partition columns
*/
def getPartitionColumns(props: TypedProperties): String = {
val keyGeneratorClass = getKeyGeneratorClassName(props)
getPartitionColumns(keyGeneratorClass, props)
val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
getPartitionColumns(keyGenerator, props)
}

/**
* @param keyGen key generator class name
* @return partition columns
*/
def getPartitionColumns(keyGenClass: String, typedProperties: TypedProperties): String = {
def getPartitionColumns(keyGenClass: KeyGenerator, typedProperties: TypedProperties): String = {
// For {@link AutoRecordGenWrapperKeyGenerator} or {@link AutoRecordGenWrapperAvroKeyGenerator},
// get the base key generator for the partition paths
var baseKeyGen = keyGenClass match {
case autoRecordKeyGenerator: AutoRecordKeyGeneratorWrapper =>
autoRecordKeyGenerator.getPartitionKeyGenerator
case _ => keyGenClass
}

// For CustomKeyGenerator and CustomAvroKeyGenerator, the partition path filed format
// is: "field_name: field_type", we extract the field_name from the partition path field.
if (keyGenClass.equals(classOf[CustomKeyGenerator].getCanonicalName) || keyGenClass.equals(classOf[CustomAvroKeyGenerator].getCanonicalName)) {
if (baseKeyGen.isInstanceOf[CustomKeyGenerator] || baseKeyGen.isInstanceOf[CustomAvroKeyGenerator]) {
typedProperties.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())
.split(",").map(pathField => {
pathField.split(CustomAvroKeyGenerator.SPLIT_REGEX)
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${keyGenClass}")}).mkString(",")
} else if (keyGenClass.equals(classOf[NonpartitionedKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[NonpartitionedAvroKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[GlobalDeleteKeyGenerator].getCanonicalName)
|| keyGenClass.equals(classOf[GlobalAvroDeleteKeyGenerator].getCanonicalName)) {
.headOption.getOrElse(s"Illegal partition path field format: '$pathField' for ${baseKeyGen}")}).mkString(",")
} else if (baseKeyGen.isInstanceOf[NonpartitionedKeyGenerator]
|| baseKeyGen.isInstanceOf[NonpartitionedAvroKeyGenerator]
|| baseKeyGen.isInstanceOf[GlobalDeleteKeyGenerator]
|| baseKeyGen.isInstanceOf[GlobalAvroDeleteKeyGenerator]) {
StringUtils.EMPTY_STRING
} else {
checkArgument(typedProperties.containsKey(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()), "Partition path needs to be set")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,14 @@ class HoodieSparkSqlWriterInternal {
}
}

val keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(hoodieConfig.getProps))
if (mode == SaveMode.Ignore && tableExists) {
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")
(false, common.util.Option.empty(), common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)
} else {
// Handle various save modes
handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(getKeyGeneratorClassName(new TypedProperties(hoodieConfig.getProps)),
toProperties(parameters))
val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))
val timelineTimeZone = HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))
val tableMetaClient = if (tableExists) {
HoodieInstantTimeGenerator.setCommitTimeZone(timelineTimeZone)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ object HoodieWriterUtils {
}

val datasourcePartitionFields = params.getOrElse(PARTITIONPATH_FIELD.key(), null)
val currentPartitionFields = if (datasourcePartitionFields == null) null else SparkKeyGenUtils.getPartitionColumns(TypedProperties.fromMap(params))
val tableConfigPartitionFields = tableConfig.getString(HoodieTableConfig.PARTITION_FIELDS)
if (null != datasourcePartitionFields && null != tableConfigPartitionFields
&& datasourcePartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$datasourcePartitionFields\t$tableConfigPartitionFields\n")
&& currentPartitionFields != tableConfigPartitionFields) {
diffConfigs.append(s"PartitionPath:\t$currentPartitionFields\t$tableConfigPartitionFields\n")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class TestHoodieSparkSqlWriter {
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)

// try write to Hudi
assertThrows[IllegalArgumentException] {
assertThrows[IOException] {
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, tableOpts - DataSourceWriteOptions.PARTITIONPATH_FIELD.key, df)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,8 +1001,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup
writer.save(basePath)
fail("should fail when invalid PartitionKeyType is provided!")
} catch {
case e: Exception =>
assertTrue(e.getCause.getMessage.contains("No enum constant org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType.DUMMY"))
case e: Exception => assertTrue(e.getCause.getMessage.contains("Unable to instantiate class org.apache.hudi.keygen.CustomKeyGenerator"))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,16 @@ public void testKafkaConnectCheckpointProvider() throws IOException {

@Test
public void testPropsWithInvalidKeyGenerator() {
Exception e = assertThrows(IllegalArgumentException.class, () -> {
Exception e = assertThrows(IOException.class, () -> {
String tableBasePath = basePath + "/test_table_invalid_key_gen";
HoodieDeltaStreamer deltaStreamer =
new HoodieDeltaStreamer(TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()), PROPS_FILENAME_TEST_INVALID, false), jsc);
deltaStreamer.sync();
}, "Should error out when setting the key generator class property to an invalid value");
// expected
LOG.debug("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("No KeyGeneratorType found for class name"));
LOG.warn("Expected error during getting the key generator", e);
assertTrue(e.getMessage().contains("Could not load key generator class invalid"));
}

private static Stream<Arguments> provideInferKeyGenArgs() {
Expand Down