1+ #
2+ # Licensed to the Apache Software Foundation (ASF) under one or more
3+ # contributor license agreements. See the NOTICE file distributed with
4+ # this work for additional information regarding copyright ownership.
5+ # The ASF licenses this file to You under the Apache License, Version 2.0
6+ # (the "License"); you may not use this file except in compliance with
7+ # the License. You may obtain a copy of the License at
8+ #
9+ # http://www.apache.org/licenses/LICENSE-2.0
10+ #
11+ # Unless required by applicable law or agreed to in writing, software
12+ # distributed under the License is distributed on an "AS IS" BASIS,
13+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ # See the License for the specific language governing permissions and
15+ # limitations under the License.
16+ #
17+
118from collections import defaultdict
219from itertools import chain , ifilter , imap
320import operator
@@ -20,11 +37,13 @@ def __init__(self, jdstream, ssc, jrdd_deserializer):
2037
2138 def count (self ):
2239 """
40+ Return a new DStream which contains the number of elements in this DStream.
2341 """
2442 return self ._mapPartitions (lambda i : [sum (1 for _ in i )])._sum ()
2543
2644 def _sum (self ):
2745 """
46+ Add up the elements in this DStream.
2847 """
2948 return self ._mapPartitions (lambda x : [sum (x )]).reduce (operator .add )
3049
@@ -41,7 +60,7 @@ def print_(self):
4160
4261 def filter (self , f ):
4362 """
44- Return DStream containing only the elements that satisfy predicate.
63+ Return a new DStream containing only the elements that satisfy predicate.
4564 """
4665 def func (iterator ): return ifilter (f , iterator )
4766 return self ._mapPartitions (func )
@@ -56,7 +75,7 @@ def func(s, iterator): return chain.from_iterable(imap(f, iterator))
5675
5776 def map (self , f ):
5877 """
59- Return DStream by applying a function to each element of DStream.
78+ Return a new DStream by applying a function to each element of DStream.
6079 """
6180 def func (iterator ): return imap (f , iterator )
6281 return self ._mapPartitions (func )
@@ -71,12 +90,14 @@ def func(s, iterator): return f(iterator)
7190 def _mapPartitionsWithIndex (self , f , preservesPartitioning = False ):
7291 """
7392 Return a new DStream by applying a function to each partition of this DStream,
74- While tracking the index of the original partition.
93+ while tracking the index of the original partition.
7594 """
7695 return PipelinedDStream (self , f , preservesPartitioning )
7796
7897 def reduce (self , func ):
7998 """
99+ Return a new DStream by reduceing the elements of this RDD using the specified
100+ commutative and associative binary operator.
80101 """
81102 return self .map (lambda x : (None , x )).reduceByKey (func , 1 ).map (lambda x : x [1 ])
82103
@@ -267,4 +288,3 @@ def _jdstream(self):
267288
268289 def _is_pipelinable (self ):
269290 return not (self .is_cached )
270-
0 commit comments