Skip to content

Commit fcea900

Browse files
committed
Add dynamic websockets support
Signed-off-by: Nijat K <[email protected]>
1 parent 25c2b67 commit fcea900

23 files changed

+1695
-306
lines changed

cpp/csp/adapters/websocket/CMakeLists.txt

+6
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )
22

33
set(WS_CLIENT_HEADER_FILES
4+
WebsocketClientTypes.h
45
ClientAdapterManager.h
56
ClientInputAdapter.h
67
ClientOutputAdapter.h
78
ClientHeaderUpdateAdapter.h
9+
ClientConnectionRequestAdapter.h
810
WebsocketEndpoint.h
11+
WebsocketEndpointManager.h
912
${WEBSOCKET_HEADER}
1013
)
1114

1215
set(WS_CLIENT_SOURCE_FILES
16+
WebsocketClientTypes.cpp
1317
ClientAdapterManager.cpp
1418
ClientInputAdapter.cpp
1519
ClientOutputAdapter.cpp
1620
ClientHeaderUpdateAdapter.cpp
21+
ClientConnectionRequestAdapter.cpp
1722
WebsocketEndpoint.cpp
23+
WebsocketEndpointManager.cpp
1824
${WS_CLIENT_HEADER_FILES}
1925
${WEBSOCKET_SOURCE}
2026
)

cpp/csp/adapters/websocket/ClientAdapterManager.cpp

+34-99
Original file line numberDiff line numberDiff line change
@@ -1,124 +1,59 @@
11
#include <csp/adapters/websocket/ClientAdapterManager.h>
22

3-
namespace csp {
4-
5-
INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
6-
"ACTIVE",
7-
"GENERIC_ERROR",
8-
"CONNECTION_FAILED",
9-
"CLOSED",
10-
"MESSAGE_SEND_FAIL",
11-
);
12-
13-
}
14-
15-
// With TLS
163
namespace csp::adapters::websocket {
174

185
ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties )
196
: AdapterManager( engine ),
20-
m_active( false ),
21-
m_shouldRun( false ),
22-
m_endpoint( std::make_unique<WebsocketEndpoint>( properties ) ),
23-
m_inputAdapter( nullptr ),
24-
m_outputAdapter( nullptr ),
25-
m_updateAdapter( nullptr ),
26-
m_thread( nullptr ),
27-
m_properties( properties )
28-
{ };
7+
m_properties( properties )
8+
{ }
299

3010
ClientAdapterManager::~ClientAdapterManager()
31-
{ };
11+
{ }
3212

33-
void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
34-
{
35-
AdapterManager::start( starttime, endtime );
36-
37-
m_shouldRun = true;
38-
m_endpoint -> setOnOpen(
39-
[ this ]() {
40-
m_active = true;
41-
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
42-
}
43-
);
44-
m_endpoint -> setOnFail(
45-
[ this ]( const std::string& reason ) {
46-
std::stringstream ss;
47-
ss << "Connection Failure: " << reason;
48-
m_active = false;
49-
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, ss.str() );
50-
}
51-
);
52-
if( m_inputAdapter ) {
53-
m_endpoint -> setOnMessage(
54-
[ this ]( void* c, size_t t ) {
55-
PushBatch batch( m_engine -> rootEngine() );
56-
m_inputAdapter -> processMessage( c, t, &batch );
57-
}
58-
);
59-
} else {
60-
// if a user doesn't call WebsocketAdapterManager.subscribe, no inputadapter will be created
61-
// but we still need something to avoid on_message_cb not being set in the endpoint.
62-
m_endpoint -> setOnMessage( []( void* c, size_t t ){} );
63-
}
64-
m_endpoint -> setOnClose(
65-
[ this ]() {
66-
m_active = false;
67-
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
68-
}
69-
);
70-
m_endpoint -> setOnSendFail(
71-
[ this ]( const std::string& s ) {
72-
std::stringstream ss;
73-
ss << "Failed to send: " << s;
74-
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
75-
}
76-
);
13+
WebsocketEndpointManager* ClientAdapterManager::getWebsocketManager(){
14+
if( m_endpointManager == nullptr )
15+
return nullptr;
16+
return m_endpointManager.get();
17+
}
7718

78-
m_thread = std::make_unique<std::thread>( [ this ]() {
79-
while( m_shouldRun )
80-
{
81-
m_endpoint -> run();
82-
m_active = false;
83-
if( m_shouldRun ) sleep( m_properties.get<TimeDelta>( "reconnect_interval" ) );
84-
}
85-
});
86-
};
19+
void ClientAdapterManager::start(DateTime starttime, DateTime endtime) {
20+
AdapterManager::start(starttime, endtime);
21+
if (m_endpointManager != nullptr)
22+
m_endpointManager -> start(starttime, endtime);
23+
}
8724

