Skip to content

Commit

Permalink
Merge pull request apache#28 from tqchen/refactor
Browse files Browse the repository at this point in the history
Refactor stable ps
  • Loading branch information
tqchen committed Feb 25, 2015
2 parents 1e35c4a + 9afa79e commit d6af610
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 18 deletions.
47 changes: 43 additions & 4 deletions mshadow-ps/ps.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,21 @@ class ISharedModel {
* this is unique per device
* \param devid the device id this tensor lies in
*/
virtual void PullWait(int key, int devid = 0) = 0;
virtual void PullWait(int key, int devid) = 0;
/*!
* \brief check if the weight was correct on the current device
*
* \param data the data
* \param key the unique key to indicate the tensor
* this is unique per device
* \param devid the device id this tensor lies in
*/
template<int dim>
inline void CheckWeight(Tensor<xpu, dim, DType> data,
int key,
int devid) {
this->CheckWeight_(data.FlatTo2D(), key, devid);
}
/*!
* \brief push out a tensor to parameter server
* this call is asynchronize and returns immediately
Expand All @@ -100,7 +114,7 @@ class ISharedModel {
template<int dim>
inline void Push(Tensor<xpu, dim, DType> data,
int key,
int devid = 0,
int devid,
int priority = 0) {
this->Push_(data.FlatTo2D(), key, devid, priority);
}
Expand All @@ -122,7 +136,7 @@ class ISharedModel {
template<int dim>
inline void PullReq(Tensor<xpu, dim, DType> data,
int key,
int devid = 0,
int devid,
int priority = 0,
CallbackFunction callback = NULL,
void *callback_arg = NULL) {
Expand Down Expand Up @@ -155,6 +169,31 @@ class ISharedModel {
this->PullReq(data, key, devid, priority, InvokeLambda_, calbk);
}
#endif // C++11

/*!
* \brief set weight of corresponding key in server
* this is a debug function that was not necessarily
* implemented by the server
* \param shape the shape content of the key
* \param key the unique key to indicate the tensor
* this is unique per device
* \param devid the device id this tensor lies in
*/
virtual void SetWeight_(Tensor<xpu, 2, DType> data,
int key,
int devid) = 0;
/*!
* \brief check if the weight matches the server side
* this is a debug function that was not necessarily
* implemented by the server
* \param shape the shape content of the key
* \param key the unique key to indicate the tensor
* this is unique per device
* \param devid the device id this tensor lies in
*/
virtual void CheckWeight_(Tensor<xpu, 2, DType> data,
int key,
int devid) = 0;
protected:
/*!
* \brief initialize a key with certain shape
Expand All @@ -178,7 +217,7 @@ class ISharedModel {
*/
virtual void Push_(Tensor<xpu, 2, DType> data,
int key,
int devid = 0,
int devid,
int priority = 0) = 0;
/*!
* \brief send a pull request, to pull parameter into data
Expand Down
68 changes: 62 additions & 6 deletions mshadow-ps/ps_local-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
bigarray_bound = 1000 * 1000;
nthread_reduction = 8;
use_pin_memory = 1;
test_on_server = 0;
destroy_signal = false;
custom_server = NULL;
}
Expand Down Expand Up @@ -118,6 +119,9 @@ class LocalModel : public ISharedModel<xpu, DType> {
if (!strcmp(name, "update_on_server")) {
update_on_server = atoi(val);
}
if (!strcmp(name, "test_on_server")) {
test_on_server = atoi(val);
}
cfgvec.push_back(std::make_pair(std::string(name),
std::string(val)));
}
Expand Down Expand Up @@ -214,6 +218,49 @@ class LocalModel : public ISharedModel<xpu, DType> {
this->init_end = 1;
}

// set weight
virtual void SetWeight_(Tensor<xpu, 2, DType> data,
int key,
int devid) {
PushEntry &e = push_map.GetRef(key);
Stream<xpu> s;
push_lock.Lock();
mshadow::Copy(e.weight, data, &s);
push_lock.Unlock();
}
virtual void CheckWeight_(Tensor<xpu, 2, DType> data,
int key,
int devid) {
utils::Check(test_on_server != 0,
"must be in pair debug mode");
PushEntry &e = push_map.GetRef(key);
mshadow::TensorContainer<cpu, 2, DType> tmp(false);
tmp.Resize(data.shape_);
Stream<xpu> s;
push_lock.Lock();
// copy data
mshadow::Copy(tmp, data, &s);
index_t count = tmp.shape_.Size();
double diff = 0.0, ssum = 0.0, maxdiff = 0.0;
index_t mxidx = 0;
for (index_t i = 0; i < count; ++i) {
double d = std::abs(tmp.dptr_[i] - e.weight.dptr_[i]);
if (d > maxdiff) {
maxdiff = d; mxidx = i;
}
diff += d;
ssum += std::abs(tmp.dptr_[i]);
}
push_lock.Unlock();
// relative absolute error
double rerr = diff / ssum;
if (rerr > 1e-5 || diff != diff) {
fprintf(stderr, "PSLocal:key=%d,dev=%d: err=%f, maxd[%u]=%f, diff=%f, ssum=%f\n",
key, devid, rerr, mxidx, maxdiff, diff, ssum);
} else {
fprintf(stderr, "PSLocal:key=%d,dev=%d:check pass\n", key, devid);
}
}
protected:
/*! \brief operation performed locally in PS */
enum LocalOp {
Expand All @@ -230,7 +277,6 @@ class LocalModel : public ISharedModel<xpu, DType> {
this->InitPullMap(key);
this->InitPushMap(key, shape);
}

virtual void Push_(Tensor<xpu, 2, DType> data,
int key, int devid, int priority) {
PullEntry &e = pull_map.GetRef(key);
Expand Down Expand Up @@ -305,7 +351,9 @@ class LocalModel : public ISharedModel<xpu, DType> {
if (custom_server != NULL) {
// intialize server, and ready for pullback
custom_server->InitModel(key, weight.dptr_, weight.MSize());
this->PullReady(weight, key);
if (update_on_server != 0) {
this->PullReady(weight, key);
}
}
}
/*!
Expand All @@ -327,8 +375,13 @@ class LocalModel : public ISharedModel<xpu, DType> {
if (custom_server != NULL) {
this->ReduceSum(data);
custom_server->Update(key, data[0].dptr_, data[0].MSize());
PushEntry &e = push_map.GetRef(key);
this->PullReady(e.weight, key);
if (update_on_server != 0) {
PushEntry &e = push_map.GetRef(key);
this->PullReady(e.weight, key);
} else {
utils::Assert(test_on_server != 0, "test mode");
this->PullReady(data[0], key);
}
return;
}
switch (op) {
Expand All @@ -346,7 +399,7 @@ class LocalModel : public ISharedModel<xpu, DType> {
}

virtual void InitCustomerServer(void) {
if (update_on_server != 0) {
if (update_on_server != 0 || test_on_server != 0) {
custom_server = CreateModelUpdater<DType>();
for (size_t j = 0; j < cfgvec.size(); ++j) {
custom_server->SetParam(cfgvec[j].first.c_str(),
Expand Down Expand Up @@ -505,6 +558,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
int init_end;
// whether perform update on serverside
int update_on_server;
// debug option
int test_on_server;
// use pinned memory
int use_pin_memory;
// number of reduction thread
Expand Down Expand Up @@ -723,7 +778,8 @@ class LocalModel : public ISharedModel<xpu, DType> {
push_lock.Lock();
if (e.copied.size() == 0) {
e.Init(devices.size(), shape,
use_pin_memory != 0, update_on_server != 0);
use_pin_memory != 0,
update_on_server != 0 || test_on_server != 0);
}
this->ServerInitKey(e.weight, key);
push_lock.Unlock();
Expand Down
4 changes: 2 additions & 2 deletions mshadow-ps/thread_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ class ThreadSafeMap {
}
inline TValue &GetRef(int key) {
TValue *ret = this->Get(key);
utils::Assert(ret != NULL, "key does not exist");
utils::Assert(ret != NULL, "key=%d does not exist", key);
return *ret;
}
inline void Init(int key) {
lock_.Lock();
if (map_.count(key) == 0) {
map_[key] = new TValue();
}
lock_.Unlock();
lock_.Unlock();
}

private:
Expand Down
34 changes: 33 additions & 1 deletion mshadow/tensor_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,24 @@ class TensorContainer: public Tensor<Device, dimension, DType> {
this->AllocByShape(shape);
(*this) = initv;
}
/*!
* \brief copy constructor
* \param src source value
*/
TensorContainer
(const TensorContainer<Device, dimension, DType> &src)
: pad_(src.pad_) {
this->dptr_ = data_.dptr_ = NULL;
this->shape_[0] = 0;
this->stride_ = 0;
this->data_.stride_ = 0;
this->data_.shape_[0] = 0;
this->stream_ = src.stream_;
if (src.dptr_ != NULL) {
this->AllocByShape(src.shape_);
mshadow::Copy(*this, src, this->stream_);
}
}
~TensorContainer(void) {
this->FreeSpace();
}
Expand Down Expand Up @@ -109,10 +127,24 @@ class TensorContainer: public Tensor<Device, dimension, DType> {
Copy(*this, tmp, &stream);
mshadow::FreeSpace(&tmp);
}
/*!
* \brief assign operator from TensorContainer
* \param src source value
*/
inline TensorContainer &operator=
(const TensorContainer<Device, dimension, DType> &src) {
this->pad_ = src.pad_;
this->stream_ = src.stream_;
if (src.dptr_ != NULL) {
this->Resize(src.shape_);
mshadow::Copy(*this, src, this->stream_);
}
return *this;
}
/*!\brief functions to fit expression template */
inline Tensor<Device, dimension, DType> &operator=(DType s) {
return this->__assign(s);
}
}
/*!\brief functions to fit expression template */
template<typename E>
inline Tensor<Device, dimension, DType> &
Expand Down
11 changes: 6 additions & 5 deletions mshadow/tensor_cpu-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,20 +41,20 @@ template<>
inline void *AllocHost_<gpu>(size_t size) {
void *dptr;
utils::Check(cudaMallocHost(&dptr, size,
cudaHostAllocPortable) == cudaSuccess,
cudaHostAllocPortable) == cudaSuccess,
"AllocHost");
return dptr;
}
template<>
inline void FreeHost_<gpu>(void *dptr) {
cudaFreeHost(dptr);
cudaFreeHost(dptr);
}
#endif

template<>
inline void *AllocHost_<cpu>(size_t size) {
size_t pitch;
return sse2::AlignedMallocPitch(&pitch, size, 1);
return sse2::AlignedMallocPitch(&pitch, size, 1);
}
template<>
inline void FreeHost_<cpu>(void *dptr) {
Expand Down Expand Up @@ -114,6 +114,7 @@ inline void Copy(Tensor<cpu, dim, DType> _dst,
memcpy(dst[y].dptr_, src[y].dptr_, sizeof(DType) * dst.size(1));
}
}

template<typename Saver, typename R, int dim,
typename DType, typename E>
inline void MapPlan(TRValue<R, cpu, dim, DType> *dst,
Expand Down Expand Up @@ -181,7 +182,7 @@ inline void MapReduceKeepLowest(TRValue<R, cpu, 1, DType> *dst,
::Error_TypeCheck_Not_Pass_For_Reduce_Exp();
Shape<2> eshape = expr::ShapeCheck<expr::ExpInfo<E>::kDim, E>
::Check(exp.self()).FlatTo2D();
Shape<1> dshape = expr::ShapeCheck<1, R>::Check(dst->self());
Shape<1> dshape = expr::ShapeCheck<1, R>::Check(dst->self());
utils::Check(eshape[1] == dshape[0],
"MapReduceKeepLowest::reduction dimension do not match");
utils::Check(eshape[0] != 0, "can not reduce over empty tensor");
Expand All @@ -207,7 +208,7 @@ inline void MapReduceKeepHighDim(TRValue<R, cpu, 1, DType> *dst,
typedef Shape<expr::ExpInfo<E>::kDim> EShape;
EShape eshape = expr::ShapeCheck<expr::ExpInfo<E>::kDim, E>
::Check(exp.self());
Shape<1> dshape = expr::ShapeCheck<1, R>::Check(dst->self());
Shape<1> dshape = expr::ShapeCheck<1, R>::Check(dst->self());
utils::Check(eshape[dimkeep] == dshape[0],
"MapReduceKeepHighDim::reduction dimension do not match");
// use equvalent form
Expand Down

0 comments on commit d6af610

Please sign in to comment.