Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[spark] abstract class StreamFileInputFormat[T]
def setMinPartitions(sc: SparkContext, context: JobContext, minPartitions: Int) {
val defaultMaxSplitBytes = sc.getConf.get(config.FILES_MAX_PARTITION_BYTES)
val openCostInBytes = sc.getConf.get(config.FILES_OPEN_COST_IN_BYTES)
val defaultParallelism = sc.defaultParallelism
val defaultParallelism = Math.max(sc.defaultParallelism, minPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If sc.defaultParallelism < 2, and minParititions is not set in BinaryFileRDD, then previously defaultParallelism shall be the same as sc.defaultParallelism, and after the change it will be 2. Have you already consider this case and feel it's right behavior change to make?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to pass in the minPartitions to use this method, what do you mean minParititions is not set?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I metioned BinaryFileRDD not this method, you can check the code to see how it handles the default value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BinaryFileRDD will set minPartitions, which will either be defaultMinPartitions, or the values you can set via binaryFiles(path, minPartitions) method. Eventually, this minPartitions value will be passed to setMinPartitions() method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a test case; otherwise, we could hit the same issue again.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, it is easy to add such a test case. We can even test the behaviors of the boundary cases. cc @srowen @HyukjinKwon @MaxGekk @jiangxb1987

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's hard to test, technically, because setMinPartitions is only a hint. In the case of binaryFiles we know it will put a hard limit on the number of partitions, but it isn't true of other implementations. We can still make a simple test for all of these, it just may be asserting behavior that could change in the future in Hadoop, though I strongly doubt it would.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it is hard to test. I appreciate If anyone can give me some hints of how to do these (how to verify and where to put my test cases).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind following up with a test that just asserts that asking for, say, 20 partitions results in 20 partitions? This is technically too specific as a test, but is probably fine for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the codes, you can see the calculation is just the intermediate result and this method won't return any value. Checking the split size does not make sense for this test case because it depends on multiple variables and this is just one of them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

      sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")
        .set(config.FILES_OPEN_COST_IN_BYTES.key, "0")
        .set("spark.default.parallelism", "1"))

      println(sc.binaryFiles(dirpath1, minPartitions = 50).getNumPartitions)
      println(sc.binaryFiles(dirpath1, minPartitions = 1).getNumPartitions)

It is not hard to verify whether the parameter minPartitions takes an effect. Currently, the description of this parameter is not clear. We need to document it clear which factors impact the actual number of partitions; otherwise, users will not understand how to use it.

val files = listStatus(context).asScala
val totalBytes = files.filterNot(_.isDirectory).map(_.getLen + openCostInBytes).sum
val bytesPerCore = totalBytes / defaultParallelism
Expand Down