2222from pyspark .streaming .dstream import DStream
2323
2424from py4j .java_collections import ListConverter
25+ from py4j .java_gateway import java_import
2526
2627__all__ = ["StreamingContext" ]
2728
@@ -72,7 +73,7 @@ def __init__(self, sparkContext, duration):
7273 should be set, either through the named parameters here or through C{conf}.
7374
7475 @param sparkContext: L{SparkContext} object.
75- @param duration: A L{Duration} object or seconds for SparkStreaming.
76+ @param duration: seconds for SparkStreaming.
7677
7778 """
7879 self ._sc = sparkContext
@@ -89,6 +90,9 @@ def _start_callback_server(self):
8990 gw ._python_proxy_port = gw ._callback_server .port # update port with real port
9091
9192 def _initialize_context (self , sc , duration ):
93+ java_import (self ._jvm , "org.apache.spark.streaming.*" )
94+ java_import (self ._jvm , "org.apache.spark.streaming.api.java.*" )
95+ java_import (self ._jvm , "org.apache.spark.streaming.api.python.*" )
9296 return self ._jvm .JavaStreamingContext (sc ._jsc , self ._jduration (duration ))
9397
9498 def _jduration (self , seconds ):
@@ -217,7 +221,6 @@ def union(self, *dstreams):
217221 raise ValueError ("should have at least one DStream to union" )
218222 if len (dstreams ) == 1 :
219223 return dstreams [0 ]
220- self ._check_serialzers (dstreams )
221224 first = dstreams [0 ]
222225 jrest = ListConverter ().convert ([d ._jdstream for d in dstreams [1 :]],
223226 SparkContext ._gateway ._gateway_client )
0 commit comments