Skip to content
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ private[spark] abstract class StreamFileInputFormat[T]
* which is set through setMaxSplitSize
*/
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val defaultMaxSplitBytes = Math.max(
sc.getConf.get(config.FILES_MAX_PARTITION_BYTES), minPartitions)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, shouldn't minPartitions be used like this?

val defaultParallelism = Math.max(sc.defaultParallelism, if (minPartitions == 0) 1 else minPartitions)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the use case when you need to take into account minPartitions. By default, FILES_MAX_PARTITION_BYTES is 128MB. Let's say it is even set to 1000, and minPartitions equals to 10 000. What is the reason to set the max size of splits in bytes to the min number of partition. Why should bigger number of partitions require bigger split size? Could you add more details to the PR description, please.

val files = listStatus(context).asScala
Expand Down