Skip to content

Commit

Permalink
Added logging, refactored code
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeagun committed Jul 23, 2013
1 parent 7721d7f commit cf92aac
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 17 deletions.
7 changes: 4 additions & 3 deletions src/batch_dedup_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ void Env::LoadSampleTraceList(string fname)
continue;
}
mSampleTraces.push_back(trace_fname);
//LOG_INFO("Added trace file: " << trace_fname);
LOG_DEBUG("add trace file: " << trace_fname);
}
is.close();
Expand Down Expand Up @@ -337,21 +338,21 @@ string Env::GetStep2InputName(int partid)
return ss.str();
}

string Env::GetStep2Output1Name(int partid)
string Env::GetStep2OutputDupBlocksName(int partid)
{
stringstream ss;
ss << mLocalPath << "partition." << partid << ".step2.out1";
return ss.str();
}

string Env::GetStep2Output2Name(int partid)
string Env::GetStep2OutputDupWithNewName(int partid)
{
stringstream ss;
ss << mLocalPath << "partition." << partid << ".step2.out2";
return ss.str();
}

string Env::GetStep2Output3Name(int partid)
string Env::GetStep2OutputNewBlocksName(int partid)
{
stringstream ss;
ss << mLocalPath << "partition." << partid << ".step2.out3";
Expand Down
6 changes: 3 additions & 3 deletions src/batch_dedup_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,9 @@ class Env
static int GetSourceNodeId(int vmid);

static string GetStep2InputName(int partid);
static string GetStep2Output1Name(int partid);
static string GetStep2Output2Name(int partid);
static string GetStep2Output3Name(int partid);
static string GetStep2OutputDupBlocksName(int partid);
static string GetStep2OutputDupWithNewName(int partid);
static string GetStep2OutputNewBlocksName(int partid);

static string GetStep3InputName(int vmid);
static string GetStep3OutputName(int vmid);
Expand Down
27 changes: 27 additions & 0 deletions src/data_sink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,17 @@ void RawRecordAccumulator::ProcessBuffer()
int part_id = Env::GetPartitionId(pblk->mCksum);
mWriterPtrs[part_id % num_parts]->Put(*pblk);
num_records++;
mStatRecordCount++;
}
LOG_DEBUG("processed " << num_records << " records");
Reset();
}

void RawRecordAccumulator::Stat()
{
LOG_INFO("Total Records Processed: " << mStatRecordCount);
}


NewRecordAccumulator::NewRecordAccumulator()
{
Expand All @@ -88,6 +94,7 @@ NewRecordAccumulator::NewRecordAccumulator()
int vmid = Env::GetVmId(i);
mWriters[vmid] = new RecordWriter<Block>(Env::GetStep3InputName(vmid));
}
mStatRecordCount = 0;
}

NewRecordAccumulator::~NewRecordAccumulator()
Expand All @@ -112,12 +119,18 @@ void NewRecordAccumulator::ProcessBuffer()
else {
it->second->Put(*pblk);
num_records++;
mStatRecordCount++;
}
}
LOG_DEBUG("processed " << num_records << " records");
Reset();
}

void NewRecordAccumulator::Stat()
{
LOG_INFO("Total Records Processed: " << mStatRecordCount);
}


NewRefAccumulator::NewRefAccumulator()
{
Expand All @@ -130,6 +143,7 @@ NewRefAccumulator::NewRefAccumulator()
mWriterPtrs[partid % num_parts] =
new RecordWriter<IndexEntry>(Env::GetStep4InputName(partid));
}
mStatRecordCount = 0;
}

NewRefAccumulator::~NewRefAccumulator()
Expand All @@ -151,11 +165,17 @@ void NewRefAccumulator::ProcessBuffer()
int part_id = Env::GetPartitionId(p_record->mCksum);
mWriterPtrs[part_id % num_parts]->Put(*p_record);
num_records++;
mStatRecordCount++;
}
LOG_DEBUG("processed " << num_records << " records");
Reset();
}

void NewRefAccumulator::Stat()
{
LOG_INFO("Total Records Processed: " << mStatRecordCount);
}


DupRecordAccumulator::DupRecordAccumulator()
{
Expand All @@ -166,6 +186,7 @@ DupRecordAccumulator::DupRecordAccumulator()
mWriters[vmid] = new RecordWriter<BlockMeta>(Env::GetStep3OutputName(vmid), true);
//mWriters[vmid] = new RecordWriter<BlockMeta>(Env::GetStep4OutputName(vmid));
}
mStatRecordCount = 0;
}

DupRecordAccumulator::~DupRecordAccumulator()
Expand All @@ -190,12 +211,18 @@ void DupRecordAccumulator::ProcessBuffer()
else {
it->second->Put(*p_record);
num_records++;
mStatRecordCount++;
}
}
LOG_DEBUG("processed " << num_records << " records");
Reset();
}

void DupRecordAccumulator::Stat()
{
LOG_INFO("Total Records Processed: " << mStatRecordCount);
}




Expand Down
10 changes: 10 additions & 0 deletions src/data_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class DataSink
virtual ~DataSink();

virtual void ProcessBuffer() = 0;
virtual void Stat() = 0;

public:
DataRecord* mRecord;
Expand Down Expand Up @@ -47,9 +48,12 @@ class RawRecordAccumulator : public DataSink
~RawRecordAccumulator();

void ProcessBuffer();
void Stat();


private:
RecordWriter<Block>** mWriterPtrs;
uint64_t mStatRecordCount;
};

