Skip to content

Commit

Permalink
Refactor LibraryInitializer so it's thread safe.
Browse files Browse the repository at this point in the history
  • Loading branch information
larroy committed Aug 6, 2019
1 parent 3112893 commit 34be4c7
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 58 deletions.
10 changes: 8 additions & 2 deletions docs/faq/env_var.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0

## Set the Number of Threads

* MXNET_OMP_MAX_THREADS
- Values: Int ```(default=Number of processors / Number of processors * 2 in X86)```
- Maximum number of threads to use in individual operators through OpenMP. If not set, OMP_NUM_THREADS is considered after.
* MXNET_GPU_WORKER_NTHREADS
- Values: Int ```(default=2)```
- The maximum number of threads to use on each GPU. This parameter is used to parallelize the computation within a single GPU card.
Expand All @@ -47,7 +50,7 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
- The maximum number of concurrent threads that do the memory copy job on each GPU.
* MXNET_CPU_WORKER_NTHREADS
- Values: Int ```(default=1)```
- The maximum number of scheduling threads on CPU. It specifies how many operators can be run in parallel. Note that most CPU operators are parallelized by OpenMP. To change the number of threads used by individual operators, please set `OMP_NUM_THREADS` instead.
- The maximum number of scheduling threads on CPU. It specifies how many operators can be run in parallel. Note that most CPU operators are parallelized by OpenMP. To change the number of threads used by individual operators, please set `MXNET_OMP_MAX_THREADS` instead.
* MXNET_CPU_PRIORITY_NTHREADS
- Values: Int ```(default=4)```
- The number of threads given to prioritized CPU jobs.
Expand All @@ -56,10 +59,13 @@ $env:MXNET_STORAGE_FALLBACK_LOG_VERBOSE=0
- The number of threads used for NNPACK. NNPACK package aims to provide high-performance implementations of some layers for multi-core CPUs. Checkout [NNPACK](http://mxnet.io/faq/nnpack.html) to know more about it.
* MXNET_MP_WORKER_NTHREADS
- Values: Int ```(default=1)```
- The number of scheduling threads on CPU given to multiprocess workers. Enlarge this number allows more operators to run in parallel in individual workers but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
- The number of scheduling threads on CPU given to multiprocess workers (after fork). Enlarge this number allows more operators to run in parallel in individual workers but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
* MXNET_MP_OPENCV_NUM_THREADS
- Values: Int ```(default=0)```
- The number of OpenCV execution threads given to multiprocess workers. OpenCV multithreading is disabled if `MXNET_MP_OPENCV_NUM_THREADS` < 1 (default). Enlarge this number may boost the performance of individual workers when executing underlying OpenCV functions but please consider reducing the overall `num_workers` to avoid thread contention (not available on Windows).
* MXNET_GPU_COPY_NTHREADS
- Values:: Int ```(default=2)```
- Number of threads for copying data from CPU to GPU.

## Memory Options

Expand Down
1 change: 1 addition & 0 deletions src/common/exec_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "../common/utils.h"
#include "../executor/exec_pass.h"


namespace mxnet {
namespace common {

Expand Down
13 changes: 13 additions & 0 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,22 @@
#include "../operator/nn/mkldnn/mkldnn_base-inl.h"
#endif

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
#include <windows.h>
#else
#include <unistd.h>
#include <cstdint>
#endif


namespace mxnet {
namespace common {

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
inline size_t current_process_id() { return ::GetCurrentProcessId(); }
#else
inline size_t current_process_id() { return getpid(); }
#endif
/*!
* \brief IndPtr should be non-negative, in non-decreasing order, start with 0
* and end with value equal with size of indices.
Expand Down
4 changes: 3 additions & 1 deletion src/engine/threaded_engine_perdevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <dmlc/parameter.h>
#include <dmlc/concurrency.h>
#include <dmlc/thread_group.h>
#include <initialize.h>
#include "./threaded_engine.h"
#include "./thread_pool.h"
#include "../common/lazy_alloc_array.h"
Expand Down Expand Up @@ -76,7 +77,8 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
void Start() override {
if (is_worker_) return;
gpu_worker_nthreads_ = common::GetNumThreadsPerGPU();
cpu_worker_nthreads_ = dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1);
// MXNET_CPU_WORKER_NTHREADS
cpu_worker_nthreads_ = LibraryInitializer::Get()->cpu_worker_nthreads_;
gpu_copy_nthreads_ = dmlc::GetEnv("MXNET_GPU_COPY_NTHREADS", 2);
// create CPU task
int cpu_priority_nthreads = dmlc::GetEnv("MXNET_CPU_PRIORITY_NTHREADS", 4);
Expand Down
125 changes: 82 additions & 43 deletions src/initialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* \file initialize.cc
* \brief initialize mxnet library
*/
#include "initialize.h"
#include <signal.h>
#include <dmlc/logging.h>
#include <mxnet/engine.h>
Expand All @@ -30,8 +31,11 @@
#if MXNET_USE_OPENCV
#include <opencv2/opencv.hpp>
#endif // MXNET_USE_OPENCV
#include "common/utils.h"
#include "engine/openmp.h"

namespace mxnet {

#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
static void SegfaultLogger(int sig) {
fprintf(stderr, "\nSegmentation fault: %d\n\n", sig);
Expand All @@ -40,58 +44,93 @@ static void SegfaultLogger(int sig) {
}
#endif

class LibraryInitializer {
public:
LibraryInitializer() {
dmlc::InitLogging("mxnet");
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
struct sigaction sa;
sigaction(SIGSEGV, nullptr, &sa);
if (sa.sa_handler == nullptr) {
signal(SIGSEGV, SegfaultLogger);
}
#endif
// pthread_atfork handlers, delegated to LibraryInitializer members.

// disable openmp for multithreaded workers
#ifndef _WIN32
using op::custom::CustomOperator;
pthread_atfork(
[]() {
CustomOperator::Get()->Stop();
Engine::Get()->Stop();
},
[]() {
Engine::Get()->Start();
CustomOperator::Get()->Start();
},
[]() {
// Conservative thread management for multiprocess workers
const size_t mp_worker_threads = dmlc::GetEnv("MXNET_MP_WORKER_NTHREADS", 1);
dmlc::SetEnv("MXNET_CPU_WORKER_NTHREADS", mp_worker_threads);
dmlc::SetEnv("OMP_NUM_THREADS", 1);
void pthread_atfork_prepare() {
LibraryInitializer* library_initializer = LibraryInitializer::Get();
library_initializer->atfork_prepare();
}

void pthread_atfork_parent() {
LibraryInitializer* library_initializer = LibraryInitializer::Get();
library_initializer->atfork_parent();
}

void pthread_atfork_child() {
LibraryInitializer* library_initializer = LibraryInitializer::Get();
library_initializer->atfork_child();
}

// LibraryInitializer member functions

LibraryInitializer::LibraryInitializer()
: original_pid_(common::current_process_id()),
mp_worker_nthreads_(dmlc::GetEnv("MXNET_MP_WORKER_NTHREADS", 1)),
cpu_worker_nthreads_(dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1)),
mp_cv_num_threads_(dmlc::GetEnv("MXNET_MP_OPENCV_NUM_THREADS", 0))
{
dmlc::InitLogging("mxnet");
engine::OpenMP::Get(); // force OpenMP initialization
install_signal_handlers();
install_pthread_atfork_handlers();
}

bool LibraryInitializer::was_forked() const {
return common::current_process_id() != original_pid_;
}

void LibraryInitializer::atfork_prepare() {
using op::custom::CustomOperator;
CustomOperator::Get()->Stop();
Engine::Get()->Stop();
}

void LibraryInitializer::atfork_parent() {
using op::custom::CustomOperator;
Engine::Get()->Start();
CustomOperator::Get()->Start();
}

void LibraryInitializer::atfork_child() {
using op::custom::CustomOperator;
// Conservative thread management for multiprocess workers
this->cpu_worker_nthreads_ = this->mp_cv_num_threads_;
#if MXNET_USE_OPENCV && !__APPLE__
const size_t mp_cv_num_threads = dmlc::GetEnv("MXNET_MP_OPENCV_NUM_THREADS", 0);
cv::setNumThreads(mp_cv_num_threads); // disable opencv threading
cv::setNumThreads(mp_cv_num_threads_);
#endif // MXNET_USE_OPENCV
engine::OpenMP::Get()->set_enabled(false);
Engine::Get()->Start();
CustomOperator::Get()->Start();
});
#endif
}
engine::OpenMP::Get()->set_thread_max(1);
engine::OpenMP::Get()->set_enabled(false);
Engine::Get()->Start();
CustomOperator::Get()->Start();
}

static LibraryInitializer* Get();
};
void LibraryInitializer::install_pthread_atfork_handlers() {
#ifndef _WIN32
pthread_atfork(pthread_atfork_prepare, pthread_atfork_parent, pthread_atfork_child);
#endif
}

LibraryInitializer* LibraryInitializer::Get() {
static LibraryInitializer inst;
return &inst;
void LibraryInitializer::install_signal_handlers() {
#if MXNET_USE_SIGNAL_HANDLER && DMLC_LOG_STACK_TRACE
struct sigaction sa;
sigaction(SIGSEGV, nullptr, &sa);
if (sa.sa_handler == nullptr) {
signal(SIGSEGV, SegfaultLogger);
}
#endif
}

/**
* Perform static initialization
*/
#ifdef __GNUC__
// Don't print an unused variable message since this is intentional
// In GCC we use constructor to perform initialization before any static initializer is able to run
__attribute__((constructor)) static void LibraryInitializerEntry() {
#pragma GCC diagnostic ignored "-Wunused-variable"
volatile LibraryInitializer* library_init = LibraryInitializer::Get();
}
#else
static LibraryInitializer* __library_init = LibraryInitializer::Get();
#endif

static LibraryInitializer* __library_init = LibraryInitializer::Get();
} // namespace mxnet
92 changes: 92 additions & 0 deletions src/initialize.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*!
* Copyright (c) 2019 by Contributors
* \file initialize.h
* \brief Library initialization
*/

#include <cstdlib>

#ifndef MXNET_INITIALIZE_H_
#define MXNET_INITIALIZE_H_

namespace mxnet {

void pthread_atfork_prepare();
void pthread_atfork_parent();
void pthread_atfork_child();

/**
* Perform library initialization and control multiprocessing behaviour.
*/
class LibraryInitializer {
public:
static LibraryInitializer* Get() {
static LibraryInitializer inst;
return &inst;
}

/**
* Library initialization. Called on library loading via constructor attributes or
* C++ static initialization.
*/
LibraryInitializer();

/**
* @return true if the current pid doesn't match the one that initialized the library
*/
bool was_forked() const;

/**
* Original pid of the process which first loaded and initialized the library
*/
size_t original_pid_;
size_t mp_worker_nthreads_;
size_t cpu_worker_nthreads_;
size_t omp_num_threads_;
size_t mp_cv_num_threads_;

// Actual code for the atfork handlers as member functions.

void atfork_prepare();
void atfork_parent();
void atfork_child();

private:
/**
* Pthread atfork handlers are used to reset the concurrency state of modules like CustomOperator
* and Engine when forking. When forking only the thread that forks is kept alive and memory is
* copied to the new process so state is inconsistent. This call install the handlers.
* Has no effect on Windows.
*
* https://pubs.opengroup.org/onlinepubs/009695399/functions/pthread_atfork.html
*/
void install_pthread_atfork_handlers();

/**
* Install signal handlers (UNIX). Has no effect on Windows.
*/
void install_signal_handlers();
};


} // namespace mxnet
#endif // MXNET_INITIALIZE_H_
15 changes: 3 additions & 12 deletions src/profiler/profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,13 @@
#include "./vtune.h"
#include "./aggregate_stats.h"
#include "./nvtx.h"
#include "../common/utils.h"

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
#include <windows.h>
#else
#include <unistd.h>
#include <cstdint>
#endif

namespace mxnet {
namespace profiler {

#if defined(_WIN32) || defined(_WIN64) || defined(__WINDOWS__)
inline size_t current_process_id() { return ::GetCurrentProcessId(); }
#else
inline size_t current_process_id() { return getpid(); }
#endif


/*!
* \brief Constant-sized character array class with simple string API to avoid allocations
Expand Down Expand Up @@ -132,7 +123,7 @@ struct ProfileStat {
bool enable_aggregate_ = true;

/* !\brief Process id */
size_t process_id_ = current_process_id();
size_t process_id_ = common::current_process_id();

/*! \brief id of thread which operation run on.
*
Expand Down

0 comments on commit 34be4c7

Please sign in to comment.