Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

fix custom operation in fork #14451

Merged
merged 6 commits into from
Mar 20, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/initialize.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <dmlc/logging.h>
#include <mxnet/engine.h>
#include "./engine/openmp.h"
#include "./operator/custom/custom-inl.h"
#if MXNET_USE_OPENCV
#include <opencv2/opencv.hpp>
#endif // MXNET_USE_OPENCV
Expand Down Expand Up @@ -53,11 +54,14 @@ class LibraryInitializer {

// disable openmp for multithreaded workers
#ifndef _WIN32
using op::custom::CustomOperator;
pthread_atfork(
[]() {
CustomOperator::Get()->Stop();
Engine::Get()->Stop();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call Engine stop before customop stop. This will ensure that all ops pushed to engine have executed before stopping engine.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take that back. Maybe that won't work since Stopping engine before custom op means there may be ops that havent been pushed to engine by custom op but it has stopped. Another option is to add waitall after the worker joined in CustomOp stop to make sure all pushed ops have completed.

Copy link
Member Author

@arcadiaphy arcadiaphy Mar 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second option doesn't work either. When it goes into Stop function, the custom worker thread has already frozen on python function, leading to the blocking of worker join. I cannot think of a simple and graceful way to complete or ignore the unfinished custom operation other than manually add waiting command in python code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why it is frozen on worker join ? The worker should just push the operator to engine and return.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very easy to understand the first situation. The destruction of static variable on engine exit happens when python interpreter has been shutdown, so something bad happens when you call CFUNCTYPE function at that time.
I push a new commit to move custom Stop function to MXNotifyShutdown, forcing custom function called before python shutdown, so the first situation is all right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second situation of forking is so mysterious, I don't have a clue now. The custom Stop function is registered in prepare fork handler, everything should be OK at that time, but the worker thread just freezes.

},
[]() {
CustomOperator::Get()->Start();
Engine::Get()->Start();
},
[]() {
Expand All @@ -71,6 +75,7 @@ class LibraryInitializer {
#endif // MXNET_USE_OPENCV
engine::OpenMP::Get()->set_enabled(false);
Engine::Get()->Start();
CustomOperator::Get()->Start();
});
#endif
}
Expand Down
29 changes: 21 additions & 8 deletions src/operator/custom/custom-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,24 @@ class CustomOperator {
}

~CustomOperator() {
this->Stop();
}

static CustomOperator* Get() {
static CustomOperator inst;
return &inst;
}

void Start() {
num_free_threads = 0;
destructing_ = false;
naive_engine_ = true;
if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) {
naive_engine_ = false;
}
}

void Stop() {
if (naive_engine_) return;
{
std::unique_lock<std::mutex> lock(mutex_);
Expand All @@ -145,17 +163,12 @@ class CustomOperator {
}
for (auto &worker : workers_)
worker.join();
workers_.clear();
}

static CustomOperator* Get();

private:
CustomOperator() : num_free_threads(0) {
destructing_ = false;
naive_engine_ = true;
if (std::string("NaiveEngine") != dmlc::GetEnv("MXNET_ENGINE_TYPE", std::string())) {
naive_engine_ = false;
}
CustomOperator() {
this->Start();
}
void ThreadTarget() {
std::unique_lock<std::mutex> lock(mutex_);
Expand Down
5 changes: 0 additions & 5 deletions src/operator/custom/custom.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ namespace mxnet {
namespace op {
namespace custom {

CustomOperator* CustomOperator::Get() {
static CustomOperator inst;
return &inst;
}

struct CustomParam {
std::string op_type;
size_t num_args, num_outs, num_auxs;
Expand Down
39 changes: 39 additions & 0 deletions tests/python/unittest/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5198,6 +5198,45 @@ def create_operator(self, ctx, shapes, dtypes):
x = mx.nd.Custom(length=10, depth=10, op_type="no_input_op")
assert_almost_equal(x.asnumpy(), np.ones(shape=(10, 10), dtype=np.float32))

# test custom operator fork
# see https://github.com/apache/incubator-mxnet/issues/14396
if not sys.platform.startswith('win'): # no fork in windows
class AdditionOP(mx.operator.CustomOp):
def __init__(self):
super(AdditionOP, self).__init__()
def forward(self, is_train, req, in_data, out_data, aux):
out_data[0][:] = in_data[0] + in_data[1]
def backward(self, req, out_grad, in_data, out_data, in_grad, aux):
in_grad[0][:] = out_grad[0]
in_grad[1][:] = out_grad[0]

@mx.operator.register("AdditionOP")
class AdditionOPProp(mx.operator.CustomOpProp):
def __init__(self):
super(AdditionOPProp, self).__init__()
def list_arguments(self):
return ['a', 'b']
def list_outputs(self):
return ['output']
def infer_shape(self, in_shape):
return in_shape, [in_shape[0]]
def create_operator(self, ctx, shapes, dtypes):
return AdditionOP()

def custom_add():
a = mx.nd.array([1, 2, 3])
b = mx.nd.array([4, 5, 6])
c = mx.nd.Custom(a, b, op_type='AdditionOP')
assert_almost_equal((a + b).asnumpy(), c.asnumpy())

custom_add()
from multiprocessing import Process
p = Process(target=custom_add)
p.daemon = True
p.start()
p.join(5)
assert not p.is_alive(), "deadlock may exist in custom operator"

@with_seed()
def test_psroipooling():
for num_rois in [1, 2]:
Expand Down