Skip to content

Commit

Permalink
various fixes and additional logging. Now uses middle 4 bytes instead…
Browse files Browse the repository at this point in the history
… of first 4 for partitioning
  • Loading branch information
mikeagun committed Jul 24, 2013
1 parent cf92aac commit 4ec8fbd
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 8 deletions.
53 changes: 47 additions & 6 deletions src/batch_dedup.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ int main(int argc, char** argv)

p_mpi->SetDataSpout(dynamic_cast<DataSpout*>(p_reader));
p_mpi->SetDataSink(dynamic_cast<DataSink*>(p_accu));
p_mpi->SetTimerPrefix("ExchangeDirtyBlocks");
p_mpi->Start();
LOG_INFO("ExchangeDirtyBlocks Rounds of MPI Communication: " << p_mpi->GetMpiCount());
p_reader->Stat();
p_accu->Stat();

delete p_mpi;
delete p_reader;
Expand All @@ -272,6 +276,10 @@ int main(int argc, char** argv)
LOG_INFO("making dedup comparison");
uint64_t indexEntries = 0;

uint64_t s2dedup_input_count = 0;
uint64_t s2dup_output_count = 0;
uint64_t s2dupnew_output_count = 0;
uint64_t s2new_output_count = 0;
TimerPool::Start("DedupComparison");
// local-2: compare with partition index
for (int partid = Env::GetPartitionBegin(); partid < Env::GetPartitionEnd(); partid++) {
Expand All @@ -282,30 +290,46 @@ int main(int argc, char** argv)
Block blk;
BlockMeta bm;
DedupBuffer buf;
RecordWriter<BlockMeta> output1(Env::GetStep2Output1Name(partid));
RecordWriter<Block> output2(Env::GetStep2Output2Name(partid));
RecordWriter<Block> output3(Env::GetStep2Output3Name(partid));
RecordWriter<BlockMeta> output1(Env::GetStep2OutputDupBlocksName(partid));
RecordWriter<Block> output2(Env::GetStep2OutputDupWithNewName(partid));
RecordWriter<Block> output3(Env::GetStep2OutputNewBlocksName(partid));
//int partitionDups = 0;
//int partitionDupNew = 0;
//int partitionNew = 0;

while(reader.Get(blk)) {
s2dedup_input_count++;
// dup with existing blocks?
if (index.Find(blk.mCksum)) {
s2dup_output_count++;
//partitionDups++;
bm.mBlk = blk;
bm.mRef = REF_VALID;
output1.Put(bm);
continue;
}
// dup with new blocks in this run?
if (buf.Find(blk)) {
s2dupnew_output_count++;
//partitionDupNew++;
output2.Put(blk);
continue;
}
// completely new
//partitionNew++;
s2new_output_count++;
buf.Add(blk);
output3.Put(blk);
}
//LOG_INFO("Step 2 Partition " << partid << " Summary\tnew: " << partitionNew << "\tdupnew: " << partitionDupNew << "\tdup: " << partitionDups);
}

TimerPool::Stop("DedupComparison");
LOG_INFO("Step 2 Summary: blocks read: " << s2dedup_input_count);
LOG_INFO("Step 2 Summary: dup blocks written: " << s2dup_output_count);
LOG_INFO("Step 2 Summary: dup-with-new blocks written: " << s2dupnew_output_count);
LOG_INFO("Step 2 Summary: new blocks written: " << s2new_output_count);
LOG_INFO("Step 2 Summary: total blocks written: " << (s2new_output_count + s2dupnew_output_count + s2dup_output_count));
LOG_INFO("Total entries in current partitions: " << indexEntries);
Env::StatPartitionIndexSize();

Expand All @@ -319,7 +343,11 @@ int main(int argc, char** argv)

p_mpi->SetDataSpout(dynamic_cast<DataSpout*>(p_reader));
p_mpi->SetDataSink(dynamic_cast<DataSink*>(p_accu));
p_mpi->SetTimerPrefix("ExchangeNewBlocks");
p_mpi->Start();
LOG_INFO("ExchangeNewBlocks Rounds of MPI Communication: " << p_mpi->GetMpiCount());
p_reader->Stat();
p_accu->Stat();

