From da74a63da03107cb58cdb85f4b11059649c4079f Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Sun, 31 Mar 2019 06:39:22 +0000 Subject: [PATCH 1/7] fix custom exception handling --- src/engine/threaded_engine.cc | 4 ++++ src/operator/custom/custom-inl.h | 31 ++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index b5897a1ca9cd..fb8706db4258 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -31,6 +31,7 @@ #include #include "./threaded_engine.h" #include "../common/cuda_utils.h" +#include "../operator/custom/custom-inl.h" namespace mxnet { namespace engine { @@ -373,10 +374,12 @@ void ThreadedEngine::DeleteVariable(SyncFn delete_fn, } void ThreadedEngine::WaitForVar(VarHandle var) { + using mxnet::op::custom::CustomOperator; BulkFlush(); ThreadedVar* threaded_var = ThreadedVar::CastFromBase(var); if (threaded_var->ready_to_read()) { ThrowException(threaded_var); + CustomOperator::Get()->ThrowException(); return; } if (engine_info_) { @@ -407,6 +410,7 @@ void ThreadedEngine::WaitForVar(VarHandle var) { } ThrowException(threaded_var); + CustomOperator::Get()->ThrowException(); } void ThreadedEngine::WaitForAll() { diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index c5eaea13661e..30da2db4ed6c 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -48,6 +48,11 @@ namespace mxnet { namespace op { namespace custom { +struct CustomTask { + std::function fn; + mxnet::engine::CallbackOnComplete on_complete; +}; + class CustomOperator { public: void Register(const std::string &op_type, CustomOpPropCreator creator) { @@ -92,7 +97,7 @@ class CustomOperator { return; } std::unique_lock lock(mutex_); - q_.push([=]() mutable { + q_.push({[=]() mutable { bool prev_recording = Imperative::Get()->set_is_recording(recording); bool prev_training = Imperative::Get()->set_is_training(training); @@ -129,7 +134,7 @@ class CustomOperator { }, ctx.run_ctx.ctx, vars, vars2, FnProperty::kNormal, 0, "CustomOperator"); - }); + }, ctx.async_on_complete}); // increase num_threads if there is not enough threads to execute custom operator if (q_.size() > num_free_threads) CreateThreads(q_.size() - num_free_threads); @@ -145,6 +150,7 @@ class CustomOperator { num_free_threads = 0; destructing_ = false; naive_engine_ = true; + exception_ = nullptr; if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) { naive_engine_ = false; } @@ -162,6 +168,14 @@ class CustomOperator { workers_.clear(); } + inline void ThrowException() { + if (exception_ && *exception_) { + std::exception_ptr tmp = *exception_; + exception_ = nullptr; + std::rethrow_exception(tmp); + } + } + private: CustomOperator() { this->Start(); @@ -172,10 +186,16 @@ class CustomOperator { cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); while (!q_.empty()) { --num_free_threads; - auto fn = q_.front(); + auto task = q_.front(); q_.pop(); lock.unlock(); - fn(); + try { + task.fn(); + } catch (dmlc::Error& e) { + exception_ = + std::make_shared(std::current_exception()); + task.on_complete(); + } ++num_free_threads; lock.lock(); } @@ -197,7 +217,8 @@ class CustomOperator { std::condition_variable cv_; std::vector workers_; std::atomic num_free_threads; - std::queue > q_; + std::queue q_; + std::shared_ptr exception_; bool naive_engine_; bool destructing_; }; From 2b1da9e1e053d09855f16f1088d0f30ebf02ba05 Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Sun, 31 Mar 2019 12:18:39 +0000 Subject: [PATCH 2/7] add test --- tests/python/unittest/test_operator.py | 56 ++++++++++++++++---------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/tests/python/unittest/test_operator.py b/tests/python/unittest/test_operator.py index c9498ecb0bd2..4f124dfb930a 100644 --- a/tests/python/unittest/test_operator.py +++ b/tests/python/unittest/test_operator.py @@ -29,6 +29,7 @@ from mxnet.test_utils import * from mxnet.base import py_str, MXNetError, _as_list from common import setup_module, with_seed, teardown, assert_raises_cudnn_not_satisfied, assertRaises +from nose.tools import assert_raises import unittest import os @@ -5200,29 +5201,29 @@ def create_operator(self, ctx, shapes, dtypes): # test custom operator fork # see https://github.com/apache/incubator-mxnet/issues/14396 - if not sys.platform.startswith('win'): # no fork in windows - class AdditionOP(mx.operator.CustomOp): - def __init__(self): - super(AdditionOP, self).__init__() - def forward(self, is_train, req, in_data, out_data, aux): - out_data[0][:] = in_data[0] + in_data[1] - def backward(self, req, out_grad, in_data, out_data, in_grad, aux): - in_grad[0][:] = out_grad[0] - in_grad[1][:] = out_grad[0] - - @mx.operator.register("AdditionOP") - class AdditionOPProp(mx.operator.CustomOpProp): - def __init__(self): - super(AdditionOPProp, self).__init__() - def list_arguments(self): - return ['a', 'b'] - def list_outputs(self): - return ['output'] - def infer_shape(self, in_shape): - return in_shape, [in_shape[0]] - def create_operator(self, ctx, shapes, dtypes): - return AdditionOP() + class AdditionOP(mx.operator.CustomOp): + def __init__(self): + super(AdditionOP, self).__init__() + def forward(self, is_train, req, in_data, out_data, aux): + out_data[0][:] = in_data[0] + in_data[1] + def backward(self, req, out_grad, in_data, out_data, in_grad, aux): + in_grad[0][:] = out_grad[0] + in_grad[1][:] = out_grad[0] + @mx.operator.register("AdditionOP") + class AdditionOPProp(mx.operator.CustomOpProp): + def __init__(self): + super(AdditionOPProp, self).__init__() + def list_arguments(self): + return ['a', 'b'] + def list_outputs(self): + return ['output'] + def infer_shape(self, in_shape): + return in_shape, [in_shape[0]] + def create_operator(self, ctx, shapes, dtypes): + return AdditionOP() + + if not sys.platform.startswith('win'): # no fork in windows def custom_add(): a = mx.nd.array([1, 2, 3]) b = mx.nd.array([4, 5, 6]) @@ -5237,6 +5238,17 @@ def custom_add(): p.join(5) assert not p.is_alive(), "deadlock may exist in custom operator" + # test except handling + # see https://github.com/apache/incubator-mxnet/pull/14575 + def custom_add_exc(): + a = mx.nd.array([1, 2, 3]) + b = mx.nd.array([4, 5]) + c = mx.nd.Custom(a, b, op_type='AdditionOP') + c.wait_to_read() + + assert_raises(MXNetError, custom_add_exc) + + @with_seed() def test_psroipooling(): for num_rois in [1, 2]: From e43f13db7afcc7ad5334ffff4cc0236b4b76184e Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Thu, 4 Apr 2019 16:23:01 +0000 Subject: [PATCH 3/7] simplify catch exception --- src/operator/custom/custom-inl.h | 30 +++++++++++++----------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index 30da2db4ed6c..a9d4afb3821d 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -48,11 +48,6 @@ namespace mxnet { namespace op { namespace custom { -struct CustomTask { - std::function fn; - mxnet::engine::CallbackOnComplete on_complete; -}; - class CustomOperator { public: void Register(const std::string &op_type, CustomOpPropCreator creator) { @@ -97,11 +92,18 @@ class CustomOperator { return; } std::unique_lock lock(mutex_); - q_.push({[=]() mutable { + q_.push([=]() mutable { bool prev_recording = Imperative::Get()->set_is_recording(recording); bool prev_training = Imperative::Get()->set_is_training(training); - func(); + try { + func(); + } catch (dmlc::Error& e) { + exception_ = + std::make_shared(std::current_exception()); + ctx.async_on_complete(); + return; + } Imperative::Get()->set_is_training(prev_training); Imperative::Get()->set_is_recording(prev_recording); @@ -134,7 +136,7 @@ class CustomOperator { }, ctx.run_ctx.ctx, vars, vars2, FnProperty::kNormal, 0, "CustomOperator"); - }, ctx.async_on_complete}); + }); // increase num_threads if there is not enough threads to execute custom operator if (q_.size() > num_free_threads) CreateThreads(q_.size() - num_free_threads); @@ -186,16 +188,10 @@ class CustomOperator { cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); while (!q_.empty()) { --num_free_threads; - auto task = q_.front(); + auto fn = q_.front(); q_.pop(); lock.unlock(); - try { - task.fn(); - } catch (dmlc::Error& e) { - exception_ = - std::make_shared(std::current_exception()); - task.on_complete(); - } + fn(); ++num_free_threads; lock.lock(); } @@ -217,7 +213,7 @@ class CustomOperator { std::condition_variable cv_; std::vector workers_; std::atomic num_free_threads; - std::queue q_; + std::queue > q_; std::shared_ptr exception_; bool naive_engine_; bool destructing_; From 5bce27efdd1e50297c28a603002397c55aaa7884 Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Fri, 5 Apr 2019 00:47:34 +0800 Subject: [PATCH 4/7] add comment --- tests/python/unittest/test_operator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/python/unittest/test_operator.py b/tests/python/unittest/test_operator.py index 4f124dfb930a..65cffc14dee0 100644 --- a/tests/python/unittest/test_operator.py +++ b/tests/python/unittest/test_operator.py @@ -5243,6 +5243,7 @@ def custom_add(): def custom_add_exc(): a = mx.nd.array([1, 2, 3]) b = mx.nd.array([4, 5]) + # trigger exception by providing unmatched operand shapes c = mx.nd.Custom(a, b, op_type='AdditionOP') c.wait_to_read() From 23aee98d1f8bc89eebcb31920ef1f10d36f35e71 Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Fri, 5 Apr 2019 16:23:35 +0000 Subject: [PATCH 5/7] remove custom from engine --- src/engine/threaded_engine.cc | 4 ---- src/operator/custom/custom-inl.h | 9 +++++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index fb8706db4258..b5897a1ca9cd 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -31,7 +31,6 @@ #include #include "./threaded_engine.h" #include "../common/cuda_utils.h" -#include "../operator/custom/custom-inl.h" namespace mxnet { namespace engine { @@ -374,12 +373,10 @@ void ThreadedEngine::DeleteVariable(SyncFn delete_fn, } void ThreadedEngine::WaitForVar(VarHandle var) { - using mxnet::op::custom::CustomOperator; BulkFlush(); ThreadedVar* threaded_var = ThreadedVar::CastFromBase(var); if (threaded_var->ready_to_read()) { ThrowException(threaded_var); - CustomOperator::Get()->ThrowException(); return; } if (engine_info_) { @@ -410,7 +407,6 @@ void ThreadedEngine::WaitForVar(VarHandle var) { } ThrowException(threaded_var); - CustomOperator::Get()->ThrowException(); } void ThreadedEngine::WaitForAll() { diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index a9d4afb3821d..bc6493efceed 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -101,8 +101,6 @@ class CustomOperator { } catch (dmlc::Error& e) { exception_ = std::make_shared(std::current_exception()); - ctx.async_on_complete(); - return; } Imperative::Get()->set_is_training(prev_training); @@ -123,6 +121,13 @@ class CustomOperator { Engine::Get()->PushSync( [=](RunContext rctx) { + try { + ThrowException(); + } catch(dmlc::Error& err) { + ctx.async_on_complete(&err); + return; + } + for (size_t i = 0, out_idx = 0; i < arrs.size(); i++) { if (arrs[i].storage_type() == kDefaultStorage || arrs[i].storage_type() == kUndefinedStorage) From 963be55c9fd68049d8519b828bf7905decce969d Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Sat, 6 Apr 2019 15:37:33 +0000 Subject: [PATCH 6/7] add sync func --- src/operator/custom/custom-inl.h | 71 +++++++++++--------------------- 1 file changed, 25 insertions(+), 46 deletions(-) diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index bc6493efceed..e954f4b89464 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -98,49 +98,38 @@ class CustomOperator { try { func(); - } catch (dmlc::Error& e) { - exception_ = - std::make_shared(std::current_exception()); + + size_t idx = 0; + for (const auto& i : arrs) { + i.WaitToRead(); + if (output_tags.count(tags[idx]) > 0) { + if (i.storage_type() == kDefaultStorage || + i.storage_type() == kUndefinedStorage) + continue; + i.WaitToWrite(); + idx++; + } + } + } catch (dmlc::Error& err) { + Imperative::Get()->set_is_training(prev_training); + Imperative::Get()->set_is_recording(prev_recording); + ctx.async_on_complete(&err); + return; } Imperative::Get()->set_is_training(prev_training); Imperative::Get()->set_is_recording(prev_recording); - std::vector vars, vars2; - size_t idx = 0; - for (const auto& i : arrs) { - vars.push_back(i.var()); - if (output_tags.count(tags[idx]) > 0) { - if (i.storage_type() == kDefaultStorage || - i.storage_type() == kUndefinedStorage) - continue; - vars2.push_back(i.var()); - idx++; + for (size_t i = 0, out_idx = 0; i < arrs.size(); i++) { + if (arrs[i].storage_type() == kDefaultStorage || + arrs[i].storage_type() == kUndefinedStorage) + continue; + if (output_tags.count(tags[i]) > 0) { + outputs[out_idx].SparseUpdateChunk(arrs[i]); + out_idx++; } } - - Engine::Get()->PushSync( - [=](RunContext rctx) { - try { - ThrowException(); - } catch(dmlc::Error& err) { - ctx.async_on_complete(&err); - return; - } - - for (size_t i = 0, out_idx = 0; i < arrs.size(); i++) { - if (arrs[i].storage_type() == kDefaultStorage || - arrs[i].storage_type() == kUndefinedStorage) - continue; - if (output_tags.count(tags[i]) > 0) { - outputs[out_idx].SparseUpdateChunk(arrs[i]); - out_idx++; - } - } - ctx.async_on_complete(); - }, - ctx.run_ctx.ctx, vars, vars2, FnProperty::kNormal, 0, - "CustomOperator"); + ctx.async_on_complete(); }); // increase num_threads if there is not enough threads to execute custom operator if (q_.size() > num_free_threads) @@ -157,7 +146,6 @@ class CustomOperator { num_free_threads = 0; destructing_ = false; naive_engine_ = true; - exception_ = nullptr; if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) { naive_engine_ = false; } @@ -175,14 +163,6 @@ class CustomOperator { workers_.clear(); } - inline void ThrowException() { - if (exception_ && *exception_) { - std::exception_ptr tmp = *exception_; - exception_ = nullptr; - std::rethrow_exception(tmp); - } - } - private: CustomOperator() { this->Start(); @@ -219,7 +199,6 @@ class CustomOperator { std::vector workers_; std::atomic num_free_threads; std::queue > q_; - std::shared_ptr exception_; bool naive_engine_; bool destructing_; }; From 931aca9d8de7331ad5505292b789c354839f7814 Mon Sep 17 00:00:00 2001 From: Wang Jiajun Date: Sat, 13 Apr 2019 07:31:19 +0000 Subject: [PATCH 7/7] unlimited custom thread --- docs/faq/env_var.md | 3 --- src/operator/custom/custom-inl.h | 15 +++++++-------- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/docs/faq/env_var.md b/docs/faq/env_var.md index 2768f644c066..095c214e66b3 100644 --- a/docs/faq/env_var.md +++ b/docs/faq/env_var.md @@ -60,9 +60,6 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0 * MXNET_MP_OPENCV_NUM_THREADS - Values: Int ```(default=0)``` - The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows). -* MXNET_CUSTOM_OP_NUM_THREADS - - Values: Int ```(default=16)``` - - The maximum number of threads given to custom operators. ## Memory Options diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index e954f4b89464..9539402a2d88 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -132,8 +132,8 @@ class CustomOperator { ctx.async_on_complete(); }); // increase num_threads if there is not enough threads to execute custom operator - if (q_.size() > num_free_threads) - CreateThreads(q_.size() - num_free_threads); + if (q_.size() > num_free_threads_) + CreateThreads(q_.size() - num_free_threads_); cv_.notify_all(); } @@ -143,7 +143,7 @@ class CustomOperator { } void Start() { - num_free_threads = 0; + num_free_threads_ = 0; destructing_ = false; naive_engine_ = true; if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) { @@ -172,21 +172,20 @@ class CustomOperator { while (!q_.empty() || !destructing_) { cv_.wait(lock, [&] {return !q_.empty() || destructing_;}); while (!q_.empty()) { - --num_free_threads; + --num_free_threads_; auto fn = q_.front(); q_.pop(); lock.unlock(); fn(); - ++num_free_threads; + ++num_free_threads_; lock.lock(); } } } void SetNumThreads(int num_threads) { - num_threads = std::min(dmlc::GetEnv("MXNET_CUSTOM_OP_NUM_THREADS", 16), num_threads); for (int i = workers_.size(); i < num_threads; ++i) { workers_.emplace_back(std::thread([this]{this->ThreadTarget();})); - ++num_free_threads; + ++num_free_threads_; } } void CreateThreads(int num_new_threads) { @@ -197,7 +196,7 @@ class CustomOperator { // async worker std::condition_variable cv_; std::vector workers_; - std::atomic num_free_threads; + std::atomic num_free_threads_; std::queue > q_; bool naive_engine_; bool destructing_;