Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,21 @@ import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.index.SparkHoodieIndexFactory
import org.apache.hudi.keygen.{BuiltinKeyGenerator, SparkKeyGeneratorInterface}
import org.apache.hudi.table.action.commit.{BulkInsertDataInternalWriterHelper, ParallelismHelper}
import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
import org.apache.spark.Partitioner
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.HoodieDataTypeUtils.{addMetaFields, hasMetaFields}
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -62,23 +64,18 @@ object HoodieDatasetBulkInsertHelper
partitioner: BulkInsertPartitioner[Dataset[Row]],
shouldDropPartitionColumns: Boolean): Dataset[Row] = {
val populateMetaFields = config.populateMetaFields()
val schema = df.schema

val metaFields = Seq(
StructField(HoodieRecord.COMMIT_TIME_METADATA_FIELD, StringType),
StructField(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD, StringType),
StructField(HoodieRecord.RECORD_KEY_METADATA_FIELD, StringType),
StructField(HoodieRecord.PARTITION_PATH_METADATA_FIELD, StringType),
StructField(HoodieRecord.FILENAME_METADATA_FIELD, StringType))

val updatedSchema = StructType(metaFields ++ schema.fields)
val schema = df.schema
val populatedSchema = addMetaFields(schema)

val updatedDF = if (populateMetaFields) {
val keyGeneratorClassName = config.getStringOrThrow(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME,
"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
val sourceRdd = df.queryExecution.toRdd
val populatedRdd: RDD[InternalRow] = if (hasMetaFields(schema)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this for clustering row writer code path ?

sourceRdd
} else {
sourceRdd.mapPartitions { iter =>
val keyGenerator =
ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps))
.asInstanceOf[SparkKeyGeneratorInterface]
Expand All @@ -91,23 +88,26 @@ object HoodieDatasetBulkInsertHelper
val filename = UTF8String.EMPTY_UTF8

// TODO use mutable row, avoid re-allocating
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename,
row, false)
}
}
}

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
dedupRows(populatedRdd, populatedSchema, config.getPreCombineField)
} else {
prependedRdd
populatedRdd
}

HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, updatedSchema)
HoodieUnsafeUtils.createDataFrameFromRDD(df.sparkSession, dedupedRdd, populatedSchema)
} else {
// NOTE: In cases when we're not populating meta-fields we actually don't
// need access to the [[InternalRow]] and therefore can avoid the need
// to dereference [[DataFrame]] into [[RDD]]
val query = df.queryExecution.logical
val metaFieldsStubs = metaFields.map(f => Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), f.name)())
val metaFieldsStubs = HoodieRecord.HOODIE_META_COLUMNS.asScala
.map(metaFieldName => Alias(Literal(UTF8String.EMPTY_UTF8, dataType = StringType), metaFieldName)())
val prependedQuery = Project(metaFieldsStubs ++ query.output, query)

HoodieUnsafeUtils.createDataFrameFrom(df.sparkSession, prependedQuery)
Expand Down Expand Up @@ -171,36 +171,30 @@ object HoodieDatasetBulkInsertHelper
table.getContext.parallelize(writeStatuses.toList.asJava)
}

private def dedupeRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String, isGlobalIndex: Boolean): RDD[InternalRow] = {
private def dedupRows(rdd: RDD[InternalRow], schema: StructType, preCombineFieldRef: String): RDD[InternalRow] = {
val recordKeyMetaFieldOrd = schema.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)
val partitionPathMetaFieldOrd = schema.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)
// NOTE: Pre-combine field could be a nested field
val preCombineFieldPath = composeNestedFieldPath(schema, preCombineFieldRef)
.getOrElse(throw new HoodieException(s"Pre-combine field $preCombineFieldRef is missing in $schema"))