delete p_mpi;
delete p_reader;
Expand All @@ -328,6 +356,7 @@ int main(int argc, char** argv)
TimerPool::Stop("ExchangeNewBlocks");

LOG_INFO("writing new blocks to backup storage");
uint64_t s3block_count = 0;
TimerPool::Start("WriteNewBlocks");
// local-3: write new blocks to storage
for (i = 0; Env::GetVmId(i) >= 0; i++) {
Expand All @@ -337,12 +366,14 @@ int main(int argc, char** argv)
Block blk;
BlockMeta bm;
while (input.Get(blk)) {
s3block_count++;
bm.mBlk = blk;
bm.mRef = REF_VALID;
output.Put(bm);
}
}
TimerPool::Stop("WriteNewBlocks");
LOG_INFO("Step 3 Summary: blocks read and written: " << s3block_count);

LOG_INFO("exchanging data reference of new blocks");
TimerPool::Start("ExchangeNewRefs");
Expand All @@ -354,7 +385,11 @@ int main(int argc, char** argv)

p_mpi->SetDataSpout(dynamic_cast<DataSpout*>(p_reader));
p_mpi->SetDataSink(dynamic_cast<DataSink*>(p_accu));
p_mpi->SetTimerPrefix("ExchangeNewRefs");
p_mpi->Start();
LOG_INFO("ExchangeNewRefs Rounds of MPI Communication: " << p_mpi->GetMpiCount());
p_reader->Stat();
p_accu->Stat();

delete p_mpi;
delete p_reader;
Expand All @@ -369,8 +404,8 @@ int main(int argc, char** argv)
for (int partid = Env::GetPartitionBegin(); partid < Env::GetPartitionEnd(); partid++) {
PartitionIndex index;
index.FromFile(Env::GetStep4InputName(partid));
RecordReader<Block> input(Env::GetStep2Output2Name(partid));
RecordWriter<BlockMeta> output(Env::GetStep2Output1Name(partid), true);
RecordReader<Block> input(Env::GetStep2OutputDupWithNewName(partid));
RecordWriter<BlockMeta> output(Env::GetStep2OutputDupBlocksName(partid), true);
Block blk;
BlockMeta bm;
while (input.Get(blk)) {
Expand All @@ -385,9 +420,11 @@ int main(int argc, char** argv)
}
}
index.AppendToFile(Env::GetLocalIndexName(partid));
//input.Stat();
//output.Stat();
}
TimerPool::Stop("UpdateRefAndIndex");
LOG_INFO("duplicate-with-new blocks in current partitions: " << dupNewBlocks);
LOG_INFO("Reference Update Summary: dup-with-new blocks: " << dupNewBlocks);

LOG_INFO("exchanging dup blocks");
TimerPool::Start("ExchangeDupBlocks");
Expand All @@ -399,7 +436,11 @@ int main(int argc, char** argv)

p_mpi->SetDataSpout(dynamic_cast<DataSpout*>(p_reader));
p_mpi->SetDataSink(dynamic_cast<DataSink*>(p_accu));
p_mpi->SetTimerPrefix("ExchangeDupBlocks");
p_mpi->Start();
LOG_INFO("ExchangeDupBlocks Rounds of MPI Communication: " << p_mpi->GetMpiCount());
p_reader->Stat();
p_accu->Stat();

delete p_mpi;
delete p_reader;
Expand Down
2 changes: 1 addition & 1 deletion src/batch_dedup_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ string Env::GetStep4OutputName(int vmid)

int Env::GetPartitionId(const Checksum& cksum)
{
return cksum.First4Bytes() % mNumPartitions;
return cksum.Middle4Bytes() % mNumPartitions;
}

int Env::GetDestNodeId(int partid)
Expand Down
21 changes: 20 additions & 1 deletion src/mpi_engine.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mpi_engine.h"
#include "batch_dedup_config.h"
#include "mpi.h"
#include "timer.h"

