Skip to content
This repository has been archived by the owner on Nov 17, 2023. It is now read-only.

Commit

Permalink
Make stuff work
Browse files Browse the repository at this point in the history
  • Loading branch information
junrushao committed Apr 13, 2019
1 parent 4c574ed commit acfbb5a
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 29 deletions.
2 changes: 1 addition & 1 deletion python/mxnet/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def forward(self, is_train=False, **kwargs):
check_call(_LIB.MXExecutorForward(
self.handle,
ctypes.c_int(int(is_train))))

self.outputs = self._get_outputs()
return self.outputs

def backward(self, out_grads=None, is_train=True):
Expand Down
128 changes: 119 additions & 9 deletions src/executor/graph_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "../common/utils.h"
#include "../common/exec_utils.h"
#include "../operator/subgraph/subgraph_property.h"
#include "../imperative/imperative_utils.h"

namespace mxnet {
namespace exec {
Expand All @@ -43,6 +44,7 @@ using namespace mxnet::common;
GraphExecutor::GraphExecutor() {
log_verbose_ = dmlc::GetEnv("MXNET_EXEC_VERBOSE_LOGGING", false);
need_grad_ = false;
is_dynamic_ = false;
subgraph_property_ = dmlc::GetEnv("MXNET_SUBGRAPH_BACKEND", std::string());
engine_ref_ = Engine::_GetSharedRef();
}
Expand Down Expand Up @@ -85,7 +87,15 @@ void GraphExecutor::Backward(const std::vector<NDArray>& head_grads, bool is_tra
<< "If you are attempting to minimize the output as "
<< "an objective, please modify your network and "
<< "pass it through the make_loss symbol.";
CopyFromTo(head_grads[i], &(head_grad_array_[i]));
const NDArray &from = head_grads[i];
NDArray &to = head_grad_array_[i];
if (this->is_dynamic_) {
to.WaitToRead();
if (to.shape().ndim() == 0) {
to.Init(from.shape());
}
}
CopyFromTo(from, &to);
}
}
}
Expand All @@ -107,7 +117,31 @@ void GraphExecutor::SetMonitorCallback(const MonitorCallback& callback, bool mon
monitor_all_ = monitor_all;
}

std::string Shape2Str(const mxnet::TShape &shape) {
std::ostringstream oss;
oss << '[';
bool is_first = true;
for (auto x : shape) {
if (is_first) {
is_first = false;
} else {
oss << ", ";
}
oss << x;
}
oss << "]";
return oss.str();
}

const std::vector<NDArray>& GraphExecutor::outputs() const {
if (this->is_dynamic_) {
for (const NDArray &array : output_arrays_) {
array.WaitToRead();
if (array.shape().ndim() == 0 || array.shape().Size() == 0) {
const_cast<NDArray &>(array).SetShapeFromChunk();
}
}
}
return output_arrays_;
}

