Skip to content

Commit

Permalink
Account for backpressure when sending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
KamWithK committed Aug 4, 2022
1 parent a7a4b10 commit 89bcfe8
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
31 changes: 30 additions & 1 deletion TextractorSender/Server.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#include "Server.h"

const int BACKPRESSURE_LIMIT = 1024;

uWS::App* app;
uWS::WebSocket<false, true, PerSocketData>* gws = nullptr;
uWS::Loop* loop;
queue<string> send_queue;

void StartServer() {
uWS::App* app = new uWS::App();
Expand All @@ -21,6 +24,22 @@ void StartServer() {
.open = [](auto* ws) {
gws = ws;
},
.drain = [](auto* /*ws*/) {
/* Check ws->getBufferedAmount() here */
/* If it is below threshold then send the next message on the queue */
if (gws->getBufferedAmount() <= BACKPRESSURE_LIMIT) {
string data = send_queue.front();
auto result = gws->send(data, uWS::OpCode::TEXT, false);

// If failed, then put back onto queue to try again next
if (result == 2) {
send_queue.push(data);
}
else {
send_queue.pop();
}
}
},
.close = [](auto*/*ws*/, int /*code*/, std::string_view /*message*/) {
gws = nullptr;
}
Expand All @@ -43,7 +62,17 @@ void CloseServer() {
void BroadcastData(string data) {
if (gws) {
loop -> defer([data] {
gws -> send(data, uWS::OpCode::TEXT, false);
if (gws->getBufferedAmount() <= BACKPRESSURE_LIMIT) {
auto result = gws->send(data, uWS::OpCode::TEXT, false);

// If failed, then put back onto queue to try again next
if (result == 2) {
send_queue.push(data);
}
}
else {
send_queue.push(data);
}
});
}
}
1 change: 1 addition & 0 deletions TextractorSender/Server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <time.h>
#include <iostream>
#include <queue>
#include <uwebsockets/App.h>

using namespace std;
Expand Down

0 comments on commit 89bcfe8

Please sign in to comment.