Skip to content

Commit e0b228c

Browse files
colinmjjLantaoJin
authored andcommitted
[Carmel-7244] Backport add parameters to limit the number of dynamic partitions for data source table (apache#61)
Co-authored-by: Lantao, Jin(lajin) <[email protected]>
1 parent 6476bf8 commit e0b228c

File tree

11 files changed

+365
-25
lines changed

11 files changed

+365
-25
lines changed

core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -206,26 +206,46 @@ object FileCommitProtocol extends Logging {
206206
className: String,
207207
jobId: String,
208208
outputPath: String,
209-
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
209+
dynamicPartitionOverwrite: Boolean = false,
210+
restrictions: Map[String, _] = Map.empty): FileCommitProtocol = {
210211

211212
logDebug(s"Creating committer $className; job $jobId; output=$outputPath;" +
212213
s" dynamic=$dynamicPartitionOverwrite")
213214
val clazz = Utils.classForName[FileCommitProtocol](className)
214215
// First try the constructor with arguments (jobId: String, outputPath: String,
216+
// dynamicPartitionOverwrite: Boolean,
217+
// restrictions: Map[String, _]).
218+
// If that doesn't exist, try the one with (jobId: String, outputPath: String,
215219
// dynamicPartitionOverwrite: Boolean).
216-
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
220+
// If that still doesn't exist, try the one with (jobId: string, outputPath: String).
217221
try {
218-
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
219-
logDebug("Using (String, String, Boolean) constructor")
220-
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
222+
val ctor = clazz.getDeclaredConstructor(
223+
classOf[String], classOf[String], classOf[Boolean],
224+
classOf[Map[String, _]])
225+
logDebug("Using (String, String, Boolean, Map[String, _]) constructor")
226+
ctor.newInstance(
227+
jobId,
228+
outputPath,
229+
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean],
230+
restrictions.asInstanceOf[Map[String, _]]
231+
)
221232
} catch {
222233
case _: NoSuchMethodException =>
223-
logDebug("Falling back to (String, String) constructor")
224-
require(!dynamicPartitionOverwrite,
225-
"Dynamic Partition Overwrite is enabled but" +
226-
s" the committer ${className} does not have the appropriate constructor")
227-
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
228-
ctor.newInstance(jobId, outputPath)
234+
try {
235+
val ctor = clazz.getDeclaredConstructor(
236+
classOf[String], classOf[String], classOf[Boolean])
237+
logDebug("Using (String, String, Boolean) constructor")
238+
ctor.newInstance(jobId, outputPath,
239+
dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
240+
} catch {
241+
case _: NoSuchMethodException =>
242+
logDebug("Falling back to (String, String) constructor")
243+
require(!dynamicPartitionOverwrite,
244+
"Dynamic Partition Overwrite is enabled but" +
245+
s" the committer ${className} does not have the appropriate constructor")
246+
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
247+
ctor.newInstance(jobId, outputPath)
248+
}
229249
}
230250
}
231251

core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class HadoopMapReduceCommitProtocol(
8383
* [[HadoopMapReduceCommitProtocol#path]] need not be a valid [[org.apache.hadoop.fs.Path]] for
8484
* committers not writing to distributed file systems.
8585
*/
86-
private val hasValidPath = Try { new Path(path) }.isSuccess
86+
protected val hasValidPath = Try { new Path(path) }.isSuccess
8787

8888
/**
8989
* Tracks files staged by this task for absolute output paths. These outputs are not managed by
@@ -106,6 +106,8 @@ class HadoopMapReduceCommitProtocol(
106106
*/
107107
@transient protected lazy val stagingDir = getStagingDir(path, jobId)
108108

109+
@transient private var fileCounter: Int = _
110+
109111
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
110112
val format = context.getOutputFormatClass.getConstructor().newInstance()
111113
// If OutputFormat is Configurable, we should set conf to it.
@@ -136,7 +138,7 @@ class HadoopMapReduceCommitProtocol(
136138
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
137139
case _ => new Path(path)
138140
}
139-
141+
fileCounter += 1
140142
dir.map { d =>
141143
new Path(new Path(stagingDir, d), filename).toString
142144
}.getOrElse {
@@ -159,6 +161,7 @@ class HadoopMapReduceCommitProtocol(
159161
val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString
160162

161163
addedAbsPathFiles(tmpOutputPath) = absOutputPath
164+
fileCounter += 1
162165
tmpOutputPath
163166
}
164167

@@ -192,8 +195,8 @@ class HadoopMapReduceCommitProtocol(
192195
committer.commitJob(jobContext)
193196

194197
if (hasValidPath) {
195-
val (allAbsPathFiles, allPartitionPaths) =
196-
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
198+
val (allAbsPathFiles, allPartitionPaths, _) =
199+
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String], Int)]).unzip3
197200
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
198201

199202
val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
@@ -270,14 +273,15 @@ class HadoopMapReduceCommitProtocol(
270273
committer.setupTask(taskContext)
271274
addedAbsPathFiles = mutable.Map[String, String]()
272275
partitionPaths = mutable.Set[String]()
276+
fileCounter = 0
273277
}
274278

275279
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
276280
val attemptId = taskContext.getTaskAttemptID
277281
logTrace(s"Commit task ${attemptId}")
278282
SparkHadoopMapRedUtil.commitTask(
279283
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
280-
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
284+
new TaskCommitMessage(addedAbsPathFiles.toMap, partitionPaths.toSet, fileCounter)
281285
}
282286

283287
/**

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2987,6 +2987,11 @@ private[spark] object Utils
29872987
resultProps
29882988
}
29892989

2990+
@tailrec
2991+
def findFirstCause(t: Throwable): Throwable = {
2992+
if (t.getCause == null) t else findFirstCause(t.getCause)
2993+
}
2994+
29902995
/**
29912996
* Convert a sequence of `Path`s to a metadata string. When the length of metadata string
29922997
* exceeds `stopAppendingThreshold`, stop appending paths for saving memory.

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3147,6 +3147,30 @@ object SQLConf {
31473147
.checkValues(PartitionOverwriteMode.values.map(_.toString))
31483148
.createWithDefault(PartitionOverwriteMode.STATIC.toString)
31493149

3150+
val DYNAMIC_PARTITION_MAX_PARTITIONS =
3151+
buildConf("spark.carmel.sql.dynamic.partition.maxPartitions")
3152+
.doc("Maximum total number of dynamic partitions allowed to be created by one DML. " +
3153+
s"This only takes effect when ${FILE_COMMIT_PROTOCOL_CLASS.key} set to " +
3154+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
3155+
.intConf
3156+
.createWithDefault(Int.MaxValue)
3157+
3158+
val DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK =
3159+
buildConf("spark.carmel.sql.dynamic.partition.maxPartitionsPerTask")
3160+
.doc("Maximum number of dynamic partitions allowed to be created per task. " +
3161+
s"This only takes effect when ${FILE_COMMIT_PROTOCOL_CLASS.key} set to " +
3162+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
3163+
.intConf
3164+
.createWithDefault(Int.MaxValue)
3165+
3166+
val DYNAMIC_PARTITION_MAX_CREATED_FILES =
3167+
buildConf("spark.carmel.sql.dynamic.partition.maxCreatedFiles")
3168+
.doc("Maximum total number of files allowed to be created in dynamic partitions write " +
3169+
"by one DML. This only takes effect when ${FILE_COMMIT_PROTOCOL_CLASS.key} set to " +
3170+
s"org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
3171+
.longConf
3172+
.createWithDefault(Long.MaxValue)
3173+
31503174
object StoreAssignmentPolicy extends Enumeration {
31513175
val ANSI, LEGACY, STRICT = Value
31523176
}
@@ -5199,6 +5223,12 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
51995223
def partitionOverwriteMode: PartitionOverwriteMode.Value =
52005224
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
52015225

5226+
def maxDynamicPartitions: Int = getConf(DYNAMIC_PARTITION_MAX_PARTITIONS)
5227+
5228+
def maxDynamicPartitionsPerTask: Int = getConf(DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK)
5229+
5230+
def maxCreatedFilesInDynamicPartition: Long = getConf(DYNAMIC_PARTITION_MAX_CREATED_FILES)
5231+
52025232
def storeAssignmentPolicy: StoreAssignmentPolicy.Value =
52035233
StoreAssignmentPolicy.withName(getConf(STORE_ASSIGNMENT_POLICY))
52045234

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
3030
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3131
import org.apache.spark.sql.execution.SparkPlan
3232
import org.apache.spark.sql.execution.command._
33+
import org.apache.spark.sql.internal.SQLConf
3334
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
3435
import org.apache.spark.sql.util.SchemaUtils
3536

@@ -108,11 +109,33 @@ case class InsertIntoHadoopFsRelationCommand(
108109
}
109110

110111
val jobId = java.util.UUID.randomUUID().toString
112+
val parameters = CaseInsensitiveMap(options)
113+
114+
val partitionOverwriteMode = parameters.get("partitionOverwriteMode")
115+
// scalastyle:off caselocale
116+
.map(mode => PartitionOverwriteMode.withName(mode.toUpperCase))
117+
// scalastyle:on caselocale
118+
.getOrElse(sparkSession.sessionState.conf.partitionOverwriteMode)
119+
val enableDynamicOverwrite = partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
120+
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
121+
// partition columns.
122+
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
123+
staticPartitions.size < partitionColumns.length
124+
val dynamicPartitionRestrictions: Map[String, _] = Map(
125+
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS.key ->
126+
sparkSession.sessionState.conf.maxDynamicPartitions,
127+
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK.key ->
128+
sparkSession.sessionState.conf.maxDynamicPartitionsPerTask,
129+
SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key ->
130+
sparkSession.sessionState.conf.maxCreatedFilesInDynamicPartition
131+
)
132+
111133
val committer = FileCommitProtocol.instantiate(
112134
sparkSession.sessionState.conf.fileCommitProtocolClass,
113135
jobId = jobId,
114136
outputPath = outputPath.toString,
115-
dynamicPartitionOverwrite = dynamicPartitionOverwrite)
137+
dynamicPartitionOverwrite = dynamicPartitionOverwrite,
138+
restrictions = dynamicPartitionRestrictions)
116139

117140
val doInsertion = if (mode == SaveMode.Append) {
118141
true

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
2222
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
2323
import org.apache.parquet.hadoop.ParquetOutputCommitter
2424

25+
import org.apache.spark.SparkException
2526
import org.apache.spark.internal.Logging
27+
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
2628
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
2729
import org.apache.spark.sql.internal.SQLConf
2830

@@ -33,10 +35,33 @@ import org.apache.spark.sql.internal.SQLConf
3335
class SQLHadoopMapReduceCommitProtocol(
3436
jobId: String,
3537
path: String,
36-
dynamicPartitionOverwrite: Boolean = false)
38+
dynamicPartitionOverwrite: Boolean = false,
39+
restrictions: Map[String, Object] = Map.empty)
3740
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
3841
with Serializable with Logging {
3942

43+
private val maxDynamicPartitions = restrictions.get(
44+
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS.key) match {
45+
case Some(value) => value.asInstanceOf[Int]
46+
case None => Int.MaxValue
47+
}
48+
49+
private val maxDynamicPartitionsPerTask = restrictions.get(
50+
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK.key) match {
51+
case Some(value) => value.asInstanceOf[Int]
52+
case None => Int.MaxValue
53+
}
54+
55+
private val maxCreatedFilesInDynamicPartition = restrictions.get(
56+
SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key) match {
57+
case Some(value) => value.asInstanceOf[Long]
58+
case None => Long.MaxValue
59+
}
60+
61+
// They are only used in driver
62+
@volatile private var totalPartitions: Set[String] = Set.empty
63+
@volatile private var totalCreatedFiles: Long = 0L
64+
4065
override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
4166
var committer = super.setupCommitter(context)
4267

@@ -85,4 +110,37 @@ class SQLHadoopMapReduceCommitProtocol(
85110
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
86111
committer
87112
}
113+
114+
/**
115+
* Called on the driver after a task commits. This can be used to access task commit messages
116+
* before the job has finished. These same task commit messages will be passed to commitJob()
117+
* if the entire job succeeds.
118+
* Override it to check dynamic partition limitation on driver side.
119+
*/
120+
override def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
121+
logDebug(s"onTaskCommit($taskCommit)")
122+
if (hasValidPath) {
123+
val (_, allPartitionPaths, fileCounter) =
124+
taskCommit.obj.asInstanceOf[(Map[String, String], Set[String], Int)]
125+
val partitionsPerTask = allPartitionPaths.size
126+
if (partitionsPerTask > maxDynamicPartitionsPerTask) {
127+
throw new SparkException(s"Task tried to create $partitionsPerTask dynamic partitions," +
128+
s" which is more than $maxDynamicPartitionsPerTask. To solve this" +
129+
s" try to increase ${SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK.key}")
130+
}
131+
totalPartitions ++= allPartitionPaths
132+
val totalPartitionNum = totalPartitions.size
133+
if (totalPartitionNum > maxDynamicPartitions) {
134+
throw new SparkException(s"Total number of dynamic partitions created is" +
135+
s" $totalPartitionNum, which is more than $maxDynamicPartitions." +
136+
s" To solve this try to increase ${SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS.key}")
137+
}
138+
totalCreatedFiles += fileCounter
139+
if (totalCreatedFiles > maxCreatedFilesInDynamicPartition) {
140+
throw new SparkException(s"Total number of created files now is" +
141+
s" $totalCreatedFiles, which exceeds $maxCreatedFilesInDynamicPartition." +
142+
s" To solve this try to increase ${SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key}")
143+
}
144+
}
145+
}
88146
}

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWrite.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,20 @@ trait FileWrite extends Write {
5959
// Hadoop Configurations are case sensitive.
6060
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
6161
val job = getJobInstance(hadoopConf, path)
62+
63+
val dynamicPartitionRestrictions: Map[String, _] = Map(
64+
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS.key ->
65+
sparkSession.sessionState.conf.maxDynamicPartitions,
66+
SQLConf.DYNAMIC_PARTITION_MAX_PARTITIONS_PER_TASK.key ->
67+
sparkSession.sessionState.conf.maxDynamicPartitionsPerTask,
68+
SQLConf.DYNAMIC_PARTITION_MAX_CREATED_FILES.key ->
69+
sparkSession.sessionState.conf.maxCreatedFilesInDynamicPartition
70+
)
6271
val committer = FileCommitProtocol.instantiate(
6372
sparkSession.sessionState.conf.fileCommitProtocolClass,
6473
jobId = java.util.UUID.randomUUID().toString,
65-
outputPath = paths.head)
74+
outputPath = paths.head,
75+
restrictions = dynamicPartitionRestrictions)
6676
lazy val description =
6777
createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, options.asScala.toMap)
6878

0 commit comments

Comments
 (0)