diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index a6488b07b5130..6702ec3e11af8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -26,19 +26,21 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.util.ReflectionUtils import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieException -import org.apache.hudi.index.SparkHoodieIndexFactory import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface} import org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, ParallelismHelper} import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable} import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked +import org.apache.spark.Partitioner import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD +import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection +import org.apache.spark.sql.HoodieDataTypeUtils.{addMetaFields, hasMetaFields} import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue} import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.Project -import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row} import org.apache.spark.unsafe.types.UTF8String @@ -62,23 +64,18 @@ object HoodieDatasetBulkInsertHelper partitioner: BulkInsertPartitioner[Dataset[Row]], shouldDropPartitionColumns: Boolean): Dataset[Row] = { val populateMetaFields = config.populateMetaFields() - val schema = df.schema - - val metaFields = Seq( - StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType), - StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType), - StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType), - StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType), - StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType)) - val updatedSchema = StructType(metaFields ++ schema.fields) + val schema = df.schema + val populatedSchema = addMetaFields(schema) val updatedDF = if (populateMetaFields) { val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME, "Key-generator class name is required") - - val prependedRdd: RDD[InternalRow] = - df.queryExecution.toRdd.mapPartitions { iter => + val sourceRdd = df.queryExecution.toRdd + val populatedRdd: RDD[InternalRow] = if (hasMetaFields(schema)) { + sourceRdd + } else { + sourceRdd.mapPartitions { iter => val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)) .asInstanceOf[SparkKeyGeneratorInterface] @@ -91,23 +88,26 @@ object HoodieDatasetBulkInsertHelper val filename = UTF8String.EMPTY_UTF8 // TODO use mutable row, avoid re-allocating - new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false) + new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, + row, false) } } + } val dedupedRdd = if (config.shouldCombineBeforeInsert) { - dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config)) + dedupRows(populatedRdd, populatedSchema, config.getPreCombineField) } else { - prependedRdd + populatedRdd } - HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema) + HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, populatedSchema) } else { // NOTE: In cases when we're not populating meta-fields we actually don't // need access to the [[InternalRow]] and therefore can avoid the need // to dereference [[DataFrame]] into [[RDD]] val query = df.queryExecution.logical - val metaFieldsStubs = metaFields.map(f => Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)()) + val metaFieldsStubs = HoodieRecord.HOODIE_META_COLUMNS.asScala + .map(metaFieldName => Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), metaFieldName)()) val prependedQuery = Project(metaFieldsStubs ++ query.output, query) HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery) @@ -171,7 +171,7 @@ object HoodieDatasetBulkInsertHelper table.getContext.parallelize(writeStatuses.toList.asJava) } - private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = { + private def dedupRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String): RDD[InternalRow] = { val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD) val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD) // NOTE: Pre-combine field could be a nested field @@ -179,28 +179,22 @@ object HoodieDatasetBulkInsertHelper .getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema")) rdd.map { row => - val rowKey = if (isGlobalIndex) { - row.getString(recordKeyMetaFieldOrd) + val partitionPath = row.getUTF8String(partitionPathMetaFieldOrd) + val recordKey = row.getUTF8String(recordKeyMetaFieldOrd) + + ((partitionPath, recordKey), row) + } + .reduceByKey(TablePartitioningAwarePartitioner(rdd.getNumPartitions), + (oneRow, otherRow) => { + val onePreCombineVal = getNestedInternalRowValue(oneRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]] + val otherPreCombineVal = getNestedInternalRowValue(otherRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]] + if (onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) { + oneRow } else { - val partitionPath = row.getString(partitionPathMetaFieldOrd) - val recordKey = row.getString(recordKeyMetaFieldOrd) - s"$partitionPath:$recordKey" + otherRow } - // NOTE: It's critical whenever we keep the reference to the row, to make a copy - // since Spark might be providing us with a mutable copy (updated during the iteration) - (rowKey, row.copy()) - } - .reduceByKey { - (oneRow, otherRow) => - val onePreCombineVal = getNestedInternalRowValue(oneRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]] - val otherPreCombineVal = getNestedInternalRowValue(otherRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]] - if (onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) { - oneRow - } else { - otherRow - } - } - .values + }) + .values } private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = { @@ -220,4 +214,41 @@ object HoodieDatasetBulkInsertHelper val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator] keyGenerator.getPartitionPathFields.asScala } + + /** + * We use custom Spark [[Partitioner]] that is aware of the target table's partitioning + * so that during inevitable shuffling required for de-duplication, we also assign records + * into individual Spark partitions in a way affine with target table's physical partitioning + * (ie records from the same table's partition will be co-located in the same Spark's partition) + * + * This would allow us to + *