Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
1441977
SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
dorx May 29, 2014
ffea61a
SPARK-1939: Refactor takeSample method in RDD
dorx May 30, 2014
7cab53a
fixed import bug in rdd.py
dorx Jun 2, 2014
e3fd6a6
Merge branch 'master' into takeSample
dorx Jun 2, 2014
9ee94ee
[SPARK-2082] stratified sampling in PairRDDFunctions that guarantees …
dorx Jun 9, 2014
1d413ce
fixed checkstyle issues
dorx Jun 9, 2014
7e1a481
changed the permission on SamplingUtil
dorx Jun 9, 2014
46f6c8c
fixed the NPE caused by closures being cleaned before being passed in…
dorx Jun 10, 2014
50581fc
added a TODO for logging in python
dorx Jun 12, 2014
7327611
merge master
dorx Jun 13, 2014
9e74ab5
Separated out most of the logic in sampleByKey
dorx Jun 17, 2014
90d94c0
merge master
dorx Jun 17, 2014
0214a76
cleanUp
dorx Jun 18, 2014
944a10c
[SPARK-2145] Add lower bound on sampling rate
dorx Jun 19, 2014
1fe1cff
Changed fractionByKey to a map to enable arg check
dorx Jun 19, 2014
bd9dc6e
unit bug and style violation fixed
dorx Jun 19, 2014
4ad516b
remove unused imports from PairRDDFunctions
dorx Jun 20, 2014
254e03c
minor fixes and Java API.
dorx Jul 3, 2014
6b5b10b
Merge branch 'master' into stratified
dorx Jul 3, 2014
ee9d260
addressed reviewer comments
dorx Jul 6, 2014
bbfb8c9
Merge branch 'master' into stratified
dorx Jul 6, 2014
9884a9f
style fix
dorx Jul 8, 2014
680b677
use mapPartitionWithIndex instead
dorx Jul 8, 2014
a2bf756
Merge branch 'master' into stratified
dorx Jul 8, 2014
a10e68d
style fix
dorx Jul 9, 2014
f4c21f3
Reviewer comments
dorx Jul 15, 2014
eecee5f
Merge branch 'master' into stratified
dorx Jul 15, 2014
b3013a4
move math3 back to test scope
mengxr Jul 25, 2014
b223529
use approx bounds for poisson
mengxr Jul 25, 2014
ea7d27f
merge master
dorx Jul 28, 2014
17a381b
fixed a merge issue and a failed unit
dorx Jul 28, 2014
eaf5771
bug fixes.
dorx Jul 28, 2014
245439e
moved minSamplingRate to getUpperBound
dorx Jul 29, 2014
616e55c
merge master
dorx Aug 9, 2014
555a3f9
separate out sampleByKeyExact as its own API
dorx Aug 9, 2014
e990325
Merge branch 'master' into stratified
dorx Aug 9, 2014
2948aae
remove unrelated changes
dorx Aug 9, 2014
0ad97b2
reviewer comments.
dorx Aug 10, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 31 additions & 37 deletions core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,68 +133,62 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
* RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
* the RDD to create a sample size that's exactly equal to the sum of
* `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
* RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* Use Utils.random.nextLong as the default seed for the random number generator
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
exact: Boolean): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)
fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, Utils.random.nextLong)

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
* ::Experimental::
* Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
* math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
* This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
* create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
* over all key values with a 99.99% confidence. When sampling without replacement, we need one
* additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
* two additional passes.
*/
def sampleByKey(withReplacement: Boolean,
@Experimental
def sampleByKeyExact(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, seed)
new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, seed))

/**
* Return a subset of this RDD sampled by key (via stratified sampling).
* ::Experimental::
* Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
* math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* Produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
* simple random sampling.
* This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
* create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
* over all key values with a 99.99% confidence. When sampling without replacement, we need one
* additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
* two additional passes.
*
* Use Utils.random.nextLong as the default seed for the random number generator
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, false, Utils.random.nextLong)
@Experimental
def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong)

/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
Expand Down
51 changes: 37 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,33 +197,56 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
* `fractions`, a key to sampling rate map.
*
* If `exact` is set to false, create the sample via simple random sampling, with one pass
* over the RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values; otherwise, use
* additional passes over the RDD to create a sample size that's exactly equal to the sum of
* math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
* without replacement, we need one additional pass over the RDD to guarantee sample size;
* when sampling with replacement, we need two additional passes.
* `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
* RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
* @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
* @return RDD containing the sampled subset
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
exact: Boolean = false,
seed: Long = Utils.random.nextLong): RDD[(K, V)]= {
seed: Long = Utils.random.nextLong): RDD[(K, V)] = {

require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}

/**
* ::Experimental::
* Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
* math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
* This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
* create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
* over all key values with a 99.99% confidence. When sampling without replacement, we need one
* additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
* two additional passes.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
* @return RDD containing the sampled subset
*/
@Experimental
def sampleByKeyExact(withReplacement: Boolean,
fractions: Map[K, Double],
seed: Long = Utils.random.nextLong): RDD[(K, V)] = {

require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")

val samplingFunc = if (withReplacement) {
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, exact, seed)
StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, true, seed)
} else {
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, exact, seed)
StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, true, seed)
}
self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
}
Expand Down
20 changes: 18 additions & 2 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1239,12 +1239,28 @@ public Tuple2<Integer, Integer> call(Integer i) {
Assert.assertTrue(worCounts.size() == 2);
Assert.assertTrue(worCounts.get(0) > 0);
Assert.assertTrue(worCounts.get(1) > 0);
JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKey(true, fractions, true, 1L);
}

@Test
@SuppressWarnings("unchecked")
public void sampleByKeyExact() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 2, 1);
}
});
Map<Integer, Object> fractions = Maps.newHashMap();
fractions.put(0, 0.5);
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey();
Assert.assertTrue(wrExactCounts.size() == 2);
Assert.assertTrue(wrExactCounts.get(0) == 2);
Assert.assertTrue(wrExactCounts.get(1) == 4);
JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKey(false, fractions, true, 1L);
JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey();
Assert.assertTrue(worExactCounts.size() == 2);
Assert.assertTrue(worExactCounts.get(0) == 2);
Expand Down
Loading