Skip to content

Commit 3166d31

Browse files
committed
clean up
1 parent c00e091 commit 3166d31

File tree

7 files changed

+37
-109
lines changed

7 files changed

+37
-109
lines changed

python/pyspark/streaming/context.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -142,17 +142,18 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):
142142

143143
def _testInputStream(self, test_inputs, numSlices=None):
144144
"""
145-
This function is only for test.
146-
This implementation is inspired by QueStream implementation.
147-
Give list of RDD to generate DStream which contains the RDD.
145+
This function is only for unittest.
146+
It requires a sequence as input, and returns the i_th element at the i_th batch
147+
under manual clock.
148148
"""
149149
test_rdds = list()
150150
test_rdd_deserializers = list()
151151
for test_input in test_inputs:
152152
test_rdd = self._sc.parallelize(test_input, numSlices)
153153
test_rdds.append(test_rdd._jrdd)
154154
test_rdd_deserializers.append(test_rdd._jrdd_deserializer)
155-
155+
# All deserializer has to be the same.
156+
# TODO: add deserializer validation
156157
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
157158
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()
158159

python/pyspark/streaming/dstream.py

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -283,23 +283,6 @@ def func(iterator):
283283
yield list(iterator)
284284
return self.mapPartitions(func)
285285

286-
#def transform(self, func): - TD
287-
# from utils import RDDFunction
288-
# wrapped_func = RDDFunction(self.ctx, self._jrdd_deserializer, func)
289-
# jdstream = self.ctx._jvm.PythonTransformedDStream(self._jdstream.dstream(), wrapped_func).toJavaDStream
290-
# return DStream(jdstream, self._ssc, ...) ## DO NOT KNOW HOW
291-
292-
def _test_output(self, result):
293-
"""
294-
This function is only for test case.
295-
Store data in a DStream to result to verify the result in test case
296-
"""
297-
def get_output(rdd, time):
298-
taken = rdd.collect()
299-
result.append(taken)
300-
301-
self.foreachRDD(get_output)
302-
303286
def cache(self):
304287
"""
305288
Persist this DStream with the default storage level (C{MEMORY_ONLY_SER}).
@@ -404,6 +387,17 @@ def saveAsTextFile(rdd, time):
404387

405388
return self.foreachRDD(saveAsTextFile)
406389

390+
def _test_output(self, result):
391+
"""
392+
This function is only for test case.
393+
Store data in a DStream to result to verify the result in test case
394+
"""
395+
def get_output(rdd, time):
396+
collected = rdd.collect()
397+
result.append(collected)
398+
399+
self.foreachRDD(get_output)
400+
407401

408402
# TODO: implement updateStateByKey
409403
# TODO: implement slice

python/pyspark/streaming/jtime.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
from pyspark.streaming.duration import Duration
2020

2121
"""
22-
The name of this file, time is not good naming for python
22+
The name of this file, time is not a good naming for python
2323
because if we do import time when we want to use native python time package, it does
2424
not import python time package.
2525
"""
26+
# TODO: add doctest
2627

2728

2829
class Time(object):

python/pyspark/streaming/utils.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020

2121
class RDDFunction():
22+
"""
23+
This class is for py4j callback. This
24+
"""
2225
def __init__(self, ctx, jrdd_deserializer, func):
2326
self.ctx = ctx
2427
self.deserializer = jrdd_deserializer
@@ -38,6 +41,7 @@ class Java:
3841

3942

4043
def msDurationToString(ms):
44+
#TODO: add doctest
4145
"""
4246
Returns a human-readable string representing a duration such as "35ms"
4347
"""
@@ -54,8 +58,10 @@ def msDurationToString(ms):
5458
else:
5559
return "%.2f h" % (float(ms) / hour)
5660

61+
5762
def rddToFileName(prefix, suffix, time):
58-
if suffix is not None:
59-
return prefix + "-" + str(time) + "." + suffix
60-
else:
63+
#TODO: add doctest
64+
if suffix is None:
6165
return prefix + "-" + str(time)
66+
else:
67+
return prefix + "-" + str(time) + "." + suffix

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 6 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
package org.apache.spark.streaming.api.python
1919

2020
import java.io._
21-
import java.io.{ObjectInputStream, IOException}
22-
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
21+
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap}
2322

24-
import scala.collection.mutable.ArrayBuffer
2523
import scala.reflect.ClassTag
2624
import scala.collection.JavaConversions._
2725

@@ -55,7 +53,9 @@ class PythonDStream[T: ClassTag](
5553
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
5654
parent.getOrCompute(validTime) match{
5755
case Some(rdd) =>
58-
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes, preservePartitoning, pythonExec, broadcastVars, accumulator)
56+
// create PythonRDD to compute Python functions.
57+
val pythonRDD = new PythonRDD(rdd, command, envVars, pythonIncludes,
58+
preservePartitoning, pythonExec, broadcastVars, accumulator)
5959
Some(pythonRDD.asJavaRDD.rdd)
6060
case None => None
6161
}
@@ -135,8 +135,8 @@ DStream[Array[Byte]](prev.ssc){
135135
case Some(rdd)=>Some(rdd)
136136
val pairwiseRDD = new PairwiseRDD(rdd)
137137
/*
138-
* Since python operation is executed by Scala after StreamingContext.start.
139-
* What PythonPairwiseDStream does is equivalent to python code in pySpark.
138+
* Since python function is executed by Scala after StreamingContext.start.
139+
* What PythonPairwiseDStream does is equivalent to python code in pyspark.
140140
*
141141
* with _JavaStackTrace(self.context) as st:
142142
* pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
@@ -154,23 +154,6 @@ DStream[Array[Byte]](prev.ssc){
154154
}
155155

156156

157-
class PythonTestInputStream3(ssc_ : JavaStreamingContext)
158-
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {
159-
160-
def start() {}
161-
162-
def stop() {}
163-
164-
def compute(validTime: Time): Option[RDD[Any]] = {
165-
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
166-
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
167-
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
168-
Some(rdd)
169-
}
170-
171-
val asJavaDStream = JavaDStream.fromDStream(this)
172-
}
173-
174157
class PythonForeachDStream(
175158
prev: DStream[Array[Byte]],
176159
foreachFunction: PythonRDDFunction
@@ -184,30 +167,11 @@ class PythonForeachDStream(
184167
this.register()
185168
}
186169

187-
class PythonTransformedDStream(
188-
prev: DStream[Array[Byte]],
189-
transformFunction: PythonRDDFunction
190-
) extends DStream[Array[Byte]](prev.ssc) {
191-
192-
override def dependencies = List(prev)
193-
194-
override def slideDuration: Duration = prev.slideDuration
195-
196-
override def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
197-
prev.getOrCompute(validTime).map(rdd => {
198-
transformFunction.call(rdd.toJavaRDD(), validTime.milliseconds).rdd
199-
})
200-
}
201-
202-
val asJavaDStream = JavaDStream.fromDStream(this)
203-
//val asJavaPairDStream : JavaPairDStream[Long, Array[Byte]] = JavaPairDStream.fromJavaDStream(this)
204-
}
205170

206171
/**
207172
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
208173
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
209174
* returns the i_th element at the i_th batch under manual clock.
210-
* This implementation is inspired by QueStream
211175
*/
212176

213177
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonRDDFunction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import org.apache.spark.api.java.JavaRDD;
44
import org.apache.spark.streaming.Time;
55

6+
/*
7+
* Interface for py4j callback function.
8+
* This function is called by pyspark.streaming.dstream.DStream.foreachRDD .
9+
*/
610
public interface PythonRDDFunction {
711
JavaRDD<byte[]> call(JavaRDD<byte[]> rdd, long time);
812
}

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonTransformedDStream.scala

Lines changed: 0 additions & 42 deletions
This file was deleted.

0 commit comments

Comments
 (0)