diff --git a/src/engine/naive_engine.cc b/src/engine/naive_engine.cc index c321e48d562d..9f44b55cfcaf 100644 --- a/src/engine/naive_engine.cc +++ b/src/engine/naive_engine.cc @@ -29,6 +29,7 @@ #include "../profiler/profiler.h" #include "./openmp.h" #include "../common/object_pool.h" +#include "../profiler/custom_op_profiler.h" namespace mxnet { namespace engine { @@ -160,15 +161,19 @@ class NaiveEngine final : public Engine { profiler::Profiler *profiler = profiler::Profiler::Get(); NaiveOpr *opr = nullptr; const bool profiling = opr_name && profiler->IsProfiling(profiler::Profiler::kImperative); + // GenerateDisplayName() will return a pointer to the correct name of the operator + const char* display_name = profiling ? + profiler::CustomOpProfiler::Get()->GenerateDisplayName(opr_name) : + opr_name; if (profiling) { opr = NewOperator(exec_fun, const_vars, mutable_vars, - prop, opr_name)->Cast(); + prop, display_name)->Cast(); opr->profiling = profiling; std::unique_ptr attrs; if (profiler->AggregateEnabled()) { attrs.reset(new profiler::ProfileOperator::Attributes()); } - opr->opr_profile.reset(new profiler::ProfileOperator(opr->opr_name, attrs.release())); + opr->opr_profile.reset(new profiler::ProfileOperator(display_name, attrs.release())); opr->opr_profile->start(exec_ctx.dev_type, exec_ctx.dev_id); } if (exec_ctx.dev_mask() == gpu::kDevMask) { diff --git a/src/engine/threaded_engine.cc b/src/engine/threaded_engine.cc index 38311908bdcd..4375149c729c 100644 --- a/src/engine/threaded_engine.cc +++ b/src/engine/threaded_engine.cc @@ -287,8 +287,11 @@ void ThreadedEngine::DeleteOperator(OprHandle op) { void ThreadedEngine::Push(OprHandle op, Context exec_ctx, int priority, bool profiling) { BulkFlush(); - ThreadedOpr* threaded_opr = ThreadedOpr::CastFromBase(op); + if (profiling) { + threaded_opr->opr_name = + profiler::CustomOpProfiler::Get()->GenerateDisplayName(threaded_opr->opr_name); + } OprBlock* opr_block = OprBlock::New(); opr_block->opr = threaded_opr; @@ -333,9 +336,10 @@ void ThreadedEngine::PushAsync(AsyncFn fn, Context exec_ctx, << device_count_; } #endif - ThreadedOpr *opr = NewOperator(std::move(fn), const_vars, mutable_vars, prop, opr_name, wait); - opr->temporary = true; const bool profiling = profiler_->IsProfiling(profiler::Profiler::kImperative); + ThreadedOpr *opr = NewOperator(std::move(fn), const_vars, mutable_vars, + prop, opr_name, wait); + opr->temporary = true; Push(opr, exec_ctx, priority, profiling); } diff --git a/src/engine/threaded_engine.h b/src/engine/threaded_engine.h index c39f322596ae..3d56b7c79b90 100644 --- a/src/engine/threaded_engine.h +++ b/src/engine/threaded_engine.h @@ -43,6 +43,7 @@ #include "../profiler/profiler.h" #include "./openmp.h" #include "../common/object_pool.h" +#include "../profiler/custom_op_profiler.h" namespace mxnet { namespace engine { diff --git a/src/operator/custom/custom-inl.h b/src/operator/custom/custom-inl.h index 3bf63b75cfdb..1173b14eb2fe 100644 --- a/src/operator/custom/custom-inl.h +++ b/src/operator/custom/custom-inl.h @@ -43,6 +43,7 @@ #include #include #include "../operator_common.h" +#include "../../profiler/custom_op_profiler.h" namespace mxnet { namespace op { @@ -76,9 +77,16 @@ class CustomOperator { bool training, const std::vector& arrs, const std::vector& tags, const std::unordered_set& output_tags, - const std::vector& outputs) { + const std::vector& outputs, + const std::string op_type = "") { if (naive_engine_) { - func(); + if (profiler::Profiler::Get()->IsProfiling(profiler::Profiler::kImperative)) { + profiler::CustomOpProfiler::Get()->OnCustomBegin(op_type); + func(); + profiler::CustomOpProfiler::Get()->OnCustomEnd(); + } else { + func(); + } for (size_t i = 0, out_idx = 0; i < arrs.size(); i++) { if (arrs[i].storage_type() == kDefaultStorage || arrs[i].storage_type() == kUndefinedStorage) @@ -97,7 +105,13 @@ class CustomOperator { bool prev_training = Imperative::Get()->set_is_training(training); try { - func(); + if (profiler::Profiler::Get()->IsProfiling(profiler::Profiler::kImperative)) { + profiler::CustomOpProfiler::Get()->OnCustomBegin(op_type); + func(); + profiler::CustomOpProfiler::Get()->OnCustomEnd(); + } else { + func(); + } } catch (dmlc::Error& e) { exception_ = std::make_shared(std::current_exception()); @@ -143,8 +157,7 @@ class CustomOperator { ctx.async_on_complete(); }, - ctx.run_ctx.ctx, vars, vars2, FnProperty::kNoSkip, 0, - "CustomOperator"); + ctx.run_ctx.ctx, vars, vars2, FnProperty::kNoSkip, 0, "CustomOperatorWait"); }); // increase num_threads if there is not enough threads to execute custom operator if (q_.size() > num_free_threads_) diff --git a/src/operator/custom/custom.cc b/src/operator/custom/custom.cc index 77fe2e6e4b1c..5d2c284be46f 100644 --- a/src/operator/custom/custom.cc +++ b/src/operator/custom/custom.cc @@ -345,7 +345,7 @@ void ForwardEx(const OpStatePtr& state, const OpContext& ctx, static_cast(ctx.is_train), params.info->contexts[kCustomOpForward])); }, - ctx, false, ctx.is_train, cpys, tags, output_tags, outputs); + ctx, false, ctx.is_train, cpys, tags, output_tags, outputs, params.op_type); } void BackwardEx(const OpStatePtr& state, const OpContext& ctx, @@ -415,7 +415,7 @@ void BackwardEx(const OpStatePtr& state, const OpContext& ctx, ptrs.size(), const_cast(ptrs.data()), const_cast(tags.data()), reinterpret_cast(req.data()), static_cast(ctx.is_train), params.info->contexts[kCustomOpBackward])); - }, ctx, false, ctx.is_train, cpys, tags, output_tags, outputs); + }, ctx, false, ctx.is_train, cpys, tags, output_tags, outputs, "_backward_" + params.op_type); } // infer storage backward function for custom op which assigns kDefaultStorage for diff --git a/src/profiler/custom_op_profiler.h b/src/profiler/custom_op_profiler.h new file mode 100644 index 000000000000..9449e5b84a3a --- /dev/null +++ b/src/profiler/custom_op_profiler.h @@ -0,0 +1,125 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +#ifndef MXNET_PROFILER_CUSTOM_OP_PROFILER_H_ +#define MXNET_PROFILER_CUSTOM_OP_PROFILER_H_ + +#include +#include +#include +#include +#include "./profiler.h" + +namespace mxnet { +namespace profiler { + +using Tid = std::thread::id; +using TaskPtr = std::unique_ptr; + + /*! + * \brief Singleton class to assist profiling python callback of custom operators + * and to assist linking sub-operators to custom operators + */ +class CustomOpProfiler { + public: + static CustomOpProfiler* Get() { + static std::mutex mtx; + static std::unique_ptr prof = nullptr; + if (!prof) { + std::unique_lock lk(mtx); + if (!prof) + prof = std::make_unique(); + } + return prof.get(); + } + /*! + * \brief Called before the callback of custom operators to start a profile task for python + * code execution time + * \param op_type The registed name of the custom operator + */ + void OnCustomBegin(const std::string& op_type) { + const Tid tid = std::this_thread::get_id(); + const std::string task_name = MakePythonCodeName(op_type); + std::lock_guard lock(mutex_); + tid_to_op_type_[tid] = op_type; + tasks_[tid] = std::make_unique(task_name.c_str(), &custom_op_domain); + tasks_[tid]->start(); + } + + /*! + * \brief Called after the callback of custom operators to stop the profile task for python + * code execution time + */ + void OnCustomEnd() { + const Tid tid = std::this_thread::get_id(); + std::lock_guard lock(mutex_); + tid_to_op_type_.erase(tid); + // this should never fail + CHECK(tasks_.find(tid) != tasks_.end()) << "thread_id not found. " << + "Please use OnCustomBegin() and OnCustomEnd() in pairs."; + tasks_[tid]->stop(); + tasks_.erase(tid); + } + + /*! + * \brief Generate a display name for sub-operators, which is the name used for OprBlock + * and later by profiler, and store it in a unordered_set so that it can be referenced + * in the future. + * Notice if the operator is not a sub-operator, just return the char pointer back. + * \param op_type The registed name of the operator + * \return Returns a pointer to the display name generated + */ + const char* GenerateDisplayName(const char* op_type) { + if (!op_type) { + return nullptr; + } + Tid tid = std::this_thread::get_id(); + std::lock_guard lock(mutex_); + if (tid_to_op_type_.find(tid) == tid_to_op_type_.end()) { + return op_type; + } + std::string name = MakeSubOperatorName(tid, op_type); + return display_names_.insert(name).first->c_str(); + } + + private: + /* !\brief make the display name for sub-operators */ + inline std::string MakeSubOperatorName(const Tid& tid, const char* op_type) { + return tid_to_op_type_[tid] + "::" + std::string(op_type); + } + /* !\brief make the display name for the pure python call back function i.e. + * forward() or backward() in the custom operator definition + */ + inline std::string MakePythonCodeName(const std::string& op_type) { + return op_type + "::pure_python"; + } + /*! \brief class mutex */ + std::mutex mutex_; + /* !\brief display names for sub-operators in custom ops */ + std::unordered_set display_names_; + /* !\brief profiling tasks for pure python code in custom operators */ + std::unordered_map tasks_; + /* !\brief the maping from thread id to the registered name op the custom operator + * that is runnin on that thread + */ + std::unordered_map tid_to_op_type_; +}; +} // namespace profiler +} // namespace mxnet + +#endif // MXNET_PROFILER_CUSTOM_OP_PROFILER_H_ diff --git a/src/profiler/profiler.h b/src/profiler/profiler.h index a6d9ecf06fee..49ef99da5bba 100644 --- a/src/profiler/profiler.h +++ b/src/profiler/profiler.h @@ -790,6 +790,13 @@ struct ProfileTask : public ProfileDuration { NVTX_ONLY_CODE(nvtx_duration_.reset(new nvtx::NVTXDuration(name))); } + /*! + * \brief Set the domain + */ + void setDomain(ProfileDomain* domain) { + domain_ = domain; + } + /*! * \brief Start the profiling scope */ @@ -1111,6 +1118,8 @@ struct ProfileMarker { VTUNE_ONLY_CODE(std::unique_ptr vtune_instant_marker_); }; +static ProfileDomain custom_op_domain("Custom Operator"); + /*! * \brief Operator profiler object. Logs as both an independent event and a task in * the operator domain @@ -1162,10 +1171,16 @@ struct ProfileOperator : public ProfileEvent { : ProfileEvent(name) , as_task_(name, &domain_) , name_(name) - , attributes_(attributes) { + , attributes_(attributes) + , profiling_(!IsDeprecatedOperator(name)) { + if (IsSubOperatorOfCustom(name)) { + as_task_.setDomain(&custom_op_domain); + SetCategories(custom_op_domain.name()); + } else { + SetCategories(domain_.name()); + } // make as_task_ not to add stat to AggregateStats; otherwise we will add twice as_task_.enableAggregateStats(false); - SetCategories(domain_.name()); } /*! * \brief Start the profiling scope @@ -1175,15 +1190,19 @@ struct ProfileOperator : public ProfileEvent { void start(mxnet::Context::DeviceType dev_type, uint32_t dev_id) { dev_type_ = dev_type; dev_id_ = dev_id; - ProfileEvent::start(); - as_task_.start(); + if (profiling_) { + ProfileEvent::start(); + as_task_.start(); + } } /*! * \brief Stop the profiling scope */ void stop() override { - as_task_.stop(); - ProfileEvent::stop(); + if (profiling_) { + as_task_.stop(); + ProfileEvent::stop(); + } } /*! @@ -1208,7 +1227,11 @@ struct ProfileOperator : public ProfileEvent { if (attributes) { name_.append(attributes->to_string().c_str()); } - categories_.set("operator"); + if (IsSubOperatorOfCustom(name)) { + categories_.set(custom_op_domain.name()); + } else { + categories_.set("operator"); + } items_[kStart].timestamp_ = start_time; items_[kStop].timestamp_ = stop_time; } @@ -1228,6 +1251,20 @@ struct ProfileOperator : public ProfileEvent { start_time_, ProfileStat::NowInMicrosec(), attributes_.get()); } + /*! + * \brief Check if this operator is no longer profiled + * Notice that this operator may still be used for e.g synchronization + */ + inline static bool IsDeprecatedOperator(const char* name) { + return strcmp(name, "CustomOperatorWait") == 0 || + strcmp(name, "Custom") == 0 || strcmp(name, "_backward_Custom") == 0; + } + /*! + * \brief Check if this operator a sub-operator of a custom operator + */ + inline static bool IsSubOperatorOfCustom(const char* name) { + return strstr(name, "::"); + } /*! \brief Also log the operator as a task in the operator domain */ ProfileTask as_task_; /* !\brief Operator name */ @@ -1240,6 +1277,8 @@ struct ProfileOperator : public ProfileEvent { static ProfileDomain domain_; /*! \brief Optional operator attributes */ std::unique_ptr attributes_; + /*! \brief Whether to profile or not */ + const bool profiling_; }; /* diff --git a/tests/python/unittest/test_profiler.py b/tests/python/unittest/test_profiler.py index b76bfbc82d12..ab7d29f104ca 100644 --- a/tests/python/unittest/test_profiler.py +++ b/tests/python/unittest/test_profiler.py @@ -23,6 +23,7 @@ import os import json from collections import OrderedDict +from common import run_in_spawned_process def enable_profiler(profile_filename, run=True, continuous_dump=False, aggregate_stats=False): profiler.set_config(profile_symbolic=True, @@ -291,6 +292,164 @@ def test_aggregate_duplication(): assert target_dict['Time']['operator']['_plus_scalar']['Count'] == 2 profiler.set_state('stop') +def test_custom_operator_profiling(seed = None, file_name = None): + class Sigmoid(mx.operator.CustomOp): + def forward(self, is_train, req, in_data, out_data, aux): + x = in_data[0].asnumpy() + import numpy as np + y = 1.0 / (1.0 + np.exp(-x)) + self.assign(out_data[0], req[0], mx.nd.array(y)) + # Create a dummy matrix using nd.zeros. Test if 'MySigmoid::_zeros' is in dump file + dummy = mx.nd.zeros(shape=(100, 100)) + + def backward(self, req, out_grad, in_data, out_data, in_grad, aux): + y = out_data[0].asnumpy() + dy = out_grad[0].asnumpy() + dx = dy*(1.0 - y)*y + self.assign(in_grad[0], req[0], mx.nd.array(dx)) + + @mx.operator.register('MySigmoid') + class SigmoidProp(mx.operator.CustomOpProp): + def __init__(self): + super(SigmoidProp, self).__init__(True) + + def list_arguments(self): + return ['data'] + + def list_outputs(self): + return ['output'] + + def infer_shape(self, in_shapes): + data_shape = in_shapes[0] + output_shape = data_shape + return (data_shape,), (output_shape,), () + + def create_operator(self, ctx, in_shapes, in_dtypes): + return Sigmoid() + + if file_name is None: + file_name = 'test_custom_operator_profiling.json' + enable_profiler(profile_filename = file_name, run=True, continuous_dump=True,\ + aggregate_stats=True) + x = mx.nd.array([0, 1, 2, 3]) + x.attach_grad() + with mx.autograd.record(): + y = mx.nd.Custom(x, op_type='MySigmoid') + y.backward() + mx.nd.waitall() + profiler.dump(False) + debug_str = profiler.dumps(format = 'json') + target_dict = json.loads(debug_str) + assert 'Time' in target_dict and 'Custom Operator' in target_dict['Time'] \ + and 'MySigmoid::pure_python' in target_dict['Time']['Custom Operator'] \ + and '_backward_MySigmoid::pure_python' in target_dict['Time']['Custom Operator'] \ + and 'MySigmoid::_zeros' in target_dict['Time']['Custom Operator'] + profiler.set_state('stop') + +def test_custom_operator_profiling_multiple_custom_ops_imperative(seed = None, \ + mode = 'imperative', file_name = None): + class MyAdd(mx.operator.CustomOp): + def forward(self, is_train, req, in_data, out_data, aux): + self.assign(out_data[0], req[0], in_data[0] + 1) + + def backward(self, req, out_grad, in_data, out_data, in_grad, aux): + self.assign(in_grad[0], req[0], out_grad[0]) + + @mx.operator.register('MyAdd1') + class MyAdd1Prop(mx.operator.CustomOpProp): + def __init__(self): + super(MyAdd1Prop, self).__init__(need_top_grad=True) + + def list_arguments(self): + return ['data'] + + def list_outputs(self): + return ['output'] + + def infer_shape(self, in_shape): + # inputs, outputs, aux + return [in_shape[0]], [in_shape[0]], [] + + def create_operator(self, ctx, shapes, dtypes): + return MyAdd() + + @mx.operator.register('MyAdd2') + class MyAdd2Prop(mx.operator.CustomOpProp): + def __init__(self): + super(MyAdd2Prop, self).__init__(need_top_grad=True) + + def list_arguments(self): + return ['data'] + + def list_outputs(self): + return ['output'] + + def infer_shape(self, in_shape): + # inputs, outputs, aux + return [in_shape[0]], [in_shape[0]], [] + + def create_operator(self, ctx, shapes, dtypes): + return MyAdd() + + if file_name is None: + file_name = 'test_custom_operator_profiling_multiple_custom_ops_imperative.json' + enable_profiler(profile_filename = file_name, run=True, continuous_dump=True,\ + aggregate_stats=True) + inp = mx.nd.zeros(shape=(100, 100)) + if mode == 'imperative': + x = inp + 1 + y = mx.nd.Custom(inp, op_type='MyAdd1') + z = mx.nd.Custom(inp, op_type='MyAdd2') + elif mode == 'symbolic': + a = mx.symbol.Variable('a') + b = a + 1 + c = mx.symbol.Custom(data=a, op_type='MyAdd1') + d = mx.symbol.Custom(data=a, op_type='MyAdd2') + b.bind(mx.cpu(), {'a': inp}).forward() + c.bind(mx.cpu(), {'a': inp}).forward() + d.bind(mx.cpu(), {'a': inp}).forward() + mx.nd.waitall() + profiler.dump(False) + debug_str = profiler.dumps(format = 'json') + target_dict = json.loads(debug_str) + ''' + We are calling _plus_scalar within MyAdd1 and MyAdd2 and outside both the custom + operators, so in aggregate stats we should have three different kinds of + _plus_scalar under domains "Custom Operator" and "operator" + ''' + assert 'Time' in target_dict and 'Custom Operator' in target_dict['Time'] \ + and 'MyAdd1::pure_python' in target_dict['Time']['Custom Operator'] \ + and 'MyAdd2::pure_python' in target_dict['Time']['Custom Operator'] \ + and 'MyAdd1::_plus_scalar' in target_dict['Time']['Custom Operator'] \ + and 'MyAdd2::_plus_scalar' in target_dict['Time']['Custom Operator'] \ + and '_plus_scalar' not in target_dict['Time']['Custom Operator'] \ + and 'operator' in target_dict['Time'] \ + and '_plus_scalar' in target_dict['Time']['operator'] + profiler.set_state('stop') + +def test_custom_operator_profiling_multiple_custom_ops_symbolic(): + run_in_spawned_process(test_custom_operator_profiling_multiple_custom_ops_imperative, \ + {'MXNET_EXEC_BULK_EXEC_INFERENCE' : 0, \ + 'MXNET_EXEC_BULK_EXEC_TRAIN' : 0}, \ + 'symbolic', \ + 'test_custom_operator_profiling_multiple_custom_ops_symbolic.json') + +def test_custom_operator_profiling_naive_engine(): + # run the three tests above using Naive Engine + run_in_spawned_process(test_custom_operator_profiling, \ + {'MXNET_ENGINE_TYPE' : "NaiveEngine"}, \ + 'test_custom_operator_profiling_naive.json') + run_in_spawned_process(test_custom_operator_profiling_multiple_custom_ops_imperative, \ + {'MXNET_ENGINE_TYPE' : "NaiveEngine"}, \ + 'imperative', \ + 'test_custom_operator_profiling_multiple_custom_ops_imperative_naive.json') + run_in_spawned_process(test_custom_operator_profiling_multiple_custom_ops_imperative, \ + {'MXNET_ENGINE_TYPE' : "NaiveEngine", \ + 'MXNET_EXEC_BULK_EXEC_INFERENCE' : 0, \ + 'MXNET_EXEC_BULK_EXEC_TRAIN' : 0}, \ + 'symbolic', \ + 'test_custom_operator_profiling_multiple_custom_ops_symbolic_naive.json') + if __name__ == '__main__': import nose nose.runmodule()