Skip to content

Commit

Permalink
tests: improve benchmark (#6)
Browse files Browse the repository at this point in the history
* enable multi-to-multi benchmark

* add realtime throughput
  • Loading branch information
ymjiang authored Oct 24, 2019
1 parent ed58ff2 commit cb92990
Showing 1 changed file with 81 additions and 19 deletions.
100 changes: 81 additions & 19 deletions tests/test_kv_app_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,49 @@ void StartServer() {
RegisterExitCallback([server]() { delete server; });
}

struct PSKV {
SArray<ps::Key> keys; // n keys
SArray<int> lens; // the length of the i-th value
};
std::unordered_map<uint64_t, PSKV> ps_kv_;

void RunWorker(int argc, char *argv[]) {
if (!IsWorker()) return;
CHECK_EQ(argc, 4) << "input argument should be: [SCRIPT, LEN, REPEAT, MODE]";
CHECK_GE(argc, 3) << "input argument should be at least 3: SCRIPT, LEN, REPEAT, (OPTIONAL) MODE";
KVWorker<float> kv(0, 0);
auto krs = ps::Postoffice::Get()->GetServerKeyRanges();

const int num_servers = krs.size();
LOG(INFO) << num_servers << " servers in total";
CHECK_GT(num_servers, 0);

// init
int len = atoi(argv[1]);
int repeat = atoi(argv[2]);
MODE mode = static_cast<MODE>(atoi(argv[3]));
MODE mode = (argc > 3) ? static_cast<MODE>(atoi(argv[3])) : PUSH_PULL_MIX_ENDLESS;

std::vector<float> vec(len);
SArray<Key> keys;
keys.push_back(0);
SArray<int> lens;
lens.push_back(len);
SArray<float> vals(vec);
std::vector<SArray<float> > server_vals;
for (int server = 0; server < num_servers; server++) {
std::vector<float> vec(len);
SArray<float> vals(vec);
server_vals.push_back(vals);
}

// init push, to register memory, better not count this into time cost
kv.Wait(kv.ZPush(keys, vals, lens));
// init push, do not count this into time cos
for (int server = 0; server < num_servers; server++) {
int key = server; // could be other value
auto vals = server_vals[server];
PSKV& pskv = ps_kv_[key];
SArray<Key> keys;
ps::Key ps_key = krs[server].begin() + key;
keys.push_back(ps_key);
SArray<int> lens;
lens.push_back(len);
pskv.keys.push_back(ps_key);
pskv.lens.push_back(len);

kv.Wait(kv.ZPush(keys, vals, lens));
}

switch(mode) {
case PUSH_THEN_PULL: {
Expand All @@ -70,26 +94,42 @@ void RunWorker(int argc, char *argv[]) {
uint64_t accumulated_ms = 0;
for (int i = 0; i < repeat; ++i) {
auto start = std::chrono::high_resolution_clock::now();
kv.Wait(kv.ZPush(keys, vals, lens));
for (int server = 0; server < num_servers; server++) {
int key = server;
PSKV& pskv = ps_kv_[key];
auto keys = pskv.keys;
auto lens = pskv.lens;
auto vals = server_vals[server];

kv.Wait(kv.ZPush(keys, vals, lens));
}
auto end = std::chrono::high_resolution_clock::now();
accumulated_ms += (end - start).count(); // ns
}
LL << "push_byte=" << len * sizeof(float)
<< ", repeat=" << repeat
LL << "push " << len * sizeof(float)
<< " bytes to each server, repeat=" << repeat
<< ", total_time="
<< accumulated_ms / 1e6 << "ms";

// pull
accumulated_ms = 0;
for (int i = 0; i < repeat; ++i) {
auto start = std::chrono::high_resolution_clock::now();
kv.Wait(kv.ZPull(keys, &vals, &lens));
for (int server = 0; server < num_servers; server++) {
int key = server;
PSKV& pskv = ps_kv_[key];
auto keys = pskv.keys;
auto lens = pskv.lens;
auto vals = server_vals[server];

kv.Wait(kv.ZPull(keys, &vals, &lens));
}
auto end = std::chrono::high_resolution_clock::now();
accumulated_ms += (end - start).count(); // ns
}

LL << "pull_byte=" << len * sizeof(float)
<< ", repeat=" << repeat
LL << "pull " << len * sizeof(float)
<< " bytes to each server, repeat=" << repeat
<< ", total_time="
<< accumulated_ms / 1e6 << "ms";
}
Expand All @@ -98,14 +138,36 @@ void RunWorker(int argc, char *argv[]) {
case PUSH_PULL_MIX_ENDLESS: {
LOG(INFO) << "PUSH_PULL_MIX_ENDLESS mode, should exit by Ctrl+C";
std::vector<int> timestamp_list;
auto start = std::chrono::high_resolution_clock::now();
auto end = std::chrono::high_resolution_clock::now();
auto val = Environment::Get()->find("THRESHOLD");
unsigned int threshold = val ? atoi(val) : 10;
int cnt = 0;
while (1) {
timestamp_list.push_back(kv.ZPush(keys, vals, lens));
timestamp_list.push_back(kv.ZPull(keys, &vals, &lens));
if (timestamp_list.size()==20) { // flow control
for (int server = 0; server < num_servers; server++) {
int key = server;
PSKV& pskv = ps_kv_[key];
auto keys = pskv.keys;
auto lens = pskv.lens;
auto vals = server_vals[server];

timestamp_list.push_back(kv.ZPush(keys, vals, lens));
timestamp_list.push_back(kv.ZPull(keys, &vals, &lens));
}
if (timestamp_list.size()/2/num_servers >= threshold) { // flow control
for (auto& ts : timestamp_list) {
kv.Wait(ts);
}
timestamp_list.clear();
cnt++;
if (cnt % 100 == 0) {
end = std::chrono::high_resolution_clock::now();
LL << "Benchmark throughput: "
<< 8.0 * len * sizeof(float) * num_servers * cnt * threshold / (end - start).count()
<< " Gbps";
cnt = 0;
start = std::chrono::high_resolution_clock::now();
}
}
}
}
Expand Down

0 comments on commit cb92990

Please sign in to comment.