Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions python/pyspark/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
import tempfile
import threading
import time
import unittest
has_resource_module = True
try:
import resource
except ImportError:
has_resource_module = False

from py4j.protocol import Py4JJavaError

Expand Down Expand Up @@ -155,6 +161,28 @@ def test_reuse_worker_of_parallelize_xrange(self):
self.assertTrue(pid in previous_pids)


@unittest.skipIf(
not has_resource_module,
"Memory limit feature in Python worker is dependent on "
"Python's 'resource' module; however, not found.")
class WorkerMemoryTest(PySparkTestCase):

def test_memory_limit(self):
self.sc._conf.set("spark.executor.pyspark.memory", "1m")
rdd = self.sc.parallelize(xrange(1), 1)

def getrlimit():
import resource
return resource.getrlimit(resource.RLIMIT_AS)

actual = rdd.map(lambda _: getrlimit()).collect()
self.assertTrue(len(actual) == 1)
self.assertTrue(len(actual[0]) == 2)
[(soft_limit, hard_limit)] = actual
self.assertEqual(soft_limit, 1024 * 1024)
self.assertEqual(hard_limit, 1024 * 1024)


if __name__ == "__main__":
import unittest
from pyspark.tests.test_worker import *
Expand Down