Skip to content

Commit

Permalink
Simultaneous RPCs (#112)
Browse files Browse the repository at this point in the history
Use call data address as unique identifying tag as opposed to state machine's current state.
  • Loading branch information
RobertBColton authored Mar 25, 2020
1 parent 1367eda commit 433f958
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.
64 changes: 32 additions & 32 deletions Plugins/ServerPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ inline std::chrono::system_clock::time_point future_deadline(size_t us) {
return (std::chrono::system_clock::now() + std::chrono::microseconds(us));
}

void* tag(int i) { return reinterpret_cast<void*>(static_cast<intptr_t>(i)); }
int detag(void* p) { return static_cast<int>(reinterpret_cast<intptr_t>(p)); }

} // anonymous namespace

CallData::~CallData() {}
Expand All @@ -28,16 +25,18 @@ struct AsyncReadWorker : public CallData {
std::unique_ptr<ClientAsyncReader<T>> stream;

virtual ~AsyncReadWorker() override {}
void operator()(const AsyncState state, const Status& /*status*/) override {
void operator()(const Status& /*status*/) override {
switch (state) {
case AsyncState::CONNECT: {
started();
stream->Read(&element, tag(AsyncState::READ));
state = AsyncState::READ;
stream->Read(&element, this);
break;
}
case AsyncState::READ: {
process(element);
stream->Read(&element, tag(AsyncState::READ));
state = AsyncState::READ;
stream->Read(&element, this);
break;
}
case AsyncState::FINISH: {
Expand All @@ -49,8 +48,14 @@ struct AsyncReadWorker : public CallData {
break;
}
}
virtual void start() final { stream->StartCall(tag(AsyncState::CONNECT)); }
virtual void finish() final { stream->Finish(&status, tag(AsyncState::FINISH)); }
virtual void start() final {
state = AsyncState::CONNECT;
stream->StartCall(this);
}
virtual void finish() final {
state = AsyncState::FINISH;
stream->Finish(&status, this);
}

virtual void started() {}
virtual void finished() {}
Expand All @@ -63,7 +68,7 @@ struct AsyncResponseReadWorker : public CallData {
std::unique_ptr<ClientAsyncResponseReader<T>> stream;

virtual ~AsyncResponseReadWorker() override {}
void operator()(const AsyncState state, const Status& /*status*/) override {
void operator()(const Status& /*status*/) override {
switch (state) {
case AsyncState::FINISH: {
finished(element);
Expand All @@ -77,7 +82,8 @@ struct AsyncResponseReadWorker : public CallData {
virtual void start() final {
stream->StartCall();
started();
stream->Finish(&element, &status, tag(AsyncState::FINISH));
state = AsyncState::FINISH;
stream->Finish(&element, &status, this);
}
virtual void finish() final {}

Expand Down Expand Up @@ -148,6 +154,7 @@ void CompilerClient::CompileBuffer(Game* game, CompileMode mode, std::string nam

auto worker = dynamic_cast<AsyncReadWorker<CompileReply>*>(callData);
worker->stream = stub->PrepareAsyncCompileBuffer(&worker->context, request, &cq);
callData->start();
}

void CompilerClient::CompileBuffer(Game* game, CompileMode mode) {
Expand All @@ -163,6 +170,7 @@ void CompilerClient::GetResources() {

auto worker = dynamic_cast<AsyncReadWorker<Resource>*>(callData);
worker->stream = stub->PrepareAsyncGetResources(&worker->context, emptyRequest, &cq);
callData->start();
}

void CompilerClient::GetSystems() {
Expand All @@ -171,6 +179,7 @@ void CompilerClient::GetSystems() {

auto worker = dynamic_cast<AsyncReadWorker<SystemType>*>(callData);
worker->stream = stub->PrepareAsyncGetSystems(&worker->context, emptyRequest, &cq);
callData->start();
}

void CompilerClient::SetDefinitions(std::string code, std::string yaml) {
Expand All @@ -182,6 +191,7 @@ void CompilerClient::SetDefinitions(std::string code, std::string yaml) {

auto worker = dynamic_cast<AsyncResponseReadWorker<SyntaxError>*>(callData);
worker->stream = stub->PrepareAsyncSetDefinitions(&worker->context, definitionsRequest, &cq);
callData->start();
}

void CompilerClient::SetCurrentConfig(const resources::Settings& settings) {
Expand All @@ -191,6 +201,7 @@ void CompilerClient::SetCurrentConfig(const resources::Settings& settings) {

auto worker = dynamic_cast<AsyncResponseReadWorker<Empty>*>(callData);
worker->stream = stub->PrepareAsyncSetCurrentConfig(&worker->context, setConfigRequest, &cq);
callData->start();
}

void CompilerClient::SyntaxCheck() {
Expand All @@ -199,43 +210,33 @@ void CompilerClient::SyntaxCheck() {

auto worker = dynamic_cast<AsyncResponseReadWorker<SyntaxError>*>(callData);
worker->stream = stub->PrepareAsyncSyntaxCheck(&worker->context, syntaxCheckRequest, &cq);
callData->start();
}

template <typename T>
T* CompilerClient::ScheduleTask() {
std::unique_ptr<T> callData(new T());
tasks.push(std::move(callData));
return (T*)tasks.back().get();
auto callData = new T();
connect(callData, &CallData::LogOutput, this, &CompilerClient::LogOutput);
connect(callData, &CallData::CompileStatusChanged, this, &CompilerClient::CompileStatusChanged);
return callData;
}

void CompilerClient::UpdateLoop() {
static bool started = false;
void* got_tag = nullptr;
bool ok = false;

if (this->tasks.empty()) return;
auto* task = this->tasks.front().get();
if (!started) {
connect(task, &CallData::LogOutput, this, &CompilerClient::LogOutput);
connect(task, &CallData::CompileStatusChanged, this, &CompilerClient::CompileStatusChanged);
task->start();
started = true;
}
auto asyncStatus = cq.AsyncNext(&got_tag, &ok, future_deadline(0));
auto state = static_cast<AsyncState>(detag(got_tag));
if (state != AsyncState::DISCONNECTED && !ok) {
task->finish();
auto callData = static_cast<CallData*>(got_tag);
if (callData && callData->state != AsyncState::DISCONNECTED && !ok) {
callData->finish();
return;
}
// yield to application main event loop
if (asyncStatus != CompletionQueue::NextStatus::GOT_EVENT || !got_tag) return;

(*task)(state, task->status);
if (state == AsyncState::FINISH) {
// go to the next task
tasks.pop();
// next task needs to be started
started = false;
(*callData)(callData->status);
if (callData->state == AsyncState::FINISH) {
delete callData;
}
}

Expand Down Expand Up @@ -290,7 +291,6 @@ ServerPlugin::ServerPlugin(MainWindow& mainWindow) : RGMPlugin(mainWindow) {
// update initial keyword set and systems
compilerClient->GetResources();
compilerClient->GetSystems();

}

ServerPlugin::~ServerPlugin() { process->close(); }
Expand Down
4 changes: 2 additions & 2 deletions Plugins/ServerPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ class CallData : public QObject {
Q_OBJECT

public:
AsyncState state = DISCONNECTED;
Status status;
ClientContext context;
virtual ~CallData();
virtual void start() {}
virtual void operator()(const AsyncState state, const Status& status) = 0;
virtual void operator()(const Status& status) = 0;
virtual void finish() {}

signals:
Expand Down Expand Up @@ -69,7 +70,6 @@ class CompilerClient : public QObject {

private:
CompletionQueue cq;
std::queue<std::unique_ptr<CallData>> tasks;

template <typename T>
T* ScheduleTask();
Expand Down

0 comments on commit 433f958

Please sign in to comment.