From cb92990c1aba8306ee474d19f373926b156b1cc2 Mon Sep 17 00:00:00 2001 From: Yimin Jiang Date: Thu, 24 Oct 2019 14:36:21 +0800 Subject: [PATCH] tests: improve benchmark (#6) * enable multi-to-multi benchmark * add realtime throughput --- tests/test_kv_app_benchmark.cc | 100 ++++++++++++++++++++++++++------- 1 file changed, 81 insertions(+), 19 deletions(-) diff --git a/tests/test_kv_app_benchmark.cc b/tests/test_kv_app_benchmark.cc index 5cdbc115d..fad3ffa1d 100644 --- a/tests/test_kv_app_benchmark.cc +++ b/tests/test_kv_app_benchmark.cc @@ -43,25 +43,49 @@ void StartServer() { RegisterExitCallback([server]() { delete server; }); } +struct PSKV { + SArray keys; // n keys + SArray lens; // the length of the i-th value +}; +std::unordered_map ps_kv_; + void RunWorker(int argc, char *argv[]) { if (!IsWorker()) return; - CHECK_EQ(argc, 4) << "input argument should be: [SCRIPT, LEN, REPEAT, MODE]"; + CHECK_GE(argc, 3) << "input argument should be at least 3: SCRIPT, LEN, REPEAT, (OPTIONAL) MODE"; KVWorker kv(0, 0); + auto krs = ps::Postoffice::Get()->GetServerKeyRanges(); + + const int num_servers = krs.size(); + LOG(INFO) << num_servers << " servers in total"; + CHECK_GT(num_servers, 0); // init int len = atoi(argv[1]); int repeat = atoi(argv[2]); - MODE mode = static_cast(atoi(argv[3])); + MODE mode = (argc > 3) ? static_cast(atoi(argv[3])) : PUSH_PULL_MIX_ENDLESS; - std::vector vec(len); - SArray keys; - keys.push_back(0); - SArray lens; - lens.push_back(len); - SArray vals(vec); + std::vector > server_vals; + for (int server = 0; server < num_servers; server++) { + std::vector vec(len); + SArray vals(vec); + server_vals.push_back(vals); + } - // init push, to register memory, better not count this into time cost - kv.Wait(kv.ZPush(keys, vals, lens)); + // init push, do not count this into time cos + for (int server = 0; server < num_servers; server++) { + int key = server; // could be other value + auto vals = server_vals[server]; + PSKV& pskv = ps_kv_[key]; + SArray keys; + ps::Key ps_key = krs[server].begin() + key; + keys.push_back(ps_key); + SArray lens; + lens.push_back(len); + pskv.keys.push_back(ps_key); + pskv.lens.push_back(len); + + kv.Wait(kv.ZPush(keys, vals, lens)); + } switch(mode) { case PUSH_THEN_PULL: { @@ -70,12 +94,20 @@ void RunWorker(int argc, char *argv[]) { uint64_t accumulated_ms = 0; for (int i = 0; i < repeat; ++i) { auto start = std::chrono::high_resolution_clock::now(); - kv.Wait(kv.ZPush(keys, vals, lens)); + for (int server = 0; server < num_servers; server++) { + int key = server; + PSKV& pskv = ps_kv_[key]; + auto keys = pskv.keys; + auto lens = pskv.lens; + auto vals = server_vals[server]; + + kv.Wait(kv.ZPush(keys, vals, lens)); + } auto end = std::chrono::high_resolution_clock::now(); accumulated_ms += (end - start).count(); // ns } - LL << "push_byte=" << len * sizeof(float) - << ", repeat=" << repeat + LL << "push " << len * sizeof(float) + << " bytes to each server, repeat=" << repeat << ", total_time=" << accumulated_ms / 1e6 << "ms"; @@ -83,13 +115,21 @@ void RunWorker(int argc, char *argv[]) { accumulated_ms = 0; for (int i = 0; i < repeat; ++i) { auto start = std::chrono::high_resolution_clock::now(); - kv.Wait(kv.ZPull(keys, &vals, &lens)); + for (int server = 0; server < num_servers; server++) { + int key = server; + PSKV& pskv = ps_kv_[key]; + auto keys = pskv.keys; + auto lens = pskv.lens; + auto vals = server_vals[server]; + + kv.Wait(kv.ZPull(keys, &vals, &lens)); + } auto end = std::chrono::high_resolution_clock::now(); accumulated_ms += (end - start).count(); // ns } - LL << "pull_byte=" << len * sizeof(float) - << ", repeat=" << repeat + LL << "pull " << len * sizeof(float) + << " bytes to each server, repeat=" << repeat << ", total_time=" << accumulated_ms / 1e6 << "ms"; } @@ -98,14 +138,36 @@ void RunWorker(int argc, char *argv[]) { case PUSH_PULL_MIX_ENDLESS: { LOG(INFO) << "PUSH_PULL_MIX_ENDLESS mode, should exit by Ctrl+C"; std::vector timestamp_list; + auto start = std::chrono::high_resolution_clock::now(); + auto end = std::chrono::high_resolution_clock::now(); + auto val = Environment::Get()->find("THRESHOLD"); + unsigned int threshold = val ? atoi(val) : 10; + int cnt = 0; while (1) { - timestamp_list.push_back(kv.ZPush(keys, vals, lens)); - timestamp_list.push_back(kv.ZPull(keys, &vals, &lens)); - if (timestamp_list.size()==20) { // flow control + for (int server = 0; server < num_servers; server++) { + int key = server; + PSKV& pskv = ps_kv_[key]; + auto keys = pskv.keys; + auto lens = pskv.lens; + auto vals = server_vals[server]; + + timestamp_list.push_back(kv.ZPush(keys, vals, lens)); + timestamp_list.push_back(kv.ZPull(keys, &vals, &lens)); + } + if (timestamp_list.size()/2/num_servers >= threshold) { // flow control for (auto& ts : timestamp_list) { kv.Wait(ts); } timestamp_list.clear(); + cnt++; + if (cnt % 100 == 0) { + end = std::chrono::high_resolution_clock::now(); + LL << "Benchmark throughput: " + << 8.0 * len * sizeof(float) * num_servers * cnt * threshold / (end - start).count() + << " Gbps"; + cnt = 0; + start = std::chrono::high_resolution_clock::now(); + } } } }