Skip to content

Commit

Permalink
1bit: use double for scaling (#22)
Browse files Browse the repository at this point in the history
* 1bit: use double

* 1bit: fix

* misc: new line eof
  • Loading branch information
jasperzhong committed Jun 23, 2020
1 parent f7d9969 commit ae30478
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 36 deletions.
2 changes: 1 addition & 1 deletion byteps/common/compressor/strategy/onebit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ size_t OnebitCompressor::PackingImpl(index_t* dst, const scalar_t* src,

float scale = 1.0f;
if (_use_scale) {
float sum = 0.0f;
double sum = 0.0f;
for (size_t i = 0; i < len; ++i) {
dst[i] = src[i] < 0;
sum += abs(src[i]);
Expand Down
4 changes: 2 additions & 2 deletions byteps/mxnet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,11 @@ def _allreduce_grads(self):
nd._internal._mul_scalar(
param._grad[0], 1.0 / self._scale / self._bps_size, out=param._grad[0])
compressed, ctx = self._intra_compressors[i].compress(
param._grad[0], x=param._data[0])
param._grad[0])
byteps_push_pull(compressed, is_average=False,
name="gradient_" + str(i), priority=-i)
param._grad[0] = self._intra_compressors[i].decompress(
compressed, ctx)
compressed, ctx, x=param._data[0])

def _init_params(self):
tensors = []
Expand Down
48 changes: 15 additions & 33 deletions byteps/mxnet/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,51 +70,33 @@ class WeightDecayMomentum(Compressor):

def __init__(self, compressor, mu, wd, *args, **kwargs):
self.compressor = compressor
self.mom = None
self.cache = None
self.mu = mu
self.wd = wd
self.task_queue = Queue()
self.done_queue = Queue()
threading.Thread(target=self._worker, args=(
self.mu, self.wd, self.task_queue, self.done_queue), daemon=True).start()

def __del__(self):
self.task_queue.put('STOP')

@staticmethod
def _worker(mu, wd, input, output):
mom = None
cache = None
for x, _ in iter(input.get, 'STOP'):
if mom is None:
mom = nd.zeros_like(x)
cache = nd.zeros_like(x)

nd._internal._mul_scalar(x, wd, out=cache)
mom += cache
nd._internal._mul_scalar(mom, mu, out=mom)
cache += mom
output.put(cache)

def compress(self, tensor, *args, **kwargs):
"""Returns the tensor unmodified."""
if "x" not in kwargs:
return self.compressor.compress(tensor)

x = kwargs["x"]
self.task_queue.put((x, None))
return self.compressor.compress(tensor)

def decompress(self, tensor, ctx, *args, **kwargs):
"""Returns the tensor added with additional momentum for wd
m_t = \mu * m_{t-1} + wd * x_t
x_{t+1} = x_t - \eta_t (tensor + \mu m_t + wd * x_t)
"""
try:
tensor += self.done_queue.get(timeout=0.1)
except Empty:
print("empty for wd-momentum")
except TimeoutError:
print("timeout for wd-momentum")
if "x" not in kwargs:
return self.compressor.decompress(tensor, ctx)

x = kwargs["x"]

if self.mom is None:
self.mom = nd.zeros_like(tensor)
self.cache = nd.zeros_like(tensor)

nd._internal._mul_scalar(x, self.wd, out=self.cache)
self.mom += self.cache
nd._internal._mul_scalar(self.mom, self.mu, out=self.mom)
tensor += self.mom + self.cache
return self.compressor.decompress(tensor, ctx)


Expand Down

0 comments on commit ae30478

Please sign in to comment.