diff --git a/include/mxnet/base.h b/include/mxnet/base.h index e610e2c381ad..ec6fa3257697 100644 --- a/include/mxnet/base.h +++ b/include/mxnet/base.h @@ -348,10 +348,10 @@ struct RunContext { * \brief the auxiliary stream of the device, can be nullptr or Stream* in GPU mode */ void *aux_stream; - /*! - * \brief indicator of whether this execution is run in bulk mode + /*! + * \brief pointer to the cuda event pool used by the dependecy engine */ - bool is_bulk; + void *event_pool = nullptr; /*! * \brief get mshadow stream from Context * \return the mshadow stream diff --git a/include/mxnet/c_api.h b/include/mxnet/c_api.h index 335ab90b3cb1..d4d0536a40b4 100644 --- a/include/mxnet/c_api.h +++ b/include/mxnet/c_api.h @@ -111,7 +111,7 @@ typedef const void *EngineFnPropertyHandle; typedef void *EngineVarHandle; /*! \brief Engine asynchronous operation */ -typedef void (*EngineAsyncFunc)(void*, void*, void*); +typedef void (*EngineAsyncFunc)(void*, void*, void*, void*); /*! \brief Engine synchronous operation */ typedef void (*EngineSyncFunc)(void*, void*); /*! \brief Callback to free the param for EngineAsyncFunc/EngineSyncFunc */ diff --git a/include/mxnet/engine.h b/include/mxnet/engine.h index 9d6367509f79..080ead54354a 100644 --- a/include/mxnet/engine.h +++ b/include/mxnet/engine.h @@ -30,6 +30,7 @@ #include #include #endif +#include #include #include "./base.h" @@ -40,6 +41,72 @@ class Engine; /*! \brief namespace of engine internal types. */ namespace engine { +#if MXNET_USE_CUDA +/* \brief The class wrapping CUDA event with timing disabled. */ +class CUDAEvent final { + public: + explicit CUDAEvent(Context const& ctx); + +CUDAEvent(CUDAEvent&& other) + : event_(other.event_), dev_id_(other.dev_id_) { + other.event_ = nullptr; + } + + CUDAEvent(const CUDAEvent& other) = delete; + void operator=(const CUDAEvent& other) = delete; + + ~CUDAEvent(); + + inline std::weak_ptr GetEvent() noexcept { + return event_; + } + private: + std::shared_ptr event_; + int dev_id_; +}; + +class CUDAEventPool final { + public: + explicit CUDAEventPool(Context const& ctx) : counter_(0) { + for (size_t i = 0; i < kPoolSize; ++i) { + events_.emplace_back(ctx); + } + } + + inline std::weak_ptr GetEvent(size_t i) noexcept { + return events_.at(i).GetEvent(); + } + + inline std::pair, uint64_t> GetNextEvent() noexcept { + int c = counter_++; + return {events_.at((c) % kPoolSize).GetEvent(), c}; + } + + inline uint64_t GetCounterValue() noexcept { + return counter_.load(); + } + private: + static constexpr size_t kPoolSize = 64; + std::vector events_; + std::atomic counter_; +}; + +/*! \brief full event info for the sync object.*/ +struct EventInfo { + std::weak_ptr event; + cudaStream_t stream; + uint64_t pool_index; +}; +/*! \brief struct containing cuda events and variables needed for the dependencies.*/ +struct SyncObject { + // vector can carry multiple reader events + std::vector reader_events; + // vector should carry only 1 writer event + std::vector writer_event; + std::mutex mutex; +}; +#endif + /*! \brief base class of engine variables.*/ struct Var { virtual size_t version() { @@ -58,6 +125,12 @@ struct Var { * is modified, the version number is incremented by 1. */ size_t version_{0}; +#if MXNET_USE_CUDA + /*! + * \brief struct containing cuda events and variables needed for the dependencies. + */ + SyncObject sync_object; +#endif }; // struct Var /*! \brief Internal representation of operator. */ @@ -66,6 +139,29 @@ struct Opr; typedef Var* VarHandle; /*! \brief Operator pointer type, usually hold by user.*/ typedef Opr* OprHandle; +/*! + * \brief OnStart callback to the engine, + * called by AsyncFn before the action + */ +class CallbackOnStart { + public: + // use implicit copy and assign + /*! \brief involve the callback */ + inline void operator()(const dmlc::Error* error = nullptr) const { + if (callback_ != nullptr) + (*callback_)(engine_, param_, error); + } + + private: + /*! \brief engine can see content of callback */ + friend class ::mxnet::Engine; + /*! \brief the real callback */ + void (*callback_)(Engine *, void *, const dmlc::Error *); + /*! \brief the engine class passed to callback */ + Engine* engine_; + /*! \brief the parameter set on callback */ + void* param_; +}; /*! * \brief OnComplete Callback to the engine, * called by AsyncFn when action completes @@ -116,12 +212,14 @@ enum class FnProperty { */ class MXNET_API Engine { public: + /*! \brief on start*/ + typedef engine::CallbackOnStart CallbackOnStart; /*! \brief callback on complete*/ typedef engine::CallbackOnComplete CallbackOnComplete; /*! \brief Synchronous operation to pass to engine. */ typedef std::function SyncFn; /*! \brief Asynchronous operation to pass to engine. */ - typedef std::function AsyncFn; + typedef std::function AsyncFn; /*! \brief Variable pointer */ typedef engine::VarHandle VarHandle; /*! \brief Operator pointer */ @@ -248,7 +346,7 @@ class MXNET_API Engine { * * \return A shared pointer to Engine singleton. */ - static std::shared_ptr _GetSharedRef(); + static const std::shared_ptr &_GetSharedRef(); /*! * \brief Push an synchronous operation to the engine. * \param exec_fn Execution function that executes the operation. @@ -267,12 +365,29 @@ class MXNET_API Engine { FnProperty prop = FnProperty::kNormal, int priority = 0, const char* opr_name = nullptr) { - this->PushAsync([exec_fn](RunContext ctx, CallbackOnComplete on_complete) { + this->PushAsync([exec_fn](RunContext ctx, + CallbackOnStart on_start, + CallbackOnComplete on_complete) { + on_start(); exec_fn(ctx); on_complete(); }, exec_ctx, const_vars, mutable_vars, prop, priority, opr_name); } + /*! + * \brief factory function to create OnStart callback. + * \param callback th static callback function. + * \param param the paramter passed to callback. + */ + inline CallbackOnStart CreateOnStart( + void (*callback)(Engine *, void *, const dmlc::Error *), void *param) { + CallbackOnStart ret; + ret.callback_ = callback; + ret.engine_ = this; + ret.param_ = param; + return ret; + } + /*! * \brief factory function to create OnComplete callback. * \param callback th static callback function. diff --git a/include/mxnet/storage.h b/include/mxnet/storage.h index 0e90d17d6747..211b43d85e30 100644 --- a/include/mxnet/storage.h +++ b/include/mxnet/storage.h @@ -27,6 +27,7 @@ #include #include +#include #include "./base.h" namespace mxnet { @@ -39,6 +40,17 @@ namespace mxnet { */ class Storage { public: + /*! + * \brief Storage sync object. + */ + struct SyncObj { +#if MXNET_USE_CUDA + /*! + * \brief All the events from the engine variable. + */ + std::vector> events; +#endif + }; /*! * \brief Storage handle. */ @@ -65,6 +77,11 @@ class Storage { */ std::string profiler_scope{MXNET_STORAGE_DEFAULT_PROFILER_SCOPE_CSTR}; std::string name{MXNET_STORAGE_DEFAULT_NAME_CSTR}; + /*! + * \brief Used to pass events back and forth between the engine Var + * and the storage manager. + */ + SyncObj sync_obj; }; /*! * \brief Allocate a new contiguous memory for a given size. @@ -138,7 +155,7 @@ class Storage { * * \return A shared pointer to Storage singleton. */ - static std::shared_ptr _GetSharedRef(); + static const std::shared_ptr &_GetSharedRef(); private: std::mutex cpu_mutex_; diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc index e35f0f4a351a..db0489f44695 100644 --- a/src/c_api/c_api.cc +++ b/src/c_api/c_api.cc @@ -3454,6 +3454,7 @@ int MXNDArrayCreateFromSharedMem(int shared_pid, int shared_id, const int *shape } using VarHandle = Engine::VarHandle; +using CallbackOnStart = Engine::CallbackOnStart; using CallbackOnComplete = Engine::CallbackOnComplete; void AssertValidNumberVars(int num_const_vars, int num_mutable_vars) { @@ -3480,16 +3481,18 @@ int MXEnginePushAsync(EngineAsyncFunc async_func, void* func_param, Engine::AsyncFn exec_fn; if (deleter == nullptr) { exec_fn = [async_func, func_param](RunContext rctx, + CallbackOnStart on_start, CallbackOnComplete on_complete) { - async_func(&rctx, &on_complete, func_param); + async_func(&rctx, &on_start, &on_complete, func_param); }; } else { // Wrap func_param in a shared_ptr with deleter such that deleter // will be called when the lambda goes out of scope. std::shared_ptr shared_func_param(func_param, deleter); exec_fn = [async_func, shared_func_param](RunContext rctx, + CallbackOnStart on_start, CallbackOnComplete on_complete) { - async_func(&rctx, &on_complete, shared_func_param.get()); + async_func(&rctx, &on_start, &on_complete, shared_func_param.get()); }; } diff --git a/src/common/object_pool.h b/src/common/object_pool.h index f0a651182431..70e84db36129 100644 --- a/src/common/object_pool.h +++ b/src/common/object_pool.h @@ -64,7 +64,7 @@ class ObjectPool { * \brief Get a shared ptr of the singleton instance of pool. * \return Shared pointer to the Object Pool. */ - static std::shared_ptr _GetSharedRef(); + static const std::shared_ptr &_GetSharedRef(); private: /*! @@ -173,7 +173,7 @@ ObjectPool* ObjectPool::Get() { } template -std::shared_ptr > ObjectPool::_GetSharedRef() { +const std::shared_ptr > &ObjectPool::_GetSharedRef() { static std::shared_ptr > inst_ptr(new ObjectPool()); return inst_ptr; } diff --git a/src/engine/engine.cc b/src/engine/engine.cc index a33f0b2c1442..a208a81b3545 100644 --- a/src/engine/engine.cc +++ b/src/engine/engine.cc @@ -26,6 +26,7 @@ #include #include #include "./engine_impl.h" +#include "../common/cuda/utils.h" namespace mxnet { namespace engine { @@ -56,9 +57,27 @@ inline Engine* CreateEngine() { } return ret; } + +#if MXNET_USE_CUDA +CUDAEvent::CUDAEvent(Context const& ctx) : + event_(std::make_shared()), dev_id_(ctx.dev_id) { + cudaEvent_t ev; + common::cuda::DeviceStore device_store(dev_id_); + CUDA_CALL(cudaEventCreateWithFlags(&ev, cudaEventDisableTiming)); + *event_ = ev; +} + +CUDAEvent::~CUDAEvent() { + if (event_ && *event_ != nullptr) { + common::cuda::DeviceStore device_store(dev_id_); + CUDA_CALL(cudaEventSynchronize(*event_)); + CUDA_CALL(cudaEventDestroy(*event_)); + } +} +#endif } // namespace engine -std::shared_ptr Engine::_GetSharedRef() { +const std::shared_ptr &Engine::_GetSharedRef() { static std::shared_ptr sptr(engine::CreateEngine()); return sptr; } diff --git a/src/engine/naive_engine.cc b/src/engine/naive_engine.cc index 95528a934c6c..043998532229 100644 --- a/src/engine/naive_engine.cc +++ b/src/engine/naive_engine.cc @@ -122,7 +122,8 @@ class NaiveEngine final : public Engine { profiler::Profiler *profiler = profiler::Profiler::Get(); NaiveOpr *opr = op->Cast(); opr->profiling = profiling && profiler->IsProfiling(profiler::Profiler::kSymbolic); - this->PushAsync([&](RunContext ctx, CallbackOnComplete on_complete) { + this->PushAsync([&](RunContext ctx, CallbackOnStart on_start, + CallbackOnComplete on_complete) { if (opr->profiling) { std::unique_ptr attrs; if (profiler->AggregateEnabled()) { @@ -132,7 +133,7 @@ class NaiveEngine final : public Engine { attrs.release()); opr->opr_profile->startForDevice(exec_ctx.dev_type, exec_ctx.dev_id); } - opr->fn(ctx, on_complete); + opr->fn(ctx, on_start, on_complete); if (opr->profiling) { opr->opr_profile->stop(); } @@ -159,6 +160,8 @@ class NaiveEngine final : public Engine { bool wait = false) override { std::promise promise; std::future future = promise.get_future(); + CallbackOnStart on_start = CreateOnStart( + NaiveEngine::OnStart, &promise); CallbackOnComplete callback = CreateCallback( NaiveEngine::OnComplete, &promise); profiler::Profiler *profiler = profiler::Profiler::Get(); @@ -196,12 +199,14 @@ class NaiveEngine final : public Engine { streams_[dev_id] = mshadow::NewStream(true, MXNET_USE_CUDNN != 0, dev_id); aux_streams_[dev_id] = new GPUAuxStream(streams_[dev_id]); } - exec_fun(RunContext{exec_ctx, streams_[dev_id], aux_streams_[dev_id], false}, callback); + exec_fun(RunContext{exec_ctx, streams_[dev_id], aux_streams_[dev_id]}, + on_start, + callback); #else LOG(FATAL) << "GPU is not enabled"; #endif } else { - exec_fun(RunContext{exec_ctx, &cpu_stream_, nullptr, false}, callback); + exec_fun(RunContext{exec_ctx, &cpu_stream_, nullptr}, on_start, callback); } future.wait(); // increment mutable var version @@ -215,7 +220,10 @@ class NaiveEngine final : public Engine { void DeleteVariable(SyncFn delete_fn, Context exec_ctx, VarHandle var) override { NaiveVar* naive_var = NaiveVar::CastFromBase(var); - this->PushAsync([delete_fn, naive_var](RunContext ctx, CallbackOnComplete on_complete) mutable { + this->PushAsync([delete_fn, naive_var](RunContext ctx, + CallbackOnStart on_start, + CallbackOnComplete on_complete) mutable { + on_start(); delete_fn(ctx); NaiveVar::Delete(naive_var); on_complete(); @@ -236,6 +244,10 @@ class NaiveEngine final : public Engine { } private: + // onstart + static void OnStart(Engine *engine, void *param, + const dmlc::Error* error) { + } // callback to oncomplete static void OnComplete(Engine *engine, void *param, const dmlc::Error* error) { diff --git a/src/engine/stream_manager.h b/src/engine/stream_manager.h index da1e4bc436ab..22f98a9d3d4a 100644 --- a/src/engine/stream_manager.h +++ b/src/engine/stream_manager.h @@ -28,7 +28,9 @@ #include #include #include +#include #include +#include "./engine_impl.h" #include "../common/cuda/utils.h" namespace mxnet { @@ -59,6 +61,7 @@ class StreamManager { gpu_aux_streams_; std::array*, kNumGpus> gpu_io_streams_; std::array gpu_cnt_; + std::array, kNumGpus> event_pools_; #endif // MXNET_USE_CUDA DISALLOW_COPY_AND_ASSIGN(StreamManager); }; // class StreamManager @@ -69,11 +72,12 @@ RunContext StreamManager::GetRunContext( RunContext ret; switch (ctx.dev_mask()) { case cpu::kDevMask: - ret = RunContext{ctx, nullptr, nullptr, false}; + ret = RunContext{ctx, nullptr, nullptr}; break; case gpu::kDevMask: { #if MXNET_USE_CUDA std::size_t use_counter; + CUDAEventPool* event_pool; { std::lock_guard lock{mutex_}; auto&& counter = gpu_cnt_.at(ctx.dev_id); @@ -89,13 +93,17 @@ RunContext StreamManager::GetRunContext( } counter = 0; } + if (event_pools_.at(ctx.dev_id) == nullptr) { + event_pools_[ctx.dev_id] = std::make_unique(ctx); + } + event_pool = event_pools_.at(ctx.dev_id).get(); use_counter = counter; counter = (counter + 1) % kStreams; } ret = RunContext{ctx, gpu_streams_.at(ctx.dev_id).at(use_counter), gpu_aux_streams_.at(ctx.dev_id).at(use_counter), - false}; + event_pool}; break; #else LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; @@ -113,18 +121,23 @@ RunContext StreamManager::GetIORunContext( RunContext ret; switch (ctx.dev_mask()) { case cpu::kDevMask: - ret = RunContext{ctx, nullptr, nullptr, false}; + ret = RunContext{ctx, nullptr, nullptr}; break; case gpu::kDevMask: { #if MXNET_USE_CUDA + CUDAEventPool* event_pool; { std::lock_guard lock{mutex_}; if (gpu_io_streams_.at(ctx.dev_id) == nullptr) { mxnet::common::cuda::DeviceStore device_store(ctx.dev_id); gpu_io_streams_.at(ctx.dev_id) = mshadow::NewStream(false, false, ctx.dev_id); } + if (event_pools_.at(ctx.dev_id) == nullptr) { + event_pools_[ctx.dev_id] = std::make_unique(ctx); + } + event_pool = event_pools_.at(ctx.dev_id).get(); } - ret = RunContext{ctx, gpu_io_streams_.at(ctx.dev_id), nullptr, false}; + ret = RunContext{ctx, gpu_io_streams_.at(ctx.dev_id), nullptr, event_pool}; break; #else LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; @@ -153,6 +166,9 @@ void StreamManager::Finalize() { #if MXNET_USE_CUDA for (std::size_t i = 0; i < kNumGpus; ++i) { if (gpu_cnt_.at(i) != -1) { + if (event_pools_.at(i) != nullptr) { + event_pools_[i].reset(); + } for (auto&& primary_stream : gpu_streams_.at(i)) { // Catch exception for CUDA driver shutdown MSHADOW_CATCH_ERROR(mshadow::DeleteStream(primary_stream)); diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index 3eda2c8712f7..b053d18b4310 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -278,7 +278,10 @@ void ThreadedEngine::DeleteOperator(OprHandle op) { deps.insert(deps.end(), threaded_opr->mutable_vars.begin(), threaded_opr->mutable_vars.end()); - this->PushAsync([threaded_opr](RunContext, CallbackOnComplete on_complete) { + this->PushAsync([threaded_opr](RunContext, + CallbackOnStart on_start, + CallbackOnComplete on_complete) { + on_start(); ThreadedOpr::Delete(threaded_opr); on_complete(); }, Context::CPU(), {}, deps, FnProperty::kDeleteVar, 0, @@ -350,7 +353,10 @@ void ThreadedEngine::PushSync(SyncFn exec_fn, Context exec_ctx, int priority, const char* opr_name) { if (!bulk_size() || prop != FnProperty::kNormal || priority) { - this->PushAsync([exec_fn](RunContext ctx, CallbackOnComplete on_complete) { + this->PushAsync([exec_fn](RunContext ctx, + CallbackOnStart on_start, + CallbackOnComplete on_complete) { + on_start(); exec_fn(ctx); on_complete(); }, exec_ctx, const_vars, mutable_vars, prop, priority, opr_name); @@ -366,9 +372,12 @@ void ThreadedEngine::DeleteVariable(SyncFn delete_fn, Context exec_ctx, VarHandle var) { ThreadedVar* threaded_var = ThreadedVar::CastFromBase(var); - this->PushAsync([delete_fn, threaded_var](RunContext ctx, CallbackOnComplete on_complete) { + this->PushAsync([delete_fn, threaded_var](RunContext ctx, + CallbackOnStart on_start, + CallbackOnComplete on_complete) { // Mark variable as orphan, // so during `ThreadedEngine::OnComplete` it could be recycled. + on_start(); threaded_var->SetToDelete(); delete_fn(ctx); on_complete(); @@ -388,7 +397,10 @@ void ThreadedEngine::WaitForVar(VarHandle var) { debug_wait_var_ = threaded_var; } std::atomic done{false}; - this->PushAsync([this, &done](RunContext, CallbackOnComplete on_complete) { + this->PushAsync([this, &done](RunContext, + CallbackOnStart on_start, + CallbackOnComplete on_complete) { + on_start(); if (engine_info_) { LOG(INFO) << "Sync is executed"; } @@ -469,6 +481,14 @@ inline void ThreadedEngine::OnComplete(ThreadedOpr* threaded_opr) { } }); if (to_delete) { +#if MXNET_USE_CUDA + auto& sync_obj = i->sync_object; + { + std::lock_guard l(sync_obj.mutex); + sync_obj.reader_events.clear(); + sync_obj.writer_event.clear(); + } +#endif ThreadedVar::Delete(i); } } @@ -523,5 +543,206 @@ void ThreadedEngine::OnCompleteStatic(Engine *engine, void *opr_block_, OprBlock::Delete(opr_block); } +void ThreadedEngine::OnStartStatic(Engine *engine, void *opr_block, + const dmlc::Error* error) { + // no-op +} + +#if MXNET_USE_CUDA +static inline void AddEventHelper( + std::unordered_map* events_per_stream, + const EventInfo& cuda_event) { + auto event_stream = cuda_event.stream; + if (events_per_stream->count(event_stream) > 0) { + if ((*events_per_stream)[event_stream].pool_index < cuda_event.pool_index) { + (*events_per_stream)[event_stream] = cuda_event; + } + } else { + (*events_per_stream).emplace(event_stream, cuda_event); + } +} + +void ThreadedEngine::OnStartCPU(Engine *engine, void *opr_block, + const dmlc::Error* error) { + static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false); + if (!use_new_dep_engine) { + return; + } + ThreadedOpr *threaded_opr = static_cast(opr_block)->opr; + std::unordered_map event_per_stream; + for (auto* read_var : threaded_opr->const_vars) { + auto &sync_obj = read_var->sync_object; + std::lock_guard l(sync_obj.mutex); + auto &reader_events = sync_obj.reader_events; + // check for expired events and delete them + reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(), + [&](const EventInfo e_i) { + return e_i.event.expired(); + }), reader_events.end()); + for (auto& cuda_event : reader_events) { + AddEventHelper(&event_per_stream, cuda_event); + } + if (!sync_obj.writer_event.empty()) { + if (sync_obj.writer_event[0].event.expired()) { + sync_obj.writer_event.clear(); + } else { + AddEventHelper(&event_per_stream, sync_obj.writer_event[0]); + } + } + } + + for (auto* write_var : threaded_opr->mutable_vars) { + auto &sync_obj = write_var->sync_object; + std::lock_guard l(sync_obj.mutex); + auto &reader_events = sync_obj.reader_events; + // check for expired events and delete them + reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(), + [&](const EventInfo e_i) { + return e_i.event.expired(); + }), reader_events.end()); + for (auto& cuda_event : reader_events) { + AddEventHelper(&event_per_stream, cuda_event); + } + if (!sync_obj.writer_event.empty()) { + if (sync_obj.writer_event[0].event.expired()) { + sync_obj.writer_event.clear(); + } else { + AddEventHelper(&event_per_stream, sync_obj.writer_event[0]); + } + } + } + for (auto event : event_per_stream) { + auto ev = event.second.event.lock(); + MSHADOW_CUDA_CALL(cudaEventSynchronize(*ev)); + } +} + +void ThreadedEngine::OnStartGPU(Engine *engine, void *sync_info, + const dmlc::Error* error) { + static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false); + if (!use_new_dep_engine) { + return; + } + auto *info = reinterpret_cast(sync_info); + CHECK(info->stream != nullptr); + auto *worker_stream = reinterpret_cast *>(info->stream); + ThreadedOpr *threaded_opr = static_cast(info->opr_block)->opr; + std::unordered_map event_per_stream; + for (auto* read_var : threaded_opr->const_vars) { + auto &sync_obj = read_var->sync_object; + std::lock_guard l(sync_obj.mutex); + auto &reader_events = sync_obj.reader_events; + // check for expired events and delete them + reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(), + [&](const EventInfo e_i) { + return e_i.event.expired(); + }), reader_events.end()); + for (auto& writer : sync_obj.writer_event) { + if (writer.event.expired()) { + sync_obj.writer_event.clear(); + break; + } + if (writer.stream != worker_stream->stream_) { + // if there is already a reader on the same stream as us, + // it already synced with that writer and we can rely on + // the ongoing sync + bool found = false; + for (const auto& reader : reader_events) { + if (reader.stream == worker_stream->stream_) { + found = true; + break; + } + } + if (!found) { + AddEventHelper(&event_per_stream, + writer); + } + } + } + } + for (auto* write_var : threaded_opr->mutable_vars) { + auto &sync_obj = write_var->sync_object; + std::lock_guard l(sync_obj.mutex); + // check for expired events and delete them + auto &reader_events = sync_obj.reader_events; + reader_events.erase(std::remove_if(reader_events.begin(), reader_events.end(), + [&](const EventInfo e_i) { + return e_i.event.expired(); + }), reader_events.end()); + // if there are some readers, we wait for them + for (auto& cuda_event : reader_events) { + if (worker_stream->stream_ != cuda_event.stream) { + AddEventHelper(&event_per_stream, cuda_event); + } + } + if (!sync_obj.writer_event.empty()) { + if (sync_obj.writer_event[0].event.expired()) { + sync_obj.writer_event.clear(); + } else { + if (worker_stream->stream_ != sync_obj.writer_event[0].stream) { + AddEventHelper(&event_per_stream, sync_obj.writer_event[0]); + } + } + } + } + for (auto event : event_per_stream) { + auto ev = event.second.event.lock(); + MSHADOW_CUDA_CALL(cudaStreamWaitEvent(worker_stream->stream_, *ev, 0)); + } +} + +void ThreadedEngine::OnCompleteGPU(Engine *engine, void *sync_info, + const dmlc::Error* error) { + auto *info = reinterpret_cast(sync_info); + CHECK(info->stream != nullptr); + + auto *worker_stream = reinterpret_cast *>(info->stream); + static bool use_new_dep_engine = dmlc::GetEnv("MXNET_ASYNC_GPU_ENGINE", false); + + if (!use_new_dep_engine) { + worker_stream->Wait(); + ThreadedEngine::OnCompleteStatic(engine, info->opr_block, error); + GPUWorkerSyncInfo::Delete(info); + return; + } + + ThreadedOpr *threaded_opr = static_cast(info->opr_block)->opr; + auto* event_pool = static_cast(info->event_pool); + auto[event, event_pool_idx] = event_pool->GetNextEvent(); + auto ev = event.lock(); + MSHADOW_CUDA_CALL(cudaEventRecord(*ev, worker_stream->stream_)); + for (auto* read_var : threaded_opr->const_vars) { + auto &sync_obj = read_var->sync_object; + std::lock_guard l(sync_obj.mutex); + // If some reader event is already recorded on the same stream, + // we want to replace ourselves by it + int i; + for (i = 0; i < sync_obj.reader_events.size(); ++i) { + auto stream = sync_obj.reader_events[i].stream; + if (stream == worker_stream->stream_) { + sync_obj.reader_events[i].event = event; + sync_obj.reader_events[i].pool_index = event_pool_idx; + break; + } + } + if (i == sync_obj.reader_events.size()) { + sync_obj.reader_events.push_back({event, worker_stream->stream_, event_pool_idx}); + } + } + + for (auto* write_var : threaded_opr->mutable_vars) { + auto &sync_obj = write_var->sync_object; + std::lock_guard l(sync_obj.mutex); + sync_obj.reader_events.clear(); + sync_obj.writer_event.clear(); + sync_obj.writer_event.push_back({event, worker_stream->stream_, event_pool_idx}); + } + + ThreadedEngine::OnCompleteStatic(engine, info->opr_block, error); + GPUWorkerSyncInfo::Delete(info); +} +#endif + + } // namespace engine } // namespace mxnet diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h index aa0e5a22fb1e..8acbcb7b1eda 100644 --- a/src/engine/threaded_engine.h +++ b/src/engine/threaded_engine.h @@ -349,7 +349,8 @@ class ThreadedEngine : public Engine { * \param run_ctx runtime context used to execute the function. * \param opr_block the opr_block to be executed and deleted. */ - void ExecuteOprBlock(RunContext run_ctx, OprBlock* opr_block) { + void ExecuteOprBlock(RunContext run_ctx, OprBlock* opr_block, + CallbackOnStart on_start, CallbackOnComplete callback) { ThreadedOpr* threaded_opr = opr_block->opr; if (opr_block->profiling && threaded_opr->opr_name.size()) { std::unique_ptr attrs; @@ -361,8 +362,6 @@ class ThreadedEngine : public Engine { attrs.release())); opr_block->opr_profile->startForDevice(ctx.dev_type, ctx.dev_id); } - CallbackOnComplete callback = - this->CreateCallback(ThreadedEngine::OnCompleteStatic, opr_block); const bool debug_info = (engine_info_ && debug_push_opr_ == opr_block); if (debug_info) { LOG(INFO) << "ExecuteOprBlock " << opr_block @@ -378,11 +377,13 @@ class ThreadedEngine : public Engine { try { if ((!(threaded_opr->opr_exception && *threaded_opr->opr_exception) || threaded_opr->prop == FnProperty::kNoSkip) || threaded_opr->wait) { - threaded_opr->fn(run_ctx, callback); + threaded_opr->fn(run_ctx, on_start, callback); } else { + on_start(); callback(); } } catch (const std::exception& e) { + on_start(); threaded_opr->opr_exception = std::make_shared(std::current_exception()); callback(); @@ -407,6 +408,7 @@ class ThreadedEngine : public Engine { } } } else { + on_start(); callback(); } } @@ -427,6 +429,27 @@ class ThreadedEngine : public Engine { return bulk_size; } + protected: + static void OnStartStatic(Engine *engine, void *opr_block, + const dmlc::Error* error); + static void OnCompleteStatic(Engine *engine, void *threaded_opr, + const dmlc::Error* error); +#if MXNET_USE_CUDA + static void OnStartCPU(Engine *engine, void *opr_block, + const dmlc::Error* error); + static void OnStartGPU(Engine *engine, void *sync_info, + const dmlc::Error* error); + static void OnCompleteGPU(Engine *engine, void *sync_info, + const dmlc::Error* error); + struct GPUWorkerSyncInfo : public common::ObjectPoolAllocatable { + void *opr_block{nullptr}; + void *stream{nullptr}; + void *event_pool{nullptr}; + }; + + std::shared_ptr > objpool_gpu_sync_ref_; +#endif + private: /*! \brief structure for holding bulk execution status */ struct BulkStatus { @@ -489,8 +512,6 @@ class ThreadedEngine : public Engine { } } - static void OnCompleteStatic(Engine *engine, void *threaded_opr, - const dmlc::Error* error); /*! * \brief find exception in global_exception_refs and add it if missing * \param opr_exception the exception to be added to global_exception_refs @@ -531,16 +552,13 @@ class ThreadedEngine : public Engine { bulk_status.count = 0; DeduplicateVarHandle(&bulk_status.const_vars, &bulk_status.mutable_vars); auto functions = bulk_status.functions; - this->PushAsync([functions](RunContext ctx, CallbackOnComplete on_complete) { - ctx.is_bulk = true; + this->PushAsync([functions](RunContext ctx, + CallbackOnStart on_start, + CallbackOnComplete on_complete) { + on_start(); for (auto& fn : *functions) { fn(ctx); } - ctx.is_bulk = false; - bool is_gpu = ctx.ctx.dev_mask() == gpu::kDevMask; - if (is_gpu) { - ctx.get_stream()->Wait(); - } on_complete(); }, bulk_status.ctx, bulk_status.const_vars, bulk_status.mutable_vars, FnProperty::kNormal, 0, "ImperativeBulk"); diff --git a/src/engine/threaded_engine_perdevice.cc b/src/engine/threaded_engine_perdevice.cc index 4c5d1befb8b3..a93e52b3632a 100644 --- a/src/engine/threaded_engine_perdevice.cc +++ b/src/engine/threaded_engine_perdevice.cc @@ -53,8 +53,14 @@ class ThreadedEnginePerDevice : public ThreadedEngine { static auto constexpr kCopyQueue = kPriority; static auto constexpr kPriorityQueue = kPriority; static auto constexpr kWorkerQueue = kFIFO; + static int constexpr kMaxStreams = 256; ThreadedEnginePerDevice() noexcept(false) { +#if MXNET_USE_CUDA + // Make sure that the pool is not destroyed before the engine + objpool_gpu_sync_ref_ = common::ObjectPool::_GetSharedRef(); + streams_.reserve(kMaxStreams); +#endif this->Start(); } ~ThreadedEnginePerDevice() noexcept(false) override { @@ -76,6 +82,15 @@ class ThreadedEnginePerDevice : public ThreadedEngine { StopNoWait(); } +#if MXNET_USE_CUDA + void WaitForAll() override { + ThreadedEngine::WaitForAll(); + for (auto s : streams_) { + s->Wait(); + } + } +#endif + void Start() override { if (is_worker_) return; gpu_worker_nthreads_ = common::GetNumThreadsPerGPU(); @@ -103,7 +118,11 @@ class ThreadedEnginePerDevice : public ThreadedEngine { MSHADOW_CATCH_ERROR(mshadow::SetDevice(ctx.dev_id)); #endif } - this->ExecuteOprBlock(RunContext{ctx, nullptr, nullptr, false}, opr_block); + CallbackOnStart on_start = this->CreateOnStart(ThreadedEngine::OnStartStatic, + opr_block); + CallbackOnComplete callback = this->CreateCallback(ThreadedEngine::OnCompleteStatic, + opr_block); + this->ExecuteOprBlock(RunContext{ctx, nullptr, nullptr}, opr_block, on_start, callback); } else { if (ctx.dev_mask() == Context::kCPU) { // CPU execution. @@ -234,6 +253,12 @@ class ThreadedEnginePerDevice : public ThreadedEngine { common::LazyAllocArray > gpu_copy_workers_; // gpu priority workers common::LazyAllocArray > gpu_priority_workers_; +#if MXNET_USE_CUDA + std::vector*> streams_; + + std::unordered_map > cuda_event_pool_per_worker_; +#endif + /*! * \brief GPU worker that performs operations on a certain device. * \param dev_id The device id of the worker. @@ -261,9 +286,20 @@ class ThreadedEnginePerDevice : public ThreadedEngine { aux_stream = new GPUAuxStream(stream); } } while (false); + // register stream + streams_.push_back(stream); + CUDAEventPool* event_pool; + auto event_pool_it = cuda_event_pool_per_worker_.find(ctx.dev_id); + if (event_pool_it != cuda_event_pool_per_worker_.end()) { + event_pool = event_pool_it->second.get(); + } else { + auto res = cuda_event_pool_per_worker_.emplace(ctx.dev_id, + std::make_unique(ctx)); + event_pool = res.first->second.get(); + } // execute task OprBlock* opr_block; - RunContext run_ctx{ctx, stream, aux_stream, false}; + RunContext run_ctx{ctx, stream, aux_stream}; auto* task_queue = &(block->task_queue); // Don't eat up omp threads for GPU jobs. They're probably best used elsewhere, @@ -271,7 +307,13 @@ class ThreadedEnginePerDevice : public ThreadedEngine { OpenMP::Get()->on_start_worker_thread(false); while (task_queue->Pop(&opr_block)) { - this->ExecuteOprBlock(run_ctx, opr_block); + auto* info = ThreadedEngine::GPUWorkerSyncInfo::New(); + info->opr_block = opr_block; + info->stream = stream; + info->event_pool = event_pool; + CallbackOnStart on_start = this->CreateOnStart(ThreadedEngine::OnStartGPU, info); + CallbackOnComplete callback = this->CreateCallback(ThreadedEngine::OnCompleteGPU, info); + this->ExecuteOprBlock(run_ctx, opr_block, on_start, callback); } #else ready_event->signal(); @@ -287,7 +329,7 @@ class ThreadedEnginePerDevice : public ThreadedEngine { const std::shared_ptr& ready_event) { this->is_worker_ = true; auto* task_queue = &(block->task_queue); - RunContext run_ctx{ctx, nullptr, nullptr, false}; + RunContext run_ctx{ctx, nullptr, nullptr}; // execute task OprBlock* opr_block; @@ -297,7 +339,14 @@ class ThreadedEnginePerDevice : public ThreadedEngine { OpenMP::Get()->on_start_worker_thread(true); while (task_queue->Pop(&opr_block)) { - this->ExecuteOprBlock(run_ctx, opr_block); +#if MXNET_USE_CUDA + CallbackOnStart on_start = this->CreateOnStart(ThreadedEngine::OnStartCPU, opr_block); +#else + CallbackOnStart on_start = this->CreateOnStart(ThreadedEngine::OnStartStatic, opr_block); +#endif + CallbackOnComplete callback = this->CreateCallback(ThreadedEngine::OnCompleteStatic, + opr_block); + this->ExecuteOprBlock(run_ctx, opr_block, on_start, callback); } } diff --git a/src/engine/threaded_engine_pooled.cc b/src/engine/threaded_engine_pooled.cc index dde16bc8fe5d..045da0584100 100644 --- a/src/engine/threaded_engine_pooled.cc +++ b/src/engine/threaded_engine_pooled.cc @@ -48,6 +48,10 @@ namespace engine { class ThreadedEnginePooled : public ThreadedEngine { public: ThreadedEnginePooled() { +#if MXNET_USE_CUDA + // Make sure that the pool is not destroyed before the engine + objpool_gpu_sync_ref_ = common::ObjectPool::_GetSharedRef(); +#endif this->Start(); } @@ -56,13 +60,13 @@ class ThreadedEnginePooled : public ThreadedEngine { } void StopNoWait() { - streams_->Finalize(); task_queue_->SignalForKill(); io_task_queue_->SignalForKill(); task_queue_ = nullptr; io_task_queue_ = nullptr; thread_pool_ = nullptr; io_thread_pool_ = nullptr; + streams_->Finalize(); streams_ = nullptr; } @@ -150,7 +154,29 @@ class ThreadedEnginePooled : public ThreadedEngine { auto&& rctx = is_copy ? streams_->GetIORunContext(opr_block->ctx) : streams_->GetRunContext(opr_block->ctx); - this->ExecuteOprBlock(rctx, opr_block); + +#if MXNET_USE_CUDA + CallbackOnStart on_start; + CallbackOnComplete callback; + if (opr_block->ctx.dev_mask() == Context::kCPU) { + on_start = this->CreateOnStart(ThreadedEngine::OnStartCPU, opr_block); + callback = this->CreateCallback(ThreadedEngine::OnCompleteStatic, opr_block); + } else { + CHECK_EQ(opr_block->ctx.dev_mask(), Context::kGPU); + auto stream = rctx.get_stream(); + auto event_pool = static_cast(rctx.event_pool); + auto* info = ThreadedEngine::GPUWorkerSyncInfo::New(); + info->opr_block = opr_block; + info->stream = stream; + info->event_pool = event_pool; + on_start = this->CreateOnStart(ThreadedEngine::OnStartGPU, info); + callback = this->CreateCallback(ThreadedEngine::OnCompleteGPU, info); + } +#else // MXNET_USE_CUDA + CallbackOnStart on_start = this->CreateOnStart(ThreadedEngine::OnStartStatic, opr_block); + CallbackOnComplete callback = this->CreateCallback(ThreadedEngine::OnCompleteStatic, opr_block); +#endif // MXNET_USE_CUDA + this->ExecuteOprBlock(rctx, opr_block, on_start, callback); } /*! * \brief Push the operation to the queue. diff --git a/src/imperative/imperative_utils.h b/src/imperative/imperative_utils.h index f53b0db91b2b..5aa246401560 100644 --- a/src/imperative/imperative_utils.h +++ b/src/imperative/imperative_utils.h @@ -673,14 +673,11 @@ inline void PushFCompute(const FCompute& fn, fn(attrs, opctx, input_blobs, tmp_req, output_blobs); // post-fcompute fallback, cast to original storage type CastNonDefaultStorage(post_temp_src, post_temp_dst, opctx, is_gpu); - if (is_gpu && !rctx.is_bulk) { - rctx.get_stream()->Wait(); - } DerefInputOutputRelease(inputs, outputs); }; if (CheckIfSkipEngine(attrs)) { // execute without engine - run(RunContext{ctx, nullptr, nullptr, false}); + run(RunContext{ctx, nullptr, nullptr}); } else { Engine::Get()->PushSync( run, ctx, read_vars, write_vars, FnProperty::kNormal, @@ -712,12 +709,9 @@ inline void PushFComputeEx(const FComputeEx& fn, INVALIDATE_OUTPUTS_COND(!cross_device_copy, outputsA, req); CREATE_DEFAULT_INPUTS(!cross_device_copy, attrs, CreateDefaultInputs(&inputsA)); fn(attrs, opctx, inputsA, req, outputsA); - if (ctx.dev_mask() == gpu::kDevMask && exec_type == ExecType::kSync && !rctx.is_bulk) { - rctx.get_stream()->Wait(); - } }; if (cross_device_copy || CheckIfSkipEngine(attrs)) { - run(RunContext{ctx, nullptr, nullptr, false}); + run(RunContext{ctx, nullptr, nullptr}); } else { CHECK(exec_type == ExecType::kSync); Engine::Get()->PushSync(run, ctx, read_vars, write_vars, FnProperty::kNormal, @@ -749,6 +743,7 @@ inline void PushOperator(const OpStatePtr& state, auto fcompute_ex = common::GetFCompute(op, "FStatefulComputeEx", ctx); if (fcompute_ex != nullptr && dispatch_mode == DispatchMode::kFComputeEx) { const auto& run = [=](RunContext rctx, + engine::CallbackOnStart on_start, engine::CallbackOnComplete on_complete) { OpContext opctx{need_grad, is_train, rctx, on_complete, requested}; REDEFINE_INPUTS_OUTPUTS(inputs, outputs, inputsA, outputsA); @@ -756,21 +751,20 @@ inline void PushOperator(const OpStatePtr& state, outputsA, req); CREATE_DEFAULT_INPUTS(exec_type != ExecType::kCrossDeviceCopy && op->name != "_CachedOp", attrs, CreateDefaultInputs(&inputsA)); + on_start(); fcompute_ex(state, opctx, inputsA, req, outputsA); - if (ctx.dev_mask() == gpu::kDevMask && exec_type == ExecType::kSync - && rctx.get_stream() && !rctx.is_bulk) { - rctx.get_stream()->Wait(); - } }; // For operators with subgraphs, we need to invoke them in the main thread // instead of the threaded engine. if (exec_type == ExecType::kSubgraphExec || CheckIfSkipEngine(attrs)) { - RunContext rctx{ctx, nullptr, nullptr, false}; - run(rctx, engine::CallbackOnComplete()); + RunContext rctx{ctx, nullptr, nullptr}; + run(rctx, engine::CallbackOnStart(), engine::CallbackOnComplete()); } else if (exec_type == ExecType::kSync) { Engine::Get()->PushSync( - [=](RunContext rctx) { run(rctx, engine::CallbackOnComplete()); }, + [=](RunContext rctx) { run(rctx, + engine::CallbackOnStart(), + engine::CallbackOnComplete()); }, ctx, read_vars, write_vars, FnProperty::kNormal, 0, op->name.c_str()); } else { @@ -785,7 +779,9 @@ inline void PushOperator(const OpStatePtr& state, << "One of FStatefulCompute and FStatefulComputeEx must be registered " << "for stateful operator " << op->name; - const auto& run = [=](RunContext rctx, engine::CallbackOnComplete on_complete) { + const auto& run = [=](RunContext rctx, + engine::CallbackOnStart on_start, + engine::CallbackOnComplete on_complete) { OpContext opctx{need_grad, is_train, rctx, on_complete, requested}; std::vector input_blobs, output_blobs; @@ -807,20 +803,16 @@ inline void PushOperator(const OpStatePtr& state, fcompute(state, opctx, input_blobs, tmp_req, output_blobs); // post-fcompute fallback, cast to original storage type, if necessary CastNonDefaultStorage(post_temp_src, post_temp_dst, opctx, is_gpu); - if (is_gpu && exec_type == ExecType::kSync - && rctx.get_stream() && !rctx.is_bulk) { - rctx.get_stream()->Wait(); - } DerefInputOutputRelease(inputs, outputs); }; if (exec_type == ExecType::kSubgraphExec || CheckIfSkipEngine(attrs)) { - RunContext rctx{ctx, nullptr, nullptr, false}; - run(rctx, engine::CallbackOnComplete()); + RunContext rctx{ctx, nullptr}; + run(rctx, engine::CallbackOnStart(), engine::CallbackOnComplete()); } else if (exec_type == ExecType::kSync) { Engine::Get()->PushSync( [=](RunContext rctx) { - run(rctx, engine::CallbackOnComplete()); + run(rctx, engine::CallbackOnStart(), engine::CallbackOnComplete()); }, ctx, read_vars, write_vars, FnProperty::kNormal, 0, op->name.c_str()); } else { @@ -1199,7 +1191,10 @@ inline Engine::OprHandle CreateEngineOp( bool is_async = execs.size() > 1 ? false : execs[0]->exec_type() == ExecType::kAsync; auto exec_fun = [execs, is_async, is_gpu] ( - RunContext ctx, Engine::CallbackOnComplete on_complete) { + RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); if (is_async) { execs[0]->op_ctx.async_on_complete = on_complete; } @@ -1207,10 +1202,7 @@ inline Engine::OprHandle CreateEngineOp( // call on complete only if it is async op if (!is_async) { if (is_gpu) { - #if MXNET_USE_CUDA - // Wait GPU kernel to finish. - ctx.get_stream()->Wait(); - #else + #if !MXNET_USE_CUDA LOG(FATAL) << MXNET_GPU_NOT_ENABLED_ERROR; #endif } diff --git a/src/io/batchify.cc b/src/io/batchify.cc index 01d93f5cad8f..ed387490e652 100644 --- a/src/io/batchify.cc +++ b/src/io/batchify.cc @@ -170,7 +170,7 @@ class StackBatchify : public BatchifyFunction { // inputs[j][i].WaitToRead(); DType *ptr = (*outputs)[i].data().dptr(); auto asize = ashape.Size(); - RunContext rctx{(*outputs)[i].ctx(), nullptr, nullptr, false}; + RunContext rctx{(*outputs)[i].ctx(), nullptr, nullptr}; auto dst = TBlob( ptr + asize * j, inputs[j][i].data().shape_, cpu::kDevMask, dtype, 0); mxnet::ndarray::Copy( diff --git a/src/io/dataset.cc b/src/io/dataset.cc index 31bffed88460..e8e96fc0b041 100644 --- a/src/io/dataset.cc +++ b/src/io/dataset.cc @@ -98,7 +98,7 @@ class RecordFileDataset final : public Dataset { const size_t size = read_buff.size(); out = NDArray(TShape({static_cast(size)}), Context::CPU(), false, mshadow::kInt8); TBlob dst = out.data(); - RunContext rctx{Context::CPU(), nullptr, nullptr, false}; + RunContext rctx{Context::CPU(), nullptr, nullptr}; mxnet::ndarray::Copy( TBlob(const_cast(reinterpret_cast(buf)), out.shape(), cpu::kDevMask, out.dtype(), 0), @@ -211,7 +211,7 @@ class ImageRecordFileDataset : public Dataset { size -= sizeof(header); s += sizeof(header); NDArray label = NDArray(Context::CPU(), mshadow::default_type_flag); - RunContext rctx{Context::CPU(), nullptr, nullptr, false}; + RunContext rctx{Context::CPU(), nullptr, nullptr}; if (header.flag > 0) { auto label_shape = header.flag <= 1 ? TShape(0, 1) : TShape({header.flag}); label.ReshapeAndAlloc(label_shape); diff --git a/src/kvstore/comm.h b/src/kvstore/comm.h index b03b74c73bad..0de07a24d58f 100644 --- a/src/kvstore/comm.h +++ b/src/kvstore/comm.h @@ -165,7 +165,10 @@ class CommCPU : public Comm { } Engine::Get()->PushAsync( - [reduce, this](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [reduce, this](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); ReduceSumCPU(reduce); on_complete(); }, Context::CPU(), const_vars, {reduce[0].var()}, @@ -194,7 +197,10 @@ class CommCPU : public Comm { Resource rsc = ResourceManager::Get()->Request(buf_merged.ctx(), ResourceRequest(ResourceRequest::kTempSpace)); Engine::Get()->PushAsync( - [reduce, buf_merged, rsc, this](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [reduce, buf_merged, rsc, this](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); NDArray out = buf_merged; is_serial_push_? ReduceSumCPUExSerial(reduce, &out) @@ -254,7 +260,10 @@ class CommCPU : public Comm { "consider create a new NDArray buffer to store the output."); } Engine::Get()->PushAsync( - [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [=](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); const TBlob& indices = row_id.data(); NDArray temp = retained_cpu; // get rid the of const qualifier op::SparseRetainOpForwardRspImpl(rctx.get_stream(), @@ -653,7 +662,10 @@ class CommDevice : public Comm { "consider create a new NDArray buffer to store the output."); } bool is_gpu = retained_gpu.ctx().dev_mask() == gpu::kDevMask; - Engine::Get()->PushAsync([=](RunContext rctx, Engine::CallbackOnComplete on_complete) { + Engine::Get()->PushAsync([=](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); const TBlob& indices = row_id.data(); using namespace mxnet::common; NDArray temp = retained_gpu; @@ -668,7 +680,6 @@ class CommDevice : public Comm { SparseRetainOpForwardRspWrapper(rctx.get_stream(), src, indices, kWriteTo, &temp); // wait for GPU operations to complete - rctx.get_stream()->Wait(); break; } #endif diff --git a/src/kvstore/gradient_compression.cc b/src/kvstore/gradient_compression.cc index d59035a77470..17a1af101b01 100644 --- a/src/kvstore/gradient_compression.cc +++ b/src/kvstore/gradient_compression.cc @@ -138,16 +138,12 @@ void GradientCompression::Quantize(const mxnet::NDArray &from, mxnet::NDArray *t mxnet::Engine::Get()->PushSync([from, to, residual, threshold](mxnet::RunContext ctx) { std::vector inputs = {from.data(), residual->data(), to->data()}; Quantize1BitImpl(ctx.get_stream(), inputs, threshold); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, from.ctx(), {from.var()}, {to->var(), residual->var()}, mxnet::FnProperty::kNormal, priority, "QuantizeGPU"); } else if (type_ == CompressionType::kTwoBit) { mxnet::Engine::Get()->PushSync([from, to, residual, threshold](mxnet::RunContext ctx) { std::vector inputs = {from.data(), residual->data(), to->data()}; Quantize2BitImpl(ctx.get_stream(), inputs, threshold); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, from.ctx(), {from.var()}, {to->var(), residual->var()}, mxnet::FnProperty::kNormal, priority, "QuantizeGPU"); } else { diff --git a/src/kvstore/kvstore_dist.h b/src/kvstore/kvstore_dist.h index 28bf19be561d..e2eef05f3fe4 100644 --- a/src/kvstore/kvstore_dist.h +++ b/src/kvstore/kvstore_dist.h @@ -422,7 +422,10 @@ class KVStoreDist : public KVStoreLocal { } gradient_compression_->Quantize(comm_buf, &small_buf, &res_buf, priority); auto push_to_servers = - [this, key, dtype, pskv, small_buf](RunContext rctx, Engine::CallbackOnComplete cb) { + [this, key, dtype, pskv, small_buf](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); size_t size = small_buf.shape().Size() * mshadow::mshadow_sizeof(dtype); char* data = static_cast (small_buf.data().dptr_); // do push. false means no delete @@ -444,7 +447,10 @@ class KVStoreDist : public KVStoreLocal { virtual void PushDefault(int key, const NDArray &send_buf, const PSKV& pskv, int priority) { auto push_to_servers = - [this, key, pskv, send_buf](RunContext rctx, Engine::CallbackOnComplete cb) { + [this, key, pskv, send_buf](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); const int dtype = send_buf.dtype(); // convert to ps keys const size_t size = send_buf.shape().Size() * mshadow::mshadow_sizeof(dtype); @@ -470,7 +476,10 @@ class KVStoreDist : public KVStoreLocal { virtual void PushRowSparse(int key, const NDArray &send_buf, int priority) { using namespace rowsparse; auto push_to_servers = [this, key, send_buf] - (RunContext rctx, Engine::CallbackOnComplete cb) { + (RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); char* data = static_cast(send_buf.data().dptr_); const int64_t num_rows = send_buf.aux_shape(kIdx)[0]; const auto offsets = send_buf.aux_data(kIdx).dptr(); @@ -500,7 +509,10 @@ class KVStoreDist : public KVStoreLocal { virtual void PullDefault(int key, const NDArray &recv_buf, int priority) { auto pull_from_servers = [this, key, recv_buf]( - RunContext rctx, Engine::CallbackOnComplete cb) { + RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); // convert to ps keys size_t size = recv_buf.shape().Size(); const int dtype = recv_buf.dtype(); @@ -534,7 +546,10 @@ class KVStoreDist : public KVStoreLocal { const NDArray& indices, int priority) { using namespace rowsparse; auto pull_from_servers = [this, key, recv_buf, indices] - (RunContext rctx, Engine::CallbackOnComplete cb) { + (RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); // allocate memory for the buffer CHECK_EQ(indices.dtype(), mshadow::kInt64); const TBlob idx_data = indices.data(); @@ -577,7 +592,10 @@ class KVStoreDist : public KVStoreLocal { virtual void PushPullDefault(int key, const NDArray &comm_buf, int priority) { auto pushpull = [this, key, comm_buf]( - RunContext rctx, Engine::CallbackOnComplete cb) { + RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); size_t size = comm_buf.shape().Size(); const int dtype = comm_buf.dtype(); const int num_bytes = mshadow::mshadow_sizeof(dtype); diff --git a/src/kvstore/kvstore_dist_server.h b/src/kvstore/kvstore_dist_server.h index 1dc222c0d7da..e9ea72ca431a 100644 --- a/src/kvstore/kvstore_dist_server.h +++ b/src/kvstore/kvstore_dist_server.h @@ -417,7 +417,10 @@ class KVStoreDistServer { // accumulate row_sparse gradients using namespace mshadow; Engine::Get()->PushAsync( - [to_merge, updateBuf, out](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [to_merge, updateBuf, out](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); op::ElemwiseBinaryOp::ComputeEx( {}, {}, {to_merge, updateBuf->merged}, {kWriteTo}, {out}); on_complete(); @@ -495,7 +498,10 @@ class KVStoreDistServer { store_[master_key] = NDArray(kRowSparseStorage, dshape, Context(), true, type.dtype); } Engine::Get()->PushAsync( - [this, recved, stored, type](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [this, recved, stored, type](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); NDArray rsp = stored; stored.CheckAndAlloc({mshadow::Shape1(recved.shape()[0])}); mshadow::Stream *s = ctx.get_stream(); diff --git a/src/kvstore/kvstore_local.h b/src/kvstore/kvstore_local.h index bc4e9337568b..a39ee2d7dd5e 100644 --- a/src/kvstore/kvstore_local.h +++ b/src/kvstore/kvstore_local.h @@ -499,7 +499,10 @@ class KVStoreLocal : public KVStore { // GPU requires temp resources bool is_gpu = out.ctx().dev_mask() == gpu::kDevMask; Engine::Get()->PushAsync( - [=](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [=](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); // copy data.data() to out.data() out.CheckAndAlloc({mshadow::Shape1(num_elements)}); TBlob out_data = out.data(); @@ -518,8 +521,6 @@ class KVStoreLocal : public KVStore { ndarray::Copy(data_in_ctx.data(), &out_data, ctx, ctx, rctx); UniqueImpl(&workspace, s, out); - // wait for GPU operations to complete - s->Wait(); break; } #endif diff --git a/src/kvstore/p3store_dist.h b/src/kvstore/p3store_dist.h index 3c99515e27b1..986f0534086d 100644 --- a/src/kvstore/p3store_dist.h +++ b/src/kvstore/p3store_dist.h @@ -83,7 +83,10 @@ class P3StoreDist : public KVStoreDist { void PushDefault(int key, const NDArray &send_buf, const PSKV& pskv, int priority) override { auto push_to_servers = [this, key, pskv, send_buf, priority] - (RunContext rctx, Engine::CallbackOnComplete cb) { + (RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); const int dtype = send_buf.dtype(); // convert to ps keys const size_t size = send_buf.shape().Size() * mshadow::mshadow_sizeof(dtype); @@ -126,7 +129,10 @@ class P3StoreDist : public KVStoreDist { CHECK(gradient_compression_->get_type() == CompressionType::kNone) << "Gradient compression not supported in P3StoreDist."; auto pull_from_servers = [this, key, recv_buf, priority]( - RunContext rctx, Engine::CallbackOnComplete cb) { + RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); // convert to ps keys size_t size = recv_buf.shape().Size(); const int dtype = recv_buf.dtype(); @@ -172,7 +178,10 @@ class P3StoreDist : public KVStoreDist { CHECK(gradient_compression_->get_type() == CompressionType::kNone) << "Compression not supported in P3StoreDist"; auto pushpull = [this, key, comm_buf, priority]( - RunContext rctx, Engine::CallbackOnComplete cb) { + RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); size_t size = comm_buf.shape().Size(); const int dtype = comm_buf.dtype(); const int num_bytes = mshadow::mshadow_sizeof(dtype); diff --git a/src/ndarray/ndarray.cc b/src/ndarray/ndarray.cc index bbce020232cb..2af8414edb0b 100644 --- a/src/ndarray/ndarray.cc +++ b/src/ndarray/ndarray.cc @@ -125,7 +125,22 @@ NDArray::Chunk::~Chunk() { mem.mem = this->mkl_mem_; #endif if (auto engine = engine_ref_.lock()) { - engine->DeleteVariable([mem, skip_free](RunContext s) { + engine->DeleteVariable([mem, skip_free, var = this->var](RunContext s) mutable { +#if MXNET_USE_CUDA + auto &sync_obj = var->sync_object; + Storage::SyncObj storage_sync_obj; + { + std::lock_guard l(sync_obj.mutex); + for (auto& ev : sync_obj.reader_events) { + storage_sync_obj.events.push_back(ev.event); + } + if (!sync_obj.writer_event.empty()) { + auto ev = sync_obj.writer_event[0]; + storage_sync_obj.events.push_back(ev.event); + } + } + mem.h.sync_obj = storage_sync_obj; +#endif if (skip_free == false) { #if MXNET_USE_ONEDNN == 1 if (mem.mem) { @@ -726,7 +741,10 @@ void NDArray::Reorder2DefaultAsync() const { std::vector mutable_vars(1, this->var()); NDArray tmp = *this; Engine::Get()->PushAsync( - [tmp](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [tmp](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); tmp.ptr_->Reorder2Default(); on_complete(); }, ctx(), const_vars, mutable_vars, @@ -753,7 +771,10 @@ void NDArray::MKLDNNDataReorderAsync(const mkldnn::memory::desc &desc) const { NDArray tmp = *this; const auto version = this->version(); Engine::Get()->PushAsync( - [tmp, version, desc](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [tmp, version, desc](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); // MXNet will try to reuse NDArray from memory planning, so we need to ensure // the NDArray is still holding the original trunk data. if (tmp.version() == version) { @@ -955,8 +976,6 @@ void TernaryOp(const NDArray &lhs, Engine::Get()->PushSync([lhs, mhs, rhs, ret](RunContext ctx) { TBlob tmp = ret.data(); ndarray::Eval(lhs.data(), mhs.data(), rhs.data(), &tmp, ctx); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, lhs.ctx(), const_vars, { ret.var() }, FnProperty::kNormal, 0, PROFILER_MESSAGE_FUNCNAME); break; @@ -1033,8 +1052,6 @@ void BinaryOpKernel(const NDArray &lhs, TBlob tmp = ret.data(); mshadow::Stream* s = ctx.get_stream(); ndarray::BinaryOpKernelImpl(s, lhs.data(), rhs.data(), &tmp); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, lhs.ctx(), const_vars, {ret.var()}, FnProperty::kNormal, 0, PROFILER_MESSAGE_FUNCNAME); break; @@ -1073,8 +1090,6 @@ void BinaryOp(const NDArray &lhs, Engine::Get()->PushSync([lhs, rhs, ret](RunContext ctx) { TBlob tmp = ret.data(); ndarray::Eval(lhs.data(), rhs.data(), &tmp, ctx); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, lhs.ctx(), const_vars, {ret.var()}, FnProperty::kNormal, 0, PROFILER_MESSAGE_FUNCNAME); break; @@ -1107,8 +1122,6 @@ void SetValueOp(const real_t &rhs, NDArray *out) { } else { ndarray::Eval(ctx.get_stream(), rhs, ret); } - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); break; } #endif @@ -1156,8 +1169,6 @@ void ScalarOp(const NDArray &lhs, Engine::Get()->PushSync([lhs, rhs, ret](RunContext ctx) { TBlob tmp = ret.data(); ndarray::Eval(lhs.data(), rhs, &tmp, ctx); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, lhs.ctx(), const_vars, {ret.var()}, FnProperty::kNormal, 0, PROFILER_MESSAGE_FUNCNAME); break; @@ -1382,7 +1393,10 @@ void CopyFromTo(const NDArray& from, const NDArray& to, int priority, bool is_op if (a == cpu::kDevMask && b == cpu::kDevMask) { Engine::Get()->PushAsync( - [from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [from, to, requested](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); CopyFromToImpl(from, to, ctx, requested); on_complete(); }, from.ctx(), const_vars, mutable_vars, @@ -1391,25 +1405,31 @@ void CopyFromTo(const NDArray& from, const NDArray& to, int priority, bool is_op #if MXNET_USE_CUDA if (a == cpu::kDevMask && b == gpu::kDevMask) { Engine::Get()->PushAsync( - [from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [from, to, requested](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); CopyFromToImpl(from, to, ctx, requested); - ctx.get_stream()->Wait(); on_complete(); }, to.ctx(), const_vars, mutable_vars, FnProperty::kCopyToGPU, priority, "CopyCPU2GPU"); } else if (a == gpu::kDevMask && b == cpu::kDevMask) { Engine::Get()->PushAsync( - [from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [from, to, requested](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); CopyFromToImpl(from, to, ctx, requested); - ctx.get_stream()->Wait(); on_complete(); }, from.ctx(), const_vars, mutable_vars, FnProperty::kCopyFromGPU, priority, "CopyGPU2CPU"); } else if (a == gpu::kDevMask && b == gpu::kDevMask) { Engine::Get()->PushAsync( - [from, to, requested](RunContext ctx, Engine::CallbackOnComplete on_complete) { + [from, to, requested](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); CopyFromToImpl(from, to, ctx, requested); - ctx.get_stream()->Wait(); on_complete(); }, from.ctx(), const_vars, mutable_vars, from.dtype() != to.dtype() ? FnProperty::kNormal : FnProperty::kCopyFromGPU, @@ -1473,8 +1493,6 @@ void ElementwiseSum(const std::vector &source, NDArray *out, int priori } TBlob tmp = ret.data(); ndarray::ElementwiseSum(source_tblob, &tmp, ctx); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, out->ctx(), const_vars, {ret.var()}, FnProperty::kNormal, priority, "DenseElementwiseSum"); break; @@ -1497,8 +1515,6 @@ void ElementwiseSum(const std::vector &source, NDArray *out, int priori #if MXNET_USE_CUDA case gpu::kDevMask: { mxnet::ndarray::ElementwiseSum(rctx.get_stream(), rsc, source, &result); - // wait for GPU operations to complete - rctx.get_stream()->Wait(); break; } #endif @@ -1570,8 +1586,6 @@ void SampleOP(const real_t &a, Engine::Get()->PushSync([a, b, resource, ret](RunContext ctx) { TBlob tmp = ret.data(); ndarray::EvalRandom(a, b, resource, &tmp, ctx); - // Wait GPU kernel to complete - ctx.get_stream()->Wait(); }, out->ctx(), {}, {ret.var(), resource.var}, FnProperty::kNormal, 0, PROFILER_MESSAGE_FUNCNAME); break; @@ -2021,18 +2035,19 @@ void NDArray::SyncCopyFromCPU(const void *data, size_t size) const { if (this->ctx().dev_mask() == cpu::kDevMask) { this->WaitToWrite(); - RunContext rctx{this->ctx(), nullptr, nullptr, false}; + RunContext rctx{this->ctx(), nullptr, nullptr}; TBlob dst = this->data(); ndarray::Copy(src, &dst, Context::CPU(), Context::CPU(), rctx); } else { #if MXNET_USE_CUDA Engine::Get()->PushAsync( - [&](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [&](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); TBlob dst = this->data(); ndarray::Copy(src, &dst, Context::CPU(), this->ctx(), rctx); - // Wait GPU kernel to complete - rctx.get_stream()->Wait(); on_complete(); }, this->ctx(), {}, {this->var()}, FnProperty::kCopyToGPU, 0, "SyncCopyCPU2GPU"); @@ -2097,31 +2112,37 @@ void NDArray::SyncCopyFromNDArray(const NDArray& src, int i, int j) { #if MXNET_USE_CUDA if (src_dev_mask == cpu::kDevMask && dst_dev_mask == gpu::kDevMask) { Engine::Get()->PushAsync( - [&](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [&](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); const TBlob src_data = (i >= 0 ? src.aux_data(i) : src.data()); TBlob dst_data = get_dst_data(src_data.shape_); ndarray::Copy(src_data, &dst_data, src.ctx(), this->ctx(), rctx); - rctx.get_stream()->Wait(); on_complete(); }, this->ctx(), const_vars, {this->var()}, FnProperty::kCopyToGPU, 0, "SyncCopyFromNDArrayCPU2GPU"); } else if (src_dev_mask == gpu::kDevMask && dst_dev_mask == cpu::kDevMask) { Engine::Get()->PushAsync( - [&](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [&](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); const TBlob src_data = (i >= 0 ? src.aux_data(i) : src.data()); TBlob dst_data = get_dst_data(src_data.shape_); ndarray::Copy(src_data, &dst_data, src.ctx(), this->ctx(), rctx); - rctx.get_stream()->Wait(); on_complete(); }, src.ctx(), const_vars, {this->var()}, FnProperty::kCopyFromGPU, 0, "SyncCopyFromNDArrayGPU2CPU"); } else if (src_dev_mask == gpu::kDevMask && dst_dev_mask == gpu::kDevMask) { Engine::Get()->PushAsync( - [&](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [&](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); const TBlob src_data = (i >= 0 ? src.aux_data(i) : src.data()); TBlob dst_data = get_dst_data(src_data.shape_); ndarray::Copy(src_data, &dst_data, src.ctx(), this->ctx(), rctx); - rctx.get_stream()->Wait(); on_complete(); }, this->ctx(), const_vars, {this->var()}, src.dtype() != this->dtype() ? FnProperty::kNormal : FnProperty::kCopyFromGPU, @@ -2162,7 +2183,7 @@ void NDArray::SyncCopyToCPU(void *data, size_t size) const { this->WaitToRead(); if (this->ctx().dev_mask() == cpu::kDevMask) { - RunContext rctx{this->ctx(), nullptr, nullptr, false}; + RunContext rctx{this->ctx(), nullptr, nullptr}; NDArray src = *this; #if MXNET_USE_ONEDNN == 1 if (src.IsMKLDNNData()) @@ -2173,11 +2194,41 @@ void NDArray::SyncCopyToCPU(void *data, size_t size) const { } else { #if MXNET_USE_CUDA Engine::Get()->PushAsync( - [&](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [&](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); + { + auto var = this->var(); + auto& sync_obj = var->sync_object; + std::lock_guard lock{sync_obj.mutex}; + bool has_writer = false; + std::shared_ptr w_ev_ptr; + if (!sync_obj.writer_event.empty()) { + w_ev_ptr = sync_obj.writer_event[0].event.lock(); + has_writer = w_ev_ptr ? true : false; + } + for (auto ev : sync_obj.reader_events) { + auto event_ptr = ev.event.lock(); + if (!event_ptr) { + continue; + } + cudaEvent_t event = *event_ptr; + if (has_writer) { + auto w_ev = sync_obj.writer_event[0]; + if (w_ev.stream == ev.stream) { + event = w_ev.pool_index > ev.pool_index ? *w_ev_ptr : *event_ptr; + has_writer = false; + } + } + CUDA_CALL(cudaEventSynchronize(event)); + } + if (has_writer) { + CUDA_CALL(cudaEventSynchronize(*w_ev_ptr)); + } + } ndarray::Copy(this->data(), &dst, this->ctx(), Context::CPU(), rctx); - // Wait GPU kernel to complete - rctx.get_stream()->Wait(); on_complete(); }, this->ctx(), {this->var()}, {}, FnProperty::kCopyFromGPU, 0, "SyncCopyGPU2CPU"); @@ -2200,7 +2251,6 @@ void NDArray::SyncCheckFormat(const bool full_check) const { #if MXNET_USE_CUDA Engine::Get()->PushSync([&](RunContext rctx) { common::CheckFormatWrapper(rctx, *this, err_cpu, full_check); - rctx.get_stream()->Wait(); }, this->ctx(), {this->var()}, {}, FnProperty::kNormal, 0, "CheckFormat"); #else @@ -2233,8 +2283,10 @@ void NDArray::WaitToWrite() const { Imperative::DCInfo::Compute(*this); // Push an empty mutable function to flush all preceding reads to the variable. Engine::Get()->PushAsync( - [](RunContext, Engine::CallbackOnComplete on_complete) { on_complete(); }, - Context{}, {}, {ptr_->var}); + [](RunContext, Engine::CallbackOnStart on_start, Engine::CallbackOnComplete on_complete) { + on_start(); + on_complete(); + }, Context{}, {}, {ptr_->var}); Engine::Get()->WaitForVar(ptr_->var); } diff --git a/src/operator/custom/ndarray_op.cc b/src/operator/custom/ndarray_op.cc index a4bb1cf5b4e4..d07c7ee30d50 100644 --- a/src/operator/custom/ndarray_op.cc +++ b/src/operator/custom/ndarray_op.cc @@ -87,7 +87,10 @@ void NDArrayOp::Forward(const OpContext &ctx, CHECK(param_.pinfo->forward(ptrs.size(), ptrs.data(), tags.data(), param_.pinfo->p_forward)); Engine::Get()->PushAsync( - [ndcpy, ctx](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [ndcpy, ctx](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); ctx.async_on_complete(); on_complete(); }, ndctx, ndvar, {}, FnProperty::kNormal, 0, "NDArrayOpForward"); @@ -137,7 +140,10 @@ void NDArrayOp::Backward(const OpContext &ctx, CHECK(param_.pinfo->backward(ptrs.size(), ptrs.data(), tags.data(), param_.pinfo->p_backward)); Engine::Get()->PushAsync( - [ndcpy, ctx](RunContext rctx, Engine::CallbackOnComplete on_complete){ + [ndcpy, ctx](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete){ + on_start(); ctx.async_on_complete(); on_complete(); }, ndctx, ndvar, {}, FnProperty::kNormal, 0, "NDArrayOpBackward"); diff --git a/src/operator/operator_util.cc b/src/operator/operator_util.cc index bc097a5b0c1c..5c5834073053 100644 --- a/src/operator/operator_util.cc +++ b/src/operator/operator_util.cc @@ -500,11 +500,6 @@ void SimpleOpRegEntryImpl::RegisterSourceImperative() { Engine::Get()->PushSync([ret, fun, dev_mask, req, env](RunContext ctx) { TBlob tmp = ret.data(); (*fun)(env, &tmp, req, ctx); -#if MXNET_USE_CUDA - if (dev_mask == gpu::kDevMask) { - ctx.get_stream()->Wait(); - } -#endif }, ret.ctx(), {}, write_vars, FnProperty::kNormal, 0, "RegisterSourceImperative"); }; @@ -684,11 +679,6 @@ void SimpleOpRegEntryImpl::RegisterUnaryImperative() { Engine::Get()->PushSync([src, ret, fun, dev_mask, req, env](RunContext ctx) { TBlob tmp = ret.data(); (*fun)(src.data(), env, &tmp, req, ctx); -#if MXNET_USE_CUDA - if (dev_mask == gpu::kDevMask) { - ctx.get_stream()->Wait(); - } -#endif }, src.ctx(), const_vars, write_vars, FnProperty::kNormal, 0, "RegisterUnaryImperative"); }; @@ -958,11 +948,6 @@ void SimpleOpRegEntryImpl::RegisterBinaryImperative() { Engine::Get()->PushSync([lhs, rhs, ret, fun, dev_mask, req, env](RunContext ctx) { TBlob tmp = ret.data(); (*fun)(lhs.data(), rhs.data(), env, &tmp, req, ctx); - #if MXNET_USE_CUDA - if (dev_mask == gpu::kDevMask) { - ctx.get_stream()->Wait(); - } - #endif }, lhs.ctx(), const_vars, write_vars, FnProperty::kNormal, 0, "RegisterBinaryImperative"); }; diff --git a/src/resource.cc b/src/resource.cc index c76c7293b4a9..ce4e3deab3d0 100644 --- a/src/resource.cc +++ b/src/resource.cc @@ -256,7 +256,10 @@ class ResourceManagerImpl : public ResourceManager { inline void Seed(uint32_t seed) { mshadow::Random *r = prnd; Engine::Get()->PushAsync( - [r, seed](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [r, seed](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); r->set_stream(rctx.get_stream()); r->Seed(seed); on_complete(); @@ -324,7 +327,10 @@ class ResourceManagerImpl : public ResourceManager { uint32_t current_seed = p->ctx.dev_id + i * kMaxNumGPUs + seed * kRandMagic; Resource* r = &(p->resource[i]); Engine::Get()->PushAsync( - [r, current_seed](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [r, current_seed](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); auto state_space = static_cast(r->ptr_); mshadow::Stream* stream = rctx.get_stream(); CHECK_EQ(state_space->ctx.dev_id, stream->dev_id) @@ -418,7 +424,10 @@ class ResourceManagerImpl : public ResourceManager { inline void SeedOne(size_t i, uint32_t seed) { common::random::RandGenerator *r = sampler[i]; Engine::Get()->PushAsync( - [r, seed](RunContext rctx, Engine::CallbackOnComplete on_complete) { + [r, seed](RunContext rctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete on_complete) { + on_start(); r->Seed(rctx.get_stream(), seed); on_complete(); }, ctx, {}, {resource[i].var}, diff --git a/src/storage/gpu_device_storage.h b/src/storage/gpu_device_storage.h index 6b5df04808b7..8c1a40d3ecb3 100644 --- a/src/storage/gpu_device_storage.h +++ b/src/storage/gpu_device_storage.h @@ -62,6 +62,14 @@ inline void GPUDeviceStorage::Free(Storage::Handle handle) { #if MXNET_USE_NCCL std::lock_guard l(Storage::Get()->GetMutex(Context::kGPU)); #endif // MXNET_USE_NCCL +#if MXNET_USE_CUDA + for (auto ev : handle.sync_obj.events) { + auto valid_ev = ev.lock(); + if (valid_ev) { + MSHADOW_CUDA_CALL(cudaEventSynchronize(*valid_ev)); + } + } +#endif CUDA_CALL(cudaFree(handle.dptr)) profiler::GpuDeviceStorageProfiler::Get()->OnFree(handle); } diff --git a/src/storage/pooled_storage_manager.h b/src/storage/pooled_storage_manager.h index 194a1a2455ba..ccf5decee9d7 100644 --- a/src/storage/pooled_storage_manager.h +++ b/src/storage/pooled_storage_manager.h @@ -30,6 +30,7 @@ #include #include #include +#include #include "./storage_manager.h" #include "../profiler/storage_profiler.h" @@ -123,7 +124,9 @@ class PooledStorageManager : public StorageManager, void Free(Storage::Handle handle) override { // Insert returned memory in cache std::lock_guard lock(Storage::Get()->GetMutex(dev_type_)); - StoringMethod::InsertInCache(BucketingStrategy::get_bucket(handle.size), handle.dptr); + StoringMethod::InsertInCache(BucketingStrategy::get_bucket(handle.size), + handle.dptr, + handle.sync_obj); } void DirectFree(Storage::Handle handle) override { @@ -148,7 +151,7 @@ class PooledStorageManager : public StorageManager, UNSET_DEVICE(device_store); } - bool MemoryIsAvalable(size_t roundSize) const { + bool MemoryIsAvailable(size_t roundSize) const { const auto free = contextHelper_->freeMemorySize(); return free > roundSize && memory_allocation_limit_ <= free - roundSize; } @@ -172,7 +175,7 @@ void PooledStorageManager::Alloc(Storage::Hand if (!reuse_pool) { SET_DEVICE(device_store, contextHelper_, handle->ctx, true); roundSize = BucketingStrategy::RoundAllocSizeForBucket(bucket_id); - if (!MemoryIsAvalable(roundSize)) + if (!MemoryIsAvailable(roundSize)) ReleaseAllNoLock(false); void *ret = nullptr; @@ -199,7 +202,19 @@ void PooledStorageManager::Alloc(Storage::Hand handle->dptr = ret; } else { // Reusing memory - handle->dptr = reuse_pool->back(); + auto ptr_syncobj = reuse_pool->back(); + handle->dptr = ptr_syncobj.first; + if (dev_type_ == Context::kGPU) { + handle->sync_obj = ptr_syncobj.second; +#if MXNET_USE_CUDA + for (auto ev : handle->sync_obj.events) { + auto valid_ev = ev.lock(); + if (valid_ev) { + MSHADOW_CUDA_CALL(cudaEventSynchronize(*valid_ev)); + } + } +#endif + } reuse_pool->pop_back(); } #if MXNET_USE_CUDA @@ -366,9 +381,11 @@ class RoundPower2 : public RoundHelper { class UnorderedMapContainer { protected: inline void InitContainer(const RoundHelper *p) {} - inline void InsertInCache(size_t key, void *dptr) { memory_pool_[key].push_back(dptr); } + inline void InsertInCache(size_t key, void *dptr, Storage::SyncObj sync_obj) { + memory_pool_[key].emplace_back(dptr, sync_obj); + } - inline std::vector *GetMemStorage(size_t key) { + inline std::vector> *GetMemStorage(size_t key) { auto&& reuse_it = memory_pool_.find(key); return reuse_it != memory_pool_.end() && reuse_it->second.size()? &reuse_it->second : nullptr; } @@ -378,8 +395,8 @@ class UnorderedMapContainer { size_t released_memory = 0; for (auto&& i : memory_pool_) { for (auto&& j : i.second) { - contextHelper->Free(j); - GPU_PROFILER_ON_FREE(profilerGPU, j); + contextHelper->Free(j.first); + GPU_PROFILER_ON_FREE(profilerGPU, j.first); } released_memory += i.first * i.second.size(); i.second.clear(); @@ -389,7 +406,7 @@ class UnorderedMapContainer { } private: - std::unordered_map> memory_pool_; + std::unordered_map>> memory_pool_; }; // class UnorderedMapContainer /*! @@ -408,9 +425,11 @@ class VectorContainer { memory_pool_ .resize(vector_size); } - inline void InsertInCache(size_t idx, void *dptr) { memory_pool_[idx].push_back(dptr); } + inline void InsertInCache(size_t idx, void *dptr, Storage::SyncObj sync_obj) { + memory_pool_[idx].emplace_back(dptr, sync_obj); + } - std::vector *GetMemStorage(size_t idx) { + std::vector> *GetMemStorage(size_t idx) { auto &&reuse_pool = memory_pool_[idx]; return reuse_pool.size() ? &reuse_pool : nullptr; } @@ -423,8 +442,8 @@ class VectorContainer { continue; for (auto &j : memory_pool_[i]) { - contextHelper->Free(j); - GPU_PROFILER_ON_FREE(profilerGPU, j); + contextHelper->Free(j.first); + GPU_PROFILER_ON_FREE(profilerGPU, j.first); } released_memory += rndHelper->get_size(i) * memory_pool_[i].size(); memory_pool_[i].clear(); @@ -433,7 +452,7 @@ class VectorContainer { } private: - std::vector> memory_pool_; + std::vector>> memory_pool_; size_t first_bucket_; }; // class VectorContainer diff --git a/src/storage/storage.cc b/src/storage/storage.cc index d83860cf9fcc..e0defffa3c87 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -251,7 +251,7 @@ const std::string env_var_name(const char* dev_type, env_var_type type) { } // namespace storage -std::shared_ptr Storage::_GetSharedRef() { +const std::shared_ptr &Storage::_GetSharedRef() { #ifdef __MXNET_JS__ // dummy code needed for emscripten code to pass // do not know why, the new will be NULLPTR diff --git a/tests/cpp/engine/threaded_engine_test.cc b/tests/cpp/engine/threaded_engine_test.cc index 11ca2c94c1c0..49a5abaed6ed 100644 --- a/tests/cpp/engine/threaded_engine_test.cc +++ b/tests/cpp/engine/threaded_engine_test.cc @@ -110,8 +110,10 @@ double EvaluateWorkloads(const std::vector& workloads, if (engine == nullptr) { EvaluateWorkload(wl, data); } else { - auto func = [wl, data](RunContext ctx, Engine::CallbackOnComplete cb) { - EvaluateWorkload(wl, data); cb(); + auto func = [wl, data](RunContext ctx, + Engine::CallbackOnStart on_start, + Engine::CallbackOnComplete cb) { + on_start(); EvaluateWorkload(wl, data); cb(); }; std::vector reads; for (auto i : wl.reads) { @@ -182,7 +184,7 @@ TEST(Engine, RandSumExpr) { void Foo(mxnet::RunContext, int i) { printf("The fox says %d\n", i); } -void FooAsyncFunc(void*, void* cb_ptr, void* param) { +void FooAsyncFunc(void*, void*, void* cb_ptr, void* param) { if (param == nullptr) { LOG(INFO) << "The fox asynchronously says receiving nothing."; } else { @@ -346,7 +348,10 @@ TEST(Engine, basics) { printf("============= Test #1 ==============\n"); for (int i = 0; i < 10; ++i) { oprs.push_back(engine->NewOperator( - [i](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) { + [i](mxnet::RunContext ctx, + mxnet::Engine::CallbackOnStart on_start, + mxnet::Engine::CallbackOnComplete cb) { + on_start(); Foo(ctx, i); std::this_thread::sleep_for(std::chrono::seconds{1}); cb(); @@ -368,7 +373,10 @@ TEST(Engine, basics) { oprs.clear(); for (int i = 0; i < 10; ++i) { oprs.push_back(engine->NewOperator( - [i](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) { + [i](mxnet::RunContext ctx, + mxnet::Engine::CallbackOnStart on_start, + mxnet::Engine::CallbackOnComplete cb) { + on_start(); Foo(ctx, i); std::this_thread::sleep_for(std::chrono::milliseconds{500}); cb(); @@ -394,8 +402,11 @@ TEST(Engine, basics) { var = engine->NewVariable(); oprs.clear(); oprs.push_back(engine->NewOperator( - [](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) { + [](mxnet::RunContext ctx, + mxnet::Engine::CallbackOnStart on_start, + mxnet::Engine::CallbackOnComplete cb) { std::this_thread::sleep_for(std::chrono::seconds{2}); + on_start(); Foo(ctx, 42); cb(); }, @@ -414,7 +425,10 @@ TEST(Engine, basics) { var = engine->NewVariable(); oprs.clear(); oprs.push_back(engine->NewOperator( - [](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) { + [](mxnet::RunContext ctx, + mxnet::Engine::CallbackOnStart on_start, + mxnet::Engine::CallbackOnComplete cb) { + on_start(); Foo(ctx, 42); std::this_thread::sleep_for(std::chrono::seconds{2}); cb(); @@ -452,7 +466,10 @@ TEST(Engine, VarVersion) { EXPECT_EQ(var->version(), 0U); for (int i = 0; i < 10; ++i) { oprs.push_back(engine->NewOperator( - [i](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) { + [i](mxnet::RunContext ctx, + mxnet::Engine::CallbackOnStart on_start, + mxnet::Engine::CallbackOnComplete cb) { + on_start(); Foo(ctx, i); cb(); }, @@ -473,7 +490,10 @@ TEST(Engine, VarVersion) { oprs.clear(); for (int i = 0; i < 10; ++i) { oprs.push_back(engine->NewOperator( - [i](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) { + [i](mxnet::RunContext ctx, + mxnet::Engine::CallbackOnStart on_start, + mxnet::Engine::CallbackOnComplete cb) { + on_start(); Foo(ctx, i); cb(); }, diff --git a/tests/python/gpu/test_gluon_gpu.py b/tests/python/gpu/test_gluon_gpu.py index 310643bc19f7..7e09acada8f0 100644 --- a/tests/python/gpu/test_gluon_gpu.py +++ b/tests/python/gpu/test_gluon_gpu.py @@ -495,90 +495,6 @@ def tensor_size(big_tensor_bytes): # Evaluate model net(data_in).asnumpy() -# isolated execution bulking test function to be invoked with different env var settings - - -def _test_bulking_in_process(seed, time_per_iteration): - # Use flip since it's a simple function with same-sized I/O unlikely to ever be fused. - class Flip(gluon.HybridBlock): - def __init__(self, **kwargs): - super(Flip, self).__init__(**kwargs) - - def hybrid_forward(self, F, x): - return F.flip(x, axis=0) - - def get_net(num_ops): - net = nn.HybridSequential() - for _ in range(num_ops): - net.add(Flip()) - return net - - data_shape = (10,) - num_ops = 1000 - num_iterations = 20 - - # build model - x = mx.ndarray.zeros(data_shape) - x.attach_grad() - dy = mx.ndarray.ones(data_shape) - net = get_net(num_ops) - net.hybridize(static_alloc=True, static_shape=True) - - # time a number of forward() and backward() executions after some warm-up iterations - warmups = 1 - for i in range(num_iterations + warmups): - with autograd.record(): - if i == warmups: - start = time.time() - y = net(x) - y.backward(dy) - x.grad.wait_to_read() - - time_per_iteration.value = (time.time() - start) / num_iterations - -def _test_bulking(test_bulking_func): - # test case format: (max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training) - test_cases = [(0, 0, True), (1, 1, True), (15, 15, False), - (15, 0, True), (0, 15, True), (15, 15, True)] - times = {} - times_str = '' - for seg_sizes in test_cases: - # Create shared variable to return measured time from test process - time_per_iteration = mp.Manager().Value('d', 0.0) - - if not run_in_spawned_process(test_bulking_func, - {'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD': str(seg_sizes[0]), - 'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD': str(seg_sizes[1]), - 'MXNET_EXEC_BULK_EXEC_TRAIN': str(seg_sizes[2])}, - time_per_iteration): - # skip test since the python version can't run it properly. Warning msg was logged. - return - times[seg_sizes] = time_per_iteration.value - times_str += \ - '\n runtime of (fwd,bwd,enable) op seg setting ({},{},{}) =\t{:.1f} msec'.format( - seg_sizes[0], seg_sizes[1], seg_sizes[2], 1000.0 * times[seg_sizes]) - - fastest_non_bulked_time = min(times[(0, 0, True)], times[(1, 1, True)], times[(15, 15, False)]) - slowest_half_bulked_time = max(times[(0, 15, True)], times[(15, 0, True)]) - fastest_half_bulked_time = min(times[(0, 15, True)], times[(15, 0, True)]) - fully_bulked_time = times[(15, 15, True)] - - print(times_str) - # Non-bulked times[0,0,True], times[1,1,True] and times[15,15,False] should be about the same, - # slower than both half-bulked times[0,15,True] and times[15,0,True] - assert slowest_half_bulked_time < fastest_non_bulked_time, \ - 'A half-bulked exec time is slower than the non-bulked time by {} secs! {}' \ - .format(slowest_half_bulked_time - fastest_non_bulked_time, times_str) - # The fully bulked times[15,15,True] should be faster than both half-bulked runs - assert fully_bulked_time < fastest_half_bulked_time, \ - 'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}' \ - .format(fully_bulked_time - fastest_half_bulked_time, times_str) - -@pytest.mark.skip(reason='skippping temporarily, tracked by https://github.com/apache/incubator-mxnet/issues/14970') -def test_bulking_gluon_gpu(): - _test_bulking(_test_bulking_in_process) - - def test_hybridblock_mix_ctx_raise(): class FooHybrid(gluon.HybridBlock): def hybrid_forward(self, F, a, b): diff --git a/tests/python/gpu/test_operator_gpu.py b/tests/python/gpu/test_operator_gpu.py index b1db89e0ef74..32b11855b8f8 100644 --- a/tests/python/gpu/test_operator_gpu.py +++ b/tests/python/gpu/test_operator_gpu.py @@ -2113,78 +2113,6 @@ def test_bilinear_sampler_versions(): if req_dict['grid'] is 'write': assert_almost_equal(exe.grad_dict['grid'], exe_list[ref_idx].grad_dict['grid'], rtol=1e-3, atol=1e-5) - -# isolated execution bulking test function to be invoked with different env var settings -def _test_bulking_in_process(seed, time_per_iteration): - data_shape = (10,) - num_ops = 1000 - num_iterations = 20 - - ctx = default_context() - # build symbol - X = mx.sym.Variable('X') - sym = mx.sym.flip(X, axis=0) - for _ in range(num_ops-1): - sym = mx.sym.flip(sym, axis=0) - x = mx.ndarray.zeros(data_shape) - dx = mx.ndarray.zeros(data_shape) - dy = mx.ndarray.ones(data_shape) - exe = sym._bind(ctx=ctx, args=[x], args_grad = {'X':dx}) - - # time a number of forward() and backward() executions after some warm-up iterations - warmups = 1 - for i in range(num_iterations+warmups): - if i == warmups: - start = time.time() - exe.forward(is_train=True) - exe.backward(dy) - dx.wait_to_read() - time_per_iteration.value = (time.time() - start) / num_iterations - - -@pytest.mark.skip(reason='skippping temporarily, tracked by https://github.com/apache/incubator-mxnet/issues/16517') -def test_bulking_operator_gpu(): - _test_bulking(_test_bulking_in_process) - - -@pytest.mark.skip(reason='skippping temporarily, tracked by https://github.com/apache/incubator-mxnet/issues/14970') -def test_bulking(): - # test case format: (max_fwd_segment_size, max_bwd_segment_size, enable_bulking_in_training) - test_cases = [(0,0,True), (1,1,True), (15,15,False), (15,0,True), (0,15,True), (15,15,True)] - times = {} - times_str = '' - for seg_sizes in test_cases: - # Create shared variable to return measured time from test process - time_per_iteration = mp.Manager().Value('d', 0.0) - if not run_in_spawned_process(_test_bulking_in_process, - {'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_FWD' : str(seg_sizes[0]), - 'MXNET_EXEC_BULK_EXEC_MAX_NODE_TRAIN_BWD' : str(seg_sizes[1]), - 'MXNET_EXEC_BULK_EXEC_TRAIN' : str(seg_sizes[2])}, - time_per_iteration): - # skip test since the python version can't run it properly. Warning msg was logged. - return - times[seg_sizes] = time_per_iteration.value - times_str += \ - '\n runtime of (fwd,bwd,enable) op seg setting ({},{},{}) =\t{:.1f} msec'.format( - seg_sizes[0], seg_sizes[1], seg_sizes[2], 1000.0 * times[seg_sizes]) - - fastest_non_bulked_time = min(times[(0,0,True)], times[(1,1,True)], times[(15,15,False)]) - slowest_half_bulked_time = max(times[(0,15,True)], times[(15,0,True)]) - fastest_half_bulked_time = min(times[(0,15,True)], times[(15,0,True)]) - fully_bulked_time = times[(15,15,True)] - - print(times_str) - # Non-bulked times[0,0,True], times[1,1,True] and times[15,15,False] should be about the same, - # slower than both half-bulked times[0,15,True] and times[15,0,True] - assert slowest_half_bulked_time < fastest_non_bulked_time, \ - 'A half-bulked exec time is slower than the non-bulked time by {} secs! {}' \ - .format(slowest_half_bulked_time - fastest_non_bulked_time, times_str) - # The fully bulked times[15,15,True] should be faster than both half-bulked runs - assert fully_bulked_time < fastest_half_bulked_time, \ - 'The fully-bulked exec time is slower than a half-bulked time by {} secs! {}' \ - .format(fully_bulked_time - fastest_half_bulked_time, times_str) - - @pytest.mark.serial def test_allclose_function_gpu(): allclose_function([mx.cpu(), mx.gpu(0)])