From 84350f376236853b306c8db0588ba4a16bd261d4 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 16 Sep 2015 22:13:06 -0700 Subject: [PATCH 1/2] Add finalize --- include/mxnet/base.h | 6 +++++ include/mxnet/c_api.h | 5 ++++ include/mxnet/engine.h | 8 ++++++ include/mxnet/kvstore.h | 11 +++++--- include/mxnet/resource.h | 12 ++++++++- include/mxnet/storage.h | 10 +++++++ python/mxnet/base.py | 7 +++++ python/mxnet/kvstore.py | 2 +- src/c_api.cc | 12 ++++----- src/engine/naive_engine.cc | 6 +++++ src/engine/stream_manager.h | 9 ++++--- src/engine/threaded_engine.cc | 6 +++++ src/engine/threaded_engine.h | 1 + src/engine/threaded_engine_perdevice.cc | 20 +++++++++----- src/engine/threaded_engine_pooled.cc | 17 ++++++++---- src/kvstore/kvstore.cc | 2 +- src/kvstore/kvstore_local.h | 3 +-- src/resource.cc | 36 ++++++++++++++++--------- src/storage/storage.cc | 4 +++ 19 files changed, 134 insertions(+), 43 deletions(-) diff --git a/include/mxnet/base.h b/include/mxnet/base.h index e6add64d9bad..6ce524b0c4a2 100644 --- a/include/mxnet/base.h +++ b/include/mxnet/base.h @@ -64,6 +64,12 @@ typedef mshadow::TShape TShape; /*! \brief storage container type */ typedef mshadow::TBlob TBlob; +/*! + * \brief Finalize and shutdown all related modules of mxnet. + * Call this function at end of program to ensure correct shutdown. + */ +void Finalize(); + /*! \brief Context information about the execution enviroment */ struct Context { /*! \brief the device type we run the op can be cpu::kDevMask or gpu::kDevMask */ diff --git a/include/mxnet/c_api.h b/include/mxnet/c_api.h index c954cabadac0..c5634cddd942 100644 --- a/include/mxnet/c_api.h +++ b/include/mxnet/c_api.h @@ -62,6 +62,11 @@ MXNET_DLL const char *MXGetLastError(); * \return 0 when success, -1 when failure happens. */ MXNET_DLL int MXRandomSeed(int seed); +/*! + * \brief Finalize and shutdown all related modules of mxnet. + * Call this function at end of program to ensure correct shutdown. + */ +MXNET_DLL int MXFinalize(); //------------------------------------- // Part 1: NDArray creation and deletion //------------------------------------- diff --git a/include/mxnet/engine.h b/include/mxnet/engine.h index 72a4456f592a..5c9c707f6752 100644 --- a/include/mxnet/engine.h +++ b/include/mxnet/engine.h @@ -199,6 +199,14 @@ class Engine { ret.param_ = param; return ret; } + // friend function + friend void ::mxnet::Finalize(); + /*! + * \brief Idempotent Finalize function. + * This function will signal engine to release all resources. + * It is safe to call this function multiple times. + */ + virtual void Finalize() = 0; }; // class Engine #endif // DMLC_USE_CXX11 } // namespace mxnet diff --git a/include/mxnet/kvstore.h b/include/mxnet/kvstore.h index dfdb6d874102..d4c422a2f8ce 100644 --- a/include/mxnet/kvstore.h +++ b/include/mxnet/kvstore.h @@ -36,12 +36,12 @@ class KVStore { virtual void Start(); /** - * \brief Stop + * \brief Finalize the KVStore * * clear all key-value pairs stored, updater, and devices binded */ - virtual void Stop() { - if (impl_) { impl_->Stop(); delete impl_; impl_ = NULL; } + virtual void Finalize() { + if (impl_) { impl_->Finalize(); delete impl_; impl_ = NULL; } } /** @@ -178,7 +178,10 @@ class KVStore { protected: KVStore() : impl_(NULL) { } - virtual ~KVStore() { delete impl_; impl_ = NULL; } + + virtual ~KVStore() { + delete impl_; impl_ = NULL; + } private: inline KVStore* get_impl() const { diff --git a/include/mxnet/resource.h b/include/mxnet/resource.h index d85299960511..e6cf7f092702 100644 --- a/include/mxnet/resource.h +++ b/include/mxnet/resource.h @@ -110,11 +110,21 @@ class ResourceManager { */ virtual void SeedRandom(uint32_t seed) = 0; /*! \brief virtual destructor */ - virtual ~ResourceManager() {} + virtual ~ResourceManager() DMLC_THROW_EXCEPTION {} /*! * \return Resource manager singleton. */ static ResourceManager *Get(); + + protected: + // friend function + friend void ::mxnet::Finalize(); + /*! + * \brief Idempotent Finalize function. + * This function will signal resource manager to release all resources. + * It is safe to call this function multiple times. + */ + virtual void Finalize() = 0; }; } // namespace mxnet #endif // MXNET_RESOURCE_H_ diff --git a/include/mxnet/storage.h b/include/mxnet/storage.h index 71d303ff01f3..c892798e696d 100644 --- a/include/mxnet/storage.h +++ b/include/mxnet/storage.h @@ -63,6 +63,16 @@ class Storage { */ static std::shared_ptr _GetSharedRef(); + protected: + // friend function + friend void ::mxnet::Finalize(); + /*! + * \brief Idempotent Finalize function. + * This function will signal engine to release all resources. + * It is safe to call this function multiple times. + */ + void Finalize(); + private: /*! * \brief Hidden constructors. diff --git a/python/mxnet/base.py b/python/mxnet/base.py index 969456aa2a3d..613c49ab7765 100644 --- a/python/mxnet/base.py +++ b/python/mxnet/base.py @@ -8,6 +8,7 @@ import ctypes import platform import numpy as np +import atexit __all__ = ['MXNetError'] #---------------------------- @@ -180,3 +181,9 @@ def ctypes2numpy_shared(cptr, shape): return np.frombuffer(dbuffer, dtype=np.float32).reshape(shape) +def stop_all(): + """Stop All the components in mxnet.""" + check_call(_LIB.MXFinalize()) + + +atexit.register(stop_all) diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index 86e39d112ee7..d4c55a4154f4 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -215,9 +215,9 @@ def set_updater(updater): def stop(): """ Stop the kvstore """ - check_call(_LIB.MXKVStoreStop()) # need to clear _updater_func before _LIB global _updater_func _updater_func = None + atexit.register(stop) diff --git a/src/c_api.cc b/src/c_api.cc index c453f743aba3..f60ae272e076 100644 --- a/src/c_api.cc +++ b/src/c_api.cc @@ -187,6 +187,12 @@ int MXRandomSeed(int seed) { API_END(); } +int MXFinalize() { + API_BEGIN(); + mxnet::Finalize(); + API_END(); +} + int MXNDArrayCreateNone(NDArrayHandle *out) { API_BEGIN(); *out = new NDArray(); @@ -891,12 +897,6 @@ int MXKVStoreStart() { API_END(); } -int MXKVStoreStop() { - API_BEGIN(); - KVStore::Get()->Stop(); - API_END(); -} - int MXKVStoreSetUpdater(MXKVStoreUpdater updater) { API_BEGIN(); auto updt = [updater](int key, const NDArray& recv, NDArray* local) { diff --git a/src/engine/naive_engine.cc b/src/engine/naive_engine.cc index a8c7db598319..7b92c7b04fef 100644 --- a/src/engine/naive_engine.cc +++ b/src/engine/naive_engine.cc @@ -15,14 +15,20 @@ class NaiveEngine final : public Engine { } // virtual destructor virtual ~NaiveEngine() { + Finalize(); + } + + void Finalize() override { #if MXNET_USE_CUDA for (size_t i = 0; i < streams_.size(); ++i) { if (streams_[i] != nullptr) { mshadow::DeleteStream(streams_[i]); + streams_[i] = nullptr; } } #endif } + // new variables VarHandle NewVariable() override { return nullptr; diff --git a/src/engine/stream_manager.h b/src/engine/stream_manager.h index 3c668788c20c..80d3124f2bf6 100644 --- a/src/engine/stream_manager.h +++ b/src/engine/stream_manager.h @@ -24,10 +24,12 @@ template class StreamManager { public: StreamManager(); - ~StreamManager(); + ~StreamManager() { + Finalize(); + } RunContext GetRunContext(Context const& ctx); RunContext GetIORunContext(Context const& ctx); - + void Finalize(); private: std::mutex m_; #if MXNET_USE_CUDA @@ -111,13 +113,14 @@ StreamManager::StreamManager() { } template -StreamManager::~StreamManager() { +void StreamManager::Finalize() { #if MXNET_USE_CUDA for (std::size_t i = 0; i < kNumGpus; ++i) { if (gpu_cnt_.at(i) != -1) { for (auto&& j : gpu_streams_.at(i)) { mshadow::DeleteStream(j); } + gpu_cnt_.at(i) = -1; } } #endif // MXNET_USE_CUDA diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index 1a3144e783ec..74314479cbe2 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -291,6 +291,12 @@ void ThreadedEngine::WaitForAll() { finished_cv_.wait(lock, [this]() { return pending_.load() == 0; }); } +void ThreadedEngine::Finalize() { + // unlock all threads + pending_.store(0); + finished_cv_.notify_all(); +} + inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) { // Mark complete for read variables for (auto&& i : threaded_opr->const_vars) { diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h index fa29939d291f..33914bbeafcc 100644 --- a/src/engine/threaded_engine.h +++ b/src/engine/threaded_engine.h @@ -249,6 +249,7 @@ class ThreadedEngine : public Engine { threaded_opr->fn(run_ctx, callback); OprBlock::Delete(opr_block); } + void Finalize() override; private: /*! diff --git a/src/engine/threaded_engine_perdevice.cc b/src/engine/threaded_engine_perdevice.cc index fada76801250..4c650c202a1d 100644 --- a/src/engine/threaded_engine_perdevice.cc +++ b/src/engine/threaded_engine_perdevice.cc @@ -28,18 +28,17 @@ class ThreadedEnginePerDevice : public ThreadedEngine { cpu_worker_nthreads_ = dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 2); gpu_worker_nthreads_ = dmlc::GetEnv("MXNET_GPU_WORKER_NTHREADS", 2); gpu_copy_nthreads_ = dmlc::GetEnv("MXNET_GPU_COPY_NTHREADS", 1); - // create CPU task - auto *cpu_queue = &(cpu_worker_.task_queue); - cpu_worker_.pool.reset(new ThreadPool( + cpu_worker_.reset(new ThreadWorkerBlock()); + auto *cpu_queue = &(cpu_worker_->task_queue); + cpu_worker_->pool.reset(new ThreadPool( cpu_worker_nthreads_, [this, cpu_queue] { this->CPUWorker(cpu_queue); })); // GPU tasks will be created lazily } ~ThreadedEnginePerDevice() noexcept(false) { - // wait until all the tasks are completed. - this->WaitForAll(); + Finalize(); } protected: @@ -56,7 +55,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine { this->ExecuteOprBlock(run_ctx, opr_block); } else { if (ctx.dev_mask == cpu::kDevMask) { - cpu_worker_.task_queue.Push(opr_block); + cpu_worker_->task_queue.Push(opr_block); } else { CHECK_EQ(ctx.dev_mask, gpu::kDevMask); ThreadWorkerBlock* block = this->GetGPUWorkerBlock( @@ -65,6 +64,13 @@ class ThreadedEnginePerDevice : public ThreadedEngine { } } } + // finalize the internal resources + void Finalize() override { + gpu_normal_workers_.Clear(); + gpu_copy_workers_.Clear(); + cpu_worker_.reset(nullptr); + ThreadedEngine::Finalize(); + } private: // working unit for each of the task. @@ -85,7 +91,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine { /*! \brief number of concurrent thread each gpu copy worker uses */ int gpu_copy_nthreads_; // cpu worker - ThreadWorkerBlock cpu_worker_; + std::unique_ptr cpu_worker_; // workers doing normal works on GPU common::LazyAllocArray gpu_normal_workers_; // workers doing copy works from/to GPU diff --git a/src/engine/threaded_engine_pooled.cc b/src/engine/threaded_engine_pooled.cc index 8ab7092dfce9..8b8055a6b33f 100644 --- a/src/engine/threaded_engine_pooled.cc +++ b/src/engine/threaded_engine_pooled.cc @@ -27,11 +27,7 @@ class ThreadedEnginePooled : public ThreadedEngine { io_thread_pool_(1, [this]() { ThreadWorker(&io_task_queue_); }) {} ~ThreadedEnginePooled() noexcept(false) { - // wait until all the tasks are completed. - // TODO(hotpxl) think if this is the correct thing to do - this->WaitForAll(); - task_queue_.SignalForKill(); - io_task_queue_.SignalForKill(); + Finalize(); } protected: @@ -42,6 +38,17 @@ class ThreadedEnginePooled : public ThreadedEngine { DoPushToQueue(opr_block); } } + // finalize the internal resources + void Finalize() override { + // wait until all the tasks are completed. + // TODO(hotpxl) think if this is the correct thing to do + this->WaitForAll(); + streams_.Finalize(); + task_queue_.SignalForKill(); + io_task_queue_.SignalForKill(); + ThreadedEngine::Finalize(); + } + private: /*! \brief Concurrency for thread pool */ diff --git a/src/kvstore/kvstore.cc b/src/kvstore/kvstore.cc index bcf2ca2b7741..fe6d1c85786a 100644 --- a/src/kvstore/kvstore.cc +++ b/src/kvstore/kvstore.cc @@ -11,7 +11,7 @@ namespace mxnet { void KVStore::Start() { - if (impl_ != NULL) Stop(); + if (impl_ != NULL) Finalize(); char* num_worker = getenv("DMLC_NUM_WORKER"); if (num_worker == NULL || atoi(num_worker) == 1) { impl_ = new KVStoreLocal(); diff --git a/src/kvstore/kvstore_local.h b/src/kvstore/kvstore_local.h index e781fc35da70..678539c44b9e 100644 --- a/src/kvstore/kvstore_local.h +++ b/src/kvstore/kvstore_local.h @@ -31,8 +31,7 @@ class KVStoreLocal : public KVStore { virtual ~KVStoreLocal() { Clear(); } virtual void Start() { } - - virtual void Stop() { Clear(); } + virtual void Finalize() { Clear(); } virtual void set_updater(const Updater& updater) { updater_ = updater; diff --git a/src/resource.cc b/src/resource.cc index e3d1771f6d32..ad2f3dc4b803 100644 --- a/src/resource.cc +++ b/src/resource.cc @@ -23,19 +23,13 @@ class ResourceManagerImpl : public ResourceManager { cpu_temp_space_copy_ = dmlc::GetEnv("MXNET_CPU_TEMP_COPY", 16); gpu_temp_space_copy_ = dmlc::GetEnv("MXNET_GPU_TEMP_COPY", 4); engine_ref_ = Engine::_GetSharedRef(); - cpu_rand_ = new ResourceRandom( - Context(cpu::kDevMask, 0), global_seed_); - cpu_space_ = new ResourceTempSpace( - Context(cpu::kDevMask, 0), cpu_temp_space_copy_); + cpu_rand_.reset(new ResourceRandom( + Context(cpu::kDevMask, 0), global_seed_)); + cpu_space_.reset(new ResourceTempSpace( + Context(cpu::kDevMask, 0), cpu_temp_space_copy_)); } ~ResourceManagerImpl() { - // need explicit delete, before engine get killed - delete cpu_rand_; -#if MXNET_USE_CUDA - gpu_rand_.Clear(); -#endif - // release the reference to engine. - engine_ref_ = nullptr; + Finalize(); } // request resources @@ -80,6 +74,22 @@ class ResourceManagerImpl : public ResourceManager { #endif } + protected: + void Finalize() override { + // need explicit delete, before engine get killed + cpu_rand_.reset(nullptr); + cpu_space_.reset(nullptr); +#if MXNET_USE_CUDA + gpu_rand_.Clear(); + gpu_space_.Clear(); +#endif + if (engine_ref_ != nullptr) { + engine_ref_->WaitForAll(); + // release the reference to engine. + engine_ref_ = nullptr; + } + } + private: /*! \brief Maximum number of GPUs */ static constexpr std::size_t kMaxNumGPUs = 16; @@ -169,9 +179,9 @@ class ResourceManagerImpl : public ResourceManager { /*! \brief internal seed to the random number generator */ uint32_t global_seed_; /*! \brief CPU random number resources */ - ResourceRandom *cpu_rand_; + std::unique_ptr > cpu_rand_; /*! \brief CPU temp space resources */ - ResourceTempSpace *cpu_space_; + std::unique_ptr > cpu_space_; #if MXNET_USE_CUDA /*! \brief random number generator for GPU */ common::LazyAllocArray > gpu_rand_; diff --git a/src/storage/storage.cc b/src/storage/storage.cc index cdcdbc15840b..729ecb46c6b4 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -107,4 +107,8 @@ Storage* Storage::Get() { Storage::Storage() : impl_{new Impl{}} {} +void Storage::Finalize() { + impl_.reset(nullptr); +} + } // namespace mxnet From fbb1418d3290453f69983831a87023f12d40e0a5 Mon Sep 17 00:00:00 2001 From: tqchen Date: Wed, 16 Sep 2015 22:28:32 -0700 Subject: [PATCH 2/2] more on finalize --- python/mxnet/__init__.py | 14 ++++++++++++++ python/mxnet/base.py | 8 -------- python/mxnet/kvstore.py | 9 +++------ src/global.cc | 21 +++++++++++++++++++++ tests/python/unittest/test_kvstore.py | 8 -------- 5 files changed, 38 insertions(+), 22 deletions(-) create mode 100644 src/global.cc diff --git a/python/mxnet/__init__.py b/python/mxnet/__init__.py index 8ccf04519e22..649fc225f369 100644 --- a/python/mxnet/__init__.py +++ b/python/mxnet/__init__.py @@ -10,6 +10,7 @@ from .context import Context, current_context, cpu, gpu from .base import MXNetError +from . import base from . import ndarray from . import symbol from . import kvstore as kv @@ -17,5 +18,18 @@ # use mx.nd as short for mx.ndarray from . import ndarray as nd from . import random +import atexit __version__ = "0.1.0" + +def finalize(): + """Stop all the components in mxnet. + + There is no need to call this function. + This function will be automatically called at module exit. + """ + # pylint: disable=protected-access + base.check_call(base._LIB.MXFinalize()) + kv._cleanup() + +atexit.register(finalize) diff --git a/python/mxnet/base.py b/python/mxnet/base.py index 613c49ab7765..2f7d15919681 100644 --- a/python/mxnet/base.py +++ b/python/mxnet/base.py @@ -8,7 +8,6 @@ import ctypes import platform import numpy as np -import atexit __all__ = ['MXNetError'] #---------------------------- @@ -180,10 +179,3 @@ def ctypes2numpy_shared(cptr, shape): dbuffer = (mx_float * size).from_address(ctypes.addressof(cptr.contents)) return np.frombuffer(dbuffer, dtype=np.float32).reshape(shape) - -def stop_all(): - """Stop All the components in mxnet.""" - check_call(_LIB.MXFinalize()) - - -atexit.register(stop_all) diff --git a/python/mxnet/kvstore.py b/python/mxnet/kvstore.py index d4c55a4154f4..07f83f1363d2 100644 --- a/python/mxnet/kvstore.py +++ b/python/mxnet/kvstore.py @@ -6,9 +6,8 @@ from .ndarray import NDArray from .base import _LIB from .base import check_call, c_array, NDArrayHandle -import atexit -__all__ = ['start', 'init', 'push', 'pull', 'stop', 'set_updater'] +__all__ = ['start', 'init', 'push', 'pull', 'set_updater'] def _ctype_key_value(keys, vals): """ @@ -213,11 +212,9 @@ def set_updater(updater): _updater_func = _updater_proto(_updater_wrapper(updater)) check_call(_LIB.MXKVStoreSetUpdater(_updater_func)) -def stop(): - """ Stop the kvstore """ +def _cleanup(): + """ cleanup callbacks """ # need to clear _updater_func before _LIB global _updater_func _updater_func = None - -atexit.register(stop) diff --git a/src/global.cc b/src/global.cc new file mode 100644 index 000000000000..f9a32e6dc58f --- /dev/null +++ b/src/global.cc @@ -0,0 +1,21 @@ +/*! + * Copyright (c) 2015 by Contributors + * \file global.cc + * \brief Implementation of project global related functions. + */ +#include +#include +#include +#include +#include + +namespace mxnet { +// finalize the mxnet modules +void Finalize() { + ResourceManager::Get()->Finalize(); + KVStore::Get()->Finalize(); + Engine::Get()->WaitForAll(); + Engine::Get()->Finalize(); + Storage::Get()->Finalize(); +} +} // namespace mxnet diff --git a/tests/python/unittest/test_kvstore.py b/tests/python/unittest/test_kvstore.py index 4d86f5512ce5..57554c06a3db 100644 --- a/tests/python/unittest/test_kvstore.py +++ b/tests/python/unittest/test_kvstore.py @@ -12,10 +12,6 @@ def init_kv(): # list mx.kv.init(keys, [mx.nd.zeros(shape)] * len(keys)) -def stop_kv(): - """stop kv """ - mx.kv.stop() - def check_diff_to_scalar(A, x): """ assert A == x""" assert(np.sum(np.abs((A - x).asnumpy())) == 0) @@ -30,7 +26,6 @@ def test_single_kv_pair(): mx.kv.pull(3, out = val) check_diff_to_scalar(val, 1) - stop_kv() def test_list_kv_pair(): """list key-value pair push & pull""" @@ -43,7 +38,6 @@ def test_list_kv_pair(): for v in val: check_diff_to_scalar(v, 4) - stop_kv() def test_aggregator(): """aggregate value on muliple devices""" @@ -72,7 +66,6 @@ def test_aggregator(): for v in vv: check_diff_to_scalar(v, num_devs * 2.0) - stop_kv() def updater(key, recv, local): """use updater: +=""" @@ -110,7 +103,6 @@ def test_updater(dev = 'cpu'): for v in vv: check_diff_to_scalar(v, num_devs * num_push) - stop_kv() if __name__ == '__main__': test_single_kv_pair()