Skip to content

Commit f0ea311

Browse files
committed
clean up code
1 parent 171edeb commit f0ea311

File tree

4 files changed

+63
-108
lines changed

4 files changed

+63
-108
lines changed

python/pyspark/streaming/context.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,21 +72,19 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
7272
# Callback sever is need only by SparkStreming; therefore the callback sever
7373
# is started in StreamingContext.
7474
SparkContext._gateway.restart_callback_server()
75-
self._clean_up_trigger()
75+
self._set_clean_up_trigger()
7676
self._jvm = self._sc._jvm
7777
self._jssc = self._initialize_context(self._sc._jsc, duration._jduration)
7878

7979
# Initialize StremaingContext in function to allow subclass specific initialization
8080
def _initialize_context(self, jspark_context, jduration):
8181
return self._jvm.JavaStreamingContext(jspark_context, jduration)
8282

83-
def _clean_up_trigger(self):
83+
def _set_clean_up_trigger(self):
8484
"""Kill py4j callback server properly using signal lib"""
8585

8686
def clean_up_handler(*args):
8787
# Make sure stop callback server.
88-
# This need improvement how to terminate callback sever properly.
89-
SparkContext._gateway._shutdown_callback_server()
9088
SparkContext._gateway.shutdown()
9189
sys.exit(0)
9290

@@ -132,18 +130,15 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
132130
Stop the execution of the streams immediately (does not wait for all received data
133131
to be processed).
134132
"""
135-
136133
try:
137134
self._jssc.stop(stopSparkContext, stopGraceFully)
138135
finally:
139-
# Stop Callback server
140-
SparkContext._gateway._shutdown_callback_server()
141136
SparkContext._gateway.shutdown()
142137

143138
def _testInputStream(self, test_inputs, numSlices=None):
144139
"""
145140
This function is only for unittest.
146-
It requires a sequence as input, and returns the i_th element at the i_th batch
141+
It requires a list as input, and returns the i_th element at the i_th batch
147142
under manual clock.
148143
"""
149144
test_rdds = list()

python/pyspark/streaming/dstream.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def _defaultReducePartitions(self):
201201
"""
202202
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
203203
If spark.default.parallelism is set, then we'll use the value from SparkContext
204-
defaultParallelism, otherwise we'll use the number of partitions in this RDD.
204+
defaultParallelism, otherwise we'll use the number of partitions in this RDD
205205
206206
This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
207207
the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
@@ -216,7 +216,8 @@ def getNumPartitions(self):
216216
"""
217217
Return the number of partitions in RDD
218218
"""
219-
# TODO: remove hardcoding. RDD has NumPartitions but DStream does not have.
219+
# TODO: remove hardcoding. RDD has NumPartitions. How do we get the number of partition
220+
# through DStream?
220221
return 2
221222

222223
def foreachRDD(self, func):
@@ -236,6 +237,10 @@ def pyprint(self):
236237
operator, so this DStream will be registered as an output stream and there materialized.
237238
"""
238239
def takeAndPrint(rdd, time):
240+
"""
241+
Closure to take element from RDD and print first 10 elements.
242+
This closure is called by py4j callback server.
243+
"""
239244
taken = rdd.take(11)
240245
print "-------------------------------------------"
241246
print "Time: %s" % (str(time))
@@ -300,17 +305,11 @@ def checkpoint(self, interval):
300305
Mark this DStream for checkpointing. It will be saved to a file inside the
301306
checkpoint directory set with L{SparkContext.setCheckpointDir()}
302307
303-
I am not sure this part in DStream
304-
and
305-
all references to its parent RDDs will be removed. This function must
306-
be called before any job has been executed on this RDD. It is strongly
307-
recommended that this RDD is persisted in memory, otherwise saving it
308-
on a file will require recomputation.
309-
310-
interval must be pysprak.streaming.duration
308+
@param interval: Time interval after which generated RDD will be checkpointed
309+
interval has to be pyspark.streaming.duration.Duration
311310
"""
312311
self.is_checkpointed = True
313-
self._jdstream.checkpoint(interval)
312+
self._jdstream.checkpoint(interval._jduration)
314313
return self
315314

316315
def groupByKey(self, numPartitions=None):
@@ -363,6 +362,10 @@ def saveAsTextFiles(self, prefix, suffix=None):
363362
"""
364363

365364
def saveAsTextFile(rdd, time):
365+
"""
366+
Closure to save element in RDD in DStream as Pickled data in file.
367+
This closure is called by py4j callback server.
368+
"""
366369
path = rddToFileName(prefix, suffix, time)
367370
rdd.saveAsTextFile(path)
368371

@@ -376,6 +379,10 @@ def saveAsPickleFiles(self, prefix, suffix=None):
376379
"""
377380

378381
def saveAsPickleFile(rdd, time):
382+
"""
383+
Closure to save element in RDD in the DStream as Pickled data in file.
384+
This closure is called by py4j callback server.
385+
"""
379386
path = rddToFileName(prefix, suffix, time)
380387
rdd.saveAsPickleFile(path)
381388

@@ -404,9 +411,10 @@ def get_output(rdd, time):
404411
# TODO: implement countByWindow
405412
# TODO: implement reduceByWindow
406413

407-
# Following operation has dependency to transform
414+
# transform Operation
408415
# TODO: implement transform
409416
# TODO: implement transformWith
417+
# Following operation has dependency with transform
410418
# TODO: implement union
411419
# TODO: implement repertitions
412420
# TODO: implement cogroup

python/pyspark/streaming/pyprint.py

Lines changed: 0 additions & 54 deletions
This file was deleted.

0 commit comments

Comments
 (0)