MpiEngine::MpiEngine()
: mSpoutPtr(NULL),
Expand All @@ -13,6 +14,7 @@ MpiEngine::MpiEngine()
mDispls = new int[Env::GetNumTasks()];
mRecvCounts = new int[Env::GetNumTasks()];
mHeaders = new MsgHeader[Env::GetNumTasks()];
mpiCounter = 0;
}

MpiEngine::~MpiEngine()
Expand Down Expand Up @@ -46,9 +48,13 @@ void MpiEngine::Start()
size_t buf_size = Env::GetMpiBufSize();
int num_tasks = Env::GetNumTasks();
int rc;
string read_timer = timerPrefix + ":MpiRead";
string mpi_timer = timerPrefix + ":MpiNetwork";
string processing_timer = timerPrefix + ":MpiProcessing";

while(true) {
// fill send buffer if we have data to read
TimerPool::Start(read_timer);
if (mSpoutPtr->GetRecord(pd)) {
int dest = mSpoutPtr->GetRecordDest(pd);
pd->ToBuffer(&send_buf[(dest * buf_size) + mWritePos[dest]]);
Expand All @@ -57,6 +63,7 @@ void MpiEngine::Start()
if (mWritePos[dest] < (buf_size - pd->GetSize()))
continue; // buffer nut full, read more
}
TimerPool::Stop(read_timer);

// buffer full or cannot read more, need to send
// prepare header
Expand All @@ -72,7 +79,7 @@ void MpiEngine::Start()
mHeaders[i].mFlags = send_flag;
mHeaders[i].ToBuffer(&send_buf[i * buf_size]);
}

TimerPool::Start(mpi_timer);
// send recv size
LOG_DEBUG("sending msg size");
rc = MPI_Alltoall(mWritePos, 1, MPI_INT,
Expand All @@ -92,6 +99,8 @@ void MpiEngine::Start()
LOG_ERROR("send data fail");
break;
}
TimerPool::Stop(mpi_timer);
mpiCounter++;

// check finish condition
bool finished = true;
Expand All @@ -104,12 +113,14 @@ void MpiEngine::Start()
}

// process data
TimerPool::Start(processing_timer);
mSinkPtr->ProcessBuffer();

// reset send buffer
for (int i = 0; i < num_tasks; i++) {
mWritePos[i] = header_size;
}
TimerPool::Stop(processing_timer);

if (finished) {
LOG_DEBUG("nobody has data to send, stop");
Expand All @@ -127,3 +138,11 @@ void MpiEngine::SetDataSink(DataSink* sink)
{
mSinkPtr = sink;
}
void MpiEngine::SetTimerPrefix(string prefix)
{
timerPrefix = prefix;
}
int MpiEngine::GetMpiCount()
{
return mpiCounter;
}
4 changes: 4 additions & 0 deletions src/mpi_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ class MpiEngine
void Start();
void SetDataSpout(DataSpout* spout);
void SetDataSink(DataSink* sink);
void SetTimerPrefix(string prefix);
int GetMpiCount();

private:
DataSpout* mSpoutPtr;
Expand All @@ -21,6 +23,8 @@ class MpiEngine
int* mDispls;
int* mRecvCounts;
MsgHeader* mHeaders;
string timerPrefix;
uint64_t mpiCounter;

private:
void Init();
Expand Down
7 changes: 7 additions & 0 deletions src/trace_types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ uint32_t Checksum::Last4Bytes() const
return tmp;
}

uint32_t Checksum::Middle4Bytes() const
{
uint32_t tmp;
memcpy((char*)&tmp, &mData[CKSUM_LEN - 12], 4);
return tmp;
}

int Block::GetSize()
{
return CKSUM_LEN + sizeof(Block::mFileID) + sizeof(Block::mFlags) + sizeof(Block::mSize) + sizeof(Block::mOffset);
Expand Down
2 changes: 2 additions & 0 deletions src/trace_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ struct Checksum
uint32_t First4Bytes() const;

uint32_t Last4Bytes() const;

uint32_t Middle4Bytes() const;
};

/*
Expand Down

0 comments on commit 4ec8fbd

Please sign in to comment.