8825
void ClientAdapterManager::stop() {
8926
AdapterManager::stop();
90-
91-
m_shouldRun=false;
92-
if( m_active ) m_endpoint->stop();
93-
if( m_thread ) m_thread->join();
94-
};
27+
if (m_endpointManager != nullptr)
28+
m_endpointManager -> stop();
29+
}
9530

9631
PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
97-
{
98-
if (m_inputAdapter == nullptr)
99-
{
100-
m_inputAdapter = m_engine -> createOwnedObject<ClientInputAdapter>(
101-
// m_engine,
102-
type,
103-
pushMode,
104-
properties
105-
);
106-
}
107-
return m_inputAdapter;
108-
};
32+
{
33+
if (m_endpointManager == nullptr)
34+
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
35+
return m_endpointManager -> getInputAdapter( type, pushMode, properties );
36+
}
10937

110-
OutputAdapter* ClientAdapterManager::getOutputAdapter()
38+
OutputAdapter* ClientAdapterManager::getOutputAdapter( const Dictionary & properties )
11139
{
112-
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(*m_endpoint);
113-
114-
return m_outputAdapter;
40+
if (m_endpointManager == nullptr)
41+
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
42+
return m_endpointManager -> getOutputAdapter( properties );
11543
}
11644

11745
OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
11846
{
119-
if (m_updateAdapter == nullptr) m_updateAdapter = m_engine -> createOwnedObject<ClientHeaderUpdateOutputAdapter>( m_endpoint -> getProperties() );
47+
if (m_endpointManager == nullptr)
48+
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
49+
return m_endpointManager -> getHeaderUpdateAdapter();
50+
}
12051

121-
return m_updateAdapter;
52+
OutputAdapter * ClientAdapterManager::getConnectionRequestAdapter( const Dictionary & properties )
53+
{
54+
if (m_endpointManager == nullptr)
55+
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
56+
return m_endpointManager -> getConnectionRequestAdapter( properties );
12257
}
12358

12459
DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )

cpp/csp/adapters/websocket/ClientAdapterManager.h

+9-29
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
33

44
#include <csp/adapters/websocket/WebsocketEndpoint.h>
5+
#include <csp/adapters/websocket/WebsocketEndpointManager.h>
6+
#include <csp/adapters/websocket/WebsocketClientTypes.h>
57
#include <csp/adapters/websocket/ClientInputAdapter.h>
6-
#include <csp/adapters/websocket/ClientOutputAdapter.h>
78
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
89
#include <csp/core/Enum.h>
910
#include <csp/core/Hash.h>
@@ -15,30 +16,15 @@
1516
#include <chrono>
1617
#include <iomanip>
1718
#include <iostream>
19+
#include <vector>
20+
#include <unordered_set>
1821

1922

