Skip to content

Commit

Permalink
dual bin packing scheduler now working. can specify timelimit in comm…
Browse files Browse the repository at this point in the history
…and line.
  • Loading branch information
mikeagun committed Nov 1, 2013
1 parent 6f67068 commit 5169b56
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 808 deletions.
2 changes: 1 addition & 1 deletion simulation/batchsim.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ int main (int argc, char *argv[]) {

//get total size/count of the vms
int total_vms = 0;
int total_vm_size = 0;
double total_vm_size = 0;
for(vector<vector<double> >::const_iterator machine = machinelist.begin(); machine != machinelist.end(); ++machine) {
for(vector<double>::const_iterator vm = (*machine).begin(); vm != (*machine).end(); ++vm) {
total_vms++;
Expand Down
156 changes: 125 additions & 31 deletions src/RoundScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//
//These globals define the actual parameters used in modeling
//double dirty_ratio = DEFAULT_DIRTY_RATIO; //block dirty ratio not used
double time_limit = DEFAULT_TIME_LIMIT;
//double time_limit = DEFAULT_TIME_LIMIT;
double segment_dirty_ratio = DEFAULT_SEGMENT_DIRTY_RATIO;
double fp_ratio = DEFAULT_FP_RATIO;
double dupnew_ratio = DEFAULT_DUPNEW_RATIO; //dup-new ratio, as a fration of r
Expand Down Expand Up @@ -41,6 +41,25 @@ string format_time(double seconds) {
return ss.str();
}

void print_settings(double time_limit) {
stringstream ss;
ss << "Current Settings:" << endl <<
//" dirty ratio: " << dirty_ratio << endl <<
" segment dirty ratio: " << segment_dirty_ratio << endl <<
" fp ratio: " << fp_ratio << endl <<
" dupnew ratio: " << dupnew_ratio << endl <<
" network latency: " << net_latency << " seconds" << endl <<
" disk read bandwidth: " << read_bandwidth << " B/s" << endl <<
" disk write bandwidth: " << disk_write_bandwidth << " B/s" << endl <<
//" backend write bandwidth: " << backend_write_bandwidth << " MB/s" << endl <<
" fp lookup time: " << lookup_time << " seconds" << endl <<
" network memory usage: " << network_memory << " B" << endl <<
" machine index size: " << n << " entries" << endl <<
" time limit: " << format_time(time_limit) << endl <<
" c=" << c << "; u=" << u << "; e=" << e << endl;
cerr << ss.str();
}

double model_time(double max_size, double total_size, int p, bool verbose) {
double time_cost = 0;
double b_r = read_bandwidth;
Expand All @@ -53,16 +72,15 @@ double model_time(double max_size, double total_size, int p, bool verbose) {
double r2_max = r_max * (1-fp_ratio); //number of new blocks at most heavily loaded machine; Not a real variable, but used often
//cout << "r=" << r << "; r_max=" << r_max << endl;
double t;
stringstream ss;

if (verbose) {
cout << "max size: " << max_size << endl <<
"total size to backup: " << total_size << endl <<
"size unit: " << SIZE_UNIT <<
ss << "max size: " << max_size << endl <<
"total size to backup: " << total_size << "; size unit: " << SIZE_UNIT << endl <<
"p: " << p << endl <<
"b_r: " << b_r << endl <<
"b_w" << b_w << endl <<
"b_r: " << b_r << "; b_w: " << b_w << endl <<
"r: " << r << "; r_max: " << r_max << endl <<
"c: " << c << "; r_max: " << r_max << endl <<
"c: " << c << endl <<
"net latency: " << net_latency << endl <<
"u: " << u << endl;
}
Expand All @@ -72,7 +90,7 @@ double model_time(double max_size, double total_size, int p, bool verbose) {
t += net_latency * (u * r_max / m_n); //transfer dirty data
t += u * r / b_w; //save requests to disk
if (verbose) {
cout << " Stage 1a: " << format_time(t) << endl;
ss << " Stage 1a: " << format_time(t) << endl;
}
time_cost += t;
//Stage 1b - handle dedup requests
Expand All @@ -81,31 +99,31 @@ double model_time(double max_size, double total_size, int p, bool verbose) {
t += r * lookup_time; //perform lookups
t += r * e / b_w; //write out results of each lookup to one of 3 files per partition
if (verbose) {
cout << " Stage 1b: " << format_time(t) << endl;
ss << " Stage 1b: " << format_time(t) << endl;
}
time_cost += t;
//Stage 2a - exchange new block results
t = r * (1-fp_ratio) * e / b_r; //read new results from disk
t += net_latency * (e * r2_max / m_n); //exchange new blocks
t += r2_max * e / b_w; //save new block results to disk
if (verbose) {
cout << " Stage 2a: " << format_time(t) << endl;
ss << " Stage 2a: " << format_time(t) << endl;
}
time_cost += t;
//Stage 2b - save new blocks
t = r2_max * e / b_r; //read new results from disk
t += r_max * c / b_r; //re-read dirty blocks
//t += r2_max * c / b_b; //write new blocks to the backend - also update the lookup results
if (verbose) {
cout << " Stage 2b: " << format_time(t) << endl;
ss << " Stage 2b: " << format_time(t) << endl;
}
time_cost += t;
//Stage 3a - exchange new block references
t = r2_max * e / b_r; //re-read new block results from disk (now with references)
t += net_latency * (e * r2_max / m_n); //exchange new block references back to parition holders
t += r * (1-fp_ratio) * e / b_w; //save new block references at partition holder
if (verbose) {
cout << " Stage 3a: " << format_time(t) << endl;
ss << " Stage 3a: " << format_time(t) << endl;
}
time_cost += t;
//Stage 3b - update index with new blocks and update dupnew results with references
Expand All @@ -115,18 +133,20 @@ double model_time(double max_size, double total_size, int p, bool verbose) {
t += r * dupnew_ratio * e / b_w; //add updated dup-new results to the list of dup results
t += r * (1-fp_ratio) * e / b_w; //add new blocks to local partition index
if (verbose) {
cout << " Stage 3b: " << format_time(t) << endl;
ss << " Stage 3b: " << format_time(t) << endl;
}
time_cost += t;
//Stage 4a - exchange dup (including dupnew) references
t = r * fp_ratio * e / b_r; //read dup results from disk (including dupnew)
t += net_latency * (e * r_max * fp_ratio / m_n); //exchange references to dup blocks
t += r_max * fp_ratio * e / b_w; //save dup references to disk
if (verbose) {
cout << " Stage 4a: " << format_time(t) << endl;
ss << " Stage 4a: " << format_time(t) << endl;
}
time_cost += t;
//Stage 4b - sort and save recipes back to storage service (we ignore this for now)
if (verbose)
cout << ss.str();

return time_cost;

Expand Down Expand Up @@ -456,19 +476,19 @@ vector<vector<vector<int> > >::iterator pick_min_newtime_round(vector<vector<vec
for(vector<vector<vector<int> > >::iterator round = round_schedules.begin();
round != round_schedules.end(); ++round) {
vector<map<int,double> > loads;
//cout << "Round Load:";
for(int i = 0; i < (*round).size(); i++) {
//cout << " { ";
map<int,double> machine_load;
for(int j = 0; j < (*round)[i].size(); j++) {
//stringstream ss;
//ss << " indexing with " << i << " and " << ((*round)[i][j]) << endl;
//cerr << ss.str();
//int vid = ((*round)[i][j]);
//machine_load[vid] = machines[i][vid];
map<int,double>::const_iterator it = machines[i].find((*round)[i][j]);
//cout << it->first << "->" << it->second << " ";
machine_load[(*round)[i][j]] = it->second;
}
//cout << "}";
loads.push_back(machine_load);
}
//cout << endl;

double time = model_new_time(loads, mid, vm->second);
if (best_round == round_schedules.end() || time < best_time) {
Expand Down Expand Up @@ -645,9 +665,47 @@ const char * DBPScheduler::getName() {


void print_loads(vector<map<int,double> > machines) {
stringstream ss;
for(vector<map<int,double> >::iterator i = machines.begin(); i != machines.end(); ++i) {
ss.str("");
ss.clear();
ss << "Machine " << (i - machines.begin()) << " vms:";
for (map<int, double>::iterator j = (*i).begin(); j != (*i).end(); ++j) {
cout << "load[" << (i - machines.begin()) << "][" << j->first << "] = " << j->second << " ";
ss << " " << j->first << "->" << j->second;
}
ss << endl;
}
cout << ss;
}

void print_loads(vector<vector<vector<int> > > round_schedules, vector<map<int,double> > machines) {
cout << "Loads: " << endl;
for(vector<vector<vector<int> > >::iterator round = round_schedules.begin();
round != round_schedules.end(); ++round) {
cout << "Round Load:";
for(int i = 0; i < (*round).size(); i++) {
cout << " { ";
for(int j = 0; j < (*round)[i].size(); j++) {
map<int,double>::const_iterator it = machines[i].find((*round)[i][j]);
cout << it->first << "->" << it->second << " ";
}
cout << "}";
}
cout << endl;
}
}

void print_schedules(vector<vector<vector<int> > > round_schedules) {
cout << "Schedules: " << endl;
for(vector<vector<vector<int> > >::iterator round = round_schedules.begin();
round != round_schedules.end(); ++round) {
cout << "Round Load:";
for(int i = 0; i < (*round).size(); i++) {
cout << " { ";
for(int j = 0; j < (*round)[i].size(); j++) {
cout << (*round)[i][j] << " ";
}
cout << "}";
}
cout << endl;
}
Expand Down Expand Up @@ -686,9 +744,9 @@ double DBPScheduler2::pack_vms(vector<map<int,double> > machines,int rounds) {
//ss.clear();
(*round)[mid].push_back(vm->first); //we push the index of the vm, not the size
//cerr << " pushed vm" << endl;
//cout << "count was: " << machines[mid].size();
//cout << "count was: " << machines[mid].size() << endl;
machines[mid].erase(vm);
//cout << "; picked [" << mid << "][" << vm->first << "]" << "; count is: " << machines[mid].size() << endl;
//cout << "picked [" << mid << "][" << vm->first << "->" << vm->second << "] for round " << (round - round_schedules.begin()) << endl;
//ss << " erased vm" << endl;
//cerr << ss.str();
//ss.str("");
Expand All @@ -700,15 +758,35 @@ double DBPScheduler2::pack_vms(vector<map<int,double> > machines,int rounds) {
for(vector<vector<vector<int> > >::iterator round = round_schedules.begin();
round != round_schedules.end(); ++round) {
vector<map<int, double> > loads;
//ss.str("");
//ss.clear();
//ss << "load compute" << endl;
//for(vector<map<int,double> >::const_iterator mit = machines.begin(); mit != machines.end(); ++mit) {
// for(map<int,double>::const_iterator vit = (*mit).begin(); vit != (*mit).end(); ++vit) {
// ss << " machine[" << (mit - machines.begin()) << "][" << vit->first << "] = " << vit->second << endl;
// }
//}
for(int i = 0; i < (*round).size(); i++) {
map<int, double> machine_load;
for(int j = 0; j < (*round)[i].size(); j++) {
//cout << "accessing: round[" << i << "][" << j << "]" << endl;
//cout << "accessing machines[" << i << "][" << (*round)[i][j] << "]" << endl;
machine_load[(*round)[i][j]] = machines[i][((*round)[i][j])];
//ss << " machines[" << i << "][" << (*round)[i][j] << "] = ";
map<int,double>::const_iterator lookup = this->machines[i].find((*round)[i][j]);
if (lookup == this->machines[i].end()) {
cout << " machines[" << i << "][" << (*round)[i][j] << "] = NOT FOUND";
} else {
//if (lookup == this->machines[i].end()) {
// ss << "NOT FOUND" << endl;
//} else {
// ss << lookup->second << endl;
//}
machine_load[(*round)[i][j]] = lookup->second;
}
}
loads.push_back(machine_load);
}
//cout << "modeling round " << (round - round_schedules.begin()) << endl;
//ss << "end load compute" << endl;
//cout << ss.str();
schedule_time += model_time(loads,false);
}
cout << "Packed VMS to " << rounds << " rounds in " << format_time(schedule_time) << endl;
Expand All @@ -718,7 +796,7 @@ double DBPScheduler2::pack_vms(vector<map<int,double> > machines,int rounds) {
void DBPScheduler2::schedule_vms(vector<map<int,double> > &machines) {
//cout << "About to start scheduling vms" << endl;
int total_count = 0;
int total_size = 0;
double total_size = 0;
double schedule_time;
for(vector<map<int, double> >::iterator machine = machines.begin();
machine != machines.end(); ++machine) {
Expand Down Expand Up @@ -746,23 +824,39 @@ void DBPScheduler2::schedule_vms(vector<map<int,double> > &machines) {
}
schedule_time = pack_vms(machines,UB);
stringstream ss;
ss << "final rounds: " << UB << "; time: " << schedule_time << "; time limit: " << time_limit << endl;
ss << "final rounds: " << UB << "; time: " << schedule_time << "; time limit: " << time_limit << "; count: " << total_count << "; data: " << total_size << endl;
cout << ss.str();
}

DBPScheduler2::DBPScheduler2() {
scheduled = false;
}

bool DBPScheduler2::schedule_round(std::vector<std::vector<int> > &round_schedule) {
//cout << "About to start scheduling round" << endl;
if (machines.empty() && round_schedules.empty()) {
return false;
round_schedule.clear();
if (scheduled && round_schedules.empty()) {
return false; //finished with schedule
} else if (!round_schedules.empty()) {
//have rounds to schedule
//print_schedules(round_schedules);
round_schedule.insert(round_schedule.end(),round_schedules.back().begin(), round_schedules.back().end());
round_schedules.pop_back();
return true;
} else {
} else if (machines.empty()) {
//we haven't been scheduled yet, but there are no machines to schedule
return false;
} else { //we need to schedule
schedule_vms(machines);
machines.clear();
scheduled = true;
//print_loads(round_schedules, machines);
//print_schedules(round_schedules);
//cout << format_time(model_time(machines,true));
machines.clear(); //TODO: is this a problem?
//print_settings(time_limit);
round_schedule.insert(round_schedule.end(),round_schedules.back().begin(), round_schedules.back().end());
round_schedules.pop_back();
//print_schedules(round_schedules);
return true;
}
}
Expand Down
Loading

0 comments on commit 5169b56

Please sign in to comment.