From 8aed7eaeac8d0e95386f748a0e7f0a5fbe792feb Mon Sep 17 00:00:00 2001 From: "Joshua Z. Zhang" Date: Tue, 4 Dec 2018 13:29:08 -0800 Subject: [PATCH 1/2] fix pool release --- python/mxnet/gluon/data/dataloader.py | 6 ++++++ tests/python/unittest/test_gluon_data.py | 11 +++++++++++ 2 files changed, 17 insertions(+) diff --git a/python/mxnet/gluon/data/dataloader.py b/python/mxnet/gluon/data/dataloader.py index ad0f534d16dd..2f20b06787bd 100644 --- a/python/mxnet/gluon/data/dataloader.py +++ b/python/mxnet/gluon/data/dataloader.py @@ -556,3 +556,9 @@ def same_process_iter(): def __len__(self): return len(self._batch_sampler) + + def __del__(): + if self._worker_pool: + # manually terminate due to a bug that pool is not automatically terminated on linux + assert isinstance(self._worker_pool, multiprocessing.pool.Pool) + self._worker_pool.terminate() diff --git a/tests/python/unittest/test_gluon_data.py b/tests/python/unittest/test_gluon_data.py index d043a7c6b802..2542a023f8ff 100644 --- a/tests/python/unittest/test_gluon_data.py +++ b/tests/python/unittest/test_gluon_data.py @@ -245,6 +245,17 @@ def test_multi_worker_forked_data_loader(): for i, data in enumerate(loader): pass +@with_seed() +def test_multi_worker_dataloader_release_pool(): + # will trigger too many open file if pool is not released properly + for _ in range(100): + A = np.random.rand(999, 2000) + D = mx.gluon.data.DataLoader(A, batch_size=8, num_workers=8) + the_iter = iter(D) + next(the_iter) + del the_iter + del D + if __name__ == '__main__': import nose nose.runmodule() From d6278cbd9a24470c367abd183c8f1a0806ea05e0 Mon Sep 17 00:00:00 2001 From: "Joshua Z. Zhang" Date: Tue, 4 Dec 2018 13:39:54 -0800 Subject: [PATCH 2/2] fix --- python/mxnet/gluon/data/dataloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/mxnet/gluon/data/dataloader.py b/python/mxnet/gluon/data/dataloader.py index 2f20b06787bd..586e620470d3 100644 --- a/python/mxnet/gluon/data/dataloader.py +++ b/python/mxnet/gluon/data/dataloader.py @@ -557,7 +557,7 @@ def same_process_iter(): def __len__(self): return len(self._batch_sampler) - def __del__(): + def __del__(self): if self._worker_pool: # manually terminate due to a bug that pool is not automatically terminated on linux assert isinstance(self._worker_pool, multiprocessing.pool.Pool)