[HUDI-5685] Fixing deduplication in Bulk Insert row-writing path#7825
[HUDI-5685] Fixing deduplication in Bulk Insert row-writing path#7825alexeykudinkin wants to merge 11 commits intoapache:masterfrom
Conversation
96d711d to
247062c
Compare
| PRECOMBINE_FIELD.key -> preCombineField, | ||
| PARTITIONPATH_FIELD.key -> partitionFieldsStr, | ||
| PAYLOAD_CLASS_NAME.key -> payloadClassName, | ||
| HoodieWriteConfig.COMBINE_BEFORE_INSERT.key -> String.valueOf(hasPrecombineColumn), |
There was a problem hiding this comment.
I guess the intent here was to automatically infer the COMBINE_BEFORE_INSERT config. With this change, it not enough for user to just configure precombine field, they also need to enable COMBINE_BEFORE_INSERT if they want to deduplicate. Isn't it?
Is there validation in code which checks that if COMBINE_BEFORE_INSERT is enabled then precombine field is also configured? If not, it would be better to add as part of configs improvement story.
There was a problem hiding this comment.
can you clarify why removing it in this pr though?
| import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession} | ||
| import org.apache.spark.sql.{AnalysisException, Column, DataFrame, HoodieDataTypeUtils, HoodieInternalRowUtils, SparkSession} |
There was a problem hiding this comment.
nit: optimize imports (HoodieInternalRowUtils is not used).
| * | ||
| * For more details check out HUDI-5685 | ||
| */ | ||
| private case class TablePartitioningAwarePartitioner(override val numPartitions: Int, |
There was a problem hiding this comment.
I understand the benefit but have we tested it?
There was a problem hiding this comment.
Yes, this been tested in our benchmarking run
| def hasMetaFields(structType: StructType): Boolean = | ||
| structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined | ||
|
|
||
| // TODO scala-doc |
| val partitionPath = if (isPartitioned) row.getUTF8String(partitionPathMetaFieldOrd) else UTF8String.EMPTY_UTF8 | ||
| val recordKey = row.getUTF8String(recordKeyMetaFieldOrd) | ||
|
|
||
| ((partitionPath, recordKey), row) |
There was a problem hiding this comment.
Not needed anymore (we're doing subsequent shuffling which is sparing us a need to copy)
| * For more details check out HUDI-5685 | ||
| */ | ||
| private case class TablePartitioningAwarePartitioner(override val numPartitions: Int, | ||
| val isPartitioned: Boolean) extends Partitioner { |
There was a problem hiding this comment.
we don't need additional flag to tell partitioned or not. can just check if nonEmpty(partitionPath) ?
| def hasMetaFields(structType: StructType): Boolean = | ||
| structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined | ||
|
|
||
| // TODO scala-doc |
| structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined | ||
|
|
||
| // TODO scala-doc | ||
| def addMetaFields(schema: StructType): StructType = { |
There was a problem hiding this comment.
this is more like ensuring meta fields placed first in schema. so the name can be more accurate.
There was a problem hiding this comment.
This is a relocated method. Keeping the name for compatibility
…ields are not provided
… relying on native `UTF8String`); Made sure `reduceByKey` doesn't coalesce incoming RDD (preserving incoming # of partitions)
…e partitioning records in a way that is aware of the table partitioning
f526dc5 to
9b2ab5d
Compare
codope
left a comment
There was a problem hiding this comment.
Thanks for addressing the comments.
| val prependedRdd: RDD[InternalRow] = | ||
| df.queryExecution.toRdd.mapPartitions { iter => | ||
| val sourceRdd = df.queryExecution.toRdd | ||
| val populatedRdd: RDD[InternalRow] = if (hasMetaFields(schema)) { |
There was a problem hiding this comment.
is this for clustering row writer code path ?
| override def getPartition(key: Any): Int = { | ||
| key match { | ||
| case null => 0 | ||
| case (partitionPath, recordKey) => |
There was a problem hiding this comment.
won't this result in data skews? if one of the hudi partition has lot of data, the respective spark partition will skew the total time for de-dup right?
There was a problem hiding this comment.
this was one of the reason why we did not go w/ this to avoid data skews.
| * however assuming that meta-fields should either be omitted or specified in full | ||
| */ | ||
| def hasMetaFields(structType: StructType): Boolean = | ||
| structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined |
There was a problem hiding this comment.
minor. should we check for partition path as well ?
|
@alexeykudinkin : can you close if this is not valid anymore ? |
|
This is still a valid scenario when someone uses NONE as partitioner |
yihua
left a comment
There was a problem hiding this comment.
Closing this PR as the bulk insert behavior is intended in this way and the changes may be suboptimal for some particular cases where there are data skews.
Change Logs
Currently, in case flag
hoodie.combine.before.insertis set to true andhoodie.bulkinsert.sort.modeis set toNONE, Bulk Insert Row Writing performance will considerably degrade due to the following circumstancesdedupRows) records in the incoming RDD would be reshuffled (by Spark's defaultHashPartitioner) based on(partition-path, record-key)into N partitionsBulkInsertSortMode.NONEis used as partitioner, no re-partitioning will be performed and therefore each Spark task might be writing into M table partitionsThis PR addresses performance gap by introducing
TablePartitioningAwarePartitionerto partition records during de-duplication in a following wayImpact
This considerably improves writing performance for Bulk Insert Row Writing path w/ enabled de-duplication
Risk level (write none, low medium or high below)
Low
Documentation Update
N/A
Contributor's checklist