rdd.map { row =>
val rowKey = if (isGlobalIndex) {
row.getString(recordKeyMetaFieldOrd)
val partitionPath = row.getUTF8String(partitionPathMetaFieldOrd)
val recordKey = row.getUTF8String(recordKeyMetaFieldOrd)

((partitionPath, recordKey), row)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not copying the row here?

Copy link
Copy Markdown
Contributor Author

@alexeykudinkin alexeykudinkin Feb 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed anymore (we're doing subsequent shuffling which is sparing us a need to copy)

}
.reduceByKey(TablePartitioningAwarePartitioner(rdd.getNumPartitions),
(oneRow, otherRow) => {
val onePreCombineVal = getNestedInternalRowValue(oneRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
val otherPreCombineVal = getNestedInternalRowValue(otherRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
if (onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
oneRow
} else {
val partitionPath = row.getString(partitionPathMetaFieldOrd)
val recordKey = row.getString(recordKeyMetaFieldOrd)
s"$partitionPath:$recordKey"
otherRow
}
// NOTE: It's critical whenever we keep the reference to the row, to make a copy
// since Spark might be providing us with a mutable copy (updated during the iteration)
(rowKey, row.copy())
}
.reduceByKey {
(oneRow, otherRow) =>
val onePreCombineVal = getNestedInternalRowValue(oneRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
val otherPreCombineVal = getNestedInternalRowValue(otherRow, preCombineFieldPath).asInstanceOf[Comparable[AnyRef]]
if (onePreCombineVal.compareTo(otherPreCombineVal.asInstanceOf[AnyRef]) >= 0) {
oneRow
} else {
otherRow
}
}
.values
})
.values
}

private def dropPartitionColumns(df: DataFrame, config: HoodieWriteConfig): DataFrame = {
Expand All @@ -220,4 +214,41 @@ object HoodieDatasetBulkInsertHelper
val keyGenerator = ReflectionUtils.loadClass(keyGeneratorClassName, new TypedProperties(config.getProps)).asInstanceOf[BuiltinKeyGenerator]
keyGenerator.getPartitionPathFields.asScala
}

/**
* We use custom Spark [[Partitioner]] that is aware of the target table's partitioning
* so that during inevitable shuffling required for de-duplication, we also assign records
* into individual Spark partitions in a way affine with target table's physical partitioning
* (ie records from the same table's partition will be co-located in the same Spark's partition)
*
* This would allow us to
* <ul>
* <li>Save on additional shuffling subsequently (by [[BulkInsertPartitioner]])</li>
* <li>Avoid "small files explosion" entailed by random (hash) partitioning stemming
* from the fact that every Spark partition hosts records from many table's partitions
* resulting into every Spark task writing into their own files in these partitions (in
* case no subsequent re-partitioning is performed)
* </li>
* <ul>
*
* For more details check out HUDI-5685
*/
private case class TablePartitioningAwarePartitioner(override val numPartitions: Int) extends Partitioner {
override def getPartition(key: Any): Int = {
key match {
case null => 0
case (partitionPath, recordKey) =>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

won't this result in data skews? if one of the hudi partition has lot of data, the respective spark partition will skew the total time for de-dup right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was one of the reason why we did not go w/ this to avoid data skews.

val targetKey = if (isPartitioned(partitionPath.asInstanceOf[UTF8String])) partitionPath else recordKey
nonNegativeMod(targetKey.hashCode(), numPartitions)
}
}

private def isPartitioned(partitionPath: UTF8String): Boolean =
partitionPath.numBytes() > 0

private def nonNegativeMod(x: Int, mod: Int): Int = {
val rawMod = x % mod
rawMod + (if (rawMod < 0) mod else 0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,29 @@

package org.apache.spark.sql

import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.types._

import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter

object HoodieDataTypeUtils {

/**
* Checks whether provided schema contains Hudi's meta-fields
*
* NOTE: This method validates presence of just one field [[HoodieRecord.RECORD_KEY_METADATA_FIELD]],
* however assuming that meta-fields should either be omitted or specified in full
*/
def hasMetaFields(structType: StructType): Boolean =
structType.getFieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD).isDefined
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. should we check for partition path as well ?


// TODO scala-doc
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove todo?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resolve TODO

def addMetaFields(schema: StructType): StructType = {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more like ensuring meta fields placed first in schema. so the name can be more accurate.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a relocated method. Keeping the name for compatibility

val metaFieldNames = HoodieRecord.HOODIE_META_COLUMNS.asScala.toSeq
val dataFields = schema.fields.filterNot(f => metaFieldNames.contains(f.name))
StructType(metaFieldNames.map(StructField(_, StringType)) ++ dataFields)
}

/**
* Parses provided [[jsonSchema]] into [[StructType]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieDatasetBulkInsertHelper.prepareForBulkInsert
import org.apache.hudi.HoodieWriterUtils._
import org.apache.hudi.avro.AvroSchemaUtils.{isCompatibleProjectionOf, isSchemaCompatible}
import org.apache.hudi.avro.HoodieAvroUtils
Expand Down Expand Up @@ -717,15 +718,15 @@ object HoodieSparkSqlWriter {
}
}

def bulkInsertAsRow(sqlContext: SQLContext,
hoodieConfig: HoodieConfig,
df: DataFrame,
tblName: String,
basePath: Path,
path: String,
instantTime: String,
writerSchema: Schema,
isTablePartitioned: Boolean): (Boolean, common.util.Option[String]) = {
private def bulkInsertAsRow(sqlContext: SQLContext,
hoodieConfig: HoodieConfig,
df: DataFrame,
tblName: String,
basePath: Path,
path: String,
instantTime: String,
writerSchema: Schema,
isTablePartitioned: Boolean): (Boolean, common.util.Option[String]) = {
if (hoodieConfig.getBoolean(INSERT_DROP_DUPS)) {
throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")
}
Expand All @@ -739,19 +740,15 @@ object HoodieSparkSqlWriter {
val populateMetaFields = hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS)

val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {
val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
if (userDefinedBulkInsertPartitionerOpt.isPresent) {
userDefinedBulkInsertPartitionerOpt.get
} else {
BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned)
}
toScalaOption(DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig))
.getOrElse(BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, isTablePartitioned))
} else {
// Sort modes are not yet supported when meta fields are disabled
new NonSortPartitionerWithRows()
}

val shouldDropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns)
val hoodieDF = prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, shouldDropPartitionColumns)

val optsOverrides = Map(
HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expressi
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, HoodieDataTypeUtils, HoodieInternalRowUtils, SparkSession}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: optimize imports (HoodieInternalRowUtils is not used).


import java.net.URI
import java.text.SimpleDateFormat
Expand Down Expand Up @@ -130,27 +130,13 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
}
}

private def tripAlias(plan: LogicalPlan): LogicalPlan = {
plan match {
case SubqueryAlias(_, relation: LogicalPlan) =>
tripAlias(relation)
case other =>
other
}
}

/**
* Add the hoodie meta fields to the schema.
* @param schema
* @return
*/
def addMetaFields(schema: StructType): StructType = {
val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala
// filter the meta field to avoid duplicate field.
val dataFields = schema.fields.filterNot(f => metaFields.contains(f.name))
val fields = metaFields.map(StructField(_, StringType)) ++ dataFields
StructType(fields)
}
def addMetaFields(schema: StructType): StructType =
HoodieDataTypeUtils.addMetaFields(schema)

private lazy val metaFields = HoodieRecord.HOODIE_META_COLUMNS.asScala.toSet

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import scala.Tuple2;
import scala.collection.JavaConversions;
Expand Down Expand Up @@ -77,15 +78,6 @@ public TestHoodieDatasetBulkInsertHelper() throws IOException {
init();
}

/**
* args for schema evolution test.
*/
private static Stream<Arguments> providePreCombineArgs() {
return Stream.of(
Arguments.of(false),
Arguments.of(true));
}

private void init() throws IOException {
schemaStr = FileIOUtils.readAsUTFString(getClass().getResourceAsStream("/exampleSchema.txt"));
schema = DataSourceTestUtils.getStructTypeExampleSchema();
Expand Down Expand Up @@ -125,6 +117,10 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField)
} else { // NonPartitioned key gen
props = getPropsForNonPartitionedKeyGen(recordKeyField);
}

boolean isNonPartitionedKeyGen = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
boolean isComplexKeyGen = keyGenClass.equals(ComplexKeyGenerator.class.getName());

HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(props).combineInput(false, false).build();
List<Row> rows = DataSourceTestUtils.generateRandomRows(10);
Dataset<Row> dataset = sqlContext.createDataFrame(rows, structType);
Expand All @@ -139,8 +135,6 @@ private void testBulkInsertHelperFor(String keyGenClass, String recordKeyField)
assertEquals(entry.getValue(), resultSchema.fieldIndex(entry.getKey()));
}

boolean isNonPartitionedKeyGen = keyGenClass.equals(NonpartitionedKeyGenerator.class.getName());
boolean isComplexKeyGen = keyGenClass.equals(ComplexKeyGenerator.class.getName());

result.toJavaRDD().foreach(entry -> {
String recordKey = isComplexKeyGen ? String.format("%s:%s", recordKeyField, entry.getAs(recordKeyField)) : entry.getAs(recordKeyField).toString();
Expand Down Expand Up @@ -192,7 +186,7 @@ public void testBulkInsertHelperNoMetaFields() {
}

@ParameterizedTest
@MethodSource("providePreCombineArgs")
@CsvSource(value = { "true", "false" })
public void testBulkInsertPreCombine(boolean enablePreCombine) {
HoodieWriteConfig config = getConfigBuilder(schemaStr).withProps(getPropsAllSet("_row_key"))
.combineInput(enablePreCombine, enablePreCombine)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import org.apache.hudi.common.model.TestHoodieRecordSerialization.{OverwriteWithLatestAvroPayloadWithEquality, cloneUsingKryo, convertToAvroRecord, toUnsafeRow}
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.spark.sql.HoodieDataTypeUtils.addMetaFields
import org.apache.spark.sql.{HoodieInternalRowUtils, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.objects.SerializerSupport
import org.apache.spark.sql.catalyst.expressions.{GenericRowWithSchema, UnsafeRow}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.addMetaFields
import org.apache.spark.sql.types.{Decimal, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.junit.jupiter.api.Assertions.assertEquals
Expand Down