Skip to content

Commit

Permalink
initial check in
Browse files Browse the repository at this point in the history
  • Loading branch information
mraway committed Apr 22, 2013
0 parents commit 72e3586
Show file tree
Hide file tree
Showing 25 changed files with 2,099 additions and 0 deletions.
22 changes: 22 additions & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
CC = mpicc
CXX = mpicxx
LDLIBS = -lssl
CPPFLAGS = -g -Wall -DDEBUG
#CPPFLAGS = -O2 -Wall -DNDEBUG

BINS = batch_dedup
DEPS = disk_io mpi_engine snapshot_mixer batch_dedup_config snapshot_meta data_spout data_sink dedup_buffer partition_index trace_types
OBJS = $(addsuffix .o,$(BINS))
DEP_OBJS = $(addsuffix .o,$(DEPS))
DEP_HDRS = $(addsuffix .h,$(DEPS))

all: $(BINS)

$(BINS): $(OBJS) $(DEP_OBJS)

$(BINS) $(DEPS): %:%.cc

$(DEPS): %:%.h

clean:
rm -f $(BINS) *.o
109 changes: 109 additions & 0 deletions src/batch_dedup.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#include "mpi.h"
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fstream>
#include <stdlib.h>
#include "trace_types.h"
#include "batch_dedup_config.h"
#include "mpi_engine.h"
#include "snapshot_mixer.h"

using namespace std;

void usage(char* progname)
{
cout << "Usage: " << progname << " num_partitions num_VMs num_snapshots mpi_buffer read_buffer write_buffer" << endl;
return;
}

// setup environment, create directories
void init(int argc, char** argv)
{
int num_tasks, rank;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD, &num_tasks);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
Env::SetNumTasks(num_tasks);
Env::SetRank(rank);
Env::SetNumPartitions(atoi(argv[1]));
Env::SetNumVms(atoi(argv[2]));
Env::SetNumSnapshots(atoi(argv[3]));
Env::SetMpiBufSize(atoi(argv[4]) * 1024);
Env::SetReadBufSize(atoi(argv[5]) * 1024);
Env::SetWriteBufSize(atoi(argv[6]) * 1024);
Env::SetRemotePath("/oasis/triton/scratch/wei-ucsb/");
Env::SetLocalPath("/state/partition1/batch_dedup/");
Env::SetHomePath("/home/wei-ucsb/batch_dedup/");
Env::LoadSampleTraceList("/home/wei-ucsb/batch_dedup/sample_traces");
Env::SetLogger();
}

void final()
{
system("cp /state/partition1/batch_dedup/* /oasis/triton/scratch/wei-ucsb/");
}

int main(int argc, char** argv)
{
if (argc != 7) {
usage(argv[0]);
return 0;
}

init(argc, argv);
// cannot allocate 2d array because MPI only accepts one continous buffer
char* send_buf = new char[Env::GetNumTasks() * Env::GetMpiBufSize()];
char* recv_buf = new char[Env::GetNumTasks() * Env::GetMpiBufSize()];
Env::SetSendBuf(send_buf);
Env::SetRecvBuf(recv_buf);

// step0: prepare traces
int i = 0;
while (Env::GetVmId(i) >= 0) {
int vmid = Env::GetVmId(i);
int ssid = Env::GetNumSnapshots();
string source_trace = Env::GetSampleTrace(vmid);
string mixed_trace = Env::GetVmTrace(vmid);
SnapshotMixer mixer(vmid, ssid, source_trace, mixed_trace);
mixer.Generate();
i++;
}

// step1: exchange dirty segments
MpiEngine* p_step1 = new MpiEngine();
TraceReader* p_reader = new TraceReader();
RawRecordAccumulator* p_accu = new RawRecordAccumulator();

p_step1->SetDataSpout(dynamic_cast<DataSpout*>(p_reader));
p_step1->SetDataSink(dynamic_cast<DataSink*>(p_accu));
p_step1->Start();

delete p_step1;
delete p_reader;
delete p_accu;

delete[] send_buf;
delete[] recv_buf;
MPI_Finalize();
Env::CloseLogger();
return 0;
}


















236 changes: 236 additions & 0 deletions src/batch_dedup_config.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#include "batch_dedup_config.h"
#include "trace_types.h"
#include <fstream>
#include <sstream>

