Skip to content

Commit dbb1bb4

Browse files
committed
Use rapidjson to parse headers, no more dict
Signed-off-by: Nijat K <[email protected]>
1 parent 639c5aa commit dbb1bb4

File tree

6 files changed

+95
-35
lines changed

6 files changed

+95
-35
lines changed

cpp/csp/adapters/websocket/ClientConnectionRequestAdapter.cpp

+24-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
#include <csp/adapters/websocket/ClientConnectionRequestAdapter.h>
2-
#include <csp/python/Conversions.h>
3-
#include <Python.h>
42

53
namespace csp::adapters::websocket {
64

@@ -31,15 +29,35 @@ void ClientConnectionRequestAdapter::executeImpl()
3129
if (unlikely(m_isPruned))
3230
return;
3331

34-
auto raw_val = input()->lastValueTyped<PyObject*>();
35-
auto val = python::fromPython<std::vector<Dictionary>>(raw_val);
32+
std::vector<Dictionary> properties_list;
33+
for (auto& request : input()->lastValueTyped<std::vector<InternalConnectionRequest::Ptr>>()) {
34+
if (!request->allFieldsSet())
35+
CSP_THROW(TypeError, "All fields must be set in InternalConnectionRequest");
36+
37+
Dictionary dict;
38+
dict.update("host", request->host());
39+
dict.update("port", request->port());
40+
dict.update("route", request->route());
41+
dict.update("uri", request->uri());
42+
dict.update("use_ssl", request->use_ssl());
43+
dict.update("reconnect_interval", request->reconnect_interval());
44+
dict.update("persistent", request->persistent());
45+
46+
dict.update("headers", request -> headers() );
47+
dict.update("on_connect_payload", request->on_connect_payload());
48+
dict.update("action", request->action());
49+
dict.update("dynamic", request->dynamic());
50+
dict.update("binary", request->binary());
51+
52+
properties_list.push_back(std::move(dict));
53+
}
3654

3755
// We intentionally post here, we want the thread running
3856
// the strand to handle the connection request. We want to keep
3957
// all updates to internal data structures at graph run-time
4058
// to that thread.
41-
boost::asio::post(m_strand, [this, val=std::move(val)]() {
42-
for(const auto& conn_req: val) {
59+
boost::asio::post(m_strand, [this, properties_list=std::move(properties_list)]() {
60+
for(const auto& conn_req: properties_list) {
4361
m_websocketManager->handleConnectionRequest(conn_req, m_callerId, m_isSubscribe);
4462
}
4563
});

cpp/csp/adapters/websocket/WebsocketEndpoint.cpp

+29-2
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,22 @@
11
#include <csp/adapters/websocket/WebsocketEndpoint.h>
2+
#include <rapidjson/document.h>
23

34
namespace csp::adapters::websocket {
45
using namespace csp;
56

67
WebsocketEndpoint::WebsocketEndpoint(
78
net::io_context& ioc,
89
Dictionary properties
9-
) : m_properties(std::make_shared<Dictionary>(std::move(properties))),
10+
) : m_properties( std::make_shared<Dictionary>( std::move( properties ) ) ),
1011
m_ioc(ioc)
11-
{ };
12+
{
13+
std::string headerProps = m_properties->get<std::string>("headers");
14+
// Create new empty headers dictionary
15+
auto headers = std::make_shared<Dictionary>();
16+
m_properties->update("headers", headers);
17+
// Update with any existing header properties
18+
updateHeaders(headerProps);
19+
}
1220
void WebsocketEndpoint::setOnOpen(void_cb on_open)
1321
{ m_on_open = std::move(on_open); }
1422
void WebsocketEndpoint::setOnFail(string_cb on_fail)
@@ -76,6 +84,25 @@ void WebsocketEndpoint::updateHeaders(csp::Dictionary properties){
7684
}
7785
}
7886

87+
void WebsocketEndpoint::updateHeaders(const std::string& properties) {
88+
if( properties.empty() )
89+
return;
90+
DictionaryPtr headers = m_properties->get<DictionaryPtr>("headers");
91+
rapidjson::Document doc;
92+
doc.Parse(properties.c_str());
93+
if (doc.IsObject()) {
94+
// Windows builds complained when doc.GetObject was called in the for loop
95+
const auto& obj = doc.GetObject();
96+
for (const auto& member : obj) {
97+
if (member.value.IsString()) {
98+
std::string key = member.name.GetString();
99+
std::string value = member.value.GetString();
100+
headers->update(key, std::move(value));
101+
}
102+
}
103+
}
104+
}
105+
79106
std::shared_ptr<Dictionary> WebsocketEndpoint::getProperties() {
80107
return m_properties;
81108
}

cpp/csp/adapters/websocket/WebsocketEndpoint.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -379,8 +379,8 @@ class WebsocketEndpoint {
379379
void setOnClose(void_cb on_close);
380380
void setOnSendFail(string_cb on_send_fail);
381381
void updateHeaders(Dictionary properties);
382+
void updateHeaders(const std::string& properties);
382383
std::shared_ptr<Dictionary> getProperties();
383-
// Dictionary& getProperties();
384384
void run();
385385
void stop( bool stop_ioc = true);
386386
void send(const std::string& s);

cpp/csp/adapters/websocket/WebsocketEndpointManager.cpp

+1-6
Original file line numberDiff line numberDiff line change
@@ -241,12 +241,8 @@ void WebsocketEndpointManager::handleConnectionRequest(const Dictionary & proper
241241
// This should only get called from the thread running
242242
// m_ioc. This allows us to avoid locks on internal data
243243
// structures
244-
// std::cout << " HEY YA\n";
245244
auto endpoint_id = properties.get<std::string>("uri");
246-
// std::cout << endpoint_id;
247245
autogen::ActionType action = autogen::ActionType::create( properties.get<std::string>("action") );
248-
// std::cout << action.asString() << "\n";
249-
// // Change headers if needed here!
250246
switch(action.enum_value()) {
251247
case autogen::ActionType::enum_::CONNECT: {
252248
auto persistent = properties.get<bool>("persistent");
@@ -289,10 +285,9 @@ void WebsocketEndpointManager::handleConnectionRequest(const Dictionary & proper
289285
m_endpoints[endpoint_id]->send(payload);
290286
// Conscious decision to let non-persisten connection
291287
// results to update the header
292-
auto headers = *properties.get<DictionaryPtr>("headers");
288+
auto headers = properties.get<std::string>("headers");
293289
m_endpoints[endpoint_id]->updateHeaders(std::move(headers));
294290
}
295-
// }
296291
break;
297292
}
298293

csp/adapters/websocket.py

+18-20
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@
1818
RawBytesMessageMapper,
1919
RawTextMessageMapper,
2020
)
21-
from csp.adapters.websocket_types import ActionType, ConnectionRequest, WebsocketHeaderUpdate, WebsocketStatus
21+
from csp.adapters.websocket_types import (
22+
ActionType,
23+
ConnectionRequest,
24+
InternalConnectionRequest,
25+
WebsocketHeaderUpdate,
26+
WebsocketStatus,
27+
)
2228
from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def
2329
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef
2430
from csp.lib import _websocketadapterimpl
@@ -396,18 +402,6 @@ def _instantiate(self):
396402
_launch_application(self._port, manager, csp.const("stub"))
397403

398404

399-
# Maybe, we can have the Adapter manager have all the connections
400-
# If we get a new connection request, we include that adapter for the
401-
# subscriptions. When we pop it, we remove it.
402-
# Then, each edge will effectively be independent.
403-
# Maybe. have each websocket push to a shared queue, then from there we
404-
# pass it along to all edges ("input adapters") that are subscribed to it
405-
406-
# Ok, maybe, let's keep it at just 1 subscribe and send call.
407-
# However, we can subscribe to the send and subscribe calls separately.
408-
# We just have to keep track of the Endpoints we have, and
409-
410-
411405
class WebsocketAdapterManager:
412406
"""
413407
Can subscribe dynamically via ts[List[ConnectionRequest]]
@@ -453,7 +447,7 @@ def __init__(
453447
connection_request = ConnectionRequest(
454448
uri=uri, reconnect_interval=reconnect_interval, headers=headers or {}
455449
)
456-
self._properties.update(self._get_properties(connection_request))
450+
self._properties.update(self._get_properties(connection_request).to_dict())
457451

458452
# This is a counter that will be used to identify every function call
459453
# We keep track of the subscribes and sends separately
@@ -467,7 +461,7 @@ def __init__(
467461
def _dynamic(self):
468462
return self._properties.get("dynamic", False)
469463

470-
def _get_properties(self, conn_request: ConnectionRequest) -> dict:
464+
def _get_properties(self, conn_request: ConnectionRequest) -> InternalConnectionRequest:
471465
uri = conn_request.uri
472466
reconnect_interval = conn_request.reconnect_interval
473467

@@ -476,14 +470,14 @@ def _get_properties(self, conn_request: ConnectionRequest) -> dict:
476470
if resp.hostname is None:
477471
raise ValueError(f"Failed to parse host from URI: {uri}")
478472

479-
res = dict(
473+
res = InternalConnectionRequest(
480474
host=resp.hostname,
481475
# if no port is explicitly present in the uri, the resp.port is None
482476
port=_sanitize_port(uri, resp.port),
483477
route=resp.path or "/", # resource shouldn't be empty string
484478
use_ssl=uri.startswith("wss"),
485479
reconnect_interval=reconnect_interval,
486-
headers=conn_request.headers,
480+
headers=rapidjson.dumps(conn_request.headers) if conn_request.headers else "",
487481
persistent=conn_request.persistent,
488482
action=conn_request.action.name,
489483
on_connect_payload=conn_request.on_connect_payload,
@@ -537,7 +531,9 @@ def subscribe(
537531
adapter_props = AdapterInfo(caller_id=caller_id, is_subscribe=True).model_dump()
538532
connection_request = csp.null_ts(List[ConnectionRequest]) if connection_request is None else connection_request
539533
request_dict = csp.apply(
540-
connection_request, lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs], list
534+
connection_request,
535+
lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs],
536+
List[InternalConnectionRequest],
541537
)
542538
# Output adapter to handle connection requests
543539
_websocket_connection_request_adapter_def(self, request_dict, adapter_props)
@@ -566,7 +562,9 @@ def send(self, x: ts["T"], connection_request: Optional[ts[List[ConnectionReques
566562
adapter_props = AdapterInfo(caller_id=caller_id, is_subscribe=False).model_dump()
567563
connection_request = csp.null_ts(List[ConnectionRequest]) if connection_request is None else connection_request
568564
request_dict = csp.apply(
569-
connection_request, lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs], list
565+
connection_request,
566+
lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs],
567+
List[InternalConnectionRequest],
570568
)
571569
_websocket_connection_request_adapter_def(self, request_dict, adapter_props)
572570
return _websocket_output_adapter_def(self, x, adapter_props)
@@ -614,6 +612,6 @@ def _create(self, engine, memo):
614612
"websocket_connection_request_adapter",
615613
_websocketadapterimpl._websocket_connection_request_adapter,
616614
WebsocketAdapterManager,
617-
input=ts[list], # needed, List[dict] didn't work on c++ level
615+
input=ts[List[InternalConnectionRequest]], # needed, List[dict] didn't work on c++ level
618616
properties=dict,
619617
)

csp/adapters/websocket_types.py

+22
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,25 @@ class ConnectionRequest(Struct):
3434
reconnect_interval: timedelta = timedelta(seconds=2)
3535
on_connect_payload: str = "" # message to send on connect
3636
headers: Dict[str, str] = {}
37+
38+
39+
# Only used internally
40+
class InternalConnectionRequest(Struct):
41+
host: str # Hostname parsed from the URI
42+
port: str # Port number for the connection (parsed and sanitized from URI)
43+
route: str # Resource path from URI, defaults to "/" if empty
44+
uri: str # Complete original URI string
45+
46+
# Connection behavior
47+
use_ssl: bool # Whether to use secure WebSocket (wss://)
48+
reconnect_interval: timedelta # Time to wait between reconnection attempts
49+
persistent: bool # Whether to maintain a persistent connection
50+
51+
# Headers and payloads
52+
headers: str # HTTP headers for the connection as json string
53+
on_connect_payload: str # Message to send when connection is established
54+
55+
# Connection metadata
56+
action: str # Connection action type (Connect, Disconnect, Ping, etc)
57+
dynamic: bool # Whether the connection is dynamic
58+
binary: bool # Whether to use binary mode for the connection

0 commit comments

Comments
 (0)