Skip to content

Commit

Permalink
add testcase for mixed mode (#20)
Browse files Browse the repository at this point in the history
* add testcase for mixed mode

* add server load

* fix log
  • Loading branch information
ymjiang authored Jan 28, 2020
1 parent 11ba01b commit dd8eb3d
Showing 1 changed file with 35 additions and 2 deletions.
37 changes: 35 additions & 2 deletions tests/test_ipc_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,30 @@ void push_pull(KVWorker<char> &kv,
}
}

int AllocateServer(int seed, int total_key_num) {
bool mixed_mode = Environment::Get()->find("BYTEPS_ENABLE_MIXED_MODE")
? atoi(Environment::Get()->find("BYTEPS_ENABLE_MIXED_MODE"))
: false;
const int num_server_total = ps::Postoffice::Get()->GetServerKeyRanges().size();
const int num_worker_total = ps::Postoffice::Get()->num_workers();
auto num_server_noncolocate = num_server_total - num_worker_total;
auto num_server_colocate = num_worker_total;

if (!mixed_mode) {
return seed % num_server_total;
}

// below we assume the seed increases from 0 with step size 1
auto ratio = (2.0 * num_server_noncolocate * (num_worker_total - 1)) /
((num_worker_total) * (num_worker_total + num_server_noncolocate) - 2 * num_server_noncolocate);
auto threshold = ratio * total_key_num;
if (seed < threshold) {
return seed % num_server_noncolocate;
} else {
return num_server_noncolocate + (seed % num_server_colocate);
}
}

void RunWorker(int argc, char *argv[]) {
if (!IsWorker()) return;
KVWorker<char> kv(0, 0);
Expand All @@ -162,7 +186,7 @@ void RunWorker(int argc, char *argv[]) {
<< ", try reduce tensor size or increase BYTEPS_PARTITION_BYTES";

auto v = Environment::Get()->find("NUM_KEY_PER_SERVER");
const int how_many_key_per_server = v ? atoi(v) : 20;
const int how_many_key_per_server = v ? atoi(v) : 10;
const int total_key_num = num_servers * how_many_key_per_server;

std::vector<SArray<char> > server_vals;
Expand All @@ -176,9 +200,12 @@ void RunWorker(int argc, char *argv[]) {
server_vals.push_back(vals);
}

std::unordered_map<int, size_t> accumulated_key_num;
// init push, do not count this into time cost
for (int i = 0; i < total_key_num; i++) {
int server = i % num_servers;
int server = AllocateServer(i, total_key_num);
accumulated_key_num[server] += 1;

auto vals = server_vals[i];

auto key = EncodeKey(i);
Expand Down Expand Up @@ -206,6 +233,12 @@ void RunWorker(int argc, char *argv[]) {
kv.Wait(kv.ZPush(keys, vals, lens));
}

for (int i = 0; i < num_servers; ++i) {
PS_VLOG(1) << "server-" << i
<< " load is " << (100.0 * accumulated_key_num[i] / total_key_num)
<< "%";
}

push_pull(kv, server_keys, server_vals, server_lens, len, num_servers, total_key_num, how_many_key_per_server, mode);
}

Expand Down

0 comments on commit dd8eb3d

Please sign in to comment.