From cf2c4665ed695a72818ca5c674f4c791013d4f2e Mon Sep 17 00:00:00 2001 From: vinodkc Date: Sat, 12 Sep 2015 13:26:19 +0530 Subject: [PATCH 1/8] wrapped in scope --- .../main/scala/org/apache/spark/rdd/RDD.scala | 63 +++++++++---------- 1 file changed, 28 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 7dd2bc5d7cd7..bd0f34e51022 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -469,50 +469,43 @@ abstract class RDD[T: ClassTag]( * @param seed seed for the random number generator * @return sample of specified size in an array */ - // TODO: rewrite this without return statements so we can wrap it in a scope def takeSample( withReplacement: Boolean, num: Int, - seed: Long = Utils.random.nextLong): Array[T] = { + seed: Long = Utils.random.nextLong): Array[T] = withScope { val numStDev = 10.0 if (num < 0) { throw new IllegalArgumentException("Negative number of elements requested") - } else if (num == 0) { - return new Array[T](0) - } - - val initialCount = this.count() - if (initialCount == 0) { - return new Array[T](0) - } - - val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt - if (num > maxSampleSize) { + } else if (num > (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt)) { throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " + - s"$numStDev * math.sqrt(Int.MaxValue)") - } - - val rand = new Random(seed) - if (!withReplacement && num >= initialCount) { - return Utils.randomizeInPlace(this.collect(), rand) - } - - val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, - withReplacement) - - var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() - - // If the first sample didn't turn out large enough, keep trying to take samples; - // this shouldn't happen often because we use a big multiplier for the initial size - var numIters = 0 - while (samples.length < num) { - logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") - samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() - numIters += 1 + s"$numStDev * math.sqrt(Int.MaxValue)") + } else { + val initialCount = this.count() + if (num == 0 || initialCount == 0 ) { + new Array[T](0) + } else { + val rand = new Random(seed) + if (!withReplacement && num >= initialCount) { + Utils.randomizeInPlace(this.collect(), rand) + } else { + val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, + withReplacement) + + var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + + // If the first sample didn't turn out large enough, keep trying to take samples; + // this shouldn't happen often because we use a big multiplier for the initial size + var numIters = 0 + while (samples.length < num) { + logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") + samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + numIters += 1 + } + Utils.randomizeInPlace(samples, rand).take(num) + } + } } - - Utils.randomizeInPlace(samples, rand).take(num) } /** From c82b4f90732c5a760499c4f7ce8fe89262cb279b Mon Sep 17 00:00:00 2001 From: vinodkc Date: Sun, 13 Sep 2015 09:26:06 +0530 Subject: [PATCH 2/8] Added assert instead of IAE --- .../main/scala/org/apache/spark/rdd/RDD.scala | 49 +++++++++---------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bd0f34e51022..fb013556ef85 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -475,35 +475,32 @@ abstract class RDD[T: ClassTag]( seed: Long = Utils.random.nextLong): Array[T] = withScope { val numStDev = 10.0 - if (num < 0) { - throw new IllegalArgumentException("Negative number of elements requested") - } else if (num > (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt)) { - throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " + - s"$numStDev * math.sqrt(Int.MaxValue)") + require(num > 0, "Negative number of elements requested") + require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt), + "Cannot support a sample size > Int.MaxValue - " + + s"$numStDev * math.sqrt(Int.MaxValue)") + + val initialCount = this.count() + if (num == 0 || initialCount == 0 ) { + new Array[T](0) } else { - val initialCount = this.count() - if (num == 0 || initialCount == 0 ) { - new Array[T](0) + val rand = new Random(seed) + if (!withReplacement && num >= initialCount) { + Utils.randomizeInPlace(this.collect(), rand) } else { - val rand = new Random(seed) - if (!withReplacement && num >= initialCount) { - Utils.randomizeInPlace(this.collect(), rand) - } else { - val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, - withReplacement) - - var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() - - // If the first sample didn't turn out large enough, keep trying to take samples; - // this shouldn't happen often because we use a big multiplier for the initial size - var numIters = 0 - while (samples.length < num) { - logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") - samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() - numIters += 1 - } - Utils.randomizeInPlace(samples, rand).take(num) + val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, + withReplacement) + var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + + // If the first sample didn't turn out large enough, keep trying to take samples; + // this shouldn't happen often because we use a big multiplier for the initial size + var numIters = 0 + while (samples.length < num) { + logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") + samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + numIters += 1 } + Utils.randomizeInPlace(samples, rand).take(num) } } } From afe9396f09353539c6fe9371522373994f8d584a Mon Sep 17 00:00:00 2001 From: vinodkc Date: Sun, 13 Sep 2015 09:28:36 +0530 Subject: [PATCH 3/8] fixed condition --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index fb013556ef85..78b3dce548d3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -475,7 +475,7 @@ abstract class RDD[T: ClassTag]( seed: Long = Utils.random.nextLong): Array[T] = withScope { val numStDev = 10.0 - require(num > 0, "Negative number of elements requested") + require(num >= 0, "Negative number of elements requested") require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt), "Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)") From c06c6e527a58a16ac26759b2ee82b8981e4521e3 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Sun, 13 Sep 2015 15:03:47 +0530 Subject: [PATCH 4/8] Removed space --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 78b3dce548d3..096e220a5e1f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -478,7 +478,7 @@ abstract class RDD[T: ClassTag]( require(num >= 0, "Negative number of elements requested") require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt), "Cannot support a sample size > Int.MaxValue - " + - s"$numStDev * math.sqrt(Int.MaxValue)") + s"$numStDev * math.sqrt(Int.MaxValue)") val initialCount = this.count() if (num == 0 || initialCount == 0 ) { From 7fa497d22debec6c8c951cd8ed66961e95ae8801 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Tue, 15 Sep 2015 06:09:06 +0530 Subject: [PATCH 5/8] Update RDD.scala --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 096e220a5e1f..8bd8bad1c7e2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -481,7 +481,7 @@ abstract class RDD[T: ClassTag]( s"$numStDev * math.sqrt(Int.MaxValue)") val initialCount = this.count() - if (num == 0 || initialCount == 0 ) { + if (num == 0 || initialCount == 0) { new Array[T](0) } else { val rand = new Random(seed) From b37778e24a8424f023f7625872157f6049cba459 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Tue, 15 Sep 2015 06:26:53 +0530 Subject: [PATCH 6/8] Fixed Review Comment --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 8bd8bad1c7e2..60a1fe1fd4db 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -480,10 +480,11 @@ abstract class RDD[T: ClassTag]( "Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)") - val initialCount = this.count() - if (num == 0 || initialCount == 0) { + + if (num == 0 || this.count() == 0) { new Array[T](0) } else { + val initialCount = this.count() val rand = new Random(seed) if (!withReplacement && num >= initialCount) { Utils.randomizeInPlace(this.collect(), rand) From 4e6ce62b2dfc24e92074c618ab78b883d41d4dd6 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Tue, 15 Sep 2015 06:29:41 +0530 Subject: [PATCH 7/8] Update RDD.scala --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 60a1fe1fd4db..d98f845ae910 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -480,7 +480,6 @@ abstract class RDD[T: ClassTag]( "Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)") - if (num == 0 || this.count() == 0) { new Array[T](0) } else { From 70de2b4e23a6578adb493f97db356931184c1495 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Tue, 15 Sep 2015 11:35:44 +0530 Subject: [PATCH 8/8] Handled review comments --- .../main/scala/org/apache/spark/rdd/RDD.scala | 37 +++++++++++-------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d98f845ae910..8258ca02f9e0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -480,27 +480,32 @@ abstract class RDD[T: ClassTag]( "Cannot support a sample size > Int.MaxValue - " + s"$numStDev * math.sqrt(Int.MaxValue)") - if (num == 0 || this.count() == 0) { + if (num == 0) { new Array[T](0) } else { val initialCount = this.count() - val rand = new Random(seed) - if (!withReplacement && num >= initialCount) { - Utils.randomizeInPlace(this.collect(), rand) + if(initialCount ==0) + { + new Array[T](0) } else { - val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, - withReplacement) - var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() - - // If the first sample didn't turn out large enough, keep trying to take samples; - // this shouldn't happen often because we use a big multiplier for the initial size - var numIters = 0 - while (samples.length < num) { - logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") - samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() - numIters += 1 + val rand = new Random(seed) + if (!withReplacement && num >= initialCount) { + Utils.randomizeInPlace(this.collect(), rand) + } else { + val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount, + withReplacement) + var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + + // If the first sample didn't turn out large enough, keep trying to take samples; + // this shouldn't happen often because we use a big multiplier for the initial size + var numIters = 0 + while (samples.length < num) { + logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters") + samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + numIters += 1 + } + Utils.randomizeInPlace(samples, rand).take(num) } - Utils.randomizeInPlace(samples, rand).take(num) } } }