Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Merge pull request #72 from tqchen/master
Browse files Browse the repository at this point in the history
Add a new Policy in ThreadedEngine to do update per device
  • Loading branch information
tqchen committed Sep 14, 2015
2 parents 4c1016f + 294b35e commit 91d420d
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 67 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
- TASK=python CXX=g++
- TASK=python3 CXX=g++
- TASK=python_naive CXX=g++
- TASK=python_perdev CXX=g++
- TASK=cpp_unittest CXX=g++

# dependent apt packages
Expand Down
2 changes: 1 addition & 1 deletion dmlc-core
6 changes: 4 additions & 2 deletions include/mxnet/engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ typedef Opr* OprHandle;
enum class FnProperty {
/*! \brief Normal operation */
kNormal,
/*! \brief Copy operation between CPU and GPU */
kCopy,
/*! \brief Copy operation from GPU to other devices */
kCopyFromGPU,
/*! \brief Copy operation from CPU to other devices */
kCopyToGPU,
/*! \brief Asynchronous function call */
kAsync
}; // enum class FnProperty
Expand Down
12 changes: 10 additions & 2 deletions scripts/travis_script.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,23 @@ if [ ${TASK} == "python3" ]; then
make all || exit -1
export MXNET_ENGINE_TYPE=ThreadedEngine
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "python_naive" ]; then
echo "USE_CUDA=0" >> config.mk
make all || exit -1
export MXNET_ENGINE_TYPE=NaiveEngine
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "python_perdev" ]; then
echo "USE_CUDA=0" >> config.mk
make all || exit -1
export MXNET_ENGINE_TYPE=ThreadedEnginePerDevice
nosetests tests/python/unittest || exit -1
nosetests tests/python/train || exit -1
fi

if [ ${TASK} == "cpp_unittest" ]; then
Expand Down
12 changes: 8 additions & 4 deletions src/engine/engine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ inline Engine* CreateEngine() {
const bool default_engine = (type == nullptr);
if (type == nullptr) type = "ThreadedEngine";
std::string stype = type;

Engine *ret = nullptr;
if (stype == "ThreadedEngine") {
ret = CreateThreadedEngine();
} else if (stype == "NaiveEngine") {
if (stype == "NaiveEngine") {
ret = CreateNaiveEngine();
} else if (stype == "ThreadedEngine") {
ret = CreateThreadedEnginePooled();
} else if (stype == "ThreadedEnginePerDevice") {
ret = CreateThreadedEnginePerDevice();
}

CHECK_NE(ret, nullptr)
<< "Cannot find Eine " << type << " in registry";
<< "Cannot find Engine " << type;
if (!default_engine) {
LOG(INFO) << "MXNet start using engine: " << type;
}
Expand Down
9 changes: 7 additions & 2 deletions src/engine/engine_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,16 @@ inline T* Opr::Cast() {
#endif
}

/*! \brief Maximum number of GPUs */
static constexpr std::size_t kMaxNumGPUs = 16;

// predeclare factory function for each type of engine
/*! \return NaiveEngine instance */
Engine *CreateNaiveEngine();
/*! \return ThreadedEngine instance */
Engine *CreateThreadedEngine();
/*! \return ThreadedEnginePooled instance */
Engine *CreateThreadedEnginePooled();
/*! \return ThreadedEnginePerDevie instance */
Engine *CreateThreadedEnginePerDevice();
} // namespace engine
} // namespace mxnet
#endif // MXNET_ENGINE_ENGINE_IMPL_H_
27 changes: 14 additions & 13 deletions src/engine/stream_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include "../common/cuda_utils.h"

