Skip to content

Commit

Permalink
Op registration & ndarray API changes (apache#26)
Browse files Browse the repository at this point in the history
* Minor naming changes

* refactor blob copy

* make sparse ndarray constructor non-blocking. add FCompute for CastStorage op

* Add test for cast storage op

* disable backward pass for cast storage

* ndarray alloc aux interface. refactor op registration code

* remove non-default FComputeEx. update none value for storage type
  • Loading branch information
eric-haibin-lin committed Apr 27, 2017
1 parent e65e577 commit 961da4e
Show file tree
Hide file tree
Showing 23 changed files with 342 additions and 275 deletions.
170 changes: 95 additions & 75 deletions include/mxnet/ndarray.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

namespace mxnet {
// forward declaration
class NDArray;
namespace autograd {
class AGNode;

Expand All @@ -51,10 +52,11 @@ class AGNodeEntry {
class AutogradRuntime;
} // namespace autograd

// FIXME int64_t is not available mshadow
// enum for storage types
#define CSR_IND_PTR_TYPE mshadow::kInt32
#define CSR_IDX_DTYPE mshadow::kInt32
#define ROW_SPARSE_IDX_TYPE mshadow::kInt32
// FIXME int64_t is not available mshadow
namespace csr {
enum CSRAuxType {kIndPtr, kIdx};
}
Expand All @@ -64,12 +66,26 @@ enum RowSparseAuxType {kIdx};
}

enum NDArrayStorageType {
kUndefinedStorage, // undefined chunk
kUndefinedStorage, // undefined storage
kDefaultStorage, // dense
kRowSparseStorage, // row sparse
kCSRStorage, // csr
};

/*!
* \brief issue an copy operation from one NDArray to another
* the two ndarray can sit on different devices
* this operation will be scheduled by the engine
*
* \param from the ndarray we want to copy data from
* \param to the target ndarray
* \param priority Priority of the action.
* \param alloc_output whether to allocate memory for the output ndarray
* \note The function name explicitly marks the order of from and to
* due to different possible convention carried by copy function.
*/
void CopyFromTo(const NDArray &from, NDArray *to, int priority = 0, bool alloc_output = true);

/*!
* \brief ndarray interface
*/
Expand All @@ -96,16 +112,17 @@ class NDArray {
Mkl_mem_ = std::make_shared<MKLMemHolder>();
#endif
}
/*! \brief constructor for NDArray with chunk type
/*! \brief constructor for NDArray with storage type
*/
NDArray(const NDArrayStorageType storage_type, const TShape &shape, Context ctx,
bool delay_alloc = true, int dtype = mshadow::default_type_flag,
std::vector<int> aux_types = {})
: shape_(shape), offset_(0), dtype_(dtype), entry_({nullptr, 0, 0}) {
// Assign default aux types if not given
if (aux_types.size() == 0) {
if (storage_type == kRowSparseStorage) aux_types = {ROW_SPARSE_IDX_TYPE};
if (storage_type == kCSRStorage) aux_types = {CSR_IND_PTR_TYPE, CSR_IDX_DTYPE};
CHECK_NE(storage_type, kDefaultStorage);
else if (storage_type == kCSRStorage) aux_types = {CSR_IND_PTR_TYPE, CSR_IDX_DTYPE};
else LOG(FATAL) << "Unknown storage type";
}
ptr_ = std::make_shared<Chunk>(ctx, delay_alloc, aux_types, storage_type);
#if MKL_EXPERIMENTAL == 1
Expand All @@ -119,15 +136,16 @@ class NDArray {
* make sure the memory region is available through out the life of NDArray
* \param data the memory content of static data
* \param dev_id the device id this tensor sits at
* \param shared_var the same var handle shared with others.
It will not be deleted during destruction.
*/
NDArray(const TBlob &data, int dev_id)
: ptr_(std::make_shared<Chunk>(data, dev_id)), shape_(data.shape_), offset_(0),
NDArray(const TBlob &data, int dev_id, Engine::VarHandle shared_var = nullptr)
: ptr_(std::make_shared<Chunk>(data, dev_id, shared_var)), shape_(data.shape_), offset_(0),
dtype_(data.type_flag_), entry_({nullptr, 0, 0}) {
#if MKL_EXPERIMENTAL == 1
Mkl_mem_ = std::make_shared<MKLMemHolder>();
#endif
}
// TODO this constructor should be removed
NDArray(NDArray data, const std::vector<NDArray> aux_data, Context ctx,
NDArrayStorageType storage_type, const TShape &shape)
: ptr_(std::make_shared<Chunk>(data, aux_data, ctx, storage_type)), shape_(shape),
Expand All @@ -146,7 +164,7 @@ class NDArray {
}
/*!
* \return the shape of underlying chunk which stores the NDArray values.
* For default storage, it is the same as shape(). For row-sparse chunks, it is the shape of
* For default storage, it is the same as shape(). For row-sparse storage, it is the shape of
* the tensor which stores the non-zero values.
*/
inline const TShape &storage_shape() const {
Expand Down Expand Up @@ -418,7 +436,7 @@ class NDArray {
* \return NDArray in new shape and type.
*/
inline NDArray AsArray(const TShape &shape, int dtype) const {
CHECK(storage_type() == kDefaultStorage) << "Not implemented yet";
CHECK_EQ(storage_type(), kDefaultStorage) << "Not implemented yet";
CHECK_GE(shape_.Size() * mshadow::mshadow_sizeof(dtype_),
shape.Size() * mshadow::mshadow_sizeof(dtype))
<< "NDArray.AsArray: target memory size is bigger";
Expand Down Expand Up @@ -451,17 +469,25 @@ class NDArray {
* This is an internal function used by system that normal user should not use
*/
inline void CheckAndAlloc() const {
CHECK_EQ(storage_type(), kDefaultStorage);
ptr_->CheckAndAlloc();
}
/* !
* \brief Alloc number of dense rows for kRowSparseStorage
* \brief Alloc memory for non-default storage
* aux_shape is only known at run time
*/
inline void CheckAndAlloc(const std::vector<TShape> &aux_shapes) const {
// probably should round up memory reservation
CHECK_NE(storage_type(), kDefaultStorage);
ptr_->CheckAndAlloc(shape_, aux_shapes, dtype_);
}

inline void CheckAndAllocData(const TShape &storage_shape) const {
CHECK_NE(storage_type(), kDefaultStorage);
ptr_->CheckAndAllocData(storage_shape, dtype_);
}
inline void CheckAndAllocAuxData(size_t i, const TShape &aux_shape) const {
CHECK_NE(storage_type(), kDefaultStorage);
ptr_->CheckAndAllocAuxData(i, aux_shape);
}
/*!
* \brief Save list of narray into the Stream.x
* \param fo The stream of output.
Expand All @@ -487,13 +513,12 @@ class NDArray {
// shandle is used to store the actual values in the NDArray
// aux_handles store the aux data(such as indices) if it's needed by non-default storage.
struct Chunk {
// every time a new element is added to a non default storage
/*! \brief storage handle from storage engine.
for non-default storage, shandle stores the data(value) array.
*/
Storage::Handle shandle;
/*! \brief storage handles for aux data (e.g index)
for row_sparse, aux_handles[0] = indic
for row_sparse, aux_handles[0] = indices
for csr, aux_handles[0] = indptr, aux_handles[1] = indices
*/
std::vector<Storage::Handle> aux_handles;
Expand All @@ -504,7 +529,7 @@ class NDArray {
* from Storage, and do not need to be freed
*/
bool static_data;
/*! \brief whether allocation is delayed */
/*! \brief whether allocation is delayed. */
bool delay_alloc;
/*! \brief construct from static data */
NDArrayStorageType storage_type = kDefaultStorage;
Expand All @@ -517,6 +542,8 @@ class NDArray {
TShape storage_shape;
// The shape of aux data. The default value for the shape is 0.
std::vector<TShape> aux_shapes;
// \brief skip the deletion of var handle. Usually set when shared_var is present.
bool skip_delete_var = false;

/*! \brief construct a new chunk */
Chunk(TShape shape, Context ctx_, bool delay_alloc_, int dtype)
Expand All @@ -528,33 +555,26 @@ class NDArray {
shandle.ctx = ctx_;
if (!delay_alloc_) this->CheckAndAlloc();
}
Chunk(const NDArray &nd_data, const std::vector<NDArray> &nd_aux, Context ctx_,
Chunk(const NDArray &nd, const std::vector<NDArray> &nd_aux, Context ctx_,
NDArrayStorageType storage_type_)
: static_data(false), delay_alloc(false), storage_type(storage_type_), ctx(ctx_) {
// Vars
var = Engine::Get()->NewVariable();
// Data Storage
const auto &data = nd_data.data();
const auto &data = nd.data();
storage_shape = data.shape_;
shandle.ctx = ctx;
shandle.size = data.shape_.Size() * mshadow::mshadow_sizeof(data.type_flag_);
shandle = Storage::Get()->Alloc(shandle.size, shandle.ctx);

// Copy data
// TODO(haibin) refactor. Single threaded copy is slow.
nd_data.WaitToRead();
CHECK_EQ(nd_data.storage_type(), kDefaultStorage);
CHECK_EQ(nd_data.dtype(), data.type_flag_);
CHECK_EQ(shandle.ctx.dev_mask(), cpu::kDevMask)
<< "Sparse NDArray on GPU not yet supported";
MSHADOW_TYPE_SWITCH(nd_data.dtype(), DType, {
auto copy = TBlob(static_cast<DType*>(shandle.dptr), storage_shape,
shandle.ctx.dev_mask(), data.type_flag_);
mshadow::Copy(copy.FlatTo1D<cpu, DType>(), data.FlatTo1D<cpu, DType>());
});
// Single threaded copy may not saturate memory bandwidth
CHECK_EQ(nd.storage_type(), kDefaultStorage);
auto data_blob = TBlob(shandle.dptr, storage_shape, shandle.ctx.dev_mask(), data.type_flag_);
NDArray data_wrapper(data_blob, ctx.dev_id, var);
CopyFromTo(nd, &data_wrapper, 0, false);

// Aux shapes, types and storage
storage_shape = data.shape_;
CHECK_GT(storage_shape.ndim(), 0);
for (size_t i = 0; i < nd_aux.size(); i++) {
const auto &aux_d = nd_aux[i].data();
Expand All @@ -565,26 +585,23 @@ class NDArray {
aux_handle.size = aux_shapes[i].Size() * mshadow::mshadow_sizeof(aux_types[i]);
aux_handle = Storage::Get()->Alloc(aux_handle.size, aux_handle.ctx);
aux_handles.emplace_back(aux_handle);

// Copy aux data
nd_aux[i].WaitToRead();
CHECK_EQ(nd_aux[i].storage_type(), kDefaultStorage);
CHECK_EQ(nd_aux[i].dtype(), aux_types[i]);
CHECK_EQ(aux_handle.ctx.dev_mask(), cpu::kDevMask)
<< "Sparse NDArray on GPU not yet supported";
MSHADOW_TYPE_SWITCH(nd_aux[i].dtype(), DType, {
auto copy = TBlob(static_cast<DType*>(aux_handle.dptr), aux_shapes[i],
ctx.dev_mask(), aux_types[i]);
mshadow::Copy(copy.FlatTo1D<cpu, DType>(), aux_d.FlatTo1D<cpu, DType>());
});
TBlob aux_blob(aux_handle.dptr, aux_shapes[i], ctx.dev_mask(), aux_types[i]);
NDArray aux_wrapper(aux_blob, ctx.dev_id, var);
CopyFromTo(nd_aux[i], &aux_wrapper, 0, false);
}
}

Chunk(const TBlob &data, int dev_id)
: static_data(true),
delay_alloc(false) {
Chunk(const TBlob &data, int dev_id, Engine::VarHandle shared_var)
: static_data(true), delay_alloc(false) {
CHECK(storage_type == kDefaultStorage);
var = Engine::Get()->NewVariable();
if (shared_var == nullptr) {
var = Engine::Get()->NewVariable();
} else {
skip_delete_var = true;
var = shared_var;
}
if (data.dev_mask_ == cpu::kDevMask) {
shandle.ctx = Context::CPU();
} else {
Expand All @@ -609,37 +626,53 @@ class NDArray {
}
/*! \brief check if delay alloc is on, do alloc if not yet done */
inline void CheckAndAlloc(void) {
// Should only be used for kDefaultStorage
if (storage_type != kDefaultStorage) {
LOG(FATAL) << "CheckAndAlloc with " << storage_type;
}
if (delay_alloc) {
shandle = Storage::Get()->Alloc(shandle.size, shandle.ctx);
delay_alloc = false;
}
}
inline void CheckAndAlloc(TShape shape, const std::vector<TShape> &aux_shapes, int dtype) {
CHECK_EQ(storage_type, kRowSparseStorage) << "Not yet implemented";
inline void CheckAndAlloc(const TShape &shape, const std::vector<TShape> &aux_shapes, int dtype) {
// calculate size, perform allocation
if (delay_alloc) {
// For row sparse storage, aux_shape indicates the number of rows to allocate
CHECK_EQ(storage_type, kRowSparseStorage) << "Not yet implemented";
// For row sparse, aux_shape indicates the number of rows to allocate
auto aux_shape = aux_shapes[0];
CHECK_EQ(aux_shape.ndim(), 1);
auto num_rows = aux_shape[0];
CHECK_EQ(shape.ndim(), 2) << "High dim RowSparse not yet implemented";
auto dbytes = num_rows * shape[1] * mshadow::mshadow_sizeof(dtype);
auto aux_bytes = num_rows * mshadow::mshadow_sizeof(aux_types[0]);
shandle = Storage::Get()->Alloc(dbytes, ctx);
aux_handles.push_back(Storage::Get()->Alloc(aux_bytes, ctx));
delay_alloc = false;
// Initialize shapes
this->aux_shapes = aux_shapes;
storage_shape = shape;
storage_shape[0] = num_rows;
CheckAndAllocAuxData(rowsparse::kIdx, aux_shape);
TShape storage_shape(shape);
storage_shape[0] = aux_shape[0];
CheckAndAllocData(storage_shape, dtype);
}
}
inline void CheckAndAllocData(const TShape &shape, int dtype) {
CHECK_NE(aux_shapes.size(), 0) << "data is expected to be allocated after aux_data";
storage_shape = shape;
auto dbytes = shape.Size() * mshadow::mshadow_sizeof(dtype);
shandle = Storage::Get()->Alloc(dbytes, ctx);
// delay_alloc is only set when data storage handle is present
delay_alloc = false;
}
inline void CheckAndAllocAuxData(size_t i, const TShape &shape) {
CHECK_EQ(aux_shapes.size(), aux_handles.size());
if (aux_shapes.size() <= i) {
aux_shapes.resize(i + 1);
aux_handles.resize(i + 1);
}
// Initialize shape
aux_shapes[i] = shape;
// Init aux storage
Storage::Handle aux_handle;
if (storage_type == kRowSparseStorage) {
auto aux_bytes = shape[0] * mshadow::mshadow_sizeof(aux_types[i]);
aux_handle = Storage::Get()->Alloc(aux_bytes, ctx);
} else if (storage_type == kCSRStorage) {
LOG(FATAL) << "Not implemented";
}
aux_handles[i] = aux_handle;
}
/*! \brief destructor */
~Chunk() {
if (skip_delete_var) return;
bool skip_free = static_data || delay_alloc;
Storage::Handle h = this->shandle;
std::vector<Storage::Handle> aux_h = this->aux_handles;
Expand Down Expand Up @@ -669,19 +702,6 @@ class NDArray {
autograd::AGNodeEntry entry_;
};

/*!
* \brief issue an copy operation from one NDArray to another
* the two ndarray can sit on different devices
* this operation will be scheduled by the engine
*
* \param from the ndarray we want to copy data from
* \param to the target ndarray
* \param priority Priority of the action.
* \note The function name explicitly marks the order of from and to
* due to different possible convention carried by copy function.
*/
void CopyFromTo(const NDArray &from, NDArray *to, int priority = 0);


/*!
* \brief Perform elementwise sum over each data from source, store result into out.
Expand Down
8 changes: 5 additions & 3 deletions include/mxnet/op_attr_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#ifndef MXNET_OP_ATTR_TYPES_H_
#define MXNET_OP_ATTR_TYPES_H_


#include <mshadow/tensor.h>
#include <nnvm/op_attr_types.h>

Expand All @@ -18,6 +17,9 @@
#include "./operator.h"
#include "./ndarray.h"

#define FCOMP_EX_CPU "FComputeEx<cpu>"
#define FCOMP_EX_GPU "FComputeEx<gpu>"

namespace mxnet {

using nnvm::NodeAttrs;
Expand Down Expand Up @@ -64,8 +66,8 @@ using FCompute = std::function<void (const nnvm::NodeAttrs& attrs,
/*!
* \brief Resiger an NDArray compute function for simple stateless forward only operator
*
* \note Register under "FComputeEx<cpu, `storage_type`>" and "FComputeEx<gpu, `storage_type`>"
* e.g FComputeEx<cpu, row_sparse>
* \note Register under "FComputeEx<xpu, default>" and "FComputeEx<xpu, non-default>"
* Dispatched only when operators process non-default storage inputs or outputs
*/
using FComputeEx = std::function<void (const nnvm::NodeAttrs& attrs,
const OpContext& ctx,
Expand Down
2 changes: 1 addition & 1 deletion nnvm
Loading

0 comments on commit 961da4e

Please sign in to comment.