Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Minor refactor: prevent string copying, shared_ptr by ref (#8872)
Browse files Browse the repository at this point in the history
  • Loading branch information
larroy authored and piiswrong committed Jan 31, 2018
1 parent 134c584 commit 376d47f
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 66 deletions.
12 changes: 6 additions & 6 deletions include/mxnet/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
/*!
* Copyright (c) 2015 by Contributors
* \file base.h
* \brief configuation of mxnet as well as basic data structure.
* \brief configuration of MXNet as well as basic data structure.
*/
#ifndef MXNET_BASE_H_
#define MXNET_BASE_H_
Expand Down Expand Up @@ -243,7 +243,7 @@ struct Context {
* \param str the string pattern
* \return Context
*/
inline static Context FromString(std::string str);
inline static Context FromString(const std::string& str);
};

/*!
Expand Down Expand Up @@ -316,15 +316,15 @@ inline Context Context::GPU(int32_t dev_id) {
return Create(kGPU, dev_id);
}

inline Context Context::FromString(std::string str) {
inline Context Context::FromString(const std::string& str) {
Context ret;
try {
std::string::size_type l = str.find('(');
const std::string::size_type l = str.find('(');
CHECK_NE(l, std::string::npos);
std::string::size_type r = str.find(')');
const std::string::size_type r = str.find(')');
CHECK_EQ(r, str.length()-1);

std::string type = str.substr(0, l);
const std::string type = str.substr(0, l);
int id = std::stoi(str.substr(l+1, r-l-1));
if (type == "cpu") {
ret = CPU(id);
Expand Down
4 changes: 2 additions & 2 deletions src/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ inline void LogStorageFallback(const nnvm::NodeAttrs& attrs,
}

// heuristic to dermine number of threads per GPU
inline int GetNumThreadPerGPU() {
inline int GetNumThreadsPerGPU() {
// This is resource efficient option.
return dmlc::GetEnv("MXNET_GPU_WORKER_NTHREADS", 2);
}
Expand All @@ -449,7 +449,7 @@ inline int GetNumThreadPerGPU() {
inline int GetExecNumMatchColor() {
// This is resource efficient option.
int num_match_color = dmlc::GetEnv("MXNET_EXEC_NUM_TEMP", 1);
return std::min(num_match_color, GetNumThreadPerGPU());
return std::min(num_match_color, GetNumThreadsPerGPU());
}

template<typename T, typename V>
Expand Down
6 changes: 3 additions & 3 deletions src/engine/stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class StreamManager {
RunContext GetIORunContext(Context const& ctx);
void Finalize();
private:
std::mutex m_;
std::mutex mutex_;
#if MXNET_USE_CUDA
std::array<std::array<mshadow::Stream<gpu>*, kStreams>, kNumGpus>
gpu_streams_;
Expand All @@ -74,7 +74,7 @@ RunContext StreamManager<kNumGpus, kStreams>::GetRunContext(
std::size_t use_counter;
CUDA_CALL(cudaSetDevice(ctx.dev_id));
{
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
auto&& counter = gpu_cnt_.at(ctx.dev_id);
if (counter == -1) {
for (auto&& i : gpu_streams_.at(ctx.dev_id)) {
Expand Down Expand Up @@ -109,7 +109,7 @@ RunContext StreamManager<kNumGpus, kStreams>::GetIORunContext(
#if MXNET_USE_CUDA
CUDA_CALL(cudaSetDevice(ctx.dev_id));
{
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
if (gpu_io_streams_.at(ctx.dev_id) == nullptr) {
gpu_io_streams_.at(ctx.dev_id) = mshadow::NewStream<gpu>(false, false, ctx.dev_id);
}
Expand Down
20 changes: 11 additions & 9 deletions src/engine/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class ThreadPool {

/*! \brief Signal event upon destruction, even for exceptions (RAII) */
struct SetReadyOnDestroy {
explicit inline SetReadyOnDestroy(std::shared_ptr<SimpleEvent> *event)
: event_(*event) {
explicit inline SetReadyOnDestroy(const std::shared_ptr<SimpleEvent>& event)
: event_(event) {
}
inline ~SetReadyOnDestroy() {
if (event_) {
Expand All @@ -82,18 +82,20 @@ class ThreadPool {
*/
explicit ThreadPool(size_t size, std::function<void()> func)
: worker_threads_(size) {
CHECK_GT(size, 0);
for (auto& i : worker_threads_) {
i = std::thread(func);
}
}
explicit ThreadPool(size_t size,
std::function<void(std::shared_ptr<SimpleEvent> ready)> func,
std::function<void(const std::shared_ptr<SimpleEvent> ready)> func,
const bool wait)
: worker_threads_(size) {
CHECK_GT(size, 0);
ready_events_.reserve(size);
for (auto& i : worker_threads_) {
std::shared_ptr<SimpleEvent> ptr = std::make_shared<SimpleEvent>();
ready_events_.emplace_back(ptr);
i = std::thread(func, ptr);
ready_events_.emplace_back(std::make_shared<SimpleEvent>());
i = std::thread(func, ready_events_.back());
}
if (wait) {
WaitForReady();
Expand All @@ -110,8 +112,8 @@ class ThreadPool {
* \brief Wait for all started threads to signal that they're ready
*/
void WaitForReady() {
for (std::shared_ptr<SimpleEvent> ptr : ready_events_) {
ptr->wait();
for (const std::shared_ptr<SimpleEvent>& event : ready_events_) {
event->wait();
}
}

Expand All @@ -122,7 +124,7 @@ class ThreadPool {
/*!
* \brief Startup synchronization objects
*/
std::list<std::shared_ptr<SimpleEvent>> ready_events_;
std::vector<std::shared_ptr<SimpleEvent> > ready_events_;
/*!
* \brief Disallow default construction.
*/
Expand Down
24 changes: 12 additions & 12 deletions src/engine/threaded_engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ThreadedVar::ThreadedVar(VersionedVarBlock* head) : head_{head} {
}

inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) {
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
if (pending_write_ == nullptr) {
// invariant: is_ready_to_read()
CHECK_GE(num_pending_reads_, 0);
Expand All @@ -71,7 +71,7 @@ inline void ThreadedVar::AppendReadDependency(OprBlock* opr_block) {

inline void ThreadedVar::AppendWriteDependency(OprBlock* opr_block) {
auto&& new_var_block = VersionedVarBlock::New();
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
// invariant.
assert(head_->next == nullptr);
assert(head_->trigger == nullptr);
Expand Down Expand Up @@ -102,7 +102,7 @@ inline void ThreadedVar::CompleteReadDependency(Dispatcher dispatcher) {
OprBlock *trigger = nullptr;
{
// this is lock scope
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
CHECK_GT(num_pending_reads_, 0);

if (--num_pending_reads_ == 0) {
Expand All @@ -124,7 +124,7 @@ inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
VersionedVarBlock *old_pending_write, *end_of_read_chain;
OprBlock* trigger_write = nullptr;
{
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
// invariants
assert(head_->next == nullptr);
assert(pending_write_ != nullptr);
Expand Down Expand Up @@ -187,12 +187,12 @@ inline bool ThreadedVar::CompleteWriteDependency(Dispatcher dispatcher) {
}

inline void ThreadedVar::SetToDelete() {
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
to_delete_ = true;
}

inline bool ThreadedVar::ready_to_read() {
std::lock_guard<std::mutex> lock{m_};
std::lock_guard<std::mutex> lock{mutex_};
return this->is_ready_to_read();
}

Expand Down Expand Up @@ -228,8 +228,8 @@ void ThreadedEngine::CheckDuplicate(std::vector<VarHandle> const& const_vars,
// Check for duplicates.
auto use = const_vars;
auto mutate = mutable_vars;
auto use_size = use.size();
auto mutate_size = mutate.size();
const size_t use_size = use.size();
const size_t mutate_size = mutate.size();
std::sort(use.begin(), use.end());
std::sort(mutate.begin(), mutate.end());
for (std::size_t i = 0; i < use_size; ++i) {
Expand Down Expand Up @@ -381,7 +381,7 @@ void ThreadedEngine::WaitForVar(VarHandle var) {
std::unique_lock<std::mutex> lock{finished_m_};
finished_cv_.wait(lock, [this, &done]() {
return done.load() || kill_.load();
});
});
}
}

Expand All @@ -403,11 +403,11 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) {
}
// Mark complete for write variables.
for (auto&& i : threaded_opr->mutable_vars) {
bool debug_info = (engine_info_ && debug_wait_var_ == i);
const bool debug_info = (engine_info_ && debug_wait_var_ == i);
if (debug_info) {
LOG(INFO) << "Complete write dep for " << i;
}
bool to_delete = i->CompleteWriteDependency(
const bool to_delete = i->CompleteWriteDependency(
[this, debug_info](OprBlock* opr) {
if (debug_info) {
LOG(INFO) << "PushToExecute " << opr;
Expand All @@ -426,7 +426,7 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) {
// could execute right after we mark all vars as complete, so if
// threaded_opr is not temporary, its value is not reliable
// anymore start from here.
int npending;
int npending = 0;
{
std::unique_lock<std::mutex> lock{finished_m_};
npending = --pending_;
Expand Down
12 changes: 6 additions & 6 deletions src/engine/threaded_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ struct OprBlock : public common::ObjectPoolAllocatable<OprBlock> {
* \return the wait counter after the decreasement.
*/
inline int decr_wait() {
// chack invariant, avoid over trigger
int ret = --wait;
// check invariant, avoid over trigger
const int ret = --wait;
CHECK_GE(ret, 0);
return ret;
}
Expand All @@ -112,8 +112,8 @@ struct VersionedVarBlock
* \brief Variable implementation.
* Each ThreadedVar is a linked list(queue) of operations to be performed.
*/
class ThreadedVar final : public Var,
public common::ObjectPoolAllocatable<ThreadedVar> {
class ThreadedVar final
: public Var, public common::ObjectPoolAllocatable<ThreadedVar> {
public:
/*!
* \brief constructor
Expand Down Expand Up @@ -180,7 +180,7 @@ class ThreadedVar final : public Var,
// TODO(hotpxl) change this to spinlock for faster runtime
// TODO(hotpxl) consider rename head
/*! \brief inetrnal mutex of the ThreadedVar */
std::mutex m_;
std::mutex mutex_;
/*!
* \brief number of pending reads operation in the variable.
* will be marked as -1 when there is a already triggered pending write.
Expand Down Expand Up @@ -446,7 +446,7 @@ class ThreadedEngine : public Engine {
if (!bulk_status.count) return;
bulk_status.count = 0;
DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars);
auto fn = std::move(bulk_status.fn);
SyncFn fn = std::move(bulk_status.fn);
this->PushAsync([fn](RunContext ctx, CallbackOnComplete on_complete) {
fn(ctx);
on_complete();
Expand Down
54 changes: 26 additions & 28 deletions src/engine/threaded_engine_perdevice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine {

void Start() override {
if (is_worker_) return;
gpu_worker_nthreads_ = common::GetNumThreadPerGPU();
gpu_worker_nthreads_ = common::GetNumThreadsPerGPU();
cpu_worker_nthreads_ = dmlc::GetEnv("MXNET_CPU_WORKER_NTHREADS", 1);
// create CPU task
int cpu_priority_nthreads = dmlc::GetEnv("MXNET_CPU_PRIORITY_NTHREADS", 4);
Expand Down Expand Up @@ -118,17 +118,15 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
if (opr_block->opr->prop == FnProperty::kCPUPrioritized) {
cpu_priority_worker_->task_queue.Push(opr_block, opr_block->priority);
} else {
int dev_id = ctx.dev_id;
int nthread = cpu_worker_nthreads_;
auto ptr =
cpu_normal_workers_.Get(dev_id, [this, ctx, nthread]() {
auto blk = new ThreadWorkerBlock<kWorkerQueue>();
blk->pool.reset(new ThreadPool(nthread,
const size_t nthreads = cpu_worker_nthreads_;
auto ptr = cpu_normal_workers_.Get(ctx.dev_id, [this, ctx, nthreads]() {
auto blk = new ThreadWorkerBlock<kWorkerQueue>();
blk->pool.reset(new ThreadPool(nthreads,
[this, ctx, blk](std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
this->CPUWorker(ctx, blk, ready_event);
}, true));
return blk;
});
return blk;
});
if (ptr) {
if (opr_block->opr->prop == FnProperty::kDeleteVar) {
ptr->task_queue.PushFront(opr_block, opr_block->priority);
Expand All @@ -140,24 +138,23 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
} else {
CHECK_EQ(ctx.dev_mask(), Context::kGPU);
// GPU execution.
FnProperty prop = opr_block->opr->prop;
bool is_copy = (prop == FnProperty::kCopyFromGPU ||
prop == FnProperty::kCopyToGPU);
int nthread = gpu_worker_nthreads_;
const FnProperty prop = opr_block->opr->prop;
const bool is_copy = (prop == FnProperty::kCopyFromGPU ||
prop == FnProperty::kCopyToGPU);
const size_t nthread = gpu_worker_nthreads_;
if (is_copy) {
auto ptr =
gpu_copy_workers_.Get(ctx.dev_id, [this, ctx, is_copy, nthread]() {
auto ptr = gpu_copy_workers_.Get(ctx.dev_id, [this, ctx, is_copy, nthread]() {
// Signify to kernel that GPU is being used, so reserve cores as necessary
OpenMP::Get()->set_reserve_cores(GetReserveCoreCount(true));
auto blk = new ThreadWorkerBlock<kCopyQueue>();
blk->pool.reset(new ThreadPool(
nthread,
[this, ctx, is_copy, blk]
(std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
this->GPUWorker(ctx, is_copy, blk, ready_event);
}, true));
return blk;
});
blk->pool.reset(new ThreadPool(
nthread,
[this, ctx, is_copy, blk]
(std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
this->GPUWorker(ctx, is_copy, blk, ready_event);
}, true));
return blk;
});
if (ptr) {
if (opr_block->opr->prop == FnProperty::kDeleteVar) {
ptr->task_queue.PushFront(opr_block, opr_block->priority);
Expand All @@ -177,7 +174,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
this->GPUWorker(ctx, is_copy, blk, ready_event);
}, true));
return blk;
});
});
if (ptr) {
if (opr_block->opr->prop == FnProperty::kDeleteVar) {
ptr->task_queue.PushFront(opr_block, opr_block->priority);
Expand Down Expand Up @@ -207,9 +204,9 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
/*! \brief whether this is a worker thread. */
static MX_THREAD_LOCAL bool is_worker_;
/*! \brief number of concurrent thread cpu worker uses */
int cpu_worker_nthreads_;
size_t cpu_worker_nthreads_;
/*! \brief number of concurrent thread each gpu worker uses */
int gpu_worker_nthreads_;
size_t gpu_worker_nthreads_;
// cpu worker
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_;
// cpu priority worker
Expand All @@ -228,12 +225,13 @@ class ThreadedEnginePerDevice : public ThreadedEngine {
inline void GPUWorker(Context ctx,
bool is_copy_worker,
ThreadWorkerBlock<type> *block,
std::shared_ptr<ThreadPool::SimpleEvent> ready_event) {
const std::shared_ptr<ThreadPool::SimpleEvent>& ready_event) {
this->is_worker_ = true;
#if MXNET_USE_CUDA
CHECK(block != nullptr);
mshadow::Stream<gpu> *stream;
do {
ThreadPool::SimpleEvent::SetReadyOnDestroy setReady(&ready_event);
ThreadPool::SimpleEvent::SetReadyOnDestroy setReady(ready_event);
// allocate stream
mshadow::SetDevice<gpu>(ctx.dev_id);
if (is_copy_worker) {
Expand Down

0 comments on commit 376d47f

Please sign in to comment.