Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gradient compression support #225

Merged
merged 386 commits into from
Aug 13, 2020
Merged
Changes from 250 commits
Commits
Show all changes
386 commits
Select commit Hold shift + click to select a range
7079d5e
compression: fix server bug
jasperzhong Feb 19, 2020
6a5b6f2
compression: fix check
jasperzhong Feb 19, 2020
cc8a51a
compression: fix bug
jasperzhong Feb 19, 2020
ac86627
compression: fix a bug
jasperzhong Feb 19, 2020
55f84b4
compression: rm check
jasperzhong Feb 19, 2020
170b0fb
compression: fix bug
jasperzhong Feb 19, 2020
247b3e3
compression: fix decompress
jasperzhong Feb 19, 2020
1c4df37
compression: fix bug
jasperzhong Feb 19, 2020
4eebf26
compression: fix fatal bug
jasperzhong Feb 19, 2020
21fd9ef
compression: add log
jasperzhong Feb 19, 2020
ecfac06
compression: add log
jasperzhong Feb 20, 2020
af041b1
compression: fix fatal bug
jasperzhong Feb 20, 2020
9b2b545
compression: test
jasperzhong Feb 20, 2020
4c68f80
compression: rm logs
jasperzhong Feb 21, 2020
47b7302
compression: rename
jasperzhong Feb 21, 2020
303a0c8
compression: fix bug
jasperzhong Feb 21, 2020
3df0ec3
compression: fix typo
jasperzhong Feb 21, 2020
05eee8c
compression: refactor
jasperzhong Feb 23, 2020
8a6c947
compression: refactor
jasperzhong Feb 23, 2020
2df9a01
compression: fix typo
jasperzhong Feb 23, 2020
0025204
compression: fix bug
jasperzhong Feb 23, 2020
719915b
compression: fix bug
jasperzhong Feb 23, 2020
d24e035
compression: fix fatal bug
jasperzhong Feb 24, 2020
67bb376
compression: fix typo
jasperzhong Feb 24, 2020
3b9bf42
compression: fix test
jasperzhong Feb 24, 2020
90fe93d
compression: test add shutdown
jasperzhong Feb 24, 2020
4285ced
compression: add exit
jasperzhong Feb 24, 2020
df0abcf
compression: rm
jasperzhong Feb 24, 2020
7c4f086
compression: finish ef
jasperzhong Feb 24, 2020
fc259c7
compression: fix error
jasperzhong Feb 24, 2020
21ff784
compression: fix bug
jasperzhong Feb 24, 2020
c93bf95
compression: fix fatal bug
jasperzhong Feb 25, 2020
eca5893
compression: add args
jasperzhong Feb 25, 2020
d269496
compression: init zero
jasperzhong Feb 25, 2020
b9df722
compression: fix ef bug
jasperzhong Feb 25, 2020
f02d0a7
compression: use mean ef in server
jasperzhong Feb 25, 2020
00d462d
compression: pack should inplace
jasperzhong Feb 25, 2020
2b6c091
compression: add mean
jasperzhong Feb 26, 2020
003f02a
compression: rm ops.cc mean when enable ef
jasperzhong Feb 26, 2020
47aed64
compression: onebit add scale
jasperzhong Feb 26, 2020
06971ed
compression: fix typo
jasperzhong Feb 26, 2020
d03834d
compression: fix typo
jasperzhong Feb 26, 2020
08f6e26
compression: fix bug
jasperzhong Feb 26, 2020
4cef1ee
compression: add ef to ns
jasperzhong Feb 26, 2020
2a24707
compression: fix bug
jasperzhong Feb 26, 2020
a97fff0
compression: fix decompress with ef bug
jasperzhong Feb 26, 2020
101a0a6
compression: make scaled onebit optional
jasperzhong Feb 27, 2020
c3a6d1a
compression: add scale in script
jasperzhong Feb 27, 2020
7f0342a
compression: rm debuf
jasperzhong Feb 27, 2020
5354b63
compression: rm debuf
jasperzhong Feb 27, 2020
91eaf30
fix fatal bug
jasperzhong Mar 2, 2020
88b6027
compression: add const
jasperzhong Mar 2, 2020
a7b38c1
compression: add const
jasperzhong Mar 2, 2020
d857211
compression: support partition for worker
jasperzhong Mar 3, 2020
0a3c8d3
compression: add fp16 op support
jasperzhong Mar 3, 2020
80a77aa
compression: support for fp16
jasperzhong Mar 4, 2020
151a044
compression: fix missing
jasperzhong Mar 4, 2020
1bb51ec
compression: fix fp16 avx
jasperzhong Mar 4, 2020
23122e7
compression: fix typo
jasperzhong Mar 4, 2020
bc0c76f
compression: fix typo
jasperzhong Mar 4, 2020
414716e
compression: fix typo
jasperzhong Mar 4, 2020
c8d219c
compression: fix typo
jasperzhong Mar 4, 2020
0bbf3bc
compression: rm some warnings
jasperzhong Mar 4, 2020
1563b29
compression: fix bug
jasperzhong Mar 4, 2020
489662c
compression: rm a check
jasperzhong Mar 4, 2020
a028ad8
compression: fix align size bug
jasperzhong Mar 4, 2020
5f61e9c
compression: test fp16 fp64
jasperzhong Mar 4, 2020
f54f7fb
compression: fix test bug
jasperzhong Mar 4, 2020
dd3dd07
compression: add mxnet fp16 compression support
jasperzhong Mar 4, 2020
61bcb65
compression: add script fp args
jasperzhong Mar 4, 2020
60b24a3
compression: rename
jasperzhong Mar 4, 2020
e92854a
compression: fix fp16 type bug
jasperzhong Mar 4, 2020
9498d88
compression: fix intra-compression bug
jasperzhong Mar 4, 2020
3d43588
compression: remove wait_to_read
jasperzhong Mar 5, 2020
d2b3976
compression: fix bug
jasperzhong Mar 6, 2020
fd291e7
compression: fix compressed released bug
jasperzhong Mar 7, 2020
f4ee9f4
compression: shallow copy to prevent gc
jasperzhong Mar 7, 2020
608d609
compression: use shared_ptr
jasperzhong Mar 7, 2020
1a235fe
compression: add copy
jasperzhong Mar 7, 2020
16e4ed1
compression: fix bug
jasperzhong Mar 7, 2020
848b556
compression: fix typo
jasperzhong Mar 7, 2020
86da936
compression: fix bug
jasperzhong Mar 7, 2020
28dec70
compression: add gluon-cv imagenet script
jasperzhong Mar 9, 2020
c74770e
compression: add momentum support
jasperzhong Mar 10, 2020
b12deca
compression: update script
jasperzhong Mar 10, 2020
83fe21e
compression: disable server mom
jasperzhong Mar 10, 2020
96a8133
compression: fix bug
jasperzhong Mar 10, 2020
c7f17ad
compression: fix typo
jasperzhong Mar 10, 2020
8ef547d
compression: fix bug
jasperzhong Mar 11, 2020
d56c91d
compression: add mu
jasperzhong Mar 11, 2020
76dff7b
compression: fix fatal impl missing
jasperzhong Mar 11, 2020
22464c5
compression: update imagenet script
jasperzhong Mar 11, 2020
882778f
compression: fix c++0x compile error
jasperzhong Mar 11, 2020
b255357
compression: fix typo
jasperzhong Mar 11, 2020
b778c29
compression: fix imagenet trianing script
jasperzhong Mar 11, 2020
7147b58
compression: fix typo
jasperzhong Mar 11, 2020
9be30ce
compression: advance registration for server
jasperzhong Mar 12, 2020
3f5640e
compression: update register compressor
jasperzhong Mar 12, 2020
9f42e3f
compression: fix bug
jasperzhong Mar 12, 2020
1734e3b
compression: fix bug
jasperzhong Mar 16, 2020
2f25d03
compression: add register sync
jasperzhong Mar 16, 2020
594e580
compression: fix typo
jasperzhong Mar 16, 2020
0472fe9
compression: fix bug
jasperzhong Mar 16, 2020
cb65d21
compression: fix typo
jasperzhong Mar 16, 2020
ceff0ab
compression: fix bug
jasperzhong Mar 16, 2020
074c843
compression: add check
jasperzhong Mar 16, 2020
e9ee76c
compression: fix bug
jasperzhong Mar 16, 2020
efd2b74
compression: support async compression in server
jasperzhong Mar 17, 2020
f17bcdc
compression: fix bug
jasperzhong Mar 17, 2020
30b2273
compression: rm useless comments
jasperzhong Mar 17, 2020
613a06f
compression: disable small tensor
jasperzhong Mar 17, 2020
270b0bb
compression: update openmp
jasperzhong Mar 17, 2020
871503d
compression: fix compile bug
jasperzhong Mar 17, 2020
deda2f0
compression: fix bug
jasperzhong Mar 17, 2020
5da1d3a
compression: fix bug
jasperzhong Mar 18, 2020
f275227
compression: fix typo
jasperzhong Mar 18, 2020
8016349
compression: adjust omp thread num
jasperzhong Mar 18, 2020
440c822
compression: make min compress bound mutable
jasperzhong Mar 18, 2020
af649c3
compression: use max threads
jasperzhong Mar 18, 2020
8cffe9f
async: norm1 & rm scale
jasperzhong Mar 18, 2020
e5662c7
async: static_assert
jasperzhong Mar 18, 2020
396e3c0
async: error-feedback
jasperzhong Mar 18, 2020
5a66972
async: rm all async
jasperzhong Mar 18, 2020
236f1c2
async: enable norm1
jasperzhong Mar 18, 2020
a80818a
async: set omp threads
jasperzhong Mar 19, 2020
f29ffd0
compression: update script
jasperzhong Mar 19, 2020
2b0c8e2
async: compress in worker side
jasperzhong Mar 20, 2020
8b01a30
async: mv log
jasperzhong Mar 21, 2020
db95e53
async: fix multi-gpus bugs
jasperzhong Mar 21, 2020
0b7e136
async: fix typo
jasperzhong Mar 21, 2020
6683233
async: add two loops
jasperzhong Mar 22, 2020
5af389a
async: add func loops
jasperzhong Mar 23, 2020
490be97
speedup: rm std::async
jasperzhong Mar 23, 2020
bb41159
speedup: set omp thread_num 1
jasperzhong Mar 23, 2020
054d807
compression: update script
jasperzhong Mar 24, 2020
f31724d
compression: add thread_pool
jasperzhong Mar 24, 2020
71c699d
compression: update script
jasperzhong Mar 24, 2020
17da182
compression: set pool size = 8
jasperzhong Mar 24, 2020
cd6eb38
compression: fix typo
jasperzhong Mar 24, 2020
f14a605
compression: add cifar100 script
jasperzhong Mar 25, 2020
75b60a9
compression: update script
jasperzhong Mar 25, 2020
2fa725f
compression: update script
jasperzhong Mar 25, 2020
6bad2db
compression: update script
jasperzhong Mar 25, 2020
055dd35
compression: fix typo
jasperzhong Mar 25, 2020
51ab4ad
script: update
jasperzhong Apr 22, 2020
c239431
compression: update 1bit
jasperzhong Apr 28, 2020
c6245a1
compression: rm final
jasperzhong Apr 28, 2020
963a273
compression: fix typo
jasperzhong Apr 28, 2020
cb500cf
compression: fix typo
jasperzhong Apr 28, 2020
1338d0d
compression: update script
jasperzhong Apr 28, 2020
88fafaf
compression: update script
jasperzhong Apr 28, 2020
d0de4d6
compression: update script
jasperzhong Apr 29, 2020
1ab6b12
compression: add time and fix typo
jasperzhong Apr 29, 2020
beead76
compression: use mmap
jasperzhong Apr 29, 2020
90ab3b1
compression: update scripts
jasperzhong Apr 29, 2020
6ec283d
compression: fix script typo
jasperzhong Apr 29, 2020
688e36c
compression: fix mmap sigbus
jasperzhong Apr 29, 2020
e371b1f
comrpession: update header
jasperzhong Apr 29, 2020
9e98e84
compression: add check and errno
jasperzhong Apr 29, 2020
6d61bcd
compression: fix division zero
jasperzhong Apr 29, 2020
0202364
compression: release resourses
jasperzhong Apr 29, 2020
602e575
compression: use double lr
jasperzhong Apr 29, 2020
8940668
compression: update script (default opt is nag)
jasperzhong Apr 30, 2020
a6155d8
register: refactor
jasperzhong May 3, 2020
0a579e1
register: rm extra param
jasperzhong May 3, 2020
356cbea
register: debug
jasperzhong May 3, 2020
85edef9
register: fix typo
jasperzhong May 3, 2020
3706abe
register: update argument parser
jasperzhong May 3, 2020
776d5d7
compression: add weight decay momentum
jasperzhong Apr 30, 2020
9fdec63
1bit-wd: fix static_method bug
jasperzhong Apr 30, 2020
709e2cf
1bit-wd: fix typo
jasperzhong Apr 30, 2020
81a412a
1bit-wd: fix typo
jasperzhong Apr 30, 2020
c2b17bb
1bit-wd: fix bug
jasperzhong Apr 30, 2020
00b49e8
1bit-wd: init mom
jasperzhong Apr 30, 2020
748ce02
1bit-wd: use cache
jasperzhong Apr 30, 2020
d16ac29
debug
jasperzhong Apr 30, 2020
d58cd1e
1bit-wd: fix
jasperzhong Apr 30, 2020
110249f
1bit-wd: fix typo
jasperzhong Apr 30, 2020
065f2ca
1bit-wd: add mnist test
jasperzhong May 3, 2020
b01e52c
1bit-wd: refactor interface
jasperzhong May 3, 2020
239e8af
1bit-wd: update register
jasperzhong May 4, 2020
c84e722
1bit-wd: update script
jasperzhong May 4, 2020
38ad4a0
1bit-wd: add log
jasperzhong May 4, 2020
9ea0ba1
1bit-wd: fix fatal bug
jasperzhong May 4, 2020
4a5ea5f
1bit-wd: rm logs
jasperzhong May 4, 2020
bcff3da
1bit-wd: fix typo
jasperzhong May 4, 2020
258b476
1bit-wd: fix ref
jasperzhong May 4, 2020
23fdcdd
1bit-wd: fix copy bug
jasperzhong May 4, 2020
2fb7a80
1bit-wd: fix typo
jasperzhong May 4, 2020
1de9174
1bit-wd: fix typo in script
jasperzhong May 4, 2020
b841f7b
1bit-wd: fix list out of range bug
jasperzhong May 4, 2020
7a0bea6
compression: async wd momentum (#6)
jasperzhong May 9, 2020
b261e4b
hotfix: use default num_threads (#7)
jasperzhong May 9, 2020
f6fd608
hotfix: use concurrent.futures.Timeout (#8)
jasperzhong May 10, 2020
1e40fd9
compression: speed up wd momentum with threading.Thread (#9)
jasperzhong May 12, 2020
2379e17
compression: add topk compressor (#10)
jasperzhong May 14, 2020
720d7e9
compression: add randomk compressor (#11)
jasperzhong May 14, 2020
d771932
numa: finetune support (#12)
jasperzhong May 14, 2020
afb96c8
rename: more readable (#13)
jasperzhong May 16, 2020
3712c33
hotfix: update non-compression case register error (#14)
jasperzhong May 16, 2020
f881fb4
compression: allreduce results for training scripts (#16)
jasperzhong May 25, 2020
3844228
hotfix: file mode use append (#17)
jasperzhong May 26, 2020
c17bfb6
compression: optimize implementation of compressors (#18)
jasperzhong May 30, 2020
7f12e15
cifar: update (#19)
jasperzhong May 30, 2020
6c45c79
hotfix: fix bugs (#20)
jasperzhong Jun 1, 2020
f7d9969
hotfix: fix fatal bug of new 1bit (#21)
jasperzhong Jun 10, 2020
ae30478
1bit: use double for scaling (#22)
jasperzhong Jun 17, 2020
c507e14
1bit: update wd mom (#23)
jasperzhong Jun 20, 2020
2a98d12
refactor: format and rename (#24)
jasperzhong Jun 24, 2020
d063a38
test: add compressor test cases (#25)
jasperzhong Jun 25, 2020
14a5bc0
misc: add comments (#26)
jasperzhong Jun 26, 2020
3347570
1bit: update FastUpdateErrorImpl (#27)
jasperzhong Jun 27, 2020
e07d7ec
random: swtich to xorshift128p rng backend (#28)
jasperzhong Jun 29, 2020
bc2ab6d
misc: use macros (#29)
jasperzhong Jun 29, 2020
6c44049
1bit: more parallelism (#31)
jasperzhong Jul 1, 2020
3952f43
Dithering (#33)
jasperzhong Jul 7, 2020
79f35b1
dithering: add ef support (#34)
jasperzhong Jul 7, 2020
478c50b
update to lastest ps-lite
jasperzhong Jul 8, 2020
90150a4
add docs (#35)
jasperzhong Jul 9, 2020
d08bdc6
Update byteps/common/core_loops.cc
jasperzhong Jul 10, 2020
39391d2
Update byteps/common/core_loops.cc
jasperzhong Jul 10, 2020
4522c52
dithering: optimize (#43)
jasperzhong Jul 19, 2020
a201d52
misc: fix typos
jasperzhong Jul 19, 2020
7da0596
Update byteps/common/thread_pool.h
jasperzhong Jul 21, 2020
1f190ed
misc: remove unused code & fix some warnings & keep non-alpha sum (#44)
jasperzhong Jul 21, 2020
ac1e9a3
dithering: add max normalization support (#45)
jasperzhong Jul 22, 2020
b4ecc6a
reduce: add num threads recommend (#46)
jasperzhong Jul 22, 2020
c132186
fp16: add fp16 support (#47)
jasperzhong Jul 24, 2020
cb221df
hotfix: fix register bugs of 1bit (#48)
jasperzhong Jul 25, 2020
23e38eb
hotfix: fix server SIGSEGV when shutdown (#50)
jasperzhong Jul 28, 2020
c28f84c
topk: fix fatal bugs when k > 1 (#51)
jasperzhong Jul 29, 2020
3068e31
dithering: fix natural dithering bug (#52)
jasperzhong Jul 29, 2020
8b1218d
sparsification: support factor k (#55)
jasperzhong Jul 30, 2020
ced6f2b
hotfix: fix typo (#56)
jasperzhong Jul 30, 2020
fa7821b
hotfix: add log (#57)
jasperzhong Jul 30, 2020
378cc2f
exp: update cifar100 (#58)
jasperzhong Jul 30, 2020
7dc8d7f
misc: remove recommend omp threads (#59)
jasperzhong Jul 30, 2020
6673b7d
1bit: not need to do wd mom for uncompressed gradients (#61)
jasperzhong Jul 30, 2020
a692fea
hotfix: fix distributed initialization #285
ZiyueHuang Aug 5, 2020
ba60a76
hotfix: merge two buffer (mentioned in #285)
jasperzhong Aug 5, 2020
c6464a9
Revert "hotfix: fix distributed initialization #285"
jasperzhong Aug 6, 2020
40114f3
hotfix: remove unnecessary code
jasperzhong Aug 6, 2020
7875987
test: full test coverage (#53)
jasperzhong Aug 12, 2020
63bbb58
Merge branch 'master' into gradient_compression
jasperzhong Aug 13, 2020
e857315
misc: refactor wdmom
jasperzhong Aug 13, 2020
ec99e82
1bit: use wd_mult
jasperzhong Aug 13, 2020
dfcc670
1bit: update wd mom
jasperzhong Aug 13, 2020
774f49c
mom: nag for uncompressed gradients (#62)
jasperzhong Aug 13, 2020
f894478
hotfix: fix wd mom issue (#63)
jasperzhong Aug 13, 2020
ef6f916
hotfix: update
jasperzhong Aug 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -116,3 +116,4 @@ venv.bak/

# for development
scripts/
exps/
2 changes: 2 additions & 0 deletions byteps/common/common.cc
Original file line number Diff line number Diff line change
@@ -100,6 +100,7 @@ int GetCommandType(RequestType requestType, int d) {
return (((m + d) * (m + d + 1)) / 2) + d;
}

#ifndef BYTEPS_BUILDING_SERVER
ncclDataType_t getNcclDataType(DataType dtype) {
switch (dtype) {
case BYTEPS_FLOAT32:
@@ -121,6 +122,7 @@ ncclDataType_t getNcclDataType(DataType dtype) {
}
return ncclFloat32;
}
#endif

int getDataTypeLength(int dtype) {
switch (dtype) {
50 changes: 41 additions & 9 deletions byteps/common/common.h
Original file line number Diff line number Diff line change
@@ -31,16 +31,23 @@
#include <vector>

// Add for profiling communication events
#include <fstream>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <thread>

#include <chrono>
#include <fstream>
#include <iostream>
#include <queue>
#include <thread>

namespace byteps {
namespace common {
namespace compressor {
struct BPSTensor;
typedef BPSTensor tensor_t;
class Compressor;
class ErrorFeedback;
} // namespace compressor

// Device ID used for CPU.
#define CPU_DEVICE_ID (-1)
@@ -83,8 +90,10 @@ enum QueueType {
COPYD2H,
PCIE_REDUCE,
COORDINATE_PUSH,
COMPRESS,
PUSH,
PULL,
DECOMPRESS,
COPYH2D,
COORDINATE_BROADCAST,
BROADCAST,
@@ -94,10 +103,18 @@ enum QueueType {
const int QueueNum =
(int)QUEUE_NUM_AND_NOT_A_REAL_QUEUE_TYPE_AND_MUST_BE_THE_LAST;

const std::vector<std::string> LogStrings = {
"COORDINATE_REDUCE", "REDUCE", "COPYD2H", "PCIE_REDUCE",
"COORDINATE_PUSH", "PUSH", "PULL", "COPYH2D",
"COORDINATE_BROADCAST", "BROADCAST"};
const std::vector<std::string> LogStrings = {"COORDINATE_REDUCE",
"REDUCE",
"COPYD2H",
"PCIE_REDUCE",
"COORDINATE_PUSH",
"COMPRESS",
"PUSH",
"PULL",
"DECOMPRESS",
"COPYH2D",
"COORDINATE_BROADCAST",
"BROADCAST"};

class Status {
public:
@@ -173,11 +190,17 @@ typedef struct BytePSContext {
std::vector<void*> pcie_cpubuff;
size_t buff_len;
// Used for profiling communication events
std::queue<BPSCommTime *> comm_time;
std::queue<BPSCommTime*> comm_time;
bool profile_flag = false;
int step_cnt = 0;
int local_rank = 0;
std::unordered_map<uint64_t, std::unordered_map<int, std::queue<BPSCommTime *>>> part_comm_time;
std::unordered_map<uint64_t,
std::unordered_map<int, std::queue<BPSCommTime*>>>
part_comm_time;
// Compressor list
std::vector<std::shared_ptr<compressor::Compressor>> compressor_list;
// kwargs
std::unordered_map<std::string, std::string> kwargs;
} BPSContext;

class Tensor {
@@ -233,6 +256,10 @@ struct TensorTableEntry {
std::shared_ptr<std::atomic_int> counter_ptr;
// How many partitions
unsigned int total_partnum = 0;
// Compressor
std::shared_ptr<compressor::Compressor> compressor;
// Compressed
std::shared_ptr<compressor::tensor_t> compressed;
};
using TensorTable = std::unordered_map<std::string, TensorTableEntry>;

@@ -250,6 +277,11 @@ ncclDataType_t getNcclDataType(DataType dtype);

int getDataTypeLength(int dtype);

inline size_t Align(size_t size, int dtype) {
const size_t min_size =
(getDataTypeLength(dtype) * getDataTypeLength(dtype)) * 8;
return size + (min_size - size % min_size) % min_size;
}
} // namespace common
} // namespace byteps

84 changes: 84 additions & 0 deletions byteps/common/compressor/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_COMMON_H
#define BYTEPS_COMPRESSOR_COMMON_H

#include <unordered_map>

namespace byteps {
namespace common {
namespace compressor {
typedef char byte_t;
/*!
* \brief Tensor type
*/
typedef struct BPSTensor {
byte_t* data;
size_t size;
int dtype;

BPSTensor() : data(nullptr), size(0), dtype(0) {}
BPSTensor(void* data, size_t size = 0, int dtype = 0)
: data(reinterpret_cast<byte_t*>(data)), size(size), dtype(dtype) {}
} tensor_t;

using kwargs_t = std::unordered_map<std::string, std::string>;

#define COMPRESS_IMPL_SWITCH(dtype, func, dst, src, size) \
switch (dtype) { \
case BYTEPS_FLOAT32: \
return func(reinterpret_cast<uint32_t*>(dst), \
reinterpret_cast<const float*>(src), size / sizeof(float)); \
case BYTEPS_FLOAT64: \
return func(reinterpret_cast<uint64_t*>(dst), \
reinterpret_cast<const double*>(src), \
size / sizeof(double)); \
default: \
BPS_CHECK(0) << "Unsupported data type:" << dtype; \
}

#define DECOMPRESS_IMPL_SWITCH(dtype, func, dst, src, compressed_size) \
switch (dtype) { \
case BYTEPS_FLOAT32: \
return func(reinterpret_cast<float*>(dst), \
reinterpret_cast<const uint32_t*>(src), compressed_size); \
case BYTEPS_FLOAT64: \
return func(reinterpret_cast<double*>(dst), \
reinterpret_cast<const uint64_t*>(src), compressed_size); \
default: \
BPS_CHECK(0) << "Unsupported data type:" << dtype; \
}

#define FAST_UPDATE_ERROR_IMPL_SWITCH(dtype, func, dst, src1, src2, \
compressed_size) \
switch (dtype) { \
case BYTEPS_FLOAT32: \
return func(reinterpret_cast<float*>(dst), \
reinterpret_cast<float*>(src1), \
reinterpret_cast<const uint32_t*>(src2), compressed_size); \
case BYTEPS_FLOAT64: \
return func(reinterpret_cast<double*>(dst), \
reinterpret_cast<double*>(src1), \
reinterpret_cast<const uint64_t*>(src2), compressed_size); \
default: \
BPS_CHECK(0) << "Unsupported data type:" << dtype; \
}

} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_COMMON_H
138 changes: 138 additions & 0 deletions byteps/common/compressor/compressor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_COMPRESSOR_H
#define BYTEPS_COMPRESSOR_COMPRESSOR_H

#include <memory>

#include "../cpu_reducer.h"
#include "common.h"

namespace byteps {
namespace common {
namespace compressor {
/*!
* \brief Compressor interface
* Compressor defines two universal API - Compress & Decompress
*
* \par
* The caller do not need to allocate additional memory to store compressed data
* because there is an internal buffer to store the compressed data and the
* pointer will be returned to the caller. Then the caller can send the returned
* compressed data as normal.
*
* \par
* There are two optional features of the compressor - error-feedback &
* momentum. These two features can be added to any common compressors like 1bit
* and topk. To be generic, these two features are also compressors, exposing
* the same API as Compressor. More details can be found in their own files.
*
* \par
* To add a new compressor, developers need to inherit this class in 'impl'
* directory. If a new optional feature like error-feedback is needed,
* developers need to use decorator pattern and add new files in the current
* directory. The existing implementation can be used as a reference.
*
*
* \sa ErrorFeedback, Momentum
*/
class Compressor {
public:
Compressor(size_t size, DataType dtype)
: _size(size),
_dtype(dtype),
_buf(new byte_t[size]),
_cpu_reducer(new CpuReducer(nullptr)){};
virtual ~Compressor() = default;

/*!
* \brief Compress function
*
* \note Except for error-feedback and momentum, the underlying data of input
* should never be changed. this is because input is still used in error
* feedback if enabled.
*
* \note Compressed data should be stored in the buffer of the compressor. So
* it is not an inplace operation.
*
* \param grad gradient tensor, passed by value.
* \return compressed tensor. it is the buffer of the compressor,
* which contains the compressed data. the returned size is the size of
* compressed data.
*/
virtual tensor_t Compress(tensor_t grad) = 0;

/*!
* \brief Decompress function
*
* \note For servers, decompression is not an inplace operation. The
* decompressed results locates in the buffer of the compressor. For workers,
* it is an inplace operation.
*
* \param compressed compressed tensor.
* \return decompressed tensor. For servers, it is the buffer of the
* compressor, which contains the decompressed data. For workers, its pointer
* is the same as the input's, while the size is decompressed size, which is
* also the original size.
*/
virtual tensor_t Decompress(tensor_t compressed) = 0;

/*!
* \brief faster version of `UpdateError` via operation fusion
*
* \par
* This is a helper function implemented by each compressor. If defined,
* `ErrorFeedback` will use this function instead of defualt `UpdateError`
* function implemented in error_feedback.cc. If undefined, default
* `UpdateError` will be used.
*
* \par
* Typically `UpdateError` needs to decompress and do a substraction. But for
* most compressors, the step of decompression can be avoided. For example,
* for topk compressor, `UpdateError` can be simplied in this way:
* 1. e <- p (e is the error and p is the corrected gradient)
* 2. zero-fill e with selected k indices
*
* Actually it is a fusion of original decompression and substraction. It is
* optional to override.
*
* \param corrected gradient corrected with error
* \param error error
* \param compressed compressed gradient
*/
virtual void FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) {
BPS_LOG(FATAL) << "FastUpdateError is not implemented";
};

protected:
/*! \brief original size */
size_t _size;

DataType _dtype;

/*! \brief buffer to store compressed grad */
std::unique_ptr<byte_t[]> _buf;

/*! \brief CPU reducer */
std::unique_ptr<CpuReducer> _cpu_reducer;
};

} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_COMPRESSOR_H
60 changes: 60 additions & 0 deletions byteps/common/compressor/compressor_registry.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "compressor_registry.h"

namespace byteps {
namespace common {
namespace compressor {

CompressorRegistry::map_t CompressorRegistry::_ctor_map;

CompressorRegistry::Register::Register(std::string name, ctor_t ctor) {
BPS_CHECK_EQ(_ctor_map.count(name), 0)
<< "Duplicate registration of compressor under name " << name;
_ctor_map.emplace(name + "_type", std::move(ctor));
BPS_LOG(INFO) << name << " compressor is registered";
}

CompressorRegistry::ctor_t CompressorRegistry::Find(const std::string& name) {
auto it = _ctor_map.find(name);
if (it == _ctor_map.end()) {
BPS_LOG(FATAL) << "No compressor registered under name:" << name;
}
return it->second;
}

std::unique_ptr<Compressor> CompressorRegistry::Create(const kwargs_t& kwargs,
size_t size, DataType dtype) {
#ifndef BYTEPS_BUILDING_SERVER
const std::string types[] = {"momentum_type", "ef_type", "compressor_type"};
#else
// server do not need momentum
const std::string types[] = {"ef_type", "compressor_type"};
#endif
for (auto& type : types) {
auto iter = kwargs.find(type);
if (iter != kwargs.end()) {
auto ctor = CompressorRegistry::Find(iter->second + "_" + type);
return ctor(kwargs, size, dtype);
}
}

return nullptr;
}

} // namespace compressor
} // namespace common
} // namespace byteps
54 changes: 54 additions & 0 deletions byteps/common/compressor/compressor_registry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_COMPRESSOR_REGISTRY_H
#define BYTEPS_COMPRESSOR_COMPRESSOR_REGISTRY_H

#include "compressor.h"
#include "utils.h"

namespace byteps {
namespace common {
namespace compressor {

class CompressorRegistry {
public:
// constructor of compressor
using ctor_t = std::function<std::unique_ptr<Compressor>(
const kwargs_t& kwargs, size_t size, DataType dtype)>;

using map_t = std::unordered_map<std::string, ctor_t>;

struct Register {
Register(std::string name, ctor_t ctor);
};

static ctor_t Find(const std::string& name);

static std::unique_ptr<Compressor> Create(const kwargs_t& kwargs, size_t size,
DataType dtype);

private:
static map_t _ctor_map;

CompressorRegistry() = delete;
~CompressorRegistry() = delete;
};

} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_COMPRESSOR_REGISTRY_H
47 changes: 47 additions & 0 deletions byteps/common/compressor/error_feedback.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "error_feedback.h"

namespace byteps {
namespace common {
namespace compressor {

tensor_t ErrorFeedback::Compress(tensor_t grad) {
// 1. grad <- grad + error
UpdateGradient(grad);

// 2. c <- Compress(grad)
auto compressed = _cptr->Compress(grad);

// 3. e <- grad - Decompress(c)
UpdateError(grad, compressed);

return compressed;
}

tensor_t ErrorFeedback::Decompress(tensor_t compressed) {
// directly forward to internal compressor
return _cptr->Decompress(compressed);
}

void ErrorFeedback::UpdateError(tensor_t corrected, tensor_t compressed) {
tensor_t error{_error.get(), _size, corrected.dtype};
_cptr->FastUpdateError(error, corrected, compressed);
}

} // namespace compressor
} // namespace common
} // namespace byteps
96 changes: 96 additions & 0 deletions byteps/common/compressor/error_feedback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_ERROR_FEEDBACK_H
#define BYTEPS_COMPRESSOR_ERROR_FEEDBACK_H

#include "compressor.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief Error feedback Decorator
*
* paper: 1-bit stochastic gradient descent and its application to data-parallel
* distributed training of speech dnns
* https://www.microsoft.com/en-us/research/wp-content/uploads/2016/02/IS140694.pdf
*
* 1. UpdateGradient: g <- g + e
* 2. UpdateError: e <- g - c
*
* These two functions should be implemented in children classes.
*
* \par
* The caller do not need allocate an additional buffer to store error. There is
* a buffer already inside the class.
*
* \par
* Add error feedback behavior to any compressor at run-time via decorator
* pattern. It keeps the same interface as Compressor. Compress and Decompress
* have been implemented and can not be changed in children classes.
*
* \sa Compressor, VanillaErrorFeedbackCompressor
*/
class ErrorFeedback : public Compressor {
public:
// error buffer should be cleared to zeros at the beginning.
ErrorFeedback(size_t size, DataType dtype, std::unique_ptr<Compressor> cptr)
: Compressor(size, dtype),
_error(new byte_t[size]()),
_cptr(std::move(cptr)) {}
virtual ~ErrorFeedback() = default;

virtual tensor_t Compress(tensor_t grad) final;

virtual tensor_t Decompress(tensor_t compressed) final;

protected:
/*!
* \brief Correct gradient with error
*
* grad += error
*
* \note it is an inplace operation.
*
* \param grad input gradient to be updated inplace
* \param dtype type
*/
virtual void UpdateGradient(tensor_t grad) = 0;

/*!
* \brief Update error
*
* error = corrected_grad - decompressed
*
* \param corrected refers to gradient + error
* \param compressed compressed tensor
*/
virtual void UpdateError(tensor_t corrected, tensor_t compressed);

protected:
/*! \brief buffer of error */
std::unique_ptr<byte_t[]> _error;

private:
/*! \brief compressor pointer */
std::unique_ptr<Compressor> _cptr;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_ERROR_FEEDBACK_H
205 changes: 205 additions & 0 deletions byteps/common/compressor/impl/dithering.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include <cmath>

#include "../compressor_registry.h"
#include "dithering.h"

namespace byteps {
namespace common {
namespace compressor {
namespace {
CompressorRegistry::Register reg(
"dithering_compressor",
[](const kwargs_t& kwargs, size_t size,
DataType dtype) -> std::unique_ptr<Compressor> {
std::tuple<> params;
auto k = HyperParamFinder<unsigned>(kwargs, "compressor_k");

auto seed = HyperParamFinder<unsigned>(kwargs, "seed", true, [](unsigned x){
return x != 0;
});

auto ptype_int = HyperParamFinder<int>(
kwargs, "seed", true, [](int x) { return x == 0 || x == 1; });
auto ptype = static_cast<DitheringCompressor::PartitionType>(ptype_int);

return std::unique_ptr<Compressor>(
new DitheringCompressor(size, dtype, k, seed, ptype));
});
}

template <typename index_t, typename scalar_t>
tensor_t DitheringCompressor::CompressImpl(index_t* dst, const scalar_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
// normalize
double l2 = 0.0;
#pragma omp parallel for simd num_threads(4) reduction(+ : l2)
for (size_t i = 0; i < len; ++i) {
l2 += src[i] * src[i];
}
l2 = std::sqrt(l2);

BitWriter<index_t> bit_writer(dst);
size_t last_non_zero_pos = -1;
if (_ptype == PartitionType::LINEAR) {
for (size_t i = 0; i < len; ++i) {
float abs_x = std::abs(src[i]);
float normalized = (abs_x / l2) * _s;
float floor = std::floor(normalized);
unsigned quantized = floor + _rng.Bernoulli(normalized - floor);
if (quantized) {
size_t diff = i - last_non_zero_pos;
last_non_zero_pos = i;
EliasDeltaEncode(bit_writer, diff);
bit_writer.Put(std::signbit(src[i]));
EliasDeltaEncode(bit_writer, quantized);
}
}
} else if (_ptype == PartitionType::NATURAL) {
const unsigned scale = 1 << (_s - 1);
for (size_t i = 0; i < len; ++i) {
float abs_x = std::abs(src[i]);
float normalized = (abs_x / l2) * scale;
float floor = RoundNextPow2(std::ceil(normalized)) << 1;
unsigned quantized =
floor * (1 + _rng.Bernoulli((normalized - floor) / floor));
if (quantized) {
size_t diff = i - last_non_zero_pos;
last_non_zero_pos = i;
EliasDeltaEncode(bit_writer, diff);
bit_writer.Put(std::signbit(src[i]));
EliasDeltaEncode(bit_writer, quantized);
}
}
}
bit_writer.Flush();

// bits
index_t* p_bits = reinterpret_cast<index_t*>(&dst[bit_writer.blocks()]);
*p_bits = bit_writer.bits();

// l2
float* p_scale = reinterpret_cast<float*>(&dst[bit_writer.blocks() + 1]);
*p_scale = l2;

return {dst, bit_writer.blocks() * sizeof(index_t) + sizeof(index_t) +
sizeof(float)};
}

tensor_t DitheringCompressor::Compress(tensor_t grad) {
COMPRESS_IMPL_SWITCH(grad.dtype, CompressImpl, _buf.get(), grad.data,
grad.size);
}

template <typename index_t, typename scalar_t>
tensor_t DitheringCompressor::DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");

const size_t blocks =
(compressed_size - sizeof(float) - sizeof(index_t)) / sizeof(index_t);
auto* p_bits = reinterpret_cast<const index_t*>(src + blocks);
const index_t bits = *p_bits;

auto* p_scale = reinterpret_cast<const float*>(src + blocks + 1);
const float scale = *p_scale;

auto ptr = const_cast<index_t*>(src);
if ((void*)dst == (void*)src) {
ptr = reinterpret_cast<index_t*>(_buf.get());
std::memcpy(ptr, src, compressed_size);
}
std::memset(dst, 0, _size);

unsigned int s = _s;
if (_ptype == PartitionType::NATURAL) {
s = 1 << (_s - 1);
}

BitReader<index_t> bit_reader(ptr);
size_t last_non_zero_pos = -1;
while (bit_reader.bits() < bits) {
size_t diff = EliasDeltaDecode(bit_reader);
size_t i = last_non_zero_pos + diff;
last_non_zero_pos = i;
int signbit = bit_reader.Get();
unsigned quantized = EliasDeltaDecode(bit_reader);
float num = quantized * scale / s;
dst[i] = (1 - (signbit << 1)) * num;
}

return {dst, _size};
}

tensor_t DitheringCompressor::Decompress(tensor_t compressed) {
#ifdef BYTEPS_BUILDING_SERVER
auto dst = _buf.get();
#else
auto dst = compressed.data;
#endif
DECOMPRESS_IMPL_SWITCH(_dtype, DecompressImpl, dst, compressed.data,
compressed.size);
}

template <typename index_t, typename scalar_t>
void DitheringCompressor::FastUpdateErrorImpl(scalar_t* error,
scalar_t* corrected,
const index_t* compressed,
size_t compressed_size) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");

const size_t blocks =
(compressed_size - sizeof(float) - sizeof(index_t)) / sizeof(index_t);
auto* p_bits = reinterpret_cast<const index_t*>(compressed + blocks);
const index_t bits = *p_bits;

auto* p_scale = reinterpret_cast<const float*>(compressed + blocks + 1);
const float scale = *p_scale;

std::memcpy(error, corrected, _size);

unsigned int s = _s;
if (_ptype == PartitionType::NATURAL) {
s = 1 << (_s - 1);
}

BitReader<index_t> bit_reader(compressed);
size_t last_non_zero_pos = -1;
while (bit_reader.bits() < bits) {
size_t diff = EliasDeltaDecode(bit_reader);
size_t i = last_non_zero_pos + diff;
last_non_zero_pos = i;
int signbit = bit_reader.Get();
unsigned quantized = EliasDeltaDecode(bit_reader);
float num = quantized * scale / s;
error[i] -= (1 - (signbit << 1)) * num;
}
}

void DitheringCompressor::FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) {
FAST_UPDATE_ERROR_IMPL_SWITCH(_dtype, FastUpdateErrorImpl, error.data,
corrected.data, compressed.data,
compressed.size);
}
} // namespace compressor
} // namespace common
} // namespace byteps
79 changes: 79 additions & 0 deletions byteps/common/compressor/impl/dithering.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_IMPL_MULTIBIT_H
#define BYTEPS_COMPRESSOR_IMPL_MULTIBIT_H

#include "../compressor.h"
#include "../utils.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief Dithering Compressor
*
* paper: Natural Compression for Distributed Deep Learning
* https://arxiv.org/pdf/1905.10988.pdf
*
* two kinds of partition:
* 1. linear: {0, 1/s, 2/s, ..., (s-1)/s, 1}
*
* 2. natural: {0, 2^{1-s}, 2^(2-s), ..., 2^{-1}, 1}
*/
class DitheringCompressor : public Compressor {
public:
enum class PartitionType { LINEAR = 0, NATURAL = 1 };

DitheringCompressor(size_t size, DataType dtype, unsigned int s, unsigned int seed = 0,
PartitionType ptype = PartitionType::LINEAR)
: Compressor(size, dtype), _s(s), _ptype(ptype) {
if (seed) {
_rng.set_seed(seed);
}
};
virtual ~DitheringCompressor() = default;

tensor_t Compress(tensor_t grad) override;

tensor_t Decompress(tensor_t compressed) override;

void FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) override;

private:
template <typename index_t, typename scalar_t>
tensor_t CompressImpl(index_t* dst, const scalar_t* src, size_t len);

template <typename index_t, typename scalar_t>
tensor_t DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size);

template <typename index_t, typename scalar_t>
void FastUpdateErrorImpl(scalar_t* error, scalar_t* corrected,
const index_t* compressed, size_t compressed_size);

/*! \brief number of levels */
unsigned int _s;

PartitionType _ptype;
XorShift128PlusBitShifterRNG _rng;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_IMPL_MULTIBIT_H
53 changes: 53 additions & 0 deletions byteps/common/compressor/impl/nesterov_momentum.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2020 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "nesterov_momentum.h"
#include "../compressor_registry.h"

namespace byteps {
namespace common {
namespace compressor {
namespace {
CompressorRegistry::Register reg(
"nesterov_momentum",
[](const kwargs_t& kwargs, size_t size,
DataType dtype) -> std::unique_ptr<Compressor> {
// register cptr
auto kwargs_clone = kwargs;
kwargs_clone.erase("momentum_type");
auto cptr = CompressorRegistry::Create(kwargs_clone, size, dtype);
BPS_CHECK_NE(cptr, nullptr);
// find \mu
auto mu = HyperParamFinder<float>(kwargs, "compressor_mu");
return std::unique_ptr<NesterovMomentumCompressor>(
new NesterovMomentumCompressor(size, dtype, std::move(cptr), mu));
});
}

void NesterovMomentumCompressor::UpdateMom(tensor_t grad) {
// m_t = \mu * m_{t-1} + g_t
this->_cpu_reducer->sum(_mom.get(), grad.data, _mom.get(), grad.size,
static_cast<DataType>(grad.dtype), _mu);
}

void NesterovMomentumCompressor::UpdateGradient(tensor_t grad) {
// p_t = \mu m_t + g_t
this->_cpu_reducer->sum(grad.data, _mom.get(), grad.size,
static_cast<DataType>(grad.dtype), _mu);
}

} // namespace compressor
} // namespace common
} // namespace byteps
51 changes: 51 additions & 0 deletions byteps/common/compressor/impl/nesterov_momentum.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2020 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_IMPL_NESTEROV_MOMENTUM_H
#define BYTEPS_COMPRESSOR_IMPL_NESTEROV_MOMENTUM_H

#include "../momentum.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief Nesterov Momentum Compressor
*
* paper: A method for solving the convex programming problem with convergence
* rate $O (1/k^2)$
*
* m_t <- \mu m_{t-1} + g_t
* g_t <- \mu m_t + g_t
*
*/
class NesterovMomentumCompressor : public Momentum {
public:
NesterovMomentumCompressor(size_t size, DataType dtype,
std::unique_ptr<Compressor> cptr, float mu)
: Momentum(size, dtype, std::move(cptr), mu){};
virtual ~NesterovMomentumCompressor() = default;

protected:
void UpdateMom(tensor_t grad) override;
void UpdateGradient(tensor_t grad) override;
};

} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_IMPL_NESTEROV_MOMENTUM_H
142 changes: 142 additions & 0 deletions byteps/common/compressor/impl/onebit.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "onebit.h"
#include "../compressor_registry.h"

namespace byteps {
namespace common {
namespace compressor {
namespace {
CompressorRegistry::Register reg("onebit_compressor", [](const kwargs_t& kwargs,
size_t size,
DataType dtype) {
auto scaled =
HyperParamFinder<bool>(kwargs, "compressor_onebit_scaling", true);
return std::unique_ptr<Compressor>(new OnebitCompressor(size, dtype, scaled));
});
}

template <typename index_t, typename scalar_t>
tensor_t OnebitCompressor::CompressImpl(index_t* dst, const scalar_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
constexpr size_t PACKING_SIZE = sizeof(scalar_t) * 8;
size_t padding_len = (PACKING_SIZE - (len % PACKING_SIZE)) % PACKING_SIZE;
const size_t chunk_len = (len + padding_len) / PACKING_SIZE;

float scale = 1.0f;
if (_use_scale) {
double sum = 0.0f;
#pragma omp parallel for simd num_threads(4) reduction(+ : sum)
for (size_t i = 0; i < len; ++i) {
sum += std::abs(src[i]);
}
scale = sum / len;
}

#pragma omp parallel for simd num_threads(4)
for (size_t i = 0; i < chunk_len; ++i) {
index_t x = src[i * PACKING_SIZE] < 0;
for (size_t j = 1; j < PACKING_SIZE; ++j) {
x <<= 1;
x |= src[i * PACKING_SIZE + j] < 0;
}
dst[i] = x;
}

float* p_scale = reinterpret_cast<float*>(&dst[chunk_len]);
*p_scale = scale;

return {dst, chunk_len * sizeof(index_t) + sizeof(float)};
} // namespace compressor

tensor_t OnebitCompressor::Compress(tensor_t grad) {
COMPRESS_IMPL_SWITCH(grad.dtype, CompressImpl, _buf.get(), grad.data,
grad.size);
}

template <typename scalar_t, typename index_t>
tensor_t OnebitCompressor::DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size) {
static_assert(sizeof(scalar_t) == sizeof(index_t),
"scalar_t should be the same size as index_t");
constexpr size_t PACKING_SIZE = sizeof(index_t) * 8;
const size_t chunk_len = (compressed_size - sizeof(float)) / sizeof(index_t);

auto* pf = reinterpret_cast<const float*>(src + chunk_len);
float scale = *pf;

index_t* ptr = const_cast<index_t*>(src);
if ((void*)dst == (void*)src) {
ptr = reinterpret_cast<index_t*>(_buf.get());
std::memcpy(ptr, src, compressed_size);
}

#pragma omp parallel for simd num_threads(4)
for (int i = chunk_len - 1; i >= 0; --i) {
index_t x = ptr[i];
for (int j = PACKING_SIZE - 1; j >= 0; --j) {
int sign = 1 - ((x & 0x01) << 1);
dst[i * PACKING_SIZE + j] = sign * scale;
x >>= 1;
}
}

return {dst, _size};
}

tensor_t OnebitCompressor::Decompress(tensor_t compressed) {
#ifdef BYTEPS_BUILDING_SERVER
auto dst = _buf.get();
#else
auto dst = compressed.data;
#endif
DECOMPRESS_IMPL_SWITCH(_dtype, DecompressImpl, dst, compressed.data,
compressed.size);
}

template <typename scalar_t, typename index_t>
void OnebitCompressor::FastUpdateErrorImpl(scalar_t* error, scalar_t* corrected,
const index_t* compressed,
size_t compressed_size) {
constexpr size_t PACKING_SIZE = sizeof(index_t) * 8;
const size_t chunk_len = (compressed_size - sizeof(float)) / sizeof(index_t);

auto* pf = reinterpret_cast<const float*>(compressed + chunk_len);
float scale = *pf;

#pragma omp parallel for simd num_threads(4)
for (int i = chunk_len - 1; i >= 0; --i) {
index_t x = compressed[i];
for (int j = PACKING_SIZE - 1; j >= 0; --j) {
int sign = ((x & 0x01) << 1) - 1;
error[i * PACKING_SIZE + j] =
corrected[i * PACKING_SIZE + j] + sign * scale;
x >>= 1;
}
}
}

void OnebitCompressor::FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) {
FAST_UPDATE_ERROR_IMPL_SWITCH(_dtype, FastUpdateErrorImpl, error.data,
corrected.data, compressed.data,
compressed.size);
}
} // namespace compressor
} // namespace common
} // namespace byteps
95 changes: 95 additions & 0 deletions byteps/common/compressor/impl/onebit.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_IMPL_ONEBIT_H
#define BYTEPS_COMPRESSOR_IMPL_ONEBIT_H

#include "../compressor.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief Onebit Compressor
*
* paper: SIGNSGD: Compressed Optimisation for Non-Convex Problems
* https://arxiv.org/pdf/1802.04434.pdf
*
* each worker i:
* c_i <- sign(grad)
*
* server: majority vote
* sign(\sum_i c_i)
*
* \note 0 represents positive and 1 represents negative.
*/
class OnebitCompressor : public Compressor {
public:
OnebitCompressor(size_t size, DataType dtype, bool use_scale = false)
: Compressor(size, dtype), _use_scale(use_scale) {}
virtual ~OnebitCompressor() = default;

/*!
* \brief Compress function
*
* compress and pack into byte array.
* each bit represents a sign.
*
* \param grad gradient tensor
* \param compressed compressed tensor
*/
tensor_t Compress(tensor_t grad) override;

/*!
* \brief Decompress function
*
* unpack from byte array to FP tensor
*
* \param compressed compressed tensor
* \param decompressed decompressed tensor
*/
tensor_t Decompress(tensor_t compressed) override;

/*!
* \brief help function for error feedback `UpdateError`
*
* \param corrected gradient corrected with error
* \param error error
* \param compressed compressed gradient
*/
void FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) override;

private:
template <typename index_t, typename scalar_t>
tensor_t CompressImpl(index_t* dst, const scalar_t* src, size_t len);

template <typename scalar_t, typename index_t>
tensor_t DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size);

template <typename scalar_t, typename index_t>
void FastUpdateErrorImpl(scalar_t* error, scalar_t* corrected,
const index_t* compressed, size_t compressed_size);

private:
bool _use_scale;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_IMPL_ONEBIT_H
120 changes: 120 additions & 0 deletions byteps/common/compressor/impl/randomk.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "randomk.h"
#include "../compressor_registry.h"

namespace byteps {
namespace common {
namespace compressor {
namespace {
CompressorRegistry::Register reg(
"randomk_compressor",
[](const kwargs_t& kwargs, size_t size,
DataType dtype) -> std::unique_ptr<Compressor> {
auto k = HyperParamFinder<unsigned>(kwargs, "compressor_k");

auto seed = HyperParamFinder<unsigned>(kwargs, "seed", true,
[](unsigned x) { return x != 0; });

return std::unique_ptr<Compressor>(
new RandomkCompressor(size, dtype, k, seed));
});
}

template <typename index_t, typename scalar_t>
tensor_t RandomkCompressor::CompressImpl(index_t* dst, const scalar_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
BPS_CHECK_LE(this->_k, len / 2);
using pair_t = std::pair<index_t, scalar_t>;
auto ptr = reinterpret_cast<pair_t*>(dst);

for (size_t i = 0; i < this->_k; ++i) {
auto index = _rng.Randint(0, len);
ptr[i] = std::make_pair(index, src[index]);
}

return {dst, this->_k * sizeof(pair_t)};
}

tensor_t RandomkCompressor::Compress(tensor_t grad) {
COMPRESS_IMPL_SWITCH(grad.dtype, CompressImpl, _buf.get(), grad.data,
grad.size);
}

template <typename index_t, typename scalar_t>
tensor_t RandomkCompressor::DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
using pair_t = std::pair<index_t, scalar_t>;

auto ptr = reinterpret_cast<const pair_t*>(src);
if ((void*)dst == (void*)src) {
auto buf = reinterpret_cast<pair_t*>(_buf.get());
std::memcpy(buf, ptr, compressed_size);
ptr = const_cast<const pair_t*>(buf);
}

// reset to zeros
std::memset(dst, 0, _size);
size_t len = compressed_size / sizeof(pair_t);
for (size_t i = 0; i < len; ++i) {
auto& pair = ptr[i];
dst[pair.first] = pair.second;
}

return {dst, _size};
}

tensor_t RandomkCompressor::Decompress(tensor_t compressed) {
#ifdef BYTEPS_BUILDING_SERVER
auto dst = _buf.get();
#else
auto dst = compressed.data;
#endif
DECOMPRESS_IMPL_SWITCH(_dtype, DecompressImpl, dst, compressed.data,
compressed.size);
}

template <typename index_t, typename scalar_t>
void RandomkCompressor::FastUpdateErrorImpl(scalar_t* error,
scalar_t* corrected,
const index_t* compressed,
size_t compressed_size) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
using pair_t = std::pair<index_t, scalar_t>;

std::memcpy(error, corrected, _size);

auto ptr = reinterpret_cast<const pair_t*>(compressed);
for (size_t i = 0; i < this->_k; ++i) {
auto& pair = ptr[i];
error[pair.first] = 0;
}
}

void RandomkCompressor::FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) {
FAST_UPDATE_ERROR_IMPL_SWITCH(_dtype, FastUpdateErrorImpl, error.data,
corrected.data, compressed.data,
compressed.size);
}
} // namespace compressor
} // namespace common
} // namespace byteps
104 changes: 104 additions & 0 deletions byteps/common/compressor/impl/randomk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_IMPL_RANDOMK_H
#define BYTEPS_COMPRESSOR_IMPL_RANDOMK_H

#include <random>

#include "../compressor.h"
#include "../utils.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief RandomK Compressor
*
* paper: Sparsified SGD with Memory
* https://arxiv.org/pdf/1809.07599.pdf
*
* randomly sending k entries of the stochastic gradient
*
* \note it is a stochastic algorithm. If you want to have deterministic
* behavior, please set a seed in the configurations.
*/
class RandomkCompressor : public Compressor {
public:
RandomkCompressor(size_t size, DataType dtype, unsigned int k, unsigned int seed = 0)
: Compressor(size, dtype), _k(k) {
if (seed != 0) {
BPS_LOG(INFO) << "SET SEED = " << seed;
_rng.set_seed(seed);
}
};
virtual ~RandomkCompressor() = default;

/*!
* \brief Compress function
*
* randomly select k entries and corresponding indices
*
* \param grad gradient tensor
* \param compressed compressed tensor
*/
tensor_t Compress(tensor_t grad) override;

/*!
* \brief Decompress function
*
* fill a zero tensor with topk entries and corresponding indices
*
* \param compressed compressed tensor
* \param decompressed decompressed tensor
*/
tensor_t Decompress(tensor_t compressed) override;

/*!
* \brief faster version of `UpdateError`
*
* 1. e <- p (e is the error and p is the corrected gradient)
* 2. zero-fill e with selected k indices
*
* \param corrected gradient corrected with error
* \param error error
* \param compressed compressed gradient
*/
void FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) override;

private:
template <typename index_t, typename scalar_t>
tensor_t CompressImpl(index_t* dst, const scalar_t* src, size_t len);

template <typename index_t, typename scalar_t>
tensor_t DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size);

template <typename index_t, typename scalar_t>
void FastUpdateErrorImpl(scalar_t* error, scalar_t* corrected,
const index_t* compressed, size_t compressed_size);

private:
unsigned int _k;
std::random_device _rd;
XorShift128PlusBitShifterRNG _rng;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_IMPL_RANDOMK_H
131 changes: 131 additions & 0 deletions byteps/common/compressor/impl/topk.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include <queue>

#include "../compressor_registry.h"
#include "topk.h"

namespace byteps {
namespace common {
namespace compressor {
namespace {
CompressorRegistry::Register reg(
"topk_compressor",
[](const kwargs_t& kwargs, size_t size,
DataType dtype) -> std::unique_ptr<Compressor> {
auto k = HyperParamFinder<unsigned>(kwargs, "compressor_k");
return std::unique_ptr<Compressor>(new TopkCompressor(size, dtype, k));
});
}

template <typename index_t, typename scalar_t>
tensor_t TopkCompressor::CompressImpl(index_t* dst, const scalar_t* src,
size_t len) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
BPS_CHECK_LE(this->_k, len / 2);
using pair_t = std::pair<index_t, scalar_t>;
auto comp = [](const pair_t& lhs, const pair_t& rhs) {
return lhs.second > rhs.second;
};

auto beg = reinterpret_cast<pair_t*>(dst);
size_t size = 0;
for (index_t i = 0; i < len; ++i) {
if (i < this->_k) {
beg[size] = std::make_pair(i, src[i]);
size++;
std::push_heap(beg, beg + size, comp);
} else {
auto& top = *beg;
// note: compare absolute value
if (std::abs(src[i]) > std::abs(top.second)) {
std::pop_heap(beg, beg + size, comp);
beg[size - 1] = std::make_pair(i, src[i]);
std::push_heap(beg, beg + size, comp);
}
}
}

return {dst, this->_k * sizeof(pair_t)};
}

tensor_t TopkCompressor::Compress(tensor_t grad) {
COMPRESS_IMPL_SWITCH(grad.dtype, CompressImpl, _buf.get(), grad.data,
grad.size);
}

template <typename index_t, typename scalar_t>
tensor_t TopkCompressor::DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
using pair_t = std::pair<index_t, scalar_t>;

auto ptr = reinterpret_cast<const pair_t*>(src);
if ((void*)dst == (void*)src) {
auto buf = reinterpret_cast<pair_t*>(_buf.get());
std::memcpy(buf, ptr, compressed_size);
ptr = const_cast<const pair_t*>(buf);
}

// reset to zeros
std::memset(dst, 0, _size);
size_t len = compressed_size / sizeof(pair_t);
for (size_t i = 0; i < len; ++i) {
auto& pair = ptr[i];
dst[pair.first] = pair.second;
}

return {dst, _size};
}

tensor_t TopkCompressor::Decompress(tensor_t compressed) {
#ifdef BYTEPS_BUILDING_SERVER
auto dst = _buf.get();
#else
auto dst = compressed.data;
#endif
DECOMPRESS_IMPL_SWITCH(_dtype, DecompressImpl, dst, compressed.data,
compressed.size);
}

template <typename index_t, typename scalar_t>
void TopkCompressor::FastUpdateErrorImpl(scalar_t* error, scalar_t* corrected,
const index_t* compressed,
size_t compressed_size) {
static_assert(sizeof(index_t) == sizeof(scalar_t),
"index_t should be the same size as scalar_t");
using pair_t = std::pair<index_t, scalar_t>;

std::memcpy(error, corrected, _size);

auto ptr = reinterpret_cast<const pair_t*>(compressed);
for (size_t i = 0; i < this->_k; ++i) {
auto& pair = ptr[i];
error[pair.first] = 0;
}
}

void TopkCompressor::FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) {
FAST_UPDATE_ERROR_IMPL_SWITCH(_dtype, FastUpdateErrorImpl, error.data,
corrected.data, compressed.data,
compressed.size);
}
} // namespace compressor
} // namespace common
} // namespace byteps
94 changes: 94 additions & 0 deletions byteps/common/compressor/impl/topk.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_IMPL_TOPK_H
#define BYTEPS_COMPRESSOR_IMPL_TOPK_H

#include "../compressor.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief TopK Compressor
*
* paper: Sparsified SGD with Memory
* https://arxiv.org/pdf/1809.07599.pdf
*
* sending the most significant entries of the stochastic gradient
*
*/
class TopkCompressor : public Compressor {
public:
TopkCompressor(size_t size, DataType dtype, unsigned int k)
: Compressor(size, dtype), _k(k){};
virtual ~TopkCompressor() = default;

/*!
* \brief Compress function
*
* select topk entries and corresponding indices
*
* \note compare with absolute values
*
* \param grad gradient tensor
* \param compressed compressed tensor
*/
tensor_t Compress(tensor_t grad) override;

/*!
* \brief Decompress function
*
* fill a zero tensor with topk entries and corresponding indices
*
* \param compressed compressed tensor
* \param decompressed decompressed tensor
*/
tensor_t Decompress(tensor_t compressed) override;

/*!
* \brief faster version of `UpdateError`
*
* 1. e <- p (e is the error and p is the corrected gradient)
* 2. zero-fill e with selected k indices
*
* \param corrected gradient corrected with error
* \param error error
* \param compressed compressed gradient
*/
void FastUpdateError(tensor_t error, tensor_t corrected,
tensor_t compressed) override;

private:
template <typename index_t, typename scalar_t>
tensor_t CompressImpl(index_t* dst, const scalar_t* src, size_t len);

template <typename index_t, typename scalar_t>
tensor_t DecompressImpl(scalar_t* dst, const index_t* src,
size_t compressed_size);

template <typename index_t, typename scalar_t>
void FastUpdateErrorImpl(scalar_t* error, scalar_t* corrected,
const index_t* compressed, size_t compressed_size);

private:
unsigned int _k;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_IMPL_TOPK_H
68 changes: 68 additions & 0 deletions byteps/common/compressor/impl/vanilla_error_feedback.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>

#include "../compressor_registry.h"
#include "vanilla_error_feedback.h"

namespace byteps {
namespace common {
namespace compressor {
namespace {
CompressorRegistry::Register reg(
"vanilla_ef",
[](const kwargs_t& kwargs, size_t size,
DataType dtype) -> std::unique_ptr<Compressor> {
// register cptr
auto kwargs_clone = kwargs;
kwargs_clone.erase("ef_type");
auto cptr = CompressorRegistry::Create(kwargs_clone, size, dtype);
BPS_CHECK_NE(cptr, nullptr);
return std::unique_ptr<VanillaErrorFeedbackCompressor>(
new VanillaErrorFeedbackCompressor(size, dtype, std::move(cptr)));
});
}

VanillaErrorFeedbackCompressor::VanillaErrorFeedbackCompressor(
size_t size, DataType dtype, std::unique_ptr<Compressor> cptr)
: ErrorFeedback(size, dtype, std::move(cptr)) {
_fd = open("lr.s", O_RDONLY);
BPS_CHECK(_fd > 0) << "open lr.s failed, errno=" << strerror(errno);
void* ptr = mmap(0, 8, PROT_READ, MAP_SHARED, _fd, 0);
BPS_CHECK_NE(ptr, MAP_FAILED) << "mmap failed, errno=" << strerror(errno);
_mm = ptr;
_pre_lr = _cur_lr = *reinterpret_cast<double*>(_mm);
}

VanillaErrorFeedbackCompressor::~VanillaErrorFeedbackCompressor() {
munmap(_mm, 8);
close(_fd);
}

void VanillaErrorFeedbackCompressor::UpdateGradient(tensor_t grad) {
_cur_lr = *reinterpret_cast<double*>(_mm);
this->_cpu_reducer->sum(grad.data, _error.get(), grad.size,
static_cast<DataType>(grad.dtype),
(_pre_lr / _cur_lr));
_pre_lr = _cur_lr;
}

} // namespace compressor
} // namespace common
} // namespace byteps
68 changes: 68 additions & 0 deletions byteps/common/compressor/impl/vanilla_error_feedback.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_IMPL_VANILLA_ERROR_FEEDBACK_H
#define BYTEPS_COMPRESSOR_IMPL_VANILLA_ERROR_FEEDBACK_H

#include "../error_feedback.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief Vanilla Error Feedback Compressor
*
* paper: Communication-efficient distributed blockwise momentum sgd with
* error-feedback
* https://arxiv.org/pdf/1905.10936.pdf
*
* each worker i:
* p_{t,i} <- g_{t,i} + \frac{\eta_{t-1}}{\eta_t} e_{t,i}
* c_{t,i} <- Compress(p_{t,i})
* e_{t,i} <- p_{t,i} - c_{t,i}
*
* server:
* \tilde{p}_{t} <- \frac{1}{M} \sum_{i=1}^{M} c_{t,i}
* +\frac{\eta_{t-1}}{\eta_{t}} \tilde{e_t} \tilde{e}_{t+1} <-
* \tilde{p}_{t}-\tilde{c_t}
*
* Error-correction: error needs to be scaled with \frac{\eta_{t-1}}{\eta_t}.
*/
class VanillaErrorFeedbackCompressor : public ErrorFeedback {
public:
VanillaErrorFeedbackCompressor(size_t size, DataType dtype,
std::unique_ptr<Compressor> cptr);
virtual ~VanillaErrorFeedbackCompressor();

protected:
void UpdateGradient(tensor_t grad) override;

private:
/*!
* \brief learning rate
*
* read from file each step
*/
double _pre_lr, _cur_lr;

int _fd;
void* _mm;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_IMPL_VANILLA_ERROR_FEEDBACK_H
40 changes: 40 additions & 0 deletions byteps/common/compressor/momentum.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2020 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#include "momentum.h"

namespace byteps {
namespace common {
namespace compressor {

tensor_t Momentum::Compress(tensor_t grad) {
// 1. m_t = \mu * m_{t-1} + g_t
UpdateMom(grad);

// 2. p_t = \mu m_t + g_t
UpdateGradient(grad);

// 3. compress
return _cptr->Compress(grad);
}

tensor_t Momentum::Decompress(tensor_t compressed) {
// directly forward to internal compressor
return _cptr->Decompress(compressed);
}

} // namespace compressor
} // namespace common
} // namespace byteps
91 changes: 91 additions & 0 deletions byteps/common/compressor/momentum.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2020 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_MOMENTUM_H
#define BYTEPS_COMPRESSOR_MOMENTUM_H

#include "compressor.h"

namespace byteps {
namespace common {
namespace compressor {
/*!
* \brief Momentum
*
* Stochastic gradient descent with momentum
*
* \note
* The momentum is added to gradient before compression. This should not be used
* at the same time with the momentum implemented in the framework such as
* MXNet, Tensorflow or PyTorch etc. The key difference between the two is the
* position where they are added to the gradients. For this one, it is added
* before push_pull. But for framework's momentum, it is added after push_pull.
*
* \note
* The framework's momentum is disabled when using this momentum. User do not
* need to disable it manully.
*
* \sa Compressor, NesterovMomentumCompressor
*/
class Momentum : public Compressor {
public:
// momentum should be cleared to zeros
Momentum(size_t size, DataType dtype, std::unique_ptr<Compressor> cptr,
float mu)
: Compressor(size, dtype),
_mom(new byte_t[size]()),
_mu(mu),
_cptr(std::move(cptr)){};
virtual ~Momentum() = default;

virtual tensor_t Compress(tensor_t grad) final;

virtual tensor_t Decompress(tensor_t compressed) final;

protected:
/*!
* \brief Update momentum
*
* e.g. m_t = \mu * m_{t-1} + g_t
*
* \param grad refers to gradient
*/
virtual void UpdateMom(tensor_t grad) = 0;

/*!
* \brief Update gradient with momentum
*
* e.g. g_t = \mu m_t + g_t
*
* \param grad refers to gradient which adds momentum in place.
*/
virtual void UpdateGradient(tensor_t grad) = 0;

protected:
/*! \brief buffer of momentum */
std::unique_ptr<byte_t[]> _mom;

/*! \brief momentum factor */
float _mu;

private:
/*! \brief compressor pointer */
std::unique_ptr<Compressor> _cptr;
};
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_MOMENTUM_H
250 changes: 250 additions & 0 deletions byteps/common/compressor/utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
// Copyright 2019 Amazon Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// =============================================================================

#ifndef BYTEPS_COMPRESSOR_UTILS_H
#define BYTEPS_COMPRESSOR_UTILS_H

#include <cmath>
#include <limits>
#include <memory>
#include <random>
#include <sstream>
#include <string>
#include <type_traits>

#include "common.h"

namespace byteps {
namespace common {
namespace compressor {

/*!
* \brief serialize key-vals hyper-params for network transmission
*
* \param kwargs hyper-params
* \return std::string serialized data
*/
inline std::string Serialize(const kwargs_t& kwargs) {
std::ostringstream os;
os << kwargs.size();
for (auto const& kwarg : kwargs) {
os << " " << kwarg.first << " " << kwarg.second;
}
return os.str();
}

/*!
* \brief deserialize serialized data into key-vals hyper-params
*
* \param content serialized data
* \return kwargs_t hyper-params
*/
inline kwargs_t Deserialize(const std::string& content) {
kwargs_t kwargs;
std::istringstream is(content);
size_t size = 0;
is >> size;
for (size_t i = 0; i < size; ++i) {
kwargs_t::key_type key;
kwargs_t::mapped_type val;
is >> key >> val;
kwargs[key] = val;
}

return kwargs;
}

/*!
* \brief random number generator based on xorshift128plus
*
* refer to https://en.wikipedia.org/wiki/Xorshift#xorshift+
*/
class XorShift128PlusBitShifterRNG {
public:
XorShift128PlusBitShifterRNG() {
std::random_device rd;
_state = {rd(), rd()};
}

// uniform int among [low, high)
uint64_t Randint(uint64_t low, uint64_t high) {
return xorshift128p() % (high - low) + low;
};

// uniform [0, 1]
double Rand() { return double(xorshift128p()) / MAX; }

// Bernoulli Distributation
bool Bernoulli(double p) { return xorshift128p() < uint64_t(p * MAX); }

void set_seed(uint64_t seed) { _state = {seed, seed}; }

private:
struct xorshift128p_state {
uint64_t a, b;
};

uint64_t xorshift128p() {
uint64_t t = _state.a;
uint64_t const s = _state.b;
_state.a = s;
t ^= t << 23; // a
t ^= t >> 17; // b
t ^= s ^ (s >> 26); // c
_state.b = t;
return t + s;
};

xorshift128p_state _state;

static constexpr uint64_t MAX = std::numeric_limits<uint64_t>::max();
};

/*!
* \brief Bit Writer
*
*/
template <typename T>
class BitWriter {
public:
explicit BitWriter(T* data)
: _dptr(data), _accum(0), _used_bits(0), _blocks(0) {}
void Put(bool x) {
_accum <<= 1;
_accum |= x;

if (++_used_bits == PACKING_SIZE) {
_dptr[_blocks++] = _accum;
_used_bits = 0;
}
}

void Flush() {
if (_used_bits > 0) {
size_t padding_size = PACKING_SIZE - _used_bits;
_accum <<= padding_size;
_dptr[_blocks] = _accum;
}
}

size_t bits() const { return _blocks * PACKING_SIZE + _used_bits; }
size_t blocks() const { return std::ceil((float)bits() / PACKING_SIZE); }

private:
static constexpr size_t PACKING_SIZE = sizeof(T) * 8;
T* _dptr; // allocated
T _accum;
size_t _used_bits;
size_t _blocks;
};

/*!
* \brief Bit Reader
*
*/
template <typename T>
class BitReader {
public:
explicit BitReader(const T* data) : _dptr(data), _used_bits(0), _blocks(0) {}
bool Get() {
if (_used_bits == 0) {
_accum = _dptr[_blocks++];
_used_bits = PACKING_SIZE;
}
return _accum & (1 << --_used_bits);
}

size_t bits() const { return _blocks * PACKING_SIZE - _used_bits; }

private:
static constexpr size_t PACKING_SIZE = sizeof(T) * 8;
const T* _dptr; // allocated
size_t _used_bits;
size_t _blocks;
T _accum;
};

inline uint32_t RoundNextPow2(uint32_t v) {
v -= 1;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v += 1;
return v;
}

template <typename T>
void EliasDeltaEncode(BitWriter<T>& bit_writer, unsigned long x) {
int len = 1 + std::floor(std::log2(x));
int lenth_of_len = std::floor(std::log2(len));

for (int i = lenth_of_len; i > 0; --i) bit_writer.Put(0);
for (int i = lenth_of_len; i >= 0; --i) bit_writer.Put((len >> i) & 1);
for (int i = len - 2; i >= 0; i--) bit_writer.Put((x >> i) & 1);
}

template <typename T>
unsigned long EliasDeltaDecode(BitReader<T>& bit_reader) {
unsigned long num = 1;
int len = 1;
int lenth_of_len = 0;
while (!bit_reader.Get()) lenth_of_len++;
for (int i = 0; i < lenth_of_len; i++) {
len <<= 1;
if (bit_reader.Get()) len |= 1;
}
for (int i = 0; i < len - 1; i++) {
num <<= 1;
if (bit_reader.Get()) num |= 1;
}
return num;
}

template <typename T, class F = std::function<bool(T)>>
T HyperParamFinder(const kwargs_t& kwargs, std::string name,
bool optional = false, F&& check = [](T) { return true; }) {
static_assert(std::is_fundamental<T>::value,
"custom type is not allow for HyperParamFinder");
T value{T()};
auto iter = kwargs.find(name);
if (iter == kwargs.end()) {
// necessary hp
if (optional == false) {
BPS_LOG(FATAL) << "Hyper-parameter '" << name
<< "' is not found! Aborted.";
}
return value;
} else {
std::istringstream ss(iter->second);
if (std::is_same<T, bool>::value) {
ss >> std::boolalpha >> value;
} else {
ss >> value;
}
if (!check(value)) {
BPS_LOG(FATAL) << "Hyper-parameter '" << name << "' should not be "
<< value << "! Aborted.";
}
}

return value;
}
} // namespace compressor
} // namespace common
} // namespace byteps

#endif // BYTEPS_COMPRESSOR_UTILS_H
147 changes: 121 additions & 26 deletions byteps/common/core_loops.cc
Original file line number Diff line number Diff line change
@@ -13,11 +13,14 @@
// limitations under the License.
// =============================================================================

#include "core_loops.h"
#include <cuda_runtime.h>

#include <chrono>
#include <memory>

#include "common.h"
#include "compressor/compressor.h"
#include "core_loops.h"
#include "global.h"
#include "logging.h"

@@ -63,25 +66,27 @@ void FinishOrProceed(std::shared_ptr<TensorTableEntry> task) {
}

if (task->context->profile_flag) {
BPS_CHECK(task->context->part_comm_time[task->key][this_op].back()->dur == 0)
<< " tensor: " << task->tensor_name
<< " task->key:" << task->key
<< " type:" << this_op
<< " 'dur' has already been assigned:" << task->context->part_comm_time[task->key][this_op].back()->dur;
BPS_CHECK(task->context->part_comm_time[task->key][this_op].back()->dur ==
0)
<< " tensor: " << task->tensor_name << " task->key:" << task->key
<< " type:" << this_op << " 'dur' has already been assigned:"
<< task->context->part_comm_time[task->key][this_op].back()->dur;
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto us = std::chrono::duration_cast<std::chrono::microseconds>(duration);
auto _ts = task->context->part_comm_time[task->key][this_op].back()->start_t;
BPS_CHECK(task->context->part_comm_time.find(task->key) != task->context->part_comm_time.end())
<< " tensor: " << task->tensor_name
<< " task->key:" << task->key
<< " type:" << this_op;
BPS_CHECK(task->context->part_comm_time[task->key].find(this_op) != task->context->part_comm_time[task->key].end())
<< " tensor: " << task->tensor_name
<< " task->key:" << task->key
<< " type:" << this_op;
auto _ts =
task->context->part_comm_time[task->key][this_op].back()->start_t;
BPS_CHECK(task->context->part_comm_time.find(task->key) !=
task->context->part_comm_time.end())
<< " tensor: " << task->tensor_name << " task->key:" << task->key
<< " type:" << this_op;
BPS_CHECK(task->context->part_comm_time[task->key].find(this_op) !=
task->context->part_comm_time[task->key].end())
<< " tensor: " << task->tensor_name << " task->key:" << task->key
<< " type:" << this_op;

task->context->part_comm_time[task->key][this_op].back()->dur = (long long)(us.count()) - _ts;
task->context->part_comm_time[task->key][this_op].back()->dur =
(long long)(us.count()) - _ts;
}

// finish current QueueType of this task, erase current QueueType.
@@ -97,19 +102,22 @@ void FinishOrProceed(std::shared_ptr<TensorTableEntry> task) {
BPS_CHECK(task->counter_ptr) << task->tensor_name << " counter_ptr is null";
int v = task->counter_ptr.get()->fetch_add(1);
if (v == (int)(task->total_partnum - 1)) {
// if meet this condition, that means all sub-tasks of this task have been done
// if meet this condition, that means all sub-tasks of this task have been
// done
BPS_CHECK(task->tensor_name != "");
BPS_LOG(TRACE) << "Rank=" << BytePSGlobal::GetRank()
<< " finish processing tensor: " << task->tensor_name;
task->callback(Status::OK());
//* Add for profiling communication events
if (task->context->profile_flag) {
BPS_CHECK(task->context->comm_time.back()->dur == 0)
<< " tensor: " << task->tensor_name
<< " 'dur' has already been assigned:" << task->context->comm_time.back()->dur;
<< " tensor: " << task->tensor_name
<< " 'dur' has already been assigned:"
<< task->context->comm_time.back()->dur;
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
auto us = std::chrono::duration_cast<std::chrono::microseconds>(duration);
auto us =
std::chrono::duration_cast<std::chrono::microseconds>(duration);
auto _ts = task->context->comm_time.back()->start_t;
task->context->comm_time.back()->dur = (long long)(us.count()) - _ts;
}
@@ -205,8 +213,7 @@ inline void PostNcclCalls(
nccl_root = BytePSGlobal::GetReduceRootByKey(key);
num_elem_per_gpu = 0;
left_elem = len / unit_len;
BPS_LOG(TRACE) << "Reduce key=" << key
<< " to root=" << nccl_root
BPS_LOG(TRACE) << "Reduce key=" << key << " to root=" << nccl_root
<< " rank=" << BytePSGlobal::GetLocalRank();
}

@@ -416,8 +423,7 @@ bool RunCopyDevice2HostLoopOnce() {

if (copy_len) {
CUDA_CALL(cudaMemcpyAsync(
(void *)(cpubuff + copy_offset),
(const void *)(p + copy_offset),
(void *)(cpubuff + copy_offset), (const void *)(p + copy_offset),
(size_t)copy_len, (cudaMemcpyKind)cudaMemcpyDeviceToHost,
(cudaStream_t)*copy_d2h_Stream));
CUDA_CALL(cudaStreamSynchronize(*copy_d2h_Stream));
@@ -483,6 +489,46 @@ bool RunPcieReduceLoopOnce() {
return true;
}

bool RunCompressLoopOnce() {
QueueType this_op = COMPRESS;
auto q = BytePSGlobal::GetScheduledQueue(this_op);
auto task = q->getTask();
if (task) {
BPS_CHECK(BytePSGlobal::IsRootDevice())
<< "only root device should enter COMPRESS loop";
BPS_CHECK(task->compressor != nullptr);
BPS_CHECK(task->compressed == nullptr);

// spawn
BytePSGlobal::GetThreadPool()->enqueue([task]() {
char *data = const_cast<char *>(static_cast<const char *>(task->cpubuff) +
task->offset);
int len = task->len;
int dtype = task->tensor->dtype();
compressor::tensor_t grad(data, len, dtype);
auto compressed = task->compressor->Compress(grad);
BPS_CHECK_LE(compressed.size, len)
<< "Compressor Implementation Error "
<< ", key=" << task->key << ", src_len=" << len
<< ", compressed_len=" << compressed.size;

task->compressed = std::make_shared<decltype(compressed)>(compressed);

// restore rt
auto &queue_list = task->queue_list;
BytePSGlobal::GetScheduledQueue(queue_list[1])
->reset(task->key, BytePSGlobal::GetLocalSize() - 1);

FinishOrProceed(task);
});

} else {
std::this_thread::sleep_for(std::chrono::nanoseconds(1000));
}

return true;
}

bool RunPushLoopOnce() {
QueueType this_op = PUSH;
auto q = BytePSGlobal::GetScheduledQueue(this_op);
@@ -503,6 +549,14 @@ bool RunPushLoopOnce() {
// get metadata
const int dtype = task->tensor->dtype();

// use compressed data/len
if (task->compressed) {
BPS_LOG(DEBUG) << "PUSH with gradient compression. key=" << task->key;
data = task->compressed->data;
len = task->compressed->size;
task->compressed = nullptr;
}

// false means not to delete data when SArray is deleted
ps::SArray<char> vals(data, len, false);

@@ -557,6 +611,36 @@ bool RunPullLoopOnce() {
return true;
}

bool RunDecompressLoopOnce() {
QueueType this_op = DECOMPRESS;
auto q = BytePSGlobal::GetScheduledQueue(this_op);
auto task = q->getTask();
if (task) {
BPS_CHECK(BytePSGlobal::IsRootDevice())
<< "only root device should enter DECOMPRESS loop";
BPS_CHECK(task->compressor != nullptr);

// spawn
BytePSGlobal::GetThreadPool()->enqueue([task]() {
char *data = const_cast<char *>(static_cast<const char *>(task->cpubuff) +
task->offset);
auto &pskv = BytePSGlobal::EncodeDefaultKey(task->key, 0);
auto len = pskv.lens[0];
int dtype = task->tensor->dtype();
compressor::tensor_t compressed(data, len, dtype);
auto decompressed = task->compressor->Decompress(compressed);
BPS_LOG(DEBUG) << "PULL with gradient compression. key=" << task->key;

FinishOrProceed(task);
});

} else {
std::this_thread::sleep_for(std::chrono::nanoseconds(1000));
}

return true;
}

void CopyHost2Device(std::shared_ptr<byteps::common::TensorTableEntry> task) {
auto copy_h2d_stream = BytePSGlobal::GetCopyHost2DeviceStream();
auto tensor = task->output;
@@ -594,8 +678,7 @@ void CopyHost2Device(std::shared_ptr<byteps::common::TensorTableEntry> task) {

if (copy_len) {
CUDA_CALL(cudaMemcpyAsync(
(void *)(gpu_addr + copy_offset),
(const void *)(cpubuff + copy_offset),
(void *)(gpu_addr + copy_offset), (const void *)(cpubuff + copy_offset),
(size_t)copy_len, (cudaMemcpyKind)cudaMemcpyHostToDevice,
(cudaStream_t)*copy_h2d_stream));
CUDA_CALL(cudaStreamSynchronize(*copy_h2d_stream));
@@ -719,6 +802,12 @@ void CopyDevice2HostLoop() {
BytePSGlobal::ReportThreadFinish();
}

void CompressLoop() {
while (RunCompressLoopOnce() && !BytePSGlobal::ShouldShutdown()) {
}
BytePSGlobal::ReportThreadFinish();
}

void PushLoop() {
while (RunPushLoopOnce() && !BytePSGlobal::ShouldShutdown()) {
}
@@ -731,6 +820,12 @@ void PullLoop() {
BytePSGlobal::ReportThreadFinish();
}

void DecompressLoop() {
while (RunDecompressLoopOnce() && !BytePSGlobal::ShouldShutdown()) {
}
BytePSGlobal::ReportThreadFinish();
}

void RootCopyHost2DeviceLoop() {
CUDA_CALL(cudaSetDevice(BytePSGlobal::GetLocalRank()));
while (RunRootCopyHost2DeviceLoopOnce() && !BytePSGlobal::ShouldShutdown()) {
4 changes: 4 additions & 0 deletions byteps/common/core_loops.h
Original file line number Diff line number Diff line change
@@ -35,10 +35,14 @@ void SyncNcclLoop();

void CopyDevice2HostLoop();

void CompressLoop();

void PushLoop();

void PullLoop();

void DecompressLoop();

void RootCopyHost2DeviceLoop();

void NonRootCopyListenLoop();
289 changes: 247 additions & 42 deletions byteps/common/cpu_reducer.cc

Large diffs are not rendered by default.

40 changes: 26 additions & 14 deletions byteps/common/cpu_reducer.h
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@
#include <immintrin.h>
#endif

#include <memory>
#include <cstring>
#include <memory>
#include "common.h"
#include "logging.h"

@@ -45,19 +45,22 @@ class CpuReducer {
BPS_LOG(DEBUG) << "Clear CpuReducer";
}

int sum(void* dst, void* src, size_t len, DataType dtype);
int sum(void* dst, void* src1, void* src2, size_t len, DataType dtype);
int copy(void* dst, void* src, size_t len);
int sum(void* dst, const void* src, size_t len, DataType dtype);
int sum(void* dst, const void* src1, const void* src2, size_t len,
DataType dtype);

int sum(void* dst, const void* src, size_t len, DataType dtype, float alpha);
int sum(void* dst, const void* src1, const void* src2, size_t len,
DataType dtype, float alpha);

int copy(void* dst, const void* src, size_t len);

#ifndef BYTEPS_BUILDING_SERVER
bool isRoot();
std::shared_ptr<BytePSComm> getComm() { return _comm; }
#endif


DataType GetDataType(int dtype) {
return static_cast<DataType>(dtype);
}
DataType GetDataType(int dtype) { return static_cast<DataType>(dtype); }

private:
#if __AVX__ && __F16C__
@@ -76,7 +79,7 @@ class CpuReducer {
}
#endif

inline void HalfBits2Float(unsigned short* src, float* res) {
inline void HalfBits2Float(const unsigned short* src, float* res) {
unsigned h = *src;
int sign = ((h >> 15) & 1);
int exp = ((h >> 10) & 0x1f);
@@ -112,7 +115,7 @@ class CpuReducer {
*res = *reinterpret_cast<float const*>(&f);
}

inline void Float2HalfBits(float* src, unsigned short* dest) {
inline void Float2HalfBits(const float* src, unsigned short* dest) {
// software implementation rounds toward nearest even
unsigned const& s = *reinterpret_cast<unsigned const*>(src);
uint16_t sign = uint16_t((s >> 16) & 0x8000);
@@ -175,13 +178,22 @@ class CpuReducer {
}

template <typename T>
int _sum(T* dst, T* src, size_t len);
int _sum(T* dst, const T* src, size_t len);
template <typename T>
int _sum(T* dst, const T* src1, const T* src2, size_t len);

int _sum_float16(void* dst, const void* src, size_t len);
int _sum_float16(void* dst, const void* src1, const void* src2, size_t len);

template <typename T>
int _sum(T* dst, const T* src, size_t len, float alpha);

template <typename T>
int _sum(T* dst, T* src1, T* src2, size_t len);
int _sum(T* dst, const T* src1, const T* src2, size_t len, float alpha);

int _sum_float16(void* dst, void* src, size_t len);
int _sum_float16(void* dst, void* src1, void* src2, size_t len);
int _sum_float16(void* dst, const void* src, size_t len, float alpha);
int _sum_float16(void* dst, const void* src1, const void* src2, size_t len,
float alpha);

float _convert_half_to_full_precision(uint16_t h);
uint16_t _convert_full_to_half_precision(float f);
222 changes: 142 additions & 80 deletions byteps/common/global.cc

Large diffs are not rendered by default.

22 changes: 18 additions & 4 deletions byteps/common/global.h
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@
#ifndef BYTEPS_GLOBAL_H
#define BYTEPS_GLOBAL_H

#include <unistd.h>

#include <map>
#include <memory>
#include <mutex>
@@ -24,7 +26,7 @@
#include <thread>
#include <unordered_map>
#include <vector>
#include <unistd.h>

#include "common.h"
#include "communicator.h"
#include "cpu_reducer.h"
@@ -34,6 +36,7 @@
#include "ready_table.h"
#include "scheduled_queue.h"
#include "shared_memory.h"
#include "thread_pool.h"

namespace byteps {
namespace common {
@@ -86,6 +89,8 @@ class BytePSGlobal {
static bool IsResuming() { return _is_resuming; }
static void SetResumingFlag(bool flag) {_is_resuming = flag; }

static void RegisterCompressor(const std::string& name,
std::unordered_map<std::string, std::string>& kwargs);
static ps::Key GetKeyFromName(const std::string& name);
static BPSContext& GetContextFromName(const std::string& name);
static uint32_t GetTensorCount();
@@ -96,6 +101,7 @@ class BytePSGlobal {
static PSKV& EncodeDefaultKey(uint64_t key, size_t len);

static uint32_t GetPartitionBound() { return _partition_bytes; }
static uint32_t GetMinCompressBound() { return _min_compress_bytes; }

static cudaStream_t* GetCopyDevice2HostStream();
static cudaStream_t* GetCopyHost2DeviceStream();
@@ -120,8 +126,9 @@ class BytePSGlobal {

static bool IsTensorSampled(uint64_t key) { return (key == _sample_key); }

static void SetProfileFlag(BPSContext *ctxt);
static void EmitTrace(std::ostream *os, const BPSCommTime *ret, BPSContext *ctxt);
static void SetProfileFlag(BPSContext* ctxt);
static void EmitTrace(std::ostream* os, const BPSCommTime* ret,
BPSContext* ctxt);
static void OutputTraces();
static bool IsAllTensorOutput(const std::string& name);
static void Who2beOutput(const std::string& name);
@@ -131,6 +138,8 @@ class BytePSGlobal {
static std::atomic_int joined_thread_cnt;
static int RoundUpToPageSize(int x) { return RoundUp(x, _pagesize); }

static std::shared_ptr<ThreadPool>& GetThreadPool() { return _thread_pool; }

private:
static std::mutex _init_mutex;
static volatile bool _initialized;
@@ -172,6 +181,7 @@ class BytePSGlobal {
static cudaStream_t* _copy_host2device_stream;

static uint32_t _partition_bytes;
static uint32_t _min_compress_bytes;

// (key, ready_signal_count) pair, only valid for root device
static ReadyTable* _reduce_table;
@@ -182,6 +192,8 @@ class BytePSGlobal {
// (key, ready_signal_count) pair, only valid for non-root device
static ReadyTable* _copy_table;

static std::shared_ptr<ThreadPool> _thread_pool;

// for reduce strategies
static bool _is_using_reduce;
static std::vector<int> _reduce_roots;
@@ -192,7 +204,9 @@ class BytePSGlobal {
// for debug sampling
static uint64_t _sample_key;

static int AlignTo(int input, int alignment) { return input / alignment * alignment; }
static int AlignTo(int input, int alignment) {
return input / alignment * alignment;
}

static int _pagesize;
static int DivUp(int x, int y) { return (x + y - 1) / y; }
80 changes: 69 additions & 11 deletions byteps/common/operations.cc
Original file line number Diff line number Diff line change
@@ -13,15 +13,20 @@
// limitations under the License.
// =============================================================================

#include "operations.h"
#include <cuda_runtime.h>
#include <unistd.h>

#include <cstring>
#include <memory>
#include <thread>
#include <unistd.h>

#include "compressor/compressor.h"
#include "compressor/compressor_registry.h"
#include "compressor/utils.h"
#include "core_loops.h"
#include "global.h"
#include "logging.h"
#include "operations.h"

namespace byteps {
namespace common {
@@ -43,6 +48,7 @@ void byteps_lazy_init() {
if (BytePSGlobal::IsDistributed()) {
if (BytePSGlobal::IsRootDevice()) {
func.push_back(PullLoop);
func.push_back(DecompressLoop);
}
}

@@ -58,6 +64,7 @@ void byteps_lazy_init() {
// PUSH can be a real push in distributed mode
// Or a dummy barrier in cross-pcie-switch mode
func.push_back(PushLoop);
func.push_back(CompressLoop);
func.push_back(RootCopyHost2DeviceLoop);
} else {
func.push_back(CoordinatePushLoop);
@@ -88,8 +95,10 @@ void byteps_shutdown() {

void byteps_resume(int num_workers, int num_servers) {
// set ps, worker numbers
BPS_LOG(DEBUG) << "Resume worker number: " << num_workers << "DMLC_NUM_WORKER: " << getenv("DMLC_NUM_WORKER");
BPS_LOG(DEBUG) << "Resume server number: " << num_workers << "DMLC_NUM_SERVER: " << getenv("DMLC_NUM_SERVER");
BPS_LOG(DEBUG) << "Resume worker number: " << num_workers
<< "DMLC_NUM_WORKER: " << getenv("DMLC_NUM_WORKER");
BPS_LOG(DEBUG) << "Resume server number: " << num_workers
<< "DMLC_NUM_SERVER: " << getenv("DMLC_NUM_SERVER");
BPS_LOG(DEBUG) << "Start resuming BytePS";

BytePSGlobal::SetResumingFlag(true);
@@ -152,6 +161,9 @@ void PartitionTensor(
e->len = ((size - accumulated) > bound) ? bound : (size - accumulated);
e->counter_ptr = entry->counter_ptr;
e->total_partnum = entry->total_partnum;
if (!entry->context->compressor_list.empty()) {
e->compressor = entry->context->compressor_list[i];
}

accumulated += e->len;
++i;
@@ -176,6 +188,14 @@ Status EnqueueTensor(BPSContext &context, std::shared_ptr<Tensor> input,
<< name << " output tensor size does not match";
}

// add queue
if (BytePSGlobal::IsRootDevice() && !context.compressor_list.empty()) {
auto it = std::find(queue_list->begin(), queue_list->end(), PUSH);
it = queue_list->insert(it, COMPRESS); // before PUSH
it = std::find(queue_list->begin(), queue_list->end(), PULL);
queue_list->insert(it + 1, DECOMPRESS); // after PULL
}

std::shared_ptr<TensorTableEntry> e(new TensorTableEntry);
e->tensor_name = name;
e->context = &context;
@@ -188,11 +208,14 @@ Status EnqueueTensor(BPSContext &context, std::shared_ptr<Tensor> input,
e->callback = callback;

if (device == CPU_DEVICE_ID) {
cudaError_t err = cudaHostRegister(const_cast<void*>(input->data()), input->size(), cudaHostRegisterMapped);
cudaError_t err = cudaHostRegister(const_cast<void *>(input->data()),
input->size(), cudaHostRegisterMapped);
if (err == cudaSuccess) {
BPS_LOG(DEBUG) << name << " cpu address has changed, so it is pinned again.";
BPS_LOG(DEBUG) << name
<< " cpu address has changed, so it is pinned again.";
}
CUDA_CALL(cudaHostGetDevicePointer(&(context.gpu_ptr), const_cast<void*>(input->data()), 0));
CUDA_CALL(cudaHostGetDevicePointer(&(context.gpu_ptr),
const_cast<void *>(input->data()), 0));
}

e->cpubuff = context.cpubuff;
@@ -302,7 +325,7 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) {
BPS_LOG(DEBUG) << name << " is already on cpu, len=" << size;
cudaError_t e = cudaHostRegister(cpubuff, size, cudaHostRegisterMapped);
if (e != cudaSuccess) {
BPS_LOG(INFO) << cudaGetErrorString(e)
BPS_LOG(INFO) << cudaGetErrorString(e)
<< " (You may ignore this if your program continues)";
}
CUDA_CALL(cudaHostGetDevicePointer(&(context.gpu_ptr), cpubuff, 0));
@@ -311,19 +334,27 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) {
// We always allocate our own cpu buffer
// use the first key in key_list as the index
auto shm_obj = BytePSGlobal::GetSharedMemoryObj();

size_t aligned_size = Align(size, dtype);
if (BytePSGlobal::IsCrossPcieSwitch()) {
context.pcie_cpubuff = shm_obj->openPcieSharedMemory(key_list[0], size);
context.pcie_cpubuff =
shm_obj->openPcieSharedMemory(key_list[0], aligned_size);
context.cpubuff = context.pcie_cpubuff.back();
} else {
context.cpubuff = shm_obj->openSharedMemory(std::string("BytePS_ShM_"),
key_list[0], size);
key_list[0], aligned_size);
}
BPS_LOG(TRACE) << name << ": open shared memory size " << size;
BPS_LOG(TRACE) << name << ": open shared memory size " << aligned_size;

// Init tensors with BytePS server
char *data = const_cast<char *>(static_cast<const char *>(context.cpubuff));
accumulated = 0;
size_t i = 0;
BPS_LOG(INFO) << "tensor size=" << size;
// small tensor does not need to be compressed
if (size < BytePSGlobal::GetMinCompressBound()) {
context.kwargs.clear();
}
while (accumulated < size) {
auto key = key_list[i];
int len = ((size - accumulated) > bound) ? bound : (size - accumulated);
@@ -338,6 +369,13 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) {
int cmd = GetCommandType(RequestType::kDefaultPushPull, dtype);
// blocking push, also as a global barrirer
ps->Wait(ps->ZPush(pskv.keys, vals, pskv.lens, cmd));

// register
if (!context.kwargs.empty()) {
auto compressor_ptr = compressor::CompressorRegistry::Create(
context.kwargs, Align(len, dtype), static_cast<DataType>(dtype));
context.compressor_list.push_back(std::move(compressor_ptr));
}
}

accumulated += len;
@@ -347,6 +385,21 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff) {
BPS_CHECK_EQ(accumulated, size);
BPS_CHECK_EQ(i, key_list.size());

// send to server
if (!context.kwargs.empty() && BytePSGlobal::IsDistributed() &&
BytePSGlobal::IsRootDevice()) {
auto ps = BytePSGlobal::GetOrInitPS();
auto content = compressor::Serialize(context.kwargs);
auto len = content.size();
auto data = const_cast<char *>(content.c_str());
for (auto key : key_list) {
auto &kv = BytePSGlobal::EncodeDefaultKey(key, len);
ps::SArray<char> vals(data, len, false);
int cmd = GetCommandType(RequestType::kCompressedPushPull, dtype);
ps->Wait(ps->ZPush(kv.keys, vals, kv.lens, cmd));
}
}

context.initialized = true;

BPS_LOG(TRACE) << "Finish Init " << name << ", size=" << size
@@ -361,6 +414,11 @@ bool IsTensorDeclared(const std::string &name) {
return BytePSGlobal::IsTensorDeclared(name);
}

void RegisterCompressor(const std::string &name,
std::unordered_map<std::string, std::string> &kwargs) {
return BytePSGlobal::RegisterCompressor(name, kwargs);
}

std::shared_ptr<std::vector<QueueType>> GetPushQueueList(int device) {
auto queue_list = std::make_shared<std::vector<QueueType>>();

3 changes: 3 additions & 0 deletions byteps/common/operations.h
Original file line number Diff line number Diff line change
@@ -72,6 +72,9 @@ void InitTensor(BPSContext &context, size_t size, int dtype, void *cpubuff);
// Only call these in Framework plugins for the best performance
bool IsTensorDeclared(const std::string &name);

void RegisterCompressor(const std::string &name,
std::unordered_map<std::string, std::string> &kwargs);

BPSContext &GetContextFromName(const std::string &name);

std::shared_ptr<std::vector<QueueType>> GetPushQueueList(int device);
6 changes: 6 additions & 0 deletions byteps/common/ready_table.cc
Original file line number Diff line number Diff line change
@@ -14,6 +14,7 @@
// =============================================================================

#include "ready_table.h"

#include "logging.h"

namespace byteps {
@@ -32,6 +33,11 @@ int ReadyTable::AddReadyCount(uint64_t key) {
return ++_ready_table[key];
}

int ReadyTable::SetReadyCount(uint64_t key, int cnt) {
std::lock_guard<std::mutex> lock(_table_mutex);
_ready_table[key] = cnt;
}

void ReadyTable::ClearReadyCount(uint64_t key) {
std::lock_guard<std::mutex> lock(_table_mutex);
_ready_table[key] = 0;
1 change: 1 addition & 0 deletions byteps/common/ready_table.h
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@ class ReadyTable {
// methods to access or modify the _ready_table
bool IsKeyReady(uint64_t key);
int AddReadyCount(uint64_t key);
int SetReadyCount(uint64_t key, int cnt);
void ClearReadyCount(uint64_t key);

private:
20 changes: 15 additions & 5 deletions byteps/common/scheduled_queue.cc
Original file line number Diff line number Diff line change
@@ -14,7 +14,9 @@
// =============================================================================

#include "scheduled_queue.h"

#include <algorithm>

#include "global.h"
#include "logging.h"

@@ -31,15 +33,16 @@ BytePSScheduledQueue::BytePSScheduledQueue(QueueType type) {
size_t credit_in_partition = BytePSGlobal::GetNccl()->GetGroupSize() + 1;

auto byteps_scheduling_credit = getenv("BYTEPS_SCHEDULING_CREDIT");
credit_in_partition = byteps_scheduling_credit ? atoi(byteps_scheduling_credit) : 0;
if (!credit_in_partition) { // disable scheduling by default
credit_in_partition =
byteps_scheduling_credit ? atoi(byteps_scheduling_credit) : 0;
if (!credit_in_partition) { // disable scheduling by default
_is_scheduled = false;
}

_qt = type;
_credits = _is_scheduled
? BytePSGlobal::GetPartitionBound() * credit_in_partition
: 34359738368; // 32GB, basically disabling credit control
? BytePSGlobal::GetPartitionBound() * credit_in_partition
: 34359738368; // 32GB, basically disabling credit control
_rt = nullptr;

switch (_qt) {
@@ -55,6 +58,7 @@ BytePSScheduledQueue::BytePSScheduledQueue(QueueType type) {
}
}
break;
case COMPRESS:
case PUSH:
if (BytePSGlobal::IsRootDevice()) {
_rt = BytePSGlobal::GetPushTable();
@@ -158,7 +162,6 @@ std::shared_ptr<TensorTableEntry> BytePSScheduledQueue::getTask() {
return nullptr;
}


std::shared_ptr<TensorTableEntry> BytePSScheduledQueue::getTask(uint64_t key) {
BPS_CHECK(!_is_scheduled);
std::lock_guard<std::mutex> lock(_mutex);
@@ -199,5 +202,12 @@ void BytePSScheduledQueue::reportFinish(int size) {
return;
}

void BytePSScheduledQueue::reset(uint64_t key, int cnt) {
std::lock_guard<std::mutex> lock(_mutex);
if(_rt) {
_rt->SetReadyCount(key, cnt);
}
}

} // namespace common
} // namespace byteps
1 change: 1 addition & 0 deletions byteps/common/scheduled_queue.h
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ class BytePSScheduledQueue {
std::shared_ptr<TensorTableEntry> getTask(uint64_t key);
uint32_t pendingSize();
void reportFinish(int size);
void reset(uint64_t key, int cnt);

private:
// TODO: use priority queue or heap
77 changes: 77 additions & 0 deletions byteps/common/thread_pool.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copy From https://github.com/progschj/ThreadPool/blob/master/ThreadPool.h
*/
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <thread>
#include <vector>

class ThreadPool {
public:
ThreadPool(size_t);
template <class F>
void enqueue(F&& f);
~ThreadPool();

private:
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<std::function<void()> > tasks;

// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};

// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
for (size_t i = 0; i < threads; ++i)
workers.emplace_back([this] {
for (;;) {
std::function<void()> task;

{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) return;
task = std::move(this->tasks.front());
this->tasks.pop();
}

task();
}
});
}

// add new work item to the pool
template <class F>
void ThreadPool::enqueue(F&& f) {
{
std::lock_guard<std::mutex> lock(queue_mutex);
if (stop) throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
// the destructor joins all threads
inline ThreadPool::~ThreadPool() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) worker.join();
}

#endif
136 changes: 119 additions & 17 deletions byteps/mxnet/__init__.py
Original file line number Diff line number Diff line change
@@ -14,28 +14,32 @@
# limitations under the License.
# ==============================================================================

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import, division, print_function

import copy
import os
import struct
import warnings

import mxnet as mx
import os
import mxnet.ndarray as nd

from byteps.mxnet.ops import byteps_push_pull, byteps_declare_tensor
from byteps.mxnet.ops import init, shutdown, suspend, resume
from byteps.mxnet.ops import size, local_size, rank, local_rank
from byteps.mxnet.compression import Compression
from byteps.mxnet.ops import (byteps_declare_tensor, byteps_push_pull, init,
local_rank, local_size, rank, resume, shutdown,
size, suspend)

parameter_index = 0


class DistributedOptimizer(mx.optimizer.Optimizer):
"""This is where BytePS's DistributedOptimizer wrapper for MXNet goes"""

def __init__(self, optimizer):
self._optimizer = optimizer
self._enable_async = (int(os.getenv('BYTEPS_ENABLE_ASYNC', 0)) != 0)
if self._enable_async:
assert int(os.getenv('DMLC_NUM_WORKER'))>1, \
assert int(os.getenv('DMLC_NUM_WORKER')) > 1, \
"Async is only valid for distributed training"
print('BytePS: enable asynchronous training')

@@ -178,10 +182,17 @@ class DistributedTrainer(mx.gluon.Trainer):
Key-word arguments to be passed to optimizer constructor. For example,
`{'learning_rate': 0.1}`. All optimizers accept learning_rate, wd (weight decay),
clip_gradient, and lr_scheduler. See each optimizer's
constructor for a list of additional supported arguments.
constructor for a list of additional supported arguments
root_rank : int
rank of root
compression_params : dict
Key-word arguments to be passed to gradient compression constructor. For example,
`{'compressor': 'onebit', 'ef': 'vanilla', 'momentum': 'nesterov', 'scaling': true}`.
All compressor accept 'compressor', 'ef'. See each compressor's constructor for a list
of additional supported arguments
"""

def __init__(self, params, optimizer, optimizer_params=None, root_rank=0):
def __init__(self, params, optimizer, optimizer_params=None, root_rank=0, compression_params=None):
if isinstance(optimizer, DistributedOptimizer):
optimizer = optimizer._optimizer
warnings.warn("DistributedTrainer does not take DistributedOptimizer "
@@ -192,25 +203,116 @@ def __init__(self, params, optimizer, optimizer_params=None, root_rank=0):
for key in sorted(list(params.keys())):
param_list.append(params[key])

self._intra_compressor = self._register_compressor(
params, optimizer_params, compression_params)

super(DistributedTrainer, self).__init__(
param_list, optimizer, optimizer_params=optimizer_params, kvstore=None)

# _scale is used to check and set rescale_grad for optimizer in Trainer.step()
# function. Normalizing it by BytePS size, which is equivalent to performing
# average in push_pull, has better performance.
self._scale /= size()
if local_rank() == 0:
self._f = open("lr.s", "wb")
self._f.truncate(8)

self._bps_size = size()
self.root_rank = root_rank
self._intra_compressors = {}
for i, param in enumerate(self._params):
byteps_declare_tensor("parameter_" + str(i))
if param.grad_req != 'null':
byteps_declare_tensor("gradient_" + str(i))

self._intra_compressors[i] = type(self._intra_compressor)(
**self._intra_compressor.__dict__)
byteps_params = dict(
filter(lambda attr: attr[0].startswith(
"byteps_",), param.__dict__.items())
)
byteps_declare_tensor("gradient_" + str(i), **byteps_params)

def _register_compressor(self, params, optimizer_params, compression_params):
"""Register compressor for BytePS
params : mx.gluon.ParameterDict
optimizer_params : dict
compression_params : dict
"""
intra_compressor = Compression.none
if not compression_params:
return intra_compressor

if "fp16" in compression_params:
intra_compressor = Compression.fp16

if "compressor" not in compression_params:
warnings.warn("Compressor is not defined")
return intra_compressor

check_list = ["compressor", "ef", "momentum"]

for _, param in params.items():
# generic
for item in check_list:
if compression_params.get(item):
if isinstance(compression_params[item], str):
setattr(param, "byteps_%s_type" %
item, compression_params[item])
else:
raise TypeError("%s should be str" % item)

# need parameter
compressor = compression_params["compressor"]
if compressor == "onebit":
setattr(param, "byteps_compressor_onebit_scaling", str(
compression_params.get("scaling", False)))
elif compressor == "topk" or compressor == "randomk" or compressor == "dithering":
# raise KeyError if 'k' is not found
setattr(param, "byteps_compressor_k",
compression_params["k"])

if compression_params.get("momentum"):
setattr(param, "byteps_momentum_mu",
optimizer_params["momentum"])

if compression_params.get("seed", None) is not None:
setattr(param, "byteps_seed",
compression_params["seed"])

# the following code will delete some items in `optimizer_params`
# to avoid duplication
if compression_params.get("momentum"):
# 1bit compressor use an additional momentum for weight decay
if compressor == "onebit" and "wd" in optimizer_params:
intra_compressor = Compression.wdmom(
intra_compressor, optimizer_params["momentum"], optimizer_params["wd"])
del optimizer_params["wd"]

del optimizer_params['momentum']

return intra_compressor

def step(self, batch_size, ignore_stale_grad=False):
# grad is normalized with batch_size. setting _scale to batch_size is
# to prevent normalized by batch_size twice.
self._scale = batch_size
super(DistributedTrainer, self).step(batch_size, ignore_stale_grad)

def _allreduce_grads(self):
# update lr
if local_rank() == 0:
self._f.seek(0)
ba = struct.pack("d", self.learning_rate)
self._f.write(ba)
self._f.flush()

for i, param in enumerate(self._params):
if param.grad_req != 'null':
byteps_push_pull(param.list_grad()[0], is_average=False,
# normalized with batch_size and num_workers
nd._internal._mul_scalar(
param._grad[0], 1.0 / self._scale / self._bps_size, out=param._grad[0])
compressed, ctx = self._intra_compressors[i].compress(
param._grad[0])
byteps_push_pull(compressed, is_average=False,
name="gradient_" + str(i), priority=-i)
param._grad[0] = self._intra_compressors[i].decompress(
compressed, ctx, x=param._data[0])

def _init_params(self):
tensors = []
1 change: 1 addition & 0 deletions byteps/mxnet/adapter.cc
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
namespace byteps {
namespace mxnet {


template <class T>
MXTensor<T>::MXTensor(T* tensor) : tensor_(tensor) {}

Loading