@@ -55,6 +55,7 @@ class PythonDStream[T: ClassTag](
5555 case None => None
5656 }
5757 }
58+ <<<<<<< HEAD
5859
5960 val asJavaDStream = JavaDStream .fromDStream(this )
6061
@@ -133,3 +134,87 @@ DStream[(Long, Array[Byte])](prev.ssc){
133134 }
134135 val asJavaPairDStream : JavaPairDStream [Long , Array [Byte ]] = JavaPairDStream .fromJavaDStream(this )
135136}
137+ =======
138+ val asJavaDStream = JavaDStream .fromDStream(this )
139+
140+ /**
141+ * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
142+ * operator, so this PythonDStream will be registered as an output stream and there materialized.
143+ * Since serialized Python object is readable by Python, pyprint writes out binary data to
144+ * temporary file and run python script to deserialized and print the first ten elements
145+ */
146+ private [streaming] def ppyprint () {
147+ def foreachFunc = (rdd : RDD [Array [Byte ]], time : Time ) => {
148+ val iter = rdd.take(11 ).iterator
149+
150+ // make a temporary file
151+ val prefix = " spark"
152+ val suffix = " .tmp"
153+ val tempFile = File .createTempFile(prefix, suffix)
154+ val tempFileStream = new DataOutputStream (new FileOutputStream (tempFile.getAbsolutePath))
155+ // write out serialized python object
156+ PythonRDD .writeIteratorToStream(iter, tempFileStream)
157+ tempFileStream.close()
158+
159+ // This value has to be passed from python
160+ // val pythonExec = new ProcessBuilder().environment().get("PYSPARK_PYTHON")
161+ val sparkHome = new ProcessBuilder ().environment().get(" SPARK_HOME" )
162+ // val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
163+ // absolute path to the python script is needed to change because we do not use pysparkstreaming
164+ val pb = new ProcessBuilder (pythonExec, sparkHome + " /python/pysparkstreaming/streaming/pyprint.py" , tempFile.getAbsolutePath)
165+ val workerEnv = pb.environment()
166+
167+ // envVars also need to be pass
168+ // workerEnv.putAll(envVars)
169+ val pythonPath = sparkHome + " /python/" + File .pathSeparator + workerEnv.get(" PYTHONPATH" )
170+ workerEnv.put(" PYTHONPATH" , pythonPath)
171+ val worker = pb.start()
172+ val is = worker.getInputStream()
173+ val isr = new InputStreamReader (is)
174+ val br = new BufferedReader (isr)
175+
176+ println (" -------------------------------------------" )
177+ println (" Time: " + time)
178+ println (" -------------------------------------------" )
179+
180+ // print value from python std out
181+ var line = " "
182+ breakable {
183+ while (true ) {
184+ line = br.readLine()
185+ if (line == null ) break()
186+ println(line)
187+ }
188+ }
189+ // delete temporary file
190+ tempFile.delete()
191+ println()
192+
193+ }
194+ new ForEachDStream (this , context.sparkContext.clean(foreachFunc)).register()
195+ }
196+ }
197+
198+
199+ private class PairwiseDStream (prev: DStream [Array [Byte ]]) extends
200+ DStream [(Long , Array [Byte ])](prev.ssc){
201+ override def dependencies = List (prev)
202+
203+ override def slideDuration : Duration = prev.slideDuration
204+
205+ override def compute (validTime: Time ): Option [RDD [(Long , Array [Byte ])]]= {
206+ prev.getOrCompute(validTime) match {
207+ case Some (rdd)=> Some (rdd)
208+ val pairwiseRDD = new PairwiseRDD (rdd)
209+ Some (pairwiseRDD.asJavaPairRDD.rdd)
210+ case None => None
211+ }
212+ }
213+ val asJavaPairDStream : JavaPairDStream [Long , Array [Byte ]] = JavaPairDStream .fromJavaDStream(this )
214+ }
215+
216+
217+
218+
219+
220+ >>>>>>> added reducedByKey not working yet
0 commit comments