Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Accuracy of Streaming Triangle Counting #242

Merged
merged 4 commits into from
Apr 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ set(HEADERS globals.h
src/partitioner/local/MetisPartitioner.h
src/partitioner/local/RDFParser.h
src/partitioner/local/RDFPartitioner.h
src/partitioner/stream/JasmineGraphIncrementalStore.h
src/partitioner/stream/Partition.h
src/partitioner/stream/Partitioner.h
src/performance/metrics/PerformanceUtil.h
Expand Down Expand Up @@ -98,7 +97,6 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp
src/partitioner/local/MetisPartitioner.cpp
src/partitioner/local/RDFParser.cpp
src/partitioner/local/RDFPartitioner.cpp
src/partitioner/stream/JasmineGraphIncrementalStore.cpp
src/partitioner/stream/Partition.cpp
src/partitioner/stream/Partitioner.cpp
src/performance/metrics/PerformanceUtil.cpp
Expand Down
108 changes: 77 additions & 31 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
Logger frontend_logger;
std::set<ProcessInfo> processData;
std::string stream_topic_name;
bool JasmineGraphFrontEnd::strian_exit;

static std::string getPartitionCount(std::string path);
static void list_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit_p);
Expand All @@ -76,7 +77,8 @@
static void triangles_command(std::string masterIP, int connFd, SQLiteDBInterface *sqlite,
PerformanceSQLiteDBInterface *perfSqlite, JobScheduler *jobScheduler, bool *loop_exit_p);
static void streaming_triangles_command(std::string masterIP, int connFd, JobScheduler *jobScheduler, bool *loop_exit_p,
int numberOfPartitions);
int numberOfPartitions, bool *strian_exit);
static void stop_strian_command(int connFd, bool *strian_exit);
static void vertex_count_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void edge_count_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit_p);
static void merge_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit_p);
Expand Down Expand Up @@ -194,7 +196,10 @@
} else if (line.compare(TRIANGLES) == 0) {
triangles_command(masterIP, connFd, sqlite, perfSqlite, jobScheduler, &loop_exit);
} else if (line.compare(STREAMING_TRIANGLES) == 0) {
streaming_triangles_command(masterIP, connFd, jobScheduler, &loop_exit, numberOfPartitions);
streaming_triangles_command(masterIP, connFd, jobScheduler, &loop_exit,
numberOfPartitions, &JasmineGraphFrontEnd::strian_exit);
} else if (line.compare(STOP_STRIAN) == 0) {
stop_strian_command(connFd, &JasmineGraphFrontEnd::strian_exit);

Check warning on line 202 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L202

Added line #L202 was not covered by tests
} else if (line.compare(VCOUNT) == 0) {
vertex_count_command(connFd, sqlite, &loop_exit);
} else if (line.compare(ECOUNT) == 0) {
Expand Down Expand Up @@ -1469,9 +1474,21 @@
}
}

void JasmineGraphFrontEnd::scheduleStrianJobs(JobRequest &jobDetails, std::priority_queue<JobRequest> &jobQueue,

Check warning on line 1477 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1477

Added line #L1477 was not covered by tests
JobScheduler *jobScheduler, bool *strian_exit) {
while (!(*strian_exit)) {
auto begin = chrono::high_resolution_clock::now();

Check warning on line 1480 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1480

Added line #L1480 was not covered by tests
jobDetails.setBeginTime(begin);
int uniqueId = JasmineGraphFrontEnd::getUid();

Check warning on line 1482 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1482

Added line #L1482 was not covered by tests
jobDetails.setJobId(std::to_string(uniqueId));
jobQueue.push(jobDetails);
jobScheduler->pushJob(jobDetails);
sleep(Conts::STREAMING_STRAIN_GAP);
}
}

