Skip to content

Commit c925d63

Browse files
committed
Dont send message if disconnected
Signed-off-by: Nijat K <[email protected]>
1 parent 62b7988 commit c925d63

File tree

3 files changed

+27
-10
lines changed

3 files changed

+27
-10
lines changed

cpp/csp/adapters/websocket/WebsocketEndpointManager.cpp

+22-6
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,35 @@ bool WebsocketEndpointManager::adapterPruned( size_t caller_id ){
7070
return m_inputAdapters[caller_id] == nullptr;
7171
};
7272

73+
bool WebsocketEndpointManager::publishesToEndpoint(const size_t caller_id, const std::string& endpoint_id){
74+
auto config_it = m_endpoint_configs.find(endpoint_id);
75+
if( config_it == m_endpoint_configs.end() || config_it->second.shutting_down || config_it->second.attempting_reconnect)
76+
return false;
77+
78+
return caller_id < m_endpoint_producers[endpoint_id].size() &&
79+
m_endpoint_producers[endpoint_id][caller_id];
80+
}
81+
7382
void WebsocketEndpointManager::send(const std::string& value, const size_t& caller_id) {
7483
// Safety check for caller_id
7584
// Get all endpoints this producer is connected to
7685
const auto& endpoints = m_producer_endpoints[caller_id];
77-
86+
// std::cout << " WHAT THE FUCK!!!\n";
7887
// For each endpoint this producer is connected to
7988
for (const auto& endpoint_id : endpoints) {
8089
// Double check the endpoint exists and producer is still valid
81-
if (auto it = m_endpoints.find(endpoint_id);
82-
it != m_endpoints.end() &&
83-
caller_id < m_endpoint_producers[endpoint_id].size() &&
84-
m_endpoint_producers[endpoint_id][caller_id]) {
85-
it->second.get()->send(value);
90+
// if (auto it = m_endpoints.find(endpoint_id);
91+
// it != m_endpoints.end() &&
92+
// caller_id < m_endpoint_producers[endpoint_id].size() &&
93+
// m_endpoint_producers[endpoint_id][caller_id]) {
94+
// it->second.get()->send(value);
95+
// // boost::asio::post(m_ioc, [endpoint_id, value, ep = it->second.get()]() {
96+
// // ep->send(value);
97+
// // });
98+
if(publishesToEndpoint(caller_id, endpoint_id)) {
99+
auto it = m_endpoints.find(endpoint_id);
100+
if( it != m_endpoints.end())
101+
it->second.get()->send(value);
86102
// boost::asio::post(m_ioc, [endpoint_id, value, ep = it->second.get()]() {
87103
// ep->send(value);
88104
// });

cpp/csp/adapters/websocket/WebsocketEndpointManager.h

+1
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class WebsocketEndpointManager {
9292

9393
void send(const std::string& value, const size_t& caller_id);
9494
bool adapterPruned( size_t caller_id );
95+
bool publishesToEndpoint(const size_t caller_id, const std::string& endpoint_id);
9596
void start(DateTime starttime, DateTime endtime);
9697
void stop();
9798
void handleConnectionRequest(const Dictionary & properties, size_t validated_id, bool is_subscribe);

csp/adapters/websocket.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ def __init__(
422422
headers: Optional[Dict[str, str]] = None,
423423
dynamic: bool = False,
424424
connection_request: Optional[ConnectionRequest] = None,
425-
num_threads: int = 2,
425+
num_threads: int = 1,
426426
):
427427
"""
428428
uri: str
@@ -433,9 +433,9 @@ def __init__(
433433
headers to apply to the request during the handshake
434434
dynamic: bool = False
435435
Whether we accept dynamically altering the connections via ConnectionRequest objects.
436-
num_threads: int = 2
437-
Determines number of threads to allocate for thread pool handling websocket endpoints.
438-
Defaults to 2 to avoid deadlocks and latency spikes
436+
num_threads: int = 1
437+
Determines number of threads to allocate for running the websocket endpoints.
438+
Defaults to 1 to avoid thread switching
439439
"""
440440

441441
self._properties = dict(dynamic=dynamic, num_threads=num_threads)

0 commit comments

Comments
 (0)