2023
namespace csp::adapters::websocket {
2124

2225
using namespace csp;
2326

24-
struct WebsocketClientStatusTypeTraits
25-
{
26-
enum _enum : unsigned char
27-
{
28-
ACTIVE = 0,
29-
GENERIC_ERROR = 1,
30-
CONNECTION_FAILED = 2,
31-
CLOSED = 3,
32-
MESSAGE_SEND_FAIL = 4,
33-
34-
NUM_TYPES
35-
};
36-
37-
protected:
38-
_enum m_value;
39-
};
40-
41-
using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
27+
class WebsocketEndpointManager;
4228

4329
class ClientAdapterManager final : public AdapterManager
4430
{
@@ -57,23 +43,17 @@ class ClientAdapterManager final : public AdapterManager
5743

5844
void stop() override;
5945

46+
WebsocketEndpointManager* getWebsocketManager();
6047
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
61-
OutputAdapter * getOutputAdapter();
48+
OutputAdapter * getOutputAdapter( const Dictionary & properties );
6249
OutputAdapter * getHeaderUpdateAdapter();
50+
OutputAdapter * getConnectionRequestAdapter( const Dictionary & properties );
6351

6452
DateTime processNextSimTimeSlice( DateTime time ) override;
6553

6654
private:
67-
// need some client info
68-
69-
bool m_active;
70-
bool m_shouldRun;
71-
std::unique_ptr<WebsocketEndpoint> m_endpoint;
72-
ClientInputAdapter* m_inputAdapter;
73-
ClientOutputAdapter* m_outputAdapter;
74-
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
75-
std::unique_ptr<std::thread> m_thread;
7655
Dictionary m_properties;
56+
std::unique_ptr<WebsocketEndpointManager> m_endpointManager;
7757
};
7858

7959
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
#include <csp/adapters/websocket/ClientConnectionRequestAdapter.h>
2+
#include <csp/python/Conversions.h>
3+
#include <Python.h>
4+
5+
namespace csp::adapters::websocket {
6+
7+
ClientConnectionRequestAdapter::ClientConnectionRequestAdapter(
8+
Engine * engine,
9+
WebsocketEndpointManager * websocketManager,
10+
bool is_subscribe,
11+
size_t caller_id,
12+
boost::asio::strand<boost::asio::io_context::executor_type>& strand
13+
14+
) : OutputAdapter( engine ),
15+
m_websocketManager( websocketManager ),
16+
m_strand( strand ),
17+
m_isSubscribe( is_subscribe ),
18+
m_callerId( caller_id ),
19+
m_checkPerformed( is_subscribe ? false : true ) // we only need to check for pruned input adapters
20+
{}
21+
22+
void ClientConnectionRequestAdapter::executeImpl()
23+
{
24+
// One-time check for pruned status
25+
if (unlikely(!m_checkPerformed)) {
26+
m_isPruned = m_websocketManager->adapterPruned(m_callerId);
27+
m_checkPerformed = true;
28+
}
29+
30+
// Early return if pruned
31+
if (unlikely(m_isPruned))
32+
return;
33+
34+
auto raw_val = input()->lastValueTyped<PyObject*>();
35+
auto val = python::fromPython<std::vector<Dictionary>>(raw_val);
36+
37+
// We intentionally post here, we want the thread running
38+
// the strand to handle the connection request. We want to keep
39+
// all updates to internal data structures at graph run-time
40+
// to that thread.
41+
boost::asio::post(m_strand, [this, val=std::move(val)]() {
42+
for(const auto& conn_req: val) {
43+
m_websocketManager->handleConnectionRequest(conn_req, m_callerId, m_isSubscribe);
44+
}
45+
});
46+
};
47+
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_CONNECTIONREQUESTADAPTER_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_CONNECTIONREQUESTADAPTER_H
3+
4+
#include <csp/adapters/websocket/ClientAdapterManager.h>
5+
#include <csp/engine/Dictionary.h>
6+
#include <csp/engine/OutputAdapter.h>
7+
#include <csp/adapters/utils/MessageWriter.h>
8+
#include <csp/adapters/websocket/csp_autogen/websocket_types.h>
9+
10+
namespace csp::adapters::websocket
11+
{
12+
using namespace csp::autogen;
13+
14+
class ClientAdapterManager;
15+
class WebsocketEndpointManager;
16+
17+
class ClientConnectionRequestAdapter final: public OutputAdapter
18+
{
19+
public:
20+
ClientConnectionRequestAdapter(
21+
Engine * engine,
22+
WebsocketEndpointManager * websocketManager,
23+
bool isSubscribe,
24+
size_t callerId,
25+
boost::asio::strand<boost::asio::io_context::executor_type>& strand
26+
);
27+
28+
void executeImpl() override;
29+
30+
const char * name() const override { return "WebsocketClientConnectionRequestAdapter"; }
31+
32+
private:
33+
WebsocketEndpointManager* m_websocketManager;
34+
boost::asio::strand<boost::asio::io_context::executor_type>& m_strand;
35+
bool m_isSubscribe;
36+
size_t m_callerId;
37+
bool m_checkPerformed;
38+
bool m_isPruned{false};
39+
40+
};
41+
42+
}
43+
44+
45+
#endif

cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.cpp

+14-6
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,27 @@
22

33
namespace csp::adapters::websocket {
44

5+
class WebsocketEndpointManager;
6+
57
ClientHeaderUpdateOutputAdapter::ClientHeaderUpdateOutputAdapter(
68
Engine * engine,
7-
Dictionary& properties
8-
) : OutputAdapter( engine ), m_properties( properties )
9+
WebsocketEndpointManager * mgr,
10+
boost::asio::strand<boost::asio::io_context::executor_type>& strand
11+
) : OutputAdapter( engine ), m_mgr( mgr ), m_strand( strand )
912
{ };
1013

1114
void ClientHeaderUpdateOutputAdapter::executeImpl()
1215
{
13-
DictionaryPtr headers = m_properties.get<DictionaryPtr>("headers");
14-
for( auto& update : input() -> lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>() )
15-
{
16-
if( update -> key_isSet() && update -> value_isSet() ) headers->update( update->key(), update->value() );
16+
Dictionary headers;
17+
for (auto& update : input()->lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>()) {
18+
if (update->key_isSet() && update->value_isSet()) {
19+
headers.update(update->key(), update->value());
20+
}
1721
}
22+
boost::asio::post(m_strand, [this, headers=std::move(headers)]() {
23+
auto endpoint = m_mgr -> getNonDynamicEndpoint();
24+
endpoint -> updateHeaders(std::move(headers));
25+
});
1826
};
1927

2028
}

0 commit comments

Comments
 (0)