Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, getNestedInternalRowValue}
import org.apache.spark.sql.HoodieUnsafeUtils.getOutputPartitioning
import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
Expand All @@ -45,7 +45,7 @@ import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters.{asScalaBufferConverter, seqAsJavaListConverter}

object HoodieDatasetBulkInsertHelper
extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df => getOutputPartitioning(df).numPartitions)) with Logging {
extends ParallelismHelper[DataFrame](toJavaSerializableFunctionUnchecked(df => getNumPartitions(df))) with Logging {

/**
* Prepares [[DataFrame]] for bulk-insert into Hudi table, taking following steps:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.hudi.HoodieUnsafeRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair

Expand All @@ -38,8 +39,21 @@ object HoodieUnsafeUtils {
* but instead will just execute Spark resolution, optimization and actual execution planning stages
* returning instance of [[SparkPlan]] ready for execution
*/
def getOutputPartitioning(df: DataFrame): Partitioning =
df.queryExecution.executedPlan.outputPartitioning
def getNumPartitions(df: DataFrame): Int = {
// NOTE: In general we'd rely on [[outputPartitioning]] of the executable [[SparkPlan]] to determine
// number of partitions plan is going to be executed with.
// However in case of [[LogicalRDD]] plan's output-partitioning will be stubbed as [[UnknownPartitioning]]
// and therefore we will be falling back to determine number of partitions by looking at the RDD itself
df.queryExecution.logical match {
case LogicalRDD(_, rdd, outputPartitioning, _, _) =>
outputPartitioning match {
case _: UnknownPartitioning => rdd.getNumPartitions
case _ => outputPartitioning.numPartitions
}

case _ => df.queryExecution.executedPlan.outputPartitioning.numPartitions
}
}

/**
* Creates [[DataFrame]] from provided [[plan]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import org.apache.hudi.common.model.WriteOperationType
import org.apache.spark.sql.hudi.command.HoodieSparkValidateDuplicateKeyRecordMerger
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.exception.HoodieDuplicateKeyException
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase.getLastCommitMetadata
import org.scalatest.Inspectors.forAll

import java.io.File

Expand Down Expand Up @@ -1159,4 +1161,68 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
)
}
}

/**
* This test is to make sure that bulk insert doesn't create a bunch of tiny files if
* hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start with the partition columns
*
* NOTE: Additionally, this test serves as a smoke test making sure that all of the bulk-insert
* modes work
*/
test(s"Test Bulk Insert with all sort-modes") {
withTempDir { basePath =>
BulkInsertSortMode.values().foreach { sortMode =>
val tableName = generateTableName
// Remove these with [HUDI-5419]
spark.sessionState.conf.unsetConf("hoodie.datasource.write.operation")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.insert.drop.duplicates")
spark.sessionState.conf.unsetConf("hoodie.merge.allow.duplicate.on.inserts")
spark.sessionState.conf.unsetConf("hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled")
// Default parallelism is 200 which means in global sort, each record will end up in a different spark partition so
// 9 files would be created. Setting parallelism to 3 so that each spark partition will contain a hudi partition.
val parallelism = if (sortMode.name.equals(BulkInsertSortMode.GLOBAL_SORT.name())) {
"hoodie.bulkinsert.shuffle.parallelism = 3,"
} else {
""
}
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| dt string
|) using hudi
| tblproperties (
| primaryKey = 'id',
| preCombineField = 'name',
| type = 'cow',
| $parallelism
| hoodie.bulkinsert.sort.mode = '${sortMode.name}'
| )
| partitioned by (dt)
| location '${basePath.getCanonicalPath}/$tableName'
""".stripMargin)

spark.sql("set hoodie.sql.bulk.insert.enable = true")
spark.sql("set hoodie.sql.insert.mode = non-strict")

spark.sql(
s"""insert into $tableName values
|(5, 'a', 35, '2021-05-21'),
|(1, 'a', 31, '2021-01-21'),
|(3, 'a', 33, '2021-03-21'),
|(4, 'b', 16, '2021-05-21'),
|(2, 'b', 18, '2021-01-21'),
|(6, 'b', 17, '2021-03-21'),
|(8, 'a', 21, '2021-05-21'),
|(9, 'a', 22, '2021-01-21'),
|(7, 'a', 23, '2021-03-21')
|""".stripMargin)

// TODO re-enable
//assertResult(3)(spark.sql(s"select distinct _hoodie_file_name from $tableName").count())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this has been disabled?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is failing currently

@jonvex will follow-up to enable it

}
}
}
}