namespace mxnet {

namespace engine {

/*!
Expand Down Expand Up @@ -44,9 +43,9 @@ class StreamManager {
template <std::size_t kNumGpus, std::size_t kStreams>
RunContext StreamManager<kNumGpus, kStreams>::GetRunContext(
Context const& ctx) {
RunContext ret;
switch (ctx.dev_mask) {
case cpu::kDevMask:
return {nullptr};
case cpu::kDevMask: ret.stream = nullptr; break;
case gpu::kDevMask: {
#if MXNET_USE_CUDA
std::size_t use_counter;
Expand All @@ -63,21 +62,22 @@ RunContext StreamManager<kNumGpus, kStreams>::GetRunContext(
use_counter = counter;
counter = (counter + 1) % kStreams;
}
return {gpu_streams_.at(ctx.dev_id).at(use_counter)};
#else // MXNET_USE_CUDA
LOG(FATAL) << "Please compile with CUDA enabled";
ret.stream = gpu_streams_.at(ctx.dev_id).at(use_counter);
break;
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif // MXNET_USE_CUDA
}
}
return {nullptr};
return ret;
}

template <std::size_t kNumGpus, std::size_t kStreams>
RunContext StreamManager<kNumGpus, kStreams>::GetIORunContext(
Context const& ctx) {
RunContext ret;
switch (ctx.dev_mask) {
case cpu::kDevMask:
return {nullptr};
case cpu::kDevMask: ret.stream = nullptr; break;
case gpu::kDevMask: {
#if MXNET_USE_CUDA
CUDA_CALL(cudaSetDevice(ctx.dev_id));
Expand All @@ -87,13 +87,14 @@ RunContext StreamManager<kNumGpus, kStreams>::GetIORunContext(
gpu_io_streams_.at(ctx.dev_id) = mshadow::NewStream<gpu>(false, false);
}
}
return {gpu_io_streams_.at(ctx.dev_id)};
#else // MXNET_USE_CUDA
LOG(FATAL) << "Please compile with CUDA enabled";
ret.stream = gpu_io_streams_.at(ctx.dev_id);
break;
#else
LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR;
#endif // MXNET_USE_CUDA
}
}
return {nullptr};
return ret;
}

template <std::size_t kNumGpus, std::size_t kStreams>
Expand Down
43 changes: 16 additions & 27 deletions src/engine/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

#include <dmlc/base.h>
#include <cstddef>
#include <array>
#include <vector>
#include <thread>
#include <utility>
#include "mxnet/base.h"
Expand All @@ -17,24 +17,30 @@ namespace engine {
/*!
* \brief Thread pool.
*/
template <std::size_t kSize>
class ThreadPool {
public:
/*!
* \brief Constructor takes function to run and its arguments.
* \brief Constructor takes function to run.
* \param size size of the thread pool.
* \param func the function to run on the thread pool.
*/
template <typename Function, typename... Args>
explicit ThreadPool(Function&& func, Args&&... args);
/*!
* \brief Destructor.
*/
~ThreadPool() noexcept(false);
explicit ThreadPool(size_t size, std::function<void()> func)
: worker_threads_(size) {
for (auto& i : worker_threads_) {
i = std::thread(func);
}
}
~ThreadPool() noexcept(false) {
for (auto&& i : worker_threads_) {
i.join();
}
}

private:
/*!
* \brief Worker threads.
*/
std::array<std::thread, kSize> worker_threads_;
std::vector<std::thread> worker_threads_;
/*!
* \brief Disallow default construction.
*/
Expand All @@ -44,23 +50,6 @@ class ThreadPool {
*/
DISALLOW_COPY_AND_ASSIGN(ThreadPool);
};

template <std::size_t kSize>
template <typename Function, typename... Args>
ThreadPool<kSize>::ThreadPool(Function&& func, Args&&... args) {
for (auto&& i : worker_threads_) {
i = std::thread{std::forward<Function>(func), std::forward<Args>(args)...};
}
}

template <std::size_t kSize>
ThreadPool<kSize>::~ThreadPool() noexcept(false) {
for (auto&& i : worker_threads_) {
i.join();
}
}

} // namespace engine
} // namespace mxnet

#endif // MXNET_ENGINE_THREAD_POOL_H_
Loading

0 comments on commit 91d420d

Please sign in to comment.