// save new blocks by vm
Expand All @@ -60,9 +64,11 @@ class NewRecordAccumulator : public DataSink
~NewRecordAccumulator();

void ProcessBuffer();
void Stat();

private:
map<int, RecordWriter<Block>*> mWriters;
uint64_t mStatRecordCount;
};


Expand All @@ -74,9 +80,11 @@ class NewRefAccumulator : public DataSink
~NewRefAccumulator();

void ProcessBuffer();
void Stat();

private:
RecordWriter<IndexEntry>** mWriterPtrs;
uint64_t mStatRecordCount;
};

// save dup block meta by vm
Expand All @@ -87,9 +95,11 @@ class DupRecordAccumulator : public DataSink
~DupRecordAccumulator();

void ProcessBuffer();
void Stat();

private:
map<int, RecordWriter<BlockMeta>*> mWriters;
uint64_t mStatRecordCount;
};


Expand Down
36 changes: 33 additions & 3 deletions src/data_spout.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ TraceReader::TraceReader()
mReadBuf = new char[Env::GetReadBufSize()];
mInput.rdbuf()->pubsetbuf(mReadBuf, Env::GetReadBufSize());
mStatTotalSize = 0;
mStatTotalCount = 0;
mStatDirtySize = 0;
}

Expand All @@ -21,7 +22,12 @@ TraceReader::~TraceReader()
if (mInput.is_open())
mInput.close();
delete[] mReadBuf;
}

void TraceReader::Stat()
{
LOG_INFO("total VM size: " << mStatTotalSize << ", dirty segment size: " << mStatDirtySize);
LOG_INFO("total Blocks Read: " << mStatTotalCount);
}

int TraceReader::GetRecordSize()
Expand Down Expand Up @@ -58,13 +64,13 @@ bool TraceReader::GetRecord(DataRecord*& pdata)
// find a dirty block
while (mBlk.FromStream(mInput)) {
mStatTotalSize += (uint64_t)mBlk.mSize;
mStatTotalCount++;
if (mBlk.mFlags & BLOCK_DIRTY_FLAG) {
mStatDirtySize += (uint64_t)mBlk.mSize;
pdata = static_cast<DataRecord*>(&mBlk);
return true;
}
}

// end of VM trace, close it
mInput.close();
}
Expand All @@ -75,6 +81,7 @@ NewBlockReader::NewBlockReader()
{
mPartId = Env::GetPartitionBegin();
mInputPtr = NULL;
mStatNewCount = 0;
}

NewBlockReader::~NewBlockReader()
Expand All @@ -84,6 +91,12 @@ NewBlockReader::~NewBlockReader()
}
}


void NewBlockReader::Stat()
{
LOG_INFO("new blocks read: " << mStatNewCount);
}

int NewBlockReader::GetRecordSize()
{
return mBlk.GetSize();
Expand All @@ -103,12 +116,13 @@ bool NewBlockReader::GetRecord(DataRecord *&pdata)
if (mPartId >= Env::GetPartitionEnd()) {
return false;
}
mInputPtr = new RecordReader<Block>(Env::GetStep2Output3Name(mPartId));
mInputPtr = new RecordReader<Block>(Env::GetStep2OutputNewBlocksName(mPartId));
mPartId ++;
}

// read one block
if (mInputPtr->Get(mBlk)) {
mStatNewCount++;
pdata = static_cast<DataRecord*>(&mBlk);
return true;
}
Expand All @@ -125,14 +139,21 @@ NewRefReader::NewRefReader()
mVmIdx = 0;
mInputPtr = NULL;
mStatNewSize = 0;
mStatNewCount = 0;
}

NewRefReader::~NewRefReader()
{
if (mInputPtr != NULL) {
delete mInputPtr;
}
}


void NewRefReader::Stat()
{
LOG_INFO("new block size: " << mStatNewSize);
LOG_INFO("new block count: " << mStatNewCount);
}

int NewRefReader::GetRecordSize()
Expand Down Expand Up @@ -163,6 +184,7 @@ bool NewRefReader::GetRecord(DataRecord*& pdata)
BlockMeta bm;
while (mInputPtr->Get(bm)) {
mStatNewSize += bm.mBlk.mSize;
mStatNewCount++;
mRecord.mCksum = bm.mBlk.mCksum;
mRecord.mRef = bm.mRef;
pdata = static_cast<DataRecord*>(&mRecord);
Expand All @@ -180,6 +202,7 @@ DupBlockReader::DupBlockReader()
{
mPartId = Env::GetPartitionBegin();
mInputPtr = NULL;
mStatDupCount = 0;
}

DupBlockReader::~DupBlockReader()
Expand All @@ -189,6 +212,12 @@ DupBlockReader::~DupBlockReader()
}
}


void DupBlockReader::Stat()
{
LOG_INFO("dup blocks: " << mStatDupCount);
}

int DupBlockReader::GetRecordSize()
{
return mRecord.GetSize();
Expand All @@ -208,12 +237,13 @@ bool DupBlockReader::GetRecord(DataRecord *&pdata)
if (mPartId >= Env::GetPartitionEnd()) {
return false;
}
mInputPtr = new RecordReader<BlockMeta>(Env::GetStep2Output1Name(mPartId));
mInputPtr = new RecordReader<BlockMeta>(Env::GetStep2OutputDupBlocksName(mPartId));
mPartId ++;
}

// read one block
if (mInputPtr->Get(mRecord)) {
mStatDupCount++;
pdata = static_cast<DataRecord*>(&mRecord);
return true;
}
Expand Down
Loading

0 comments on commit cf92aac

Please sign in to comment.