Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Support multi-threading for Custom Operator #14363

Merged
merged 3 commits into from
Mar 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/faq/env_var.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
* MXNET_MP_OPENCV_NUM_THREADS
- Values: Int ```(default=0)```
- The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
* MXNET_CUSTOM_OP_NUM_THREADS
- Values: Int ```(default=16)```
- The maximum number of threads given to custom operators.

## Memory Options

Expand Down
51 changes: 34 additions & 17 deletions src/operator/custom/custom-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <mxnet/operator.h>
#include <mxnet/c_api.h>
#include <mxnet/imperative.h>
#include <algorithm>
#include <map>
#include <vector>
#include <string>
Expand Down Expand Up @@ -129,6 +130,9 @@ class CustomOperator {
ctx.run_ctx.ctx, vars, vars2, FnProperty::kNormal, 0,
"CustomOperator");
});
// increase num_threads if there is not enough threads to execute custom operator
if (q_.size() > num_free_threads)
eric-haibin-lin marked this conversation as resolved.
Show resolved Hide resolved
CreateThreads(q_.size() - num_free_threads);
cv_.notify_all();
}

Expand All @@ -139,38 +143,51 @@ class CustomOperator {
destructing_ = true;
cv_.notify_all();
}
worker_.join();
for (auto &worker : workers_)
worker.join();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you evaluated this when the custom op is part of a bigger graph and if there is any performance impact ? Since the CustomOperator is static its lifetime is till the end of the program and destructor of customoperator gets called at the end. This means there is one thread that is waiting on the condition variable while other 11 threads tryign to obtain the lock and in a blocked state. Since these are idle threads, I am not sure if the impact will be significant but good to verify. Will also help us come up with a good default for MXNET_CUSTOM_OP_NUM_THREADS

Copy link
Member Author

@wkcn wkcn Mar 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

Sorry that I do not have any server with multiple GPUs to evaluate a big graph.

In this PR, the number of threads will increase when threads are not enough, and the maximum number is MXNET_CUSTOM_OP_NUM_THREADS.

There are always threads which get the lock and execute the operator function, so I think the idle threads do not drop the performance.

I think the maximum MXNET_CUSTOM_OP_NUM_THREADS is the number of CPU cores.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's fine during op execution but even after custom op execution there is one thread waiting on CV and other 11 trying to acquire lock.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.
I ever thought that decrease the number of threads when threads are idle, however it is difficult to estimate the number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you run the example/reinforcement-learning/dqn which includes a custom op on CPU to check for performance.

Copy link
Member Author

@wkcn wkcn Mar 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be not available to check for performance on only CPU,since there Is only a computational stream usually. I will find a server with multiple GPUs and check on it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this change is mainly for performance improvements on a GPU machine, but we should be careful not to impact cpu performance for inference. Otherwise we should keep the default small.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry that I met the problem when running the RL demo:

(gdb) bt                                                                                                                                    
#0  0x00007ffff7e1ff73 in free () at /usr/lib/libc.so.6                                                                                     
#1  0x00007fffbf2c9ad1 in ALEInterface::welcomeMessage[abi:cxx11]() () at /usr/lib/python2.7/site-packages/ale_python_interface/libale_c.so 
#2  0x00007fffbf2c9d0a in ALEInterface::ALEInterface() () at /usr/lib/python2.7/site-packages/ale_python_interface/libale_c.so              
#3  0x00007fffbf2c7fbb in ALE_new () at /usr/lib/python2.7/site-packages/ale_python_interface/libale_c.so                                   
#4  0x00007ffff6d236d0 in ffi_call_unix64 () at /usr/lib/libffi.so.6                                                                        
#5  0x00007ffff6d230a0 in ffi_call () at /usr/lib/libffi.so.6                                                                               
#6  0x00007ffff6cc9d77 in _ctypes_callproc () at /usr/lib/python2.7/lib-dynload/_ctypes.so                                                 
#7  0x00007ffff6cc37d0 in  () at /usr/lib/python2.7/lib-dynload/_ctypes.so                                                                 
#8  0x00007ffff7c8cb43 in PyObject_Call () at /usr/lib/libpython2.7.so.1.0                                                                  
#9  0x00007ffff7c1c10e in PyEval_EvalFrameEx () at /usr/lib/libpython2.7.so.1.0                                                            
#10 0x00007ffff7c156ba in PyEval_EvalCodeEx () at /usr/lib/libpython2.7.so.1.0             

Copy link
Member Author

@wkcn wkcn Mar 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm checking it. The problem may be related to GPERFTOOLS and JEMALLOC. I close them and plan to rebuild MXNet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my experiment, the performance does not drop in DQN example. The FPS keeps 1500 on my laptop with only CPU i7-7500U(2c4t).

}

static CustomOperator* Get();

private:
CustomOperator() {
CustomOperator() : num_free_threads(0) {
destructing_ = false;
naive_engine_ = true;
if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) {
naive_engine_ = false;
worker_ = std::thread(
[&]() {
std::unique_lock<std::mutex> lock(mutex_);
while (!q_.empty() || !destructing_) {
cv_.wait(lock, [&] {return !q_.empty() || destructing_;});
while (!q_.empty()) {
auto fn = q_.front();
lock.unlock();
fn();
lock.lock();
q_.pop();
}
}
});
}
}
void ThreadTarget() {
std::unique_lock<std::mutex> lock(mutex_);
while (!q_.empty() || !destructing_) {
cv_.wait(lock, [&] {return !q_.empty() || destructing_;});
while (!q_.empty()) {
--num_free_threads;
auto fn = q_.front();
q_.pop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice ! Popping early will prevent the race condition.

lock.unlock();
fn();
++num_free_threads;
lock.lock();
}
}
}
void SetNumThreads(int num_threads) {
num_threads = std::min(dmlc::GetEnv("MXNET_CUSTOM_OP_NUM_THREADS", 16), num_threads);
eric-haibin-lin marked this conversation as resolved.
Show resolved Hide resolved
for (int i = workers_.size(); i < num_threads; ++i) {
workers_.emplace_back(std::thread([this]{this->ThreadTarget();}));
++num_free_threads;
}
}
void CreateThreads(int num_new_threads) {
SetNumThreads(workers_.size() + num_new_threads);
}
std::mutex mutex_;
std::map<std::string, CustomOpPropCreator> registry_;
// async worker
std::condition_variable cv_;
std::thread worker_;
std::vector<std::thread> workers_;
std::atomic<uint32_t> num_free_threads;
std::queue<std::function<void(void)> > q_;
bool naive_engine_;
bool destructing_;
Expand Down