static void streaming_triangles_command(std::string masterIP, int connFd, JobScheduler *jobScheduler, bool *loop_exit_p,
int numberOfPartitions) {
int uniqueId = JasmineGraphFrontEnd::getUid();
int numberOfPartitions, bool *strian_exit) {
int result_wr = write(connFd, GRAPHID_SEND.c_str(), FRONTEND_COMMAND_LENGTH);
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
Expand Down Expand Up @@ -1518,47 +1535,76 @@
mode = Utils::trim_copy(mode, " \f\n\r\t\v");
frontend_logger.info("Got mode " + mode);

std::priority_queue<JobRequest> jobQueue;

Check warning on line 1538 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1538

Added line #L1538 was not covered by tests
JobRequest jobDetails;
jobDetails.setJobId(std::to_string(uniqueId));
jobDetails.setJobType(STREAMING_TRIANGLES);

jobDetails.setMasterIP(masterIP);
jobDetails.addParameter(Conts::PARAM_KEYS::GRAPH_ID, graph_id);
jobDetails.addParameter(Conts::PARAM_KEYS::MODE, mode);
jobDetails.addParameter(Conts::PARAM_KEYS::PARTITION, std::to_string(numberOfPartitions));

jobScheduler->pushJob(jobDetails);
JobResponse jobResponse = jobScheduler->getResult(jobDetails);
std::string errorMessage = jobResponse.getParameter(Conts::PARAM_KEYS::ERROR_MESSAGE);
if (*strian_exit) {
*strian_exit = false;

Check warning on line 1548 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1548

Added line #L1548 was not covered by tests
}

if (!errorMessage.empty()) {
*loop_exit_p = true;
result_wr = write(connFd, errorMessage.c_str(), errorMessage.length());
std::thread schedulerThread(JasmineGraphFrontEnd::scheduleStrianJobs, std::ref(jobDetails), std::ref(jobQueue),

Check warning on line 1551 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1551

Added line #L1551 was not covered by tests
jobScheduler, std::ref(strian_exit));

if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
while (!(*strian_exit)) {
if (!jobQueue.empty()) {
JobRequest request = jobQueue.top();
JobResponse jobResponse = jobScheduler->getResult(request);
std::string errorMessage = jobResponse.getParameter(Conts::PARAM_KEYS::ERROR_MESSAGE);

if (!errorMessage.empty()) {
*loop_exit_p = true;

Check warning on line 1561 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1561

Added line #L1561 was not covered by tests
result_wr = write(connFd, errorMessage.c_str(), errorMessage.length());

if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
return;

Check warning on line 1566 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1566

Added line #L1566 was not covered by tests
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(),
Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
}
return;

Check warning on line 1573 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1573

Added line #L1573 was not covered by tests
}

std::string triangleCount = jobResponse.getParameter(Conts::PARAM_KEYS::STREAMING_TRIANGLE_COUNT);
std::time_t begin_time_t = std::chrono::system_clock::to_time_t(request.getBegin());
std::time_t end_time_t = std::chrono::system_clock::to_time_t(jobResponse.getEndTime());
auto dur = jobResponse.getEndTime() - request.getBegin();
auto msDuration = std::chrono::duration_cast<std::chrono::milliseconds>(dur).count();
frontend_logger.info("Streaming triangle " + request.getJobId() +
" Count : " + triangleCount + " Time Taken: " + to_string(msDuration) +
" milliseconds");
std::string out = triangleCount + " Time Taken: " + to_string(msDuration) + " ms , Begin Time: " +
std::ctime(&begin_time_t) + " End Time: " + std::ctime(&end_time_t);
result_wr = write(connFd, out.c_str(), out.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;

Check warning on line 1590 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1589-L1590

Added lines #L1589 - L1590 were not covered by tests
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;

Check warning on line 1595 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1595

Added line #L1595 was not covered by tests
}
jobQueue.pop();
} else {
sleep(Conts::SCHEDULER_SLEEP_TIME);
}
return;
}

std::string triangleCount = jobResponse.getParameter(Conts::PARAM_KEYS::STREAMING_TRIANGLE_COUNT);
schedulerThread.join(); // Wait for the scheduler thread to finish
}

result_wr = write(connFd, triangleCount.c_str(), triangleCount.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
result_wr = write(connFd, Conts::CARRIAGE_RETURN_NEW_LINE.c_str(), Conts::CARRIAGE_RETURN_NEW_LINE.size());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
}
static void stop_strian_command(int connFd, bool *strian_exit) {
*strian_exit = true;

Check warning on line 1607 in src/frontend/JasmineGraphFrontEnd.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/JasmineGraphFrontEnd.cpp#L1606-L1607

Added lines #L1606 - L1607 were not covered by tests
}

