Skip to content

Commit

Permalink
Refactor LibraryInitializer so it's thread safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
larroy committed Aug 8, 2019
1 parent b77f524 commit b2ae2d2
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 108 deletions.
10 changes: 8 additions & 2 deletions docs/faq/env_var.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0

## Set the Number of Threads

* MXNET_OMP_MAX_THREADS
- Values: Int ```(default=Number of processors / Number of processors * 2 in X86)```
- Maximum number of threads to use in individual operators through OpenMP. If not set, OMP_NUM_THREADS is considered after.
* MXNET_GPU_WORKER_NTHREADS
- Values: Int ```(default=2)```
- The maximum number of threads to use on each GPU. This parameter is used to parallelize the computation within a single GPU card.
Expand All @@ -47,7 +50,7 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
- The maximum number of concurrent threads that do the memory copy job on each GPU.
* MXNET_CPU_WORKER_NTHREADS
- Values: Int ```(default=1)```
- The maximum number of scheduling threads on CPU. It specifies how many operators can be run in parallel. Note that most CPU operators are parallelized by OpenMP. To change the number of threads used by individual operators, please set `OMP_NUM_THREADS` instead.
- The maximum number of scheduling threads on CPU. It specifies how many operators can be run in parallel. Note that most CPU operators are parallelized by OpenMP. To change the number of threads used by individual operators, please set `MXNET_OMP_MAX_THREADS` instead.
* MXNET_CPU_PRIORITY_NTHREADS
- Values: Int ```(default=4)```
- The number of threads given to prioritized CPU jobs.
Expand All @@ -56,10 +59,13 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
- The number of threads used for NNPACK. NNPACK package aims to provide high-performance implementations of some layers for multi-core CPUs. Checkout [NNPACK](http://mxnet.io/faq/nnpack.html) to know more about it.
* MXNET_MP_WORKER_NTHREADS
- Values: Int ```(default=1)```
- The number of scheduling threads on CPU given to multiprocess workers. Enlarge this number allows more operators to run in parallel in individual workers but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
- The number of scheduling threads on CPU given to multiprocess workers (after fork). Enlarge this number allows more operators to run in parallel in individual workers but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
* MXNET_MP_OPENCV_NUM_THREADS
- Values: Int ```(default=0)```
- The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
* MXNET_GPU_COPY_NTHREADS
- Values:: Int ```(default=2)```
- Number of threads for copying data from CPU to GPU.

## Memory Options

Expand Down
27 changes: 0 additions & 27 deletions src/common/library.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,33 +24,6 @@
* and accessing its functions
*/

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
#include <windows.h>
#else
#include <dlfcn.h>
#endif

#include <string>
#include "library.h"

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
/*!
* \brief Retrieve the system error message for the last-error code
* \param err string that gets the error message
*/
void win_err(char **err) {
uint32_t dw = GetLastError();
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
dw,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
reinterpret_cast<char*>(err),
0, NULL);
}
#endif


/*!
Expand Down
19 changes: 1 addition & 18 deletions src/common/library.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,11 @@
#include <string>
#include "dmlc/io.h"

// map of libraries loaded
static std::map<std::string, void*> loaded_libs;

void* load_lib(const char* path);
void close_lib(void* handle);
void get_sym(void* handle, void** func, char* name);

/*!
* \brief a templated function that fetches from the library
* a function pointer of any given datatype and name
* \param T a template parameter for data type of function pointer
* \param lib library handle
* \param func_name function name to search for in the library
* \return func a function pointer
*/
template<typename T>
T get_func(void *lib, char *func_name) {
T func;
get_sym(lib, reinterpret_cast<void**>(&func), func_name);
if (!func)
LOG(FATAL) << "Unable to get function '" << func_name << "' from library";
return func;
}