Expand Down Expand Up @@ -370,8 +404,7 @@ void GraphExecutor::Init(nnvm::Symbol symbol,
arg_shapes.resize(idx.input_nodes().size(), mxnet::TShape());
g = InferShape(std::move(g), std::move(arg_shapes), "__shape__");
if (g.GetAttr<size_t>("shape_num_unknown_nodes") != 0U) {
HandleInferShapeError(num_forward_inputs_, g.indexed_graph(),
g.GetAttr<mxnet::ShapeVector>("shape"));
this->is_dynamic_ = true;
}

arg_dtypes.resize(idx.input_nodes().size(), -1);
Expand Down Expand Up @@ -810,6 +843,7 @@ Executor* GraphExecutor::Reshape(const bool partial_shaping,
}
g = InferShape(std::move(g), std::move(arg_shapes), "__shape__");
if (g.GetAttr<size_t>("shape_num_unknown_nodes") != 0U) {
this->is_dynamic_ = true;
HandleInferShapeError(num_forward_inputs_, g.indexed_graph(),
g.GetAttr<mxnet::ShapeVector>("shape"));
}
Expand Down Expand Up @@ -966,14 +1000,17 @@ void GraphExecutor::InitDataEntryMemory(std::vector<NDArray>* shared_pool) {
uint32_t oid = head_grad_map_.at(idx[nid].source);
uint32_t eid = idx.entry_id(idx.outputs()[oid]);
NDArrayStorageType stype = (NDArrayStorageType) vstorage_type[eid];
CHECK_NE(vshape[eid].ndim(), 0U);
// TODO(@junrushao1994): numpy compatibility
bool unknown_shape = (vshape[eid].ndim() == 0U);
CHECK_NE(vdtype[eid], -1);
auto data_eid = idx.entry_id(nid, 0);
// initialize based on storage_type
if (stype != kDefaultStorage) {
data_entry_[data_eid] = NDArray(stype, vshape[eid], data_context[eid], true, vdtype[eid]);
} else {
} else if (!unknown_shape) {
data_entry_[data_eid] = NDArray(vshape[eid], data_context[eid], false, vdtype[eid]);
} else {
data_entry_[data_eid] = NDArray(data_context[eid], vdtype[eid]);
}
if (log_verbose_) {
LOG(INFO) << "\tinit head_grad entry\t" << data_eid << "\tas "
Expand Down Expand Up @@ -1056,9 +1093,13 @@ void GraphExecutor::InitDataEntryMemory(std::vector<NDArray>* shared_pool) {
int storage_id = vstorage[i];
auto storage_type = (NDArrayStorageType) vstorage_type[i];
if (storage_type == kDefaultStorage) {
CHECK_GE(storage_id, 0) << "Do not support runtime shape op yet";
const NDArray& src = data_pool_.at(storage_id);
data_entry_[i] = src.AsArray(vshape[i], vdtype[i]);
if (vshape[i].ndim() == 0) {
data_entry_[i] = NDArray(data_context[i], vdtype[i]);
} else {
CHECK_GE(storage_id, 0) << "Do not support runtime shape op yet";
const NDArray& src = data_pool_.at(storage_id);
data_entry_[i] = src.AsArray(vshape[i], vdtype[i]);
}
} else {
data_entry_[i] = NDArray(storage_type, vshape[i], data_context[i],
true, vdtype[i]);
Expand Down Expand Up @@ -1198,7 +1239,10 @@ void GraphExecutor::InitOpSegs() {
const profiler::Profiler *prof = profiler::Profiler::Get();
bool prefer_bulk_exec_train = Imperative::PreferBulkExecTrain()
&& (!prof || !prof->AggregateEnabled());

if (this->is_dynamic_) {
prefer_bulk_exec_inference = false;
prefer_bulk_exec_train = false;
}
bool is_training = num_forward_nodes_ != total_num_nodes;

if (prefer_bulk_exec_train && is_training) {
Expand Down Expand Up @@ -1289,6 +1333,8 @@ void GraphExecutor::ExecuteMonOutputCallback(size_t nid) {
}

void GraphExecutor::RunOps(bool is_train, size_t topo_start, size_t topo_end) {
static auto& finfer_shape = nnvm::Op::GetAttr<mxnet::FInferShape>("FInferShape");
static auto& is_backward = Op::GetAttr<nnvm::TIsBackward>("TIsBackward");
// Update context
const auto& idx = graph_.indexed_graph();
for (size_t nid = topo_start; nid < topo_end; ++nid) {
Expand All @@ -1300,6 +1346,7 @@ void GraphExecutor::RunOps(bool is_train, size_t topo_start, size_t topo_end) {
opnode.exec->op_ctx.need_grad = need_grad_;
}

// std::cout << "is_dynamic = " << this->is_dynamic_ << std::endl;
// Push Ops
for (size_t nid = topo_start; nid < topo_end; ++nid) {
auto seg_op = cached_seg_opr_[nid];
Expand All @@ -1319,6 +1366,60 @@ void GraphExecutor::RunOps(bool is_train, size_t topo_start, size_t topo_end) {
if (monitor_callback_ && monitor_all_) {
ExecuteMonInputCallback(nid);
}
if (this->is_dynamic_) {
const auto &op = inode.source->op();
for (NDArray &array : opnode.exec->in_array) {
array.WaitToRead();
if (array.shape().ndim() == 0) {
array.SetShapeFromChunk();
}
}
if (finfer_shape.count(op)) {
mxnet::ShapeVector in_shapes;
mxnet::ShapeVector out_shapes;
for (NDArray &array : opnode.exec->in_array) {
in_shapes.push_back(array.shape());
}
for (NDArray &array : opnode.exec->out_array) {
out_shapes.push_back(array.shape());
}
auto finfer = finfer_shape[op];
try {
bool success = finfer(inode.source->attrs, &in_shapes, &out_shapes);
CHECK(success) << "InferShape failed in operator " << inode.source->attrs.name;
} catch (const std::exception& e) {
throw dmlc::Error("Error in operator " + inode.source->attrs.name + ": " + e.what());
}
int n_out = out_shapes.size();
for (int i = 0; i < n_out; ++i) {
NDArray &array = opnode.exec->out_array[i];
if (array.shape().ndim() == 0) {
array.Init(out_shapes[i]);
}
}
} else if (is_backward.get(inode.source->op(), false) && inode.control_deps.size()) {
CHECK_GE(inode.control_deps.size(), 1U) <<
"BackwardOp need to have control_deps to its forward op";
uint32_t fid = inode.control_deps[0];
const OpNode& fopnode = op_nodes_[fid];
CHECK_EQ(fopnode.exec->in_array.size(), opnode.exec->out_array.size());
int nelem = fopnode.exec->in_array.size();
std::vector<NDArray> &from = fopnode.exec->in_array;
std::vector<NDArray> &to = opnode.exec->out_array;
for (int i = 0; i < nelem; ++i) {
if (to[i].shape().ndim() == 0) {
to[i].Init(from[i].shape());
}
}
}
}
// std::cout << "nid = " << nid << std::endl;
// for (NDArray &array : opnode.exec->in_array) {
// std::cout << " " << Shape2Str(array.shape()) << std::endl;
// }
// for (NDArray &array : opnode.exec->out_array) {
// std::cout << " " << Shape2Str(array.shape()) << std::endl;
// }
opnode.exec->op_ctx.is_train = is_train;
opnode.exec->op_ctx.need_grad = need_grad_;
if (opnode.exec->exec_type() == ExecType::kCrossDeviceCopy) {
Expand All @@ -1332,6 +1433,15 @@ void GraphExecutor::RunOps(bool is_train, size_t topo_start, size_t topo_end) {
} else if (opnode.cached_opr != nullptr) {
bool profiling = profiler::Profiler::Get()->GetState() == profiler::Profiler::kRunning;
Engine::Get()->Push(opnode.cached_opr, opnode.ctx, 0, profiling);
if (this->is_dynamic_) {
for (NDArray &array : opnode.exec->out_array) {
array.WaitToRead();
// TODO(@junrushao1994): numpy compatibility
if (array.shape().ndim() == 0) {
array.SetShapeFromChunk();
}
}
}
} else {
LOG(FATAL) << "Not accessed";
}
Expand Down
2 changes: 2 additions & 0 deletions src/executor/graph_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ class GraphExecutor : public Executor {
void ExecuteMonOutputCallback(size_t nid);
// peform bulking and segmentation on the region [from_node, up_to_node) of a graph
void BulkOpSegs(size_t from_node, size_t up_to_node, size_t segment_num_nodes_max);
// When infer shape fails, fall back to ensure dynamic-shaped operators executed correctly.
bool is_dynamic_;
// indicate whether there is a backward graph for gradients.
bool need_grad_;
// internal graph
Expand Down
21 changes: 10 additions & 11 deletions src/operator/control_flow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,8 @@ static void WhileLoopComputeExCPU(const OpStatePtr& state_ptr,
}
// we create func_outputs for the current step:
// func_outputs[0: num_out_data] is a slice of outputs[][step]
if (step == 0) {
for (size_t i = 0; i < (size_t) params.num_out_data; ++i) {
func_outputs[i] = NDArray(outputs[i].ctx(), outputs[i].dtype());
}
} else {
for (size_t i = 0; i < (size_t) params.num_out_data; ++i) {
func_outputs[i] = outputs[i].At(step);
}
for (size_t i = 0; i < (size_t) params.num_out_data; ++i) {
func_outputs[i] = NDArray(outputs[i].ctx(), outputs[i].dtype());
}
// func_outputs[num_out_data: ] are new_loop_vars, need to allocate new memory
for (size_t i = params.num_out_data; i < outputs.size(); ++i) {
Expand All @@ -612,8 +606,8 @@ static void WhileLoopComputeExCPU(const OpStatePtr& state_ptr,
if (step == 0) {
for (size_t i = 0; i < (size_t) params.num_out_data; ++i) {
// TODO(@junrushao1994): support 0-d tensors
func_outputs[i].WaitToRead();
if (func_outputs[i].shape().ndim() == 0) {
func_outputs[i].WaitToRead();
func_outputs[i].SetShapeFromChunk();
}
mxnet::TShape step_shape = func_outputs[i].shape();
Expand All @@ -623,10 +617,12 @@ static void WhileLoopComputeExCPU(const OpStatePtr& state_ptr,
shape[j + 1] = step_shape[j];
}
const_cast<NDArray &>(outputs[i]).Init(shape);
NDArray first_slot = outputs[i].At(0);
mxnet::CopyFromTo(func_outputs[i], &first_slot);
}
}
for (size_t i = 0; i < (size_t) params.num_out_data; ++i) {
NDArray first_slot = outputs[i].At(step);
mxnet::CopyFromTo(func_outputs[i], &first_slot);
}
// func_inputs on the next step:
// the output (new_loop_vars) will become the new inputs (loop_vars)
for (size_t i = params.num_out_data; i < outputs.size(); ++i) {
Expand All @@ -653,6 +649,9 @@ static void WhileLoopComputeExCPU(const OpStatePtr& state_ptr,
}
mxnet::CopyFromTo(func_inputs[j], &outputs[i]);
}
for (size_t i = 0; i < (size_t) params.num_out_data; ++i) {
const_cast<NDArray &>(outputs[i]).SetShapeFromChunk();
}
}

static void WhileLoopGradComputeExCPU(const OpStatePtr& state_ptr,
Expand Down
29 changes: 21 additions & 8 deletions tests/python/unittest/test_contrib_control_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -935,14 +935,27 @@ def _func(*states):
max_iterations=seq_len
)
result = mx.sym.Group(result[0] + result[1][1: ])
arg_shapes, _, _ = result.infer_shape(
data=(seq_len, batch_size, input_dim),
s_0=(batch_size, hidden_dim),
)
rnn_inputs = result.list_inputs()
args = {name: _array(arg_shapes[i]) for i, name in enumerate(rnn_inputs) if name != "i"}
args["i"] = mx.nd.zeros([1])
args_grad = {name: _array(arg_shapes[i]) for i, name in enumerate(rnn_inputs)}
args = {
"i": mx.nd.zeros([1]),
"data": _array((seq_len, batch_size, input_dim)),
"i2h_weight": _array((input_dim * hidden_dim, input_dim)),
"i2h_bias": _array((input_dim * hidden_dim, )),
"s_0": _array((batch_size, hidden_dim)),
"h2h_weight": _array((input_dim * hidden_dim, seq_len)),
"h2h_bias": _array((input_dim * hidden_dim, )),
"s_1": _array((batch_size, hidden_dim)),
}
args_grad = {
"i": _array([1]),
"data": _array((seq_len, batch_size, input_dim)),
"i2h_weight": _array((input_dim * hidden_dim, input_dim)),
"i2h_bias": _array((input_dim * hidden_dim, )),
"s_0": _array((batch_size, hidden_dim)),
"h2h_weight": _array((input_dim * hidden_dim, seq_len)),
"h2h_bias": _array((input_dim * hidden_dim, )),
"s_1": _array((batch_size, hidden_dim)),
}
e_1 = result.bind(ctx=default_context(),
args={name: array.copy() for name, array in args.items()},
args_grad={name: array.copy() for name, array in args_grad.items() if name != "i"},
Expand All @@ -961,9 +974,9 @@ def _func(*states):
args_grad={name: array.copy() for name, array in args_grad.items() if name != "i"},
)
for case_id in range(100):
out_grads = [_array(arr.shape) for arr in e_1.outputs]
args = {name: array.copy() for name, array in args.items()}
e_1.forward(is_train=True, **args)
out_grads = [_array(arr.shape) for arr in e_1.outputs]
e_1.backward(out_grads)
args = {name: array.copy() for name, array in args.items() if name != "i"}
e_2.forward(is_train=True, **args)
Expand Down

0 comments on commit acfbb5a

Please sign in to comment.