@@ -203,9 +203,9 @@ def cache(self):
203203
204204 def persist (self , storageLevel ):
205205 """
206- Set this RDD's storage level to persist its values across operations after the first time
207- it is computed. This can only be used to assign a new storage level if the RDD does not
208- have a storage level set yet.
206+ Set this RDD's storage level to persist its values across operations
207+ after the first time it is computed. This can only be used to assign
208+ a new storage level if the RDD does not have a storage level set yet.
209209 """
210210 self .is_cached = True
211211 javaStorageLevel = self .ctx ._getJavaStorageLevel (storageLevel )
@@ -214,7 +214,8 @@ def persist(self, storageLevel):
214214
215215 def unpersist (self ):
216216 """
217- Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
217+ Mark the RDD as non-persistent, and remove all blocks for it from
218+ memory and disk.
218219 """
219220 self .is_cached = False
220221 self ._jrdd .unpersist ()
@@ -358,7 +359,8 @@ def sample(self, withReplacement, fraction, seed=None):
358359 # this is ported from scala/spark/RDD.scala
359360 def takeSample (self , withReplacement , num , seed = None ):
360361 """
361- Return a fixed-size sampled subset of this RDD (currently requires numpy).
362+ Return a fixed-size sampled subset of this RDD (currently requires
363+ numpy).
362364
363365 >>> sc.parallelize(range(0, 10)).takeSample(True, 10, 1) #doctest: +SKIP
364366 [4, 2, 1, 8, 2, 7, 0, 4, 1, 4]
@@ -401,20 +403,24 @@ def takeSample(self, withReplacement, num, seed=None):
401403 @staticmethod
402404 def _computeFractionForSampleSize (sampleSizeLowerBound , total , withReplacement ):
403405 """
404- Returns a sampling rate that guarantees a sample of size >= sampleSizeLowerBound 99.99% of
405- the time.
406+ Returns a sampling rate that guarantees a sample of
407+ size >= sampleSizeLowerBound 99.99% of the time.
406408
407409 How the sampling rate is determined:
408- Let p = num / total, where num is the sample size and total is the total number of
409- datapoints in the RDD. We're trying to compute q > p such that
410- - when sampling with replacement, we're drawing each datapoint with prob_i ~ Pois(q),
411- where we want to guarantee Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
412- total), i.e. the failure rate of not having a sufficiently large sample < 0.0001.
413- Setting q = p + 5 * sqrt(p/total) is sufficient to guarantee 0.9999 success rate for
414- num > 12, but we need a slightly larger q (9 empirically determined).
415- - when sampling without replacement, we're drawing each datapoint with prob_i
416- ~ Binomial(total, fraction) and our choice of q guarantees 1-delta, or 0.9999 success
417- rate, where success rate is defined the same as in sampling with replacement.
410+ Let p = num / total, where num is the sample size and total is the
411+ total number of data points in the RDD. We're trying to compute
412+ q > p such that
413+ - when sampling with replacement, we're drawing each data point
414+ with prob_i ~ Pois(q), where we want to guarantee
415+ Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
416+ total), i.e. the failure rate of not having a sufficiently large
417+ sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
418+ to guarantee 0.9999 success rate for num > 12, but we need a
419+ slightly larger q (9 empirically determined).
420+ - when sampling without replacement, we're drawing each data point
421+ with prob_i ~ Binomial(total, fraction) and our choice of q
422+ guarantees 1-delta, or 0.9999 success rate, where success rate is
423+ defined the same as in sampling with replacement.
418424 """
419425 fraction = float (sampleSizeLowerBound ) / total
420426 if withReplacement :
@@ -449,8 +455,8 @@ def union(self, other):
449455
450456 def intersection (self , other ):
451457 """
452- Return the intersection of this RDD and another one. The output will not
453- contain any duplicate elements, even if the input RDDs did.
458+ Return the intersection of this RDD and another one. The output will
459+ not contain any duplicate elements, even if the input RDDs did.
454460
455461 Note that this method performs a shuffle internally.
456462
@@ -692,8 +698,8 @@ def aggregate(self, zeroValue, seqOp, combOp):
692698 modify C{t2}.
693699
694700 The first function (seqOp) can return a different result type, U, than
695- the type of this RDD. Thus, we need one operation for merging a T into an U
696- and one operation for merging two U
701+ the type of this RDD. Thus, we need one operation for merging a T into
702+ an U and one operation for merging two U
697703
698704 >>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
699705 >>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
@@ -786,8 +792,9 @@ def stdev(self):
786792
787793 def sampleStdev (self ):
788794 """
789- Compute the sample standard deviation of this RDD's elements (which corrects for bias in
790- estimating the standard deviation by dividing by N-1 instead of N).
795+ Compute the sample standard deviation of this RDD's elements (which
796+ corrects for bias in estimating the standard deviation by dividing by
797+ N-1 instead of N).
791798
792799 >>> sc.parallelize([1, 2, 3]).sampleStdev()
793800 1.0
@@ -796,8 +803,8 @@ def sampleStdev(self):
796803
797804 def sampleVariance (self ):
798805 """
799- Compute the sample variance of this RDD's elements (which corrects for bias in
800- estimating the variance by dividing by N-1 instead of N).
806+ Compute the sample variance of this RDD's elements (which corrects
807+ for bias in estimating the variance by dividing by N-1 instead of N).
801808
802809 >>> sc.parallelize([1, 2, 3]).sampleVariance()
803810 1.0
@@ -849,8 +856,8 @@ def merge(a, b):
849856
850857 def takeOrdered (self , num , key = None ):
851858 """
852- Get the N elements from a RDD ordered in ascending order or as specified
853- by the optional key function.
859+ Get the N elements from a RDD ordered in ascending order or as
860+ specified by the optional key function.
854861
855862 >>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
856863 [1, 2, 3, 4, 5, 6]
@@ -939,8 +946,9 @@ def first(self):
939946
940947 def saveAsPickleFile (self , path , batchSize = 10 ):
941948 """
942- Save this RDD as a SequenceFile of serialized objects. The serializer used is
943- L{pyspark.serializers.PickleSerializer}, default batch size is 10.
949+ Save this RDD as a SequenceFile of serialized objects. The serializer
950+ used is L{pyspark.serializers.PickleSerializer}, default batch size
951+ is 10.
944952
945953 >>> tmpFile = NamedTemporaryFile(delete=True)
946954 >>> tmpFile.close()
@@ -1208,9 +1216,10 @@ def _mergeCombiners(iterator):
12081216
12091217 def foldByKey (self , zeroValue , func , numPartitions = None ):
12101218 """
1211- Merge the values for each key using an associative function "func" and a neutral "zeroValue"
1212- which may be added to the result an arbitrary number of times, and must not change
1213- the result (e.g., 0 for addition, or 1 for multiplication.).
1219+ Merge the values for each key using an associative function "func"
1220+ and a neutral "zeroValue" which may be added to the result an
1221+ arbitrary number of times, and must not change the result
1222+ (e.g., 0 for addition, or 1 for multiplication.).
12141223
12151224 >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
12161225 >>> from operator import add
@@ -1227,8 +1236,8 @@ def groupByKey(self, numPartitions=None):
12271236 Hash-partitions the resulting RDD with into numPartitions partitions.
12281237
12291238 Note: If you are grouping in order to perform an aggregation (such as a
1230- sum or average) over each key, using reduceByKey will provide much better
1231- performance.
1239+ sum or average) over each key, using reduceByKey will provide much
1240+ better performance.
12321241
12331242 >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
12341243 >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
@@ -1288,8 +1297,8 @@ def groupWith(self, other):
12881297 def cogroup (self , other , numPartitions = None ):
12891298 """
12901299 For each key k in C{self} or C{other}, return a resulting RDD that
1291- contains a tuple with the list of values for that key in C{self} as well
1292- as C{other}.
1300+ contains a tuple with the list of values for that key in C{self} as
1301+ well as C{other}.
12931302
12941303 >>> x = sc.parallelize([("a", 1), ("b", 4)])
12951304 >>> y = sc.parallelize([("a", 2)])
@@ -1300,8 +1309,8 @@ def cogroup(self, other, numPartitions=None):
13001309
13011310 def subtractByKey (self , other , numPartitions = None ):
13021311 """
1303- Return each (key, value) pair in C{self} that has no pair with matching key
1304- in C{other}.
1312+ Return each (key, value) pair in C{self} that has no pair with matching
1313+ key in C{other}.
13051314
13061315 >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
13071316 >>> y = sc.parallelize([("a", 3), ("c", None)])
@@ -1339,10 +1348,10 @@ def repartition(self, numPartitions):
13391348 """
13401349 Return a new RDD that has exactly numPartitions partitions.
13411350
1342- Can increase or decrease the level of parallelism in this RDD. Internally, this uses
1343- a shuffle to redistribute data.
1344- If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
1345- which can avoid performing a shuffle.
1351+ Can increase or decrease the level of parallelism in this RDD.
1352+ Internally, this uses a shuffle to redistribute data.
1353+ If you are decreasing the number of partitions in this RDD, consider
1354+ using `coalesce`, which can avoid performing a shuffle.
13461355 >>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
13471356 >>> sorted(rdd.glom().collect())
13481357 [[1], [2, 3], [4, 5], [6, 7]]
@@ -1367,9 +1376,10 @@ def coalesce(self, numPartitions, shuffle=False):
13671376
13681377 def zip (self , other ):
13691378 """
1370- Zips this RDD with another one, returning key-value pairs with the first element in each RDD
1371- second element in each RDD, etc. Assumes that the two RDDs have the same number of
1372- partitions and the same number of elements in each partition (e.g. one was made through
1379+ Zips this RDD with another one, returning key-value pairs with the
1380+ first element in each RDD second element in each RDD, etc. Assumes
1381+ that the two RDDs have the same number of partitions and the same
1382+ number of elements in each partition (e.g. one was made through
13731383 a map on the other).
13741384
13751385 >>> x = sc.parallelize(range(0,5))
0 commit comments