#endif // MXNET_COMMON_LIBRARY_H_
13 changes: 13 additions & 0 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,22 @@
#include "../operator/nn/mkldnn/mkldnn_base-inl.h"
#endif

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
#include <windows.h>
#else
#include <unistd.h>
#include <cstdint>
#endif


namespace mxnet {
namespace common {

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
inline size_t current_process_id() { return ::GetCurrentProcessId(); }
#else
inline size_t current_process_id() { return getpid(); }
#endif
/*!
* \brief IndPtr should be non-negative, in non-decreasing order, start with 0
* and end with value equal with size of indices.
Expand Down
4 changes: 3 additions & 1 deletion src/engine/threaded_engine_perdevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <dmlc/parameter.h>
#include <dmlc/concurrency.h>
#include <dmlc/thread_group.h>
#include <initialize.h>
#include "./threaded_engine.h"
#include "./thread_pool.h"
#include "../common/lazy_alloc_array.h"
Expand Down Expand Up @@ -76,7 +77,8 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
void Start() override {
if (is_worker_) return;
gpu_worker_nthreads_ = common::GetNumThreadsPerGPU();
cpu_worker_nthreads_ = dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1);
// MXNET_CPU_WORKER_NTHREADS
cpu_worker_nthreads_ = LibraryInitializer::Get()->cpu_worker_nthreads_;
gpu_copy_nthreads_ = dmlc::GetEnv("MXNET_GPU_COPY_NTHREADS", 2);
// create CPU task
int cpu_priority_nthreads = dmlc::GetEnv("MXNET_CPU_PRIORITY_NTHREADS", 4);
Expand Down
169 changes: 121 additions & 48 deletions src/initialize.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -22,6 +22,7 @@
* \file initialize.cc
* \brief initialize mxnet library
*/
#include "initialize.h"
#include <signal.h>
#include <dmlc/logging.h>
#include <mxnet/engine.h>
Expand All @@ -31,8 +32,37 @@
#if MXNET_USE_OPENCV
#include <opencv2/opencv.hpp>
#endif // MXNET_USE_OPENCV
#include "common/utils.h"
#include "engine/openmp.h"

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
#include <windows.h>
#else
#include <dlfcn.h>
#endif

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
/*!
* \brief Retrieve the system error message for the last-error code
* \param err string that gets the error message
*/
void win_err(char **err) {
uint32_t dw = GetLastError();
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM |
FORMAT_MESSAGE_IGNORE_INSERTS,
NULL,
dw,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
reinterpret_cast<char*>(err),
0, NULL);
}
#endif


namespace mxnet {

#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
static void SegfaultLogger(int sig) {
fprintf(stderr, "\nSegmentation fault: %d\n\n", sig);
Expand All @@ -41,65 +71,108 @@ static void SegfaultLogger(int sig) {
}
#endif

class LibraryInitializer {
public:
LibraryInitializer() {
dmlc::InitLogging("mxnet");
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
struct sigaction sa;
sigaction(SIGSEGV, nullptr, &sa);
if (sa.sa_handler == nullptr) {
signal(SIGSEGV, SegfaultLogger);
}
#endif
// pthread_atfork handlers, delegated to LibraryInitializer members.

// disable openmp for multithreaded workers
#ifndef _WIN32
using op::custom::CustomOperator;
pthread_atfork(
[]() {
CustomOperator::Get()->Stop();
Engine::Get()->Stop();
},
[]() {
Engine::Get()->Start();
CustomOperator::Get()->Start();
},
[]() {
// Conservative thread management for multiprocess workers
const size_t mp_worker_threads = dmlc::GetEnv("MXNET_MP_WORKER_NTHREADS", 1);
dmlc::SetEnv("MXNET_CPU_WORKER_NTHREADS", mp_worker_threads);
dmlc::SetEnv("OMP_NUM_THREADS", 1);
void pthread_atfork_prepare() {
LibraryInitializer* library_initializer = LibraryInitializer::Get();
library_initializer->atfork_prepare();
}

void pthread_atfork_parent() {
LibraryInitializer* library_initializer = LibraryInitializer::Get();
library_initializer->atfork_parent();
}

void pthread_atfork_child() {
LibraryInitializer* library_initializer = LibraryInitializer::Get();
library_initializer->atfork_child();
}

// LibraryInitializer member functions

LibraryInitializer::LibraryInitializer()
: original_pid_(common::current_process_id()),
mp_worker_nthreads_(dmlc::GetEnv("MXNET_MP_WORKER_NTHREADS", 1)),
cpu_worker_nthreads_(dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1)),
mp_cv_num_threads_(dmlc::GetEnv("MXNET_MP_OPENCV_NUM_THREADS", 0))
{
dmlc::InitLogging("mxnet");
engine::OpenMP::Get(); // force OpenMP initialization
install_signal_handlers();
install_pthread_atfork_handlers();
}

LibraryInitializer::~LibraryInitializer() {
close_open_libs();
}

bool LibraryInitializer::was_forked() const {
return common::current_process_id() != original_pid_;
}

void LibraryInitializer::atfork_prepare() {
using op::custom::CustomOperator;
CustomOperator::Get()->Stop();
Engine::Get()->Stop();
}

void LibraryInitializer::atfork_parent() {
using op::custom::CustomOperator;
Engine::Get()->Start();
CustomOperator::Get()->Start();
}

void LibraryInitializer::atfork_child() {
using op::custom::CustomOperator;
// Conservative thread management for multiprocess workers
this->cpu_worker_nthreads_ = this->mp_cv_num_threads_;
#if MXNET_USE_OPENCV && !__APPLE__
const size_t mp_cv_num_threads = dmlc::GetEnv("MXNET_MP_OPENCV_NUM_THREADS", 0);
cv::setNumThreads(mp_cv_num_threads); // disable opencv threading
cv::setNumThreads(mp_cv_num_threads_);
#endif // MXNET_USE_OPENCV
engine::OpenMP::Get()->set_enabled(false);
Engine::Get()->Start();
CustomOperator::Get()->Start();
});
engine::OpenMP::Get()->set_thread_max(1);
engine::OpenMP::Get()->set_enabled(false);
Engine::Get()->Start();
CustomOperator::Get()->Start();
}


void LibraryInitializer::install_pthread_atfork_handlers() {
#ifndef _WIN32
pthread_atfork(pthread_atfork_prepare, pthread_atfork_parent, pthread_atfork_child);
#endif
}
}

~LibraryInitializer() {
// close opened libraries
for (auto const& lib : loaded_libs) {
close_lib(lib.second);
}
void LibraryInitializer::install_signal_handlers() {
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
struct sigaction sa;
sigaction(SIGSEGV, nullptr, &sa);
if (sa.sa_handler == nullptr) {
signal(SIGSEGV, SegfaultLogger);
}
#endif
}

static LibraryInitializer* Get();
};
void LibraryInitializer::close_open_libs() {
for (auto const& lib : loaded_libs) {
close_lib(lib.second);
}
}

LibraryInitializer* LibraryInitializer::Get() {
static LibraryInitializer inst;
return &inst;
void LibraryInitializer::dynlib_defer_close(const std::string &path, void *handle) {
loaded_libs.emplace(path, handle);
}

/**
* Perform static initialization
*/
#ifdef __GNUC__
// Don't print an unused variable message since this is intentional
// In GCC we use constructor to perform initialization before any static initializer is able to run
__attribute__((constructor)) static void LibraryInitializerEntry() {
#pragma GCC diagnostic ignored "-Wunused-variable"
volatile LibraryInitializer* library_init = LibraryInitializer::Get();
}
#else
static LibraryInitializer* __library_init = LibraryInitializer::Get();
#endif

static LibraryInitializer* __library_init = LibraryInitializer::Get();
} // namespace mxnet
Loading

0 comments on commit b2ae2d2

Please sign in to comment.