2020import operator
2121
2222from pyspark .serializers import NoOpSerializer ,\
23- BatchedSerializer , CloudPickleSerializer , pack_long
23+ BatchedSerializer , CloudPickleSerializer , pack_long ,\
24+ CompressedSerializer
2425from pyspark .rdd import _JavaStackTrace
2526from pyspark .storagelevel import StorageLevel
2627from pyspark .resultiterable import ResultIterable
@@ -463,7 +464,8 @@ def _jdstream(self):
463464 serializer = self .ctx .serializer
464465
465466 command = (self .func , self ._prev_jrdd_deserializer , serializer )
466- pickled_command = CloudPickleSerializer ().dumps (command )
467+ ser = CompressedSerializer (CloudPickleSerializer ())
468+ pickled_command = ser .dumps (command )
467469 broadcast_vars = ListConverter ().convert (
468470 [x ._jbroadcast for x in self .ctx ._pickled_broadcast_vars ],
469471 self .ctx ._gateway ._gateway_client )
@@ -472,12 +474,13 @@ def _jdstream(self):
472474 env = MapConverter ().convert (self .ctx .environment ,
473475 self .ctx ._gateway ._gateway_client )
474476 includes = ListConverter ().convert (self .ctx ._python_includes ,
475- self .ctx ._gateway ._gateway_client )
477+ self .ctx ._gateway ._gateway_client )
476478 python_dstream = self .ctx ._jvm .PythonDStream (self ._prev_jdstream .dstream (),
477- bytearray (pickled_command ),
478- env , includes , self .preservesPartitioning ,
479- self .ctx .pythonExec , broadcast_vars , self .ctx ._javaAccumulator ,
480- class_tag )
479+ bytearray (pickled_command ),
480+ env , includes , self .preservesPartitioning ,
481+ self .ctx .pythonExec ,
482+ broadcast_vars , self .ctx ._javaAccumulator ,
483+ class_tag )
481484 self ._jdstream_val = python_dstream .asJavaDStream ()
482485 return self ._jdstream_val
483486
0 commit comments