static void vertex_count_command(int connFd, SQLiteDBInterface *sqlite, bool *loop_exit_p) {
Expand Down
4 changes: 4 additions & 0 deletions src/frontend/JasmineGraphFrontEnd.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,12 @@ class JasmineGraphFrontEnd {
static long getSLAForGraphId(SQLiteDBInterface *sqlite, PerformanceSQLiteDBInterface *perfSqlite,
std::string graphId, std::string command, std::string category);

static void scheduleStrianJobs(JobRequest &jobDetails, std::priority_queue<JobRequest> &jobQueue,
JobScheduler *jobScheduler, bool *strian_exist);

static int getRunningHighPriorityTaskCount();
static bool areRunningJobsForSameGraph();
static bool strian_exit;
std::map<std::string, std::atomic<bool>> *streamsState;
std::map<std::string, std::thread> streamingThreads;

Expand Down
1 change: 1 addition & 0 deletions src/frontend/JasmineGraphFrontEndProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const string ERROR = "error";
const string ADD_STREAM_KAFKA = "adstrmk";
const string ADD_STREAM_KAFKA_CSV = "adstrmkcsv";
const string STOP_STREAM_KAFKA = "stopstrm";
const string STOP_STRIAN = "stopstrian";
const string STREAM_TOPIC_NAME = "topicnm";
const string PROCESS_DATASET = "process_dataset";
const string REFORMAT = "reformat";
Expand Down
1 change: 1 addition & 0 deletions src/frontend/JasmineGraphFrontEndProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ extern const string SLA;
extern const string COMMAND;
extern const string PRIORITY;
extern const string STOP_STREAM_KAFKA;
extern const string STOP_STRIAN;

extern const string ADMDL;
extern const string MERGE;
Expand Down
10 changes: 10 additions & 0 deletions src/frontend/core/domain/JobRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
limitations under the License.
*/

#include <chrono>
#include "JobRequest.h"

std::string JobRequest::getJobId() { return jobId; }
Expand All @@ -32,3 +33,12 @@
void JobRequest::setMasterIP(std::string masterip) { this->masterIP = masterip; }

std::string JobRequest::getMasterIP() { return masterIP; }

void JobRequest::setBeginTime(std::chrono::time_point<std::chrono::system_clock> begin) {
this->begin = begin;

Check warning on line 38 in src/frontend/core/domain/JobRequest.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/domain/JobRequest.cpp#L37-L38

Added lines #L37 - L38 were not covered by tests
}

std::chrono::time_point<std::chrono::system_clock> JobRequest::getBegin() {
return begin;

Check warning on line 42 in src/frontend/core/domain/JobRequest.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/domain/JobRequest.cpp#L41-L42

Added lines #L41 - L42 were not covered by tests
}

5 changes: 5 additions & 0 deletions src/frontend/core/domain/JobRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@ limitations under the License.

#include <map>
#include <string>
#include <chrono>

class JobRequest {
private:
std::string jobId;
std::string jobType;
std::string masterIP;
std::map<std::string, std::string> requestParams;
std::chrono::time_point<std::chrono::system_clock> begin;
std::chrono::time_point<std::chrono::system_clock> end;

public:
int priority;
Expand All @@ -37,6 +40,8 @@ class JobRequest {
int getPriority();
void setMasterIP(std::string masterip);
std::string getMasterIP();
std::chrono::time_point<std::chrono::system_clock> getBegin();
void setBeginTime(std::chrono::time_point<std::chrono::system_clock> begin);
};

#endif // JASMINEGRAPH_JOBREQUEST_H
9 changes: 9 additions & 0 deletions src/frontend/core/domain/JobResponse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
limitations under the License.
*/

#include <chrono>
#include "JobResponse.h"

std::string JobResponse::getJobId() { return jobId; }
Expand All @@ -20,3 +21,11 @@
void JobResponse::addParameter(std::string key, std::string value) { responseParams[key] = value; }

std::string JobResponse::getParameter(std::string key) { return responseParams[key]; }

std::chrono::time_point<std::chrono::system_clock> JobResponse::getEndTime() {
return end;

Check warning on line 26 in src/frontend/core/domain/JobResponse.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/domain/JobResponse.cpp#L25-L26

Added lines #L25 - L26 were not covered by tests
}

void JobResponse::setEndTime(const std::chrono::time_point<std::chrono::system_clock> end) {
this->end = end;

Check warning on line 30 in src/frontend/core/domain/JobResponse.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/domain/JobResponse.cpp#L29-L30

Added lines #L29 - L30 were not covered by tests
}
5 changes: 5 additions & 0 deletions src/frontend/core/domain/JobResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ limitations under the License.

#include <map>
#include <string>
#include <chrono>

class JobResponse {
private:
std::string jobId;
std::map<std::string, std::string> responseParams;
std::chrono::time_point<std::chrono::system_clock> begin;
std::chrono::time_point<std::chrono::system_clock> end;

public:
std::string getJobId();
void setJobId(std::string inputJobId);
void addParameter(std::string key, std::string value);
std::string getParameter(std::string key);
std::chrono::time_point<std::chrono::system_clock> getEndTime();
void setEndTime(std::chrono::time_point<std::chrono::system_clock> end);
};

#endif // JASMINEGRAPH_JOBRESPONSE_H
55 changes: 37 additions & 18 deletions src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
#define DATA_BUFFER_SIZE (FRONTEND_DATA_LENGTH + 1)

Logger streaming_triangleCount_logger;
std::unordered_map<long, std::unordered_map<long, std::unordered_set<long>>>
StreamingTriangleCountExecutor::triangleTree;
long StreamingTriangleCountExecutor::triangleCount;

void saveLocalValues(StreamingSQLiteDBInterface stramingdb, std::string graphID, std::string partitionID,
NativeStoreTriangleResult result);
Expand Down Expand Up @@ -51,19 +54,6 @@
std::vector<std::future<long>> intermRes;
long result = 0;

if (partitionCount > 2) {
long aggregatedTriangleCount = StreamingTriangleCountExecutor::aggregateCentralStoreTriangles(
sqlite, streamingDB, graphId, masterIP, mode, partitionCount);
if (mode == "0") {
saveCentralValues(streamingDB, graphId, std::to_string(aggregatedTriangleCount));
result += aggregatedTriangleCount;
} else {
long old_result = stol(retrieveCentralValues(streamingDB, graphId));
saveCentralValues(streamingDB, graphId, std::to_string(aggregatedTriangleCount + old_result));
result += (aggregatedTriangleCount + old_result);
}
}

streaming_triangleCount_logger.info("###STREAMING-TRIANGLE-COUNT-EXECUTOR### Completed central store counting");

for (int i = 0; i < partitionCount; i++) {
Expand All @@ -78,6 +68,19 @@
host, workerPort, workerDataPort, i, masterIP, mode, streamingDB));
}

if (partitionCount > 2) {
long aggregatedTriangleCount = StreamingTriangleCountExecutor::aggregateCentralStoreTriangles(
sqlite, streamingDB, graphId, masterIP, mode, partitionCount);
if (mode == "0") {
saveCentralValues(streamingDB, graphId, std::to_string(aggregatedTriangleCount));
result += aggregatedTriangleCount;

Check warning on line 76 in src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp#L76

Added line #L76 was not covered by tests
} else {
long old_result = stol(retrieveCentralValues(streamingDB, graphId));
saveCentralValues(streamingDB, graphId, std::to_string(aggregatedTriangleCount + old_result));
result += (aggregatedTriangleCount + old_result);

Check warning on line 80 in src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp#L80

Added line #L80 was not covered by tests
}
}

for (auto &&futureCall : intermRes) {
result += futureCall.get();
}
Expand All @@ -91,6 +94,7 @@
JobResponse jobResponse;
jobResponse.setJobId(request.getJobId());
jobResponse.addParameter(Conts::PARAM_KEYS::STREAMING_TRIANGLE_COUNT, std::to_string(result));
jobResponse.setEndTime(chrono::high_resolution_clock::now());
responseVector.push_back(jobResponse);

responseMap[request.getJobId()] = jobResponse;
Expand Down Expand Up @@ -365,19 +369,34 @@

std::vector<std::string> triangles = Utils::split(result, ':');
std::vector<std::string>::iterator triangleIterator;
std::set<std::string> uniqueTriangleSet;
long currentSize = triangleCount;

Check warning on line 372 in src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp#L372

Added line #L372 was not covered by tests

for (triangleIterator = triangles.begin(); triangleIterator != triangles.end(); ++triangleIterator) {
std::string triangle = *triangleIterator;

if (!triangle.empty() && triangle != "NILL") {
uniqueTriangleSet.insert(triangle);
std::vector<std::string> triangleList = Utils::split(triangle, ',');
long varOne = std::stol(triangleList[0]);
long varTwo = std::stol(triangleList[1]);
long varThree = std::stol(triangleList[2]);

auto &itemRes = triangleTree[varOne];
auto itemResIterator = itemRes.find(varTwo);
if (itemResIterator != itemRes.end()) {
auto &set2 = itemRes[varTwo];
auto set2Iter = set2.find(varThree);
if (set2Iter == set2.end()) {
set2.insert(varThree);
triangleCount++;

Check warning on line 390 in src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp#L390

Added line #L390 was not covered by tests
}
} else {
triangleTree[varOne][varTwo].insert(varThree);
triangleCount++;

Check warning on line 394 in src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp#L394

Added line #L394 was not covered by tests
}
}
}

aggregatedTriangleCount = uniqueTriangleSet.size();

return aggregatedTriangleCount;
return triangleCount - currentSize;

Check warning on line 399 in src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp

View check run for this annotation

Codecov / codecov/patch

src/frontend/core/executor/impl/StreamingTriangleCountExecutor.cpp#L399

Added line #L399 was not covered by tests
}

string StreamingTriangleCountExecutor::countCentralStoreTriangles(
Expand Down
Loading
Loading