const double VM_SEG_CHANGE_RATE = 0.5;
const double VM_BLOCK_CHANGE_RATE = 0.5;
const double SS_SEG_CHANGE_RATE = 0.2;
const double SS_BLOCK_CHANGE_RATE = 0.5;
const int MAX_NUM_SNAPSHOTS = 100;
const uint16_t BLOCK_CLEAN_FLAG = 0;
const uint16_t BLOCK_DIRTY_FLAG = 1;


void Env::SetRank(int rank)
{
mRank = rank;
}

int Env::GetRank()
{
return mRank;
}

void Env::SetNumTasks(int num)
{
mNumTasks = num;
}

int Env::GetNumTasks() {
return mNumTasks;
}

void Env::SetNumPartitions(int num)
{
mNumPartitions = num;
}

int Env::GetNumPartitions() {
return mNumPartitions;
}

void Env::SetMpiBufSize(size_t size)
{
mMpiBufSize = size;
}

size_t Env::GetMpiBufSize()
{
return mMpiBufSize;
}

void Env::SetReadBufSize(size_t size)
{
mReadBufSize = size;
}

size_t Env::GetReadBufSize()
{
return mReadBufSize;
}

void Env::SetWriteBufSize(size_t size)
{
mWriteBufSize = size;
}

size_t Env::GetWriteBufSize()
{
return mWriteBufSize;
}

void Env::SetNumVms(int num)
{
mNumVms = num;
}

int Env::GetNumVms()
{
return mNumVms;
}

void Env::SetNumSnapshots(int num)
{
mNumSnapshots = num;
}

int Env::GetNumSnapshots()
{
return mNumSnapshots;
}

string Env::GetSampleTrace(int vmid)
{
size_t num_samples = mSampleTraces.size();
if (num_samples != 0) {
return mSampleTraces[vmid % num_samples];
}
return "";
}

void Env::LoadSampleTraceList(string fname)
{
ifstream is(fname.c_str(), ios::in);
if (!is.is_open()) {
mLogger << "file not exist: " << fname << endl;
return;
}

string trace_fname("");
while (is.good()) {
getline(is, trace_fname);
mSampleTraces.push_back(trace_fname);
mLogger << "add trace file: " << trace_fname << endl;
}
is.close();
}

string Env::GetVmTrace(int vmid)
{
stringstream ss;
ss << GetLocalPath() << "/" << vmid << "." << mNumSnapshots << ".trace";
return ss.str();
}

int Env::GetVmId(size_t idx)
{
if (mMyVmIds.size() == 0) {
for (int i = 0; i < mNumVms; i++) {
if (i % mNumTasks == mRank) {
mMyVmIds.push_back(i);
}
}
}

if (idx < mMyVmIds.size()) {
return mMyVmIds[idx];
}
else {
return -1;
}
}

void Env::SetLogger()
{
stringstream ss;
ss << mHomePath << mRank << ".log";
mLogger.open(ss.str().c_str(), ios::out | ios::trunc | ios:: app);
}

void Env::CloseLogger()
{
mLogger.close();
}

void Env::SetLocalPath(string const &path)
{
mLocalPath = path + "/" + Env::GetTaskName() + "/";
CreateDir(path);
CreateDir(mLocalPath, true);
}

string Env::GetLocalPath()
{
return mLocalPath;
}

void Env::SetRemotePath(string const &path)
{
mRemotePath = path;
CreateDir(path);
}

string Env::GetRemotePath()
{
return mRemotePath;
}

void Env::SetHomePath(string const &path)
{
mHomePath = path + "/" + Env::GetTaskName() + "/";
CreateDir(path);
CreateDir(mHomePath);
}

string Env::GetHomePath()
{
return mHomePath;
}

string Env::GetTaskName()
{
stringstream ss;
ss << mNumTasks
<< "_" << mNumPartitions
<< "_" << mNumVms
<< "_" << mNumSnapshots
<< "_" << mMpiBufSize
<< "_" << mReadBufSize
<< "_" << mWriteBufSize;
return ss.str();
}

bool Env::CreateDir(string const &path, bool empty)
{
if (empty) {
stringstream ss;
ss << "rm -rf " << path;
system(ss.str().c_str());
}

// TODO: check dir existance before mkdir
stringstream ss;
ss << "mkdir " << path;
system(ss.str().c_str());
}




















Loading

0 comments on commit 72e3586

Please sign in to comment.