Skip to content
Closed
7 changes: 4 additions & 3 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,10 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.rdd.compress</code></td>
<td>false</td>
<td>
Whether to compress serialized RDD partitions (e.g. for
<code>StorageLevel.MEMORY_ONLY_SER</code>). Can save substantial space at the cost of some
extra CPU time.
Whether to compress serialized RDD partitions (e.g. for
<code>StorageLevel.MEMORY_ONLY_SER</code> in Java
and Scala or <code>StorageLevel.MEMORY_ONLY</code> in Python).
Can save substantial space at the cost of some extra CPU time.
</td>
</tr>
<tr>
Expand Down
10 changes: 6 additions & 4 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1196,14 +1196,14 @@ storage levels is:
partitions that don't fit on disk, and read them from there when they're needed. </td>
</tr>
<tr>
<td> MEMORY_ONLY_SER </td>
<td> MEMORY_ONLY_SER <br /> (Java and Scala) </td>
<td> Store RDD as <i>serialized</i> Java objects (one byte array per partition).
This is generally more space-efficient than deserialized objects, especially when using a
<a href="tuning.html">fast serializer</a>, but more CPU-intensive to read.
</td>
</tr>
<tr>
<td> MEMORY_AND_DISK_SER </td>
<td> MEMORY_AND_DISK_SER <br /> (Java and Scala) </td>
<td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of
recomputing them on the fly each time they're needed. </td>
</tr>
Expand All @@ -1230,7 +1230,9 @@ storage levels is:
</tr>
</table>

**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library, so it does not matter whether you choose a serialized level.*
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, `DISK_ONLY_2` and `OFF_HEAP`.*

Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.

Expand All @@ -1243,7 +1245,7 @@ efficiency. We recommend going through the following process to select one:
This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.

* If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to
make the objects much more space-efficient, but still reasonably fast to access.
make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)

* Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from
Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,18 @@ def context(self):

def cache(self):
"""
Persist this RDD with the default storage level (C{MEMORY_ONLY_SER}).
Persist this RDD with the default storage level (C{MEMORY_ONLY}).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY_SER)
self.persist(StorageLevel.MEMORY_ONLY)
return self

def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""
Set this RDD's storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
If no storage level is specified defaults to (C{MEMORY_ONLY}).

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,18 +371,18 @@ def foreachPartition(self, f):

@since(1.3)
def cache(self):
""" Persists with the default storage level (C{MEMORY_ONLY_SER}).
""" Persists with the default storage level (C{MEMORY_ONLY}).
"""
self.is_cached = True
self._jdf.cache()
return self

@since(1.3)
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY_SER):
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
"""Sets the storage level to persist its values across operations
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY_SER}).
If no storage level is specified defaults to (C{MEMORY_ONLY}).
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
Expand Down
31 changes: 21 additions & 10 deletions python/pyspark/storagelevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ class StorageLevel(object):
"""
Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
Also contains static constants for some commonly used storage levels, such as MEMORY_ONLY.
in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple
nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY.
Since the data is always serialized on the Python side, all the constants use the serialized
formats.
"""

def __init__(self, useDisk, useMemory, useOffHeap, deserialized, replication=1):
Expand All @@ -49,12 +51,21 @@ def __str__(self):

StorageLevel.DISK_ONLY = StorageLevel(True, False, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, True)
StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, True, 2)
StorageLevel.MEMORY_ONLY_SER = StorageLevel(False, True, False, False)
StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, True)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, True, 2)
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel(True, True, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, False, False)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing these will break backward compatibility, I'd like to deprecate them, explain the difference between Python and Java (say records will always serialized in Python)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! Just updated the codes with the deprecated notes. Trying to follow the existing PySpark style. Please check if they are good. : )

Not sure if this will be merged to 1.6. The note is still using 1.6. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's too late for 1.6, and this change (API change) is good for 2.0, sounds good?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Just changed it. : )

StorageLevel.MEMORY_ONLY_2 = StorageLevel(False, True, False, False, 2)
StorageLevel.MEMORY_AND_DISK = StorageLevel(True, True, False, False)
StorageLevel.MEMORY_AND_DISK_2 = StorageLevel(True, True, False, False, 2)
StorageLevel.OFF_HEAP = StorageLevel(False, False, True, False, 1)

"""
.. note:: The following four storage level constants are deprecated in 2.0, since the records \
will always be serialized in Python.
"""
StorageLevel.MEMORY_ONLY_SER = StorageLevel.MEMORY_ONLY
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY`` instead."""
StorageLevel.MEMORY_ONLY_SER_2 = StorageLevel.MEMORY_ONLY_2
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_ONLY_2`` instead."""
StorageLevel.MEMORY_AND_DISK_SER = StorageLevel.MEMORY_AND_DISK
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK`` instead."""
StorageLevel.MEMORY_AND_DISK_SER_2 = StorageLevel.MEMORY_AND_DISK_2
""".. note:: Deprecated in 2.0, use ``StorageLevel.MEMORY_AND_DISK_2`` instead."""
2 changes: 1 addition & 1 deletion python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ def checkpoint(self, directory):
"""
self._jssc.checkpoint(directory)

def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
def socketTextStream(self, hostname, port, storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
Create an input from TCP source hostname:port. Data is received using
a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,10 @@ def func(iterator):
def cache(self):
"""
Persist the RDDs of this DStream with the default storage level
(C{MEMORY_ONLY_SER}).
(C{MEMORY_ONLY}).
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY_SER)
self.persist(StorageLevel.MEMORY_ONLY)
return self

def persist(self, storageLevel):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/streaming/flume.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class FlumeUtils(object):

@staticmethod
def createStream(ssc, hostname, port,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
enableDecompression=False,
bodyDecoder=utf8_decoder):
"""
Expand Down Expand Up @@ -70,7 +70,7 @@ def createStream(ssc, hostname, port,

@staticmethod
def createPollingStream(ssc, addresses,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
maxBatchSize=1000,
parallelism=5,
bodyDecoder=utf8_decoder):
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/streaming/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KafkaUtils(object):

@staticmethod
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2,
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
"""
Create an input stream that pulls messages from a Kafka Broker.
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/streaming/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MQTTUtils(object):

@staticmethod
def createStream(ssc, brokerUrl, topic,
storageLevel=StorageLevel.MEMORY_AND_DISK_SER_2):
storageLevel=StorageLevel.MEMORY_AND_DISK_2):
"""
Create an input stream that pulls messages from a Mqtt Broker.

Expand Down