Skip to content

Commit b171ec3

Browse files
committed
fixed pep8 violation
1 parent f198d14 commit b171ec3

File tree

6 files changed

+481
-10
lines changed

6 files changed

+481
-10
lines changed

python/pyspark/streaming/context.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ class StreamingContext(object):
3333
"""
3434

3535
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
36-
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
37-
gateway=None, sparkContext=None, duration=None):
36+
environment=None, batchSize=1024, serializer=PickleSerializer(), conf=None,
37+
gateway=None, sparkContext=None, duration=None):
3838
"""
3939
Create a new StreamingContext. At least the master and app name and duration
4040
should be set, either through the named parameters here or through C{conf}.
@@ -63,8 +63,8 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
6363
if sparkContext is None:
6464
# Create the Python Sparkcontext
6565
self._sc = SparkContext(master=master, appName=appName, sparkHome=sparkHome,
66-
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
67-
serializer=serializer, conf=conf, gateway=gateway)
66+
pyFiles=pyFiles, environment=environment, batchSize=batchSize,
67+
serializer=serializer, conf=conf, gateway=gateway)
6868
else:
6969
self._sc = sparkContext
7070

@@ -107,7 +107,7 @@ def awaitTermination(self, timeout=None):
107107
else:
108108
self._jssc.awaitTermination(timeout)
109109

110-
#TODO: add storageLevel
110+
# TODO: add storageLevel
111111
def socketTextStream(self, hostname, port):
112112
"""
113113
Create an input from TCP source hostname:port. Data is received using

python/pyspark/streaming/dstream.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,8 @@ def filter(self, f):
8181
"""
8282
Return a new DStream containing only the elements that satisfy predicate.
8383
"""
84-
def func(iterator): return ifilter(f, iterator)
84+
def func(iterator):
85+
return ifilter(f, iterator)
8586
return self.mapPartitions(func)
8687

8788
def flatMap(self, f, preservesPartitioning=False):
@@ -136,7 +137,7 @@ def reduceByKey(self, func, numPartitions=None):
136137
return self.combineByKey(lambda x: x, func, func, numPartitions)
137138

138139
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
139-
numPartitions = None):
140+
numPartitions=None):
140141
"""
141142
Count the number of elements for each key, and return the result to the
142143
master as a dictionary
@@ -159,7 +160,7 @@ def combineLocally(iterator):
159160
def _mergeCombiners(iterator):
160161
combiners = {}
161162
for (k, v) in iterator:
162-
if not k in combiners:
163+
if k not in combiners:
163164
combiners[k] = v
164165
else:
165166
combiners[k] = mergeCombiners(combiners[k], v)
@@ -194,7 +195,7 @@ def add_shuffle_key(split, iterator):
194195
keyed._bypass_serializer = True
195196
with _JavaStackTrace(self.ctx) as st:
196197
partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
197-
id(partitionFunc))
198+
id(partitionFunc))
198199
jdstream = self.ctx._jvm.PythonPairwiseDStream(keyed._jdstream.dstream(),
199200
partitioner).asJavaDStream()
200201
dstream = DStream(jdstream, self._ssc, BatchedSerializer(outputSerializer))

python/pyspark/streaming/duration.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,7 @@ def _is_duration(self, instance):
333333
if not isinstance(instance, Duration):
334334
raise TypeError("This should be Duration")
335335

336+
336337
def Milliseconds(milliseconds):
337338
"""
338339
Helper function that creates instance of [[pysparkstreaming.duration]] representing
@@ -346,6 +347,7 @@ def Milliseconds(milliseconds):
346347
"""
347348
return Duration(milliseconds)
348349

350+
349351
def Seconds(seconds):
350352
"""
351353
Helper function that creates instance of [[pysparkstreaming.duration]] representing
@@ -359,6 +361,7 @@ def Seconds(seconds):
359361
"""
360362
return Duration(seconds * 1000)
361363

364+
362365
def Minutes(minutes):
363366
"""
364367
Helper function that creates instance of [[pysparkstreaming.duration]] representing

0 commit comments

Comments
 (0)