diff --git a/python/pyspark/tests/test_worker.py b/python/pyspark/tests/test_worker.py index a4f108f18e17..5193cfeceac8 100644 --- a/python/pyspark/tests/test_worker.py +++ b/python/pyspark/tests/test_worker.py @@ -19,6 +19,12 @@ import tempfile import threading import time +import unittest +has_resource_module = True +try: + import resource +except ImportError: + has_resource_module = False from py4j.protocol import Py4JJavaError @@ -155,6 +161,28 @@ def test_reuse_worker_of_parallelize_xrange(self): self.assertTrue(pid in previous_pids) +@unittest.skipIf( + not has_resource_module, + "Memory limit feature in Python worker is dependent on " + "Python's 'resource' module; however, not found.") +class WorkerMemoryTest(PySparkTestCase): + + def test_memory_limit(self): + self.sc._conf.set("spark.executor.pyspark.memory", "1m") + rdd = self.sc.parallelize(xrange(1), 1) + + def getrlimit(): + import resource + return resource.getrlimit(resource.RLIMIT_AS) + + actual = rdd.map(lambda _: getrlimit()).collect() + self.assertTrue(len(actual) == 1) + self.assertTrue(len(actual[0]) == 2) + [(soft_limit, hard_limit)] = actual + self.assertEqual(soft_limit, 1024 * 1024) + self.assertEqual(hard_limit, 1024 * 1024) + + if __name__ == "__main__": import unittest from pyspark.tests.test_worker import *