-
Notifications
You must be signed in to change notification settings - Fork 2.5k
[HUDI-4365] Fixing URL-encoding in Bulk Insert row-writing path #6049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,11 +24,11 @@ | |
| import org.apache.hudi.config.HoodieWriteConfig; | ||
| import org.apache.hudi.keygen.BuiltinKeyGenerator; | ||
| import org.apache.hudi.keygen.ComplexKeyGenerator; | ||
| import org.apache.hudi.keygen.KeyGenUtils; | ||
| import org.apache.hudi.keygen.NonpartitionedKeyGenerator; | ||
| import org.apache.hudi.keygen.SimpleKeyGenerator; | ||
| import org.apache.hudi.keygen.constant.KeyGeneratorOptions; | ||
| import org.apache.hudi.table.BulkInsertPartitioner; | ||
|
|
||
| import org.apache.log4j.LogManager; | ||
| import org.apache.log4j.Logger; | ||
| import org.apache.spark.sql.Column; | ||
|
|
@@ -38,15 +38,14 @@ | |
| import org.apache.spark.sql.api.java.UDF1; | ||
| import org.apache.spark.sql.functions; | ||
| import org.apache.spark.sql.types.DataTypes; | ||
| import scala.collection.JavaConverters; | ||
|
|
||
| import java.util.ArrayList; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import scala.collection.JavaConverters; | ||
|
|
||
| import static org.apache.spark.sql.functions.callUDF; | ||
|
|
||
| /** | ||
|
|
@@ -97,8 +96,31 @@ public static Dataset<Row> prepareHoodieDatasetForBulkInsert(SQLContext sqlConte | |
| // simple fields for both record key and partition path: can directly use withColumn | ||
| String partitionPathField = keyGeneratorClass.equals(SimpleKeyGenerator.class.getName()) ? partitionPathFields : | ||
| partitionPathFields.substring(partitionPathFields.indexOf(":") + 1); | ||
| rowDatasetWithRecordKeysAndPartitionPath = rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) | ||
| .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType)); | ||
|
|
||
| // TODO(HUDI-3993) cleanup duplication | ||
| String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); | ||
| String partitionPathDecorationUDFName = PARTITION_PATH_UDF_FN + tableName; | ||
|
|
||
| boolean shouldURLEncodePartitionPath = config.shouldURLEncodePartitionPath(); | ||
| boolean isHiveStylePartitioned = config.isHiveStylePartitioningEnabled(); | ||
|
|
||
| if (shouldURLEncodePartitionPath || isHiveStylePartitioned) { | ||
| sqlContext.udf().register( | ||
| partitionPathDecorationUDFName, | ||
| (UDF1<String, String>) partitionPathValue -> | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So, I assume this UDF registration would be gone after HUDI-3993?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct |
||
| KeyGenUtils.handlePartitionPathDecoration(partitionPathField, partitionPathValue, | ||
| shouldURLEncodePartitionPath, isHiveStylePartitioned), | ||
| DataTypes.StringType); | ||
|
|
||
| rowDatasetWithRecordKeysAndPartitionPath = | ||
| rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In As I understand here, we use
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @TengHuo That's a good point. It can be a problem if you mix the write operation type or switch row-writing config for a table. I would suggest filing another JIRA ticket to keep it consistent across. I don't deem it to be a blocker but would be good to keep it consistent.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. okay, got it then think it will have duplicate data issue if user upgrade from 0.10 or older version when they only setup one column as record key and use |
||
| .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, | ||
| callUDF(partitionPathDecorationUDFName, functions.col(partitionPathField).cast(DataTypes.StringType))); | ||
| } else { | ||
| rowDatasetWithRecordKeysAndPartitionPath = | ||
| rows.withColumn(HoodieRecord.RECORD_KEY_METADATA_FIELD, functions.col(recordKeyFields).cast(DataTypes.StringType)) | ||
| .withColumn(HoodieRecord.PARTITION_PATH_METADATA_FIELD, functions.col(partitionPathField).cast(DataTypes.StringType)); | ||
| } | ||
| } else { | ||
| // use udf | ||
| String tableName = properties.getString(HoodieWriteConfig.TBL_NAME.key()); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,17 +23,16 @@ | |
| import org.apache.hudi.common.config.TypedProperties; | ||
| import org.apache.hudi.common.model.HoodieRecord; | ||
| import org.apache.hudi.common.util.Option; | ||
| import org.apache.hudi.common.util.PartitionPathEncodeUtils; | ||
| import org.apache.hudi.config.HoodieWriteConfig; | ||
| import org.apache.hudi.exception.HoodieIOException; | ||
| import org.apache.hudi.io.storage.row.HoodieRowCreateHandle; | ||
| import org.apache.hudi.io.storage.row.HoodieRowCreateHandleWithoutMetaFields; | ||
| import org.apache.hudi.keygen.BuiltinKeyGenerator; | ||
| import org.apache.hudi.keygen.KeyGenUtils; | ||
| import org.apache.hudi.keygen.NonpartitionedKeyGenerator; | ||
| import org.apache.hudi.keygen.SimpleKeyGenerator; | ||
| import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; | ||
| import org.apache.hudi.table.HoodieTable; | ||
|
|
||
| import org.apache.log4j.LogManager; | ||
| import org.apache.log4j.Logger; | ||
| import org.apache.spark.sql.catalyst.InternalRow; | ||
|
|
@@ -126,14 +125,17 @@ public void write(InternalRow record) throws IOException { | |
| if (populateMetaFields) { // usual path where meta fields are pre populated in prep step. | ||
| partitionPath = String.valueOf(record.getUTF8String(HoodieRecord.PARTITION_PATH_META_FIELD_POS)); | ||
| } else { // if meta columns are disabled. | ||
| // TODO(HUDI-3993) remove duplication, unify with HoodieDatasetBulkInsertHelper | ||
| if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen | ||
| partitionPath = ""; | ||
| } else if (simpleKeyGen) { // SimpleKeyGen | ||
| Object parititionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); | ||
| partitionPath = parititionPathValue != null ? parititionPathValue.toString() : PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH; | ||
| if (writeConfig.isHiveStylePartitioningEnabled()) { | ||
| partitionPath = (keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath; | ||
| } | ||
| Object partitionPathValue = record.get(simplePartitionFieldIndex, simplePartitionFieldDataType); | ||
| String partitionPathField = keyGeneratorOpt.get().getPartitionPathFields().get(0); | ||
| boolean shouldURLEncodePartitionPath = writeConfig.shouldURLEncodePartitionPath(); | ||
| boolean hiveStylePartitioningEnabled = writeConfig.isHiveStylePartitioningEnabled(); | ||
|
|
||
| partitionPath = KeyGenUtils.handlePartitionPathDecoration(partitionPathField, | ||
| partitionPathValue == null ? null : partitionPathValue.toString(), shouldURLEncodePartitionPath, hiveStylePartitioningEnabled); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider passing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, done that initially but then decided to optimize it out given that this is a hot-path |
||
| } else { | ||
| // only BuiltIn key generators are supported if meta fields are disabled. | ||
| partitionPath = keyGeneratorOpt.get().getPartitionPath(record, structType); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1