Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic endpoints to WebsocketAdapter #378

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Next Next commit
Add dynamic websockets support
Signed-off-by: Nijat K <nijat.khanbabayev@gmail.com>
NeejWeej committed Nov 25, 2024
commit fcea90036a6e2c1324ea56e92285903cb5384f7a
6 changes: 6 additions & 0 deletions cpp/csp/adapters/websocket/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )

set(WS_CLIENT_HEADER_FILES
WebsocketClientTypes.h
ClientAdapterManager.h
ClientInputAdapter.h
ClientOutputAdapter.h
ClientHeaderUpdateAdapter.h
ClientConnectionRequestAdapter.h
WebsocketEndpoint.h
WebsocketEndpointManager.h
${WEBSOCKET_HEADER}
)

set(WS_CLIENT_SOURCE_FILES
WebsocketClientTypes.cpp
ClientAdapterManager.cpp
ClientInputAdapter.cpp
ClientOutputAdapter.cpp
ClientHeaderUpdateAdapter.cpp
ClientConnectionRequestAdapter.cpp
WebsocketEndpoint.cpp
WebsocketEndpointManager.cpp
${WS_CLIENT_HEADER_FILES}
${WEBSOCKET_SOURCE}
)
133 changes: 34 additions & 99 deletions cpp/csp/adapters/websocket/ClientAdapterManager.cpp
Original file line number Diff line number Diff line change
@@ -1,124 +1,59 @@
#include <csp/adapters/websocket/ClientAdapterManager.h>

namespace csp {

INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
"ACTIVE",
"GENERIC_ERROR",
"CONNECTION_FAILED",
"CLOSED",
"MESSAGE_SEND_FAIL",
);

}

// With TLS
namespace csp::adapters::websocket {

ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties )
: AdapterManager( engine ),
m_active( false ),
m_shouldRun( false ),
m_endpoint( std::make_unique<WebsocketEndpoint>( properties ) ),
m_inputAdapter( nullptr ),
m_outputAdapter( nullptr ),
m_updateAdapter( nullptr ),
m_thread( nullptr ),
m_properties( properties )
{ };
m_properties( properties )
{ }

ClientAdapterManager::~ClientAdapterManager()
{ };
{ }

void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
{
AdapterManager::start( starttime, endtime );

m_shouldRun = true;
m_endpoint -> setOnOpen(
[ this ]() {
m_active = true;
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
}
);
m_endpoint -> setOnFail(
[ this ]( const std::string& reason ) {
std::stringstream ss;
ss << "Connection Failure: " << reason;
m_active = false;
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, ss.str() );
}
);
if( m_inputAdapter ) {
m_endpoint -> setOnMessage(
[ this ]( void* c, size_t t ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
}
);
} else {
// if a user doesn't call WebsocketAdapterManager.subscribe, no inputadapter will be created
// but we still need something to avoid on_message_cb not being set in the endpoint.
m_endpoint -> setOnMessage( []( void* c, size_t t ){} );
}
m_endpoint -> setOnClose(
[ this ]() {
m_active = false;
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
}
);
m_endpoint -> setOnSendFail(
[ this ]( const std::string& s ) {
std::stringstream ss;
ss << "Failed to send: " << s;
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
}
);
WebsocketEndpointManager* ClientAdapterManager::getWebsocketManager(){
if( m_endpointManager == nullptr )
return nullptr;
return m_endpointManager.get();
}

m_thread = std::make_unique<std::thread>( [ this ]() {
while( m_shouldRun )
{
m_endpoint -> run();
m_active = false;
if( m_shouldRun ) sleep( m_properties.get<TimeDelta>( "reconnect_interval" ) );
}
});
};
void ClientAdapterManager::start(DateTime starttime, DateTime endtime) {
AdapterManager::start(starttime, endtime);
if (m_endpointManager != nullptr)
m_endpointManager -> start(starttime, endtime);
}

void ClientAdapterManager::stop() {
AdapterManager::stop();

m_shouldRun=false;
if( m_active ) m_endpoint->stop();
if( m_thread ) m_thread->join();
};
if (m_endpointManager != nullptr)
m_endpointManager -> stop();
}

PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
{
if (m_inputAdapter == nullptr)
{
m_inputAdapter = m_engine -> createOwnedObject<ClientInputAdapter>(
// m_engine,
type,
pushMode,
properties
);
}
return m_inputAdapter;
};
{
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getInputAdapter( type, pushMode, properties );
}

OutputAdapter* ClientAdapterManager::getOutputAdapter()
OutputAdapter* ClientAdapterManager::getOutputAdapter( const Dictionary & properties )
{
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(*m_endpoint);

return m_outputAdapter;
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getOutputAdapter( properties );
}

OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
{
if (m_updateAdapter == nullptr) m_updateAdapter = m_engine -> createOwnedObject<ClientHeaderUpdateOutputAdapter>( m_endpoint -> getProperties() );
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getHeaderUpdateAdapter();
}

return m_updateAdapter;
OutputAdapter * ClientAdapterManager::getConnectionRequestAdapter( const Dictionary & properties )
{
if (m_endpointManager == nullptr)
m_endpointManager = std::make_unique<WebsocketEndpointManager>(this, m_properties, m_engine);
return m_endpointManager -> getConnectionRequestAdapter( properties );
}

DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )
38 changes: 9 additions & 29 deletions cpp/csp/adapters/websocket/ClientAdapterManager.h
Original file line number Diff line number Diff line change
@@ -2,8 +2,9 @@
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H

#include <csp/adapters/websocket/WebsocketEndpoint.h>
#include <csp/adapters/websocket/WebsocketEndpointManager.h>
#include <csp/adapters/websocket/WebsocketClientTypes.h>
#include <csp/adapters/websocket/ClientInputAdapter.h>
#include <csp/adapters/websocket/ClientOutputAdapter.h>
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
#include <csp/core/Enum.h>
#include <csp/core/Hash.h>
@@ -15,30 +16,15 @@
#include <chrono>
#include <iomanip>
#include <iostream>
#include <vector>
#include <unordered_set>


namespace csp::adapters::websocket {

using namespace csp;

struct WebsocketClientStatusTypeTraits
{
enum _enum : unsigned char
{
ACTIVE = 0,
GENERIC_ERROR = 1,
CONNECTION_FAILED = 2,
CLOSED = 3,
MESSAGE_SEND_FAIL = 4,

NUM_TYPES
};

protected:
_enum m_value;
};

using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
class WebsocketEndpointManager;

class ClientAdapterManager final : public AdapterManager
{
@@ -57,23 +43,17 @@ class ClientAdapterManager final : public AdapterManager

void stop() override;

WebsocketEndpointManager* getWebsocketManager();
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
OutputAdapter * getOutputAdapter();
OutputAdapter * getOutputAdapter( const Dictionary & properties );
OutputAdapter * getHeaderUpdateAdapter();
OutputAdapter * getConnectionRequestAdapter( const Dictionary & properties );

DateTime processNextSimTimeSlice( DateTime time ) override;

private:
// need some client info

bool m_active;
bool m_shouldRun;
std::unique_ptr<WebsocketEndpoint> m_endpoint;
ClientInputAdapter* m_inputAdapter;
ClientOutputAdapter* m_outputAdapter;
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
std::unique_ptr<std::thread> m_thread;
Dictionary m_properties;
std::unique_ptr<WebsocketEndpointManager> m_endpointManager;
};

}
48 changes: 48 additions & 0 deletions cpp/csp/adapters/websocket/ClientConnectionRequestAdapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#include <csp/adapters/websocket/ClientConnectionRequestAdapter.h>
#include <csp/python/Conversions.h>
#include <Python.h>

namespace csp::adapters::websocket {

ClientConnectionRequestAdapter::ClientConnectionRequestAdapter(
Engine * engine,
WebsocketEndpointManager * websocketManager,
bool is_subscribe,
size_t caller_id,
boost::asio::strand<boost::asio::io_context::executor_type>& strand

) : OutputAdapter( engine ),
m_websocketManager( websocketManager ),
m_strand( strand ),
m_isSubscribe( is_subscribe ),
m_callerId( caller_id ),
m_checkPerformed( is_subscribe ? false : true ) // we only need to check for pruned input adapters
{}

void ClientConnectionRequestAdapter::executeImpl()
{
// One-time check for pruned status
if (unlikely(!m_checkPerformed)) {
m_isPruned = m_websocketManager->adapterPruned(m_callerId);
m_checkPerformed = true;
}

// Early return if pruned
if (unlikely(m_isPruned))
return;

auto raw_val = input()->lastValueTyped<PyObject*>();
auto val = python::fromPython<std::vector<Dictionary>>(raw_val);

// We intentionally post here, we want the thread running
// the strand to handle the connection request. We want to keep
// all updates to internal data structures at graph run-time
// to that thread.
boost::asio::post(m_strand, [this, val=std::move(val)]() {
for(const auto& conn_req: val) {
m_websocketManager->handleConnectionRequest(conn_req, m_callerId, m_isSubscribe);
}
});
};

}
45 changes: 45 additions & 0 deletions cpp/csp/adapters/websocket/ClientConnectionRequestAdapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_CONNECTIONREQUESTADAPTER_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_CONNECTIONREQUESTADAPTER_H

#include <csp/adapters/websocket/ClientAdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/OutputAdapter.h>
#include <csp/adapters/utils/MessageWriter.h>
#include <csp/adapters/websocket/csp_autogen/websocket_types.h>

namespace csp::adapters::websocket
{
using namespace csp::autogen;

class ClientAdapterManager;
class WebsocketEndpointManager;

class ClientConnectionRequestAdapter final: public OutputAdapter
{
public:
ClientConnectionRequestAdapter(
Engine * engine,
WebsocketEndpointManager * websocketManager,
bool isSubscribe,
size_t callerId,
boost::asio::strand<boost::asio::io_context::executor_type>& strand
);

void executeImpl() override;

const char * name() const override { return "WebsocketClientConnectionRequestAdapter"; }

private:
WebsocketEndpointManager* m_websocketManager;
boost::asio::strand<boost::asio::io_context::executor_type>& m_strand;
bool m_isSubscribe;
size_t m_callerId;
bool m_checkPerformed;
bool m_isPruned{false};

};

}


#endif
20 changes: 14 additions & 6 deletions cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.cpp
Original file line number Diff line number Diff line change
@@ -2,19 +2,27 @@

namespace csp::adapters::websocket {

class WebsocketEndpointManager;

ClientHeaderUpdateOutputAdapter::ClientHeaderUpdateOutputAdapter(
Engine * engine,
Dictionary& properties
) : OutputAdapter( engine ), m_properties( properties )
WebsocketEndpointManager * mgr,
boost::asio::strand<boost::asio::io_context::executor_type>& strand
) : OutputAdapter( engine ), m_mgr( mgr ), m_strand( strand )
{ };

void ClientHeaderUpdateOutputAdapter::executeImpl()
{
DictionaryPtr headers = m_properties.get<DictionaryPtr>("headers");
for( auto& update : input() -> lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>() )
{
if( update -> key_isSet() && update -> value_isSet() ) headers->update( update->key(), update->value() );
Dictionary headers;
for (auto& update : input()->lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>()) {
if (update->key_isSet() && update->value_isSet()) {
headers.update(update->key(), update->value());
}
}
boost::asio::post(m_strand, [this, headers=std::move(headers)]() {
auto endpoint = m_mgr -> getNonDynamicEndpoint();
endpoint -> updateHeaders(std::move(headers));
});
};

}
11 changes: 9 additions & 2 deletions cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H

#include <csp/adapters/websocket/WebsocketEndpointManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/OutputAdapter.h>
#include <csp/adapters/utils/MessageWriter.h>
@@ -10,20 +11,26 @@ namespace csp::adapters::websocket
{
using namespace csp::autogen;

class WebsocketEndpointManager;

class ClientHeaderUpdateOutputAdapter final: public OutputAdapter
{
public:
ClientHeaderUpdateOutputAdapter(
Engine * engine,
Dictionary& properties
WebsocketEndpointManager * mgr,
boost::asio::strand<boost::asio::io_context::executor_type>& strand
);

void executeImpl() override;

const char * name() const override { return "WebsocketClientHeaderUpdateAdapter"; }

private:
Dictionary& m_properties;
WebsocketEndpointManager * m_mgr;
boost::asio::strand<boost::asio::io_context::executor_type>& m_strand;



};

54 changes: 50 additions & 4 deletions cpp/csp/adapters/websocket/ClientInputAdapter.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#include <csp/adapters/websocket/ClientInputAdapter.h>

namespace csp::adapters::websocket
{

ClientInputAdapter::ClientInputAdapter(
Engine * engine,
CspTypePtr & type,
PushMode pushMode,
const Dictionary & properties
) : PushInputAdapter(engine, type, pushMode)
const Dictionary & properties,
bool dynamic
) : PushInputAdapter(engine, type, pushMode),
m_dynamic( dynamic )
{
if( type -> type() != CspType::Type::STRUCT &&
type -> type() != CspType::Type::STRING )
@@ -21,8 +22,14 @@ ClientInputAdapter::ClientInputAdapter(
if( !metaFieldMap.empty() && type -> type() != CspType::Type::STRUCT )
CSP_THROW( ValueError, "meta_field_map is not supported on non-struct types" );
}
if ( m_dynamic ){
auto& actual_type = static_cast<const CspStructType &>( *type );
auto& nested_type = actual_type.meta()-> field( "msg" ) -> type();

m_converter = adapters::utils::MessageStructConverterCache::instance().create( type, properties );
m_converter = adapters::utils::MessageStructConverterCache::instance().create( nested_type, properties );
}
else
m_converter = adapters::utils::MessageStructConverterCache::instance().create( type, properties );
};

void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch )
@@ -39,4 +46,43 @@ void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch )

}

void ClientInputAdapter::processMessage( std::tuple<std::string, void*> data, size_t t, PushBatch* batch )
{
// Extract the source string and data pointer from tuple
std::string source = std::get<0>(data);
void* c = std::get<1>(data);
if ( m_dynamic ){
auto& actual_type = static_cast<const CspStructType &>( *dataType() );
auto& nested_type = actual_type.meta()-> field( "msg" ) -> type();
auto true_val = actual_type.meta() -> create();
actual_type.meta()->field("uri")->setValue( true_val.get(), source );

if( nested_type -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( c, t );
actual_type.meta()->field("msg")->setValue( true_val.get(), std::move(tick) );

pushTick( std::move(true_val), batch );
} else if ( nested_type -> type() == CspType::Type::STRING )
{
auto msg = std::string((char const*)c, t);
actual_type.meta()->field("msg")->setValue( true_val.get(), msg );

pushTick( std::move(true_val), batch );
}

}
else{
if( dataType() -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( c, t );
pushTick( std::move(tick), batch );
} else if ( dataType() -> type() == CspType::Type::STRING )
{
pushTick( std::string((char const*)c, t), batch );
}
}

}

}
7 changes: 5 additions & 2 deletions cpp/csp/adapters/websocket/ClientInputAdapter.h
Original file line number Diff line number Diff line change
@@ -16,17 +16,20 @@ class ClientInputAdapter final: public PushInputAdapter {
Engine * engine,
CspTypePtr & type,
PushMode pushMode,
const Dictionary & properties
const Dictionary & properties,
bool dynamic
);

void processMessage( void* c, size_t t, PushBatch* batch );
void processMessage( std::tuple<std::string, void*> data, size_t t, PushBatch* batch );

private:
adapters::utils::MessageStructConverterPtr m_converter;
const bool m_dynamic;

};

}


#endif // _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_INPUTADAPTER_H
#endif
17 changes: 13 additions & 4 deletions cpp/csp/adapters/websocket/ClientOutputAdapter.cpp
Original file line number Diff line number Diff line change
@@ -4,14 +4,23 @@ namespace csp::adapters::websocket {

ClientOutputAdapter::ClientOutputAdapter(
Engine * engine,
WebsocketEndpoint& endpoint
) : OutputAdapter( engine ), m_endpoint( endpoint )
WebsocketEndpointManager * websocketManager,
size_t caller_id,
net::io_context& ioc,
boost::asio::strand<boost::asio::io_context::executor_type>& strand
) : OutputAdapter( engine ),
m_websocketManager( websocketManager ),
m_callerId( caller_id ),
m_ioc( ioc ),
m_strand( strand )
{ };

void ClientOutputAdapter::executeImpl()
{
const std::string & value = input() -> lastValueTyped<std::string>();
m_endpoint.send( value );
};
boost::asio::post(m_strand, [this, value=value]() {
m_websocketManager->send(value, m_callerId);
});
}

}
15 changes: 13 additions & 2 deletions cpp/csp/adapters/websocket/ClientOutputAdapter.h
Original file line number Diff line number Diff line change
@@ -5,27 +5,38 @@
#include <csp/engine/Dictionary.h>
#include <csp/engine/OutputAdapter.h>
#include <csp/adapters/utils/MessageWriter.h>
#include <csp/adapters/websocket/ClientAdapterManager.h>

namespace csp::adapters::websocket
{

class ClientAdapterManager;
class WebsocketEndpointManager;

class ClientOutputAdapter final: public OutputAdapter
{

public:
ClientOutputAdapter(
Engine * engine,
WebsocketEndpoint& endpoint
WebsocketEndpointManager * websocketManager,
size_t caller_id,
net::io_context& ioc,
boost::asio::strand<boost::asio::io_context::executor_type>& strand
// bool dynamic
);

void executeImpl() override;

const char * name() const override { return "WebsocketClientOutputAdapter"; }

private:
WebsocketEndpoint& m_endpoint;
WebsocketEndpointManager* m_websocketManager;
size_t m_callerId;
[[maybe_unused]] net::io_context& m_ioc;
boost::asio::strand<boost::asio::io_context::executor_type>& m_strand;
// bool m_dynamic;
// std::unordered_map<std::string, std::vector<bool>>& m_endpoint_consumers;
};

}
13 changes: 13 additions & 0 deletions cpp/csp/adapters/websocket/WebsocketClientTypes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include "WebsocketClientTypes.h"

namespace csp {

INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
"ACTIVE",
"GENERIC_ERROR",
"CONNECTION_FAILED",
"CLOSED",
"MESSAGE_SEND_FAIL",
);

}
26 changes: 26 additions & 0 deletions cpp/csp/adapters/websocket/WebsocketClientTypes.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include "csp/core/Enum.h" // or whatever the correct path is

namespace csp::adapters::websocket {

struct WebsocketClientStatusTypeTraits
{
enum _enum : unsigned char
{
ACTIVE = 0,
GENERIC_ERROR = 1,
CONNECTION_FAILED = 2,
CLOSED = 3,
MESSAGE_SEND_FAIL = 4,

NUM_TYPES
};

protected:
_enum m_value;
};

using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;

}
49 changes: 33 additions & 16 deletions cpp/csp/adapters/websocket/WebsocketEndpoint.cpp
Original file line number Diff line number Diff line change
@@ -3,9 +3,11 @@
namespace csp::adapters::websocket {
using namespace csp;

WebsocketEndpoint::WebsocketEndpoint(
WebsocketEndpoint::WebsocketEndpoint(
net::io_context& ioc,
Dictionary properties
) : m_properties(properties)
) : m_properties(std::make_shared<Dictionary>(std::move(properties))),
m_ioc(ioc)
{ };
void WebsocketEndpoint::setOnOpen(void_cb on_open)
{ m_on_open = std::move(on_open); }
@@ -20,27 +22,26 @@ void WebsocketEndpoint::setOnSendFail(string_cb on_send_fail)

void WebsocketEndpoint::run()
{

m_ioc.reset();
if(m_properties.get<bool>("use_ssl")) {
// Owns this ioc object
if(m_properties->get<bool>("use_ssl")) {
ssl::context ctx{ssl::context::sslv23};
ctx.set_verify_mode(ssl::context::verify_peer );
ctx.set_default_verify_paths();

m_session = new WebsocketSessionTLS(
m_session = std::make_shared<WebsocketSessionTLS>(
m_ioc,
ctx,
&m_properties,
m_properties,
m_on_open,
m_on_fail,
m_on_message,
m_on_close,
m_on_send_fail
);
} else {
m_session = new WebsocketSessionNoTLS(
m_session = std::make_shared<WebsocketSessionNoTLS>(
m_ioc,
&m_properties,
m_properties,
m_on_open,
m_on_fail,
m_on_message,
@@ -49,23 +50,39 @@ void WebsocketEndpoint::run()
);
}
m_session->run();
}

m_ioc.run();
WebsocketEndpoint::~WebsocketEndpoint() {
try {
// Call stop but explicitly pass false to prevent io_context shutdown
stop(false);
} catch (...) {
// Ignore any exceptions during cleanup
}
}

void WebsocketEndpoint::stop()
{
m_ioc.stop();
if(m_session) m_session->stop();
void WebsocketEndpoint::stop( bool stop_ioc )
{
if( m_session ) m_session->stop();
if( stop_ioc ) m_ioc.stop();
}

void WebsocketEndpoint::updateHeaders(csp::Dictionary properties){
DictionaryPtr headers = m_properties->get<DictionaryPtr>("headers");
for (auto it = properties.begin(); it != properties.end(); ++it) {
std::string key = it.key();
auto value = it.value<std::string>();
headers->update(key, std::move(value));
}
}

csp::Dictionary& WebsocketEndpoint::getProperties() {
std::shared_ptr<Dictionary> WebsocketEndpoint::getProperties() {
return m_properties;
}

void WebsocketEndpoint::send(const std::string& s)
{ if(m_session) m_session->send(s); }

void WebsocketEndpoint::ping()
{ if(m_session) m_session->ping(); }

}
211 changes: 120 additions & 91 deletions cpp/csp/adapters/websocket/WebsocketEndpoint.h

Large diffs are not rendered by default.

432 changes: 432 additions & 0 deletions cpp/csp/adapters/websocket/WebsocketEndpointManager.cpp

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions cpp/csp/adapters/websocket/WebsocketEndpointManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#ifndef WEBSOCKET_ENDPOINT_MANAGER_H
#define WEBSOCKET_ENDPOINT_MANAGER_H

#include <boost/asio.hpp>
#include <csp/adapters/websocket/WebsocketClientTypes.h>
#include <csp/adapters/websocket/WebsocketEndpoint.h>
#include <csp/adapters/websocket/ClientAdapterManager.h>
#include <csp/adapters/websocket/ClientInputAdapter.h>
#include <csp/adapters/websocket/ClientOutputAdapter.h>
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
#include <csp/adapters/websocket/ClientConnectionRequestAdapter.h>
#include <csp/core/Enum.h>
#include <csp/core/Hash.h>
#include <csp/engine/AdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/PushInputAdapter.h>
#include <csp/core/Platform.h>
#include <thread>
#include <chrono>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <iomanip>
#include <iostream>
#include <vector>
#include <unordered_set>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <chrono>
#include <optional>
#include <functional>

namespace csp::adapters::websocket {
using namespace csp;
class WebsocketEndpoint;

class ClientAdapterManager;
class ClientOutputAdapter;
class ClientConnectionRequestAdapter;
class ClientHeaderUpdateOutputAdapter;

struct ConnectPayloads {
std::vector<std::string> consumer_payloads;
std::vector<std::string> producer_payloads;
};

struct EndpointConfig {
std::chrono::milliseconds reconnect_interval;
std::unique_ptr<boost::asio::steady_timer> reconnect_timer;
bool attempting_reconnect{false};
bool connected{false};

// Payloads for different client types
std::vector<std::string> consumer_payloads;
std::vector<std::string> producer_payloads;

explicit EndpointConfig(boost::asio::io_context& ioc)
: reconnect_timer(std::make_unique<boost::asio::steady_timer>(ioc)) {}
};

// Callbacks for endpoint events
struct EndpointCallbacks {
std::function<void(const std::string&)> onOpen;
std::function<void(const std::string&, const std::string&)> onFail;
std::function<void(const std::string&)> onClose;
std::function<void(const std::string&, const std::string&)> onSendFail;
std::function<void(const std::string&, void*, size_t)> onMessage;
};

class WebsocketEndpointManager {
public:
explicit WebsocketEndpointManager(ClientAdapterManager* mgr, const Dictionary & properties, Engine* engine);
~WebsocketEndpointManager();
void send(const std::string& value, const size_t& caller_id);
// Whether the input adapter (subscribe) given by a specific caller_id was pruned
bool adapterPruned( size_t caller_id );
// Whether the output adapater (publish) given by a specific caller_id publishes to a given endpoint

void start(DateTime starttime, DateTime endtime);
void stop();

void handleConnectionRequest(const Dictionary & properties, size_t validated_id, bool is_subscribe);
void handleEndpointFailure(const std::string& endpoint_id, const std::string& reason, ClientStatusType status_type);

void setupEndpoint(const std::string& endpoint_id, std::unique_ptr<WebsocketEndpoint> endpoint, std::string payload, bool persist, bool is_consumer, size_t validated_id);
void shutdownEndpoint(const std::string& endpoint_id);

void addConsumer(const std::string& endpoint_id, size_t caller_id);
void addProducer(const std::string& endpoint_id, size_t caller_id);
bool canRemoveEndpoint(const std::string& endpoint_id);

void removeEndpointForCallerId(const std::string& endpoint_id, bool is_consumer, size_t validated_id);
void removeConsumer(const std::string& endpoint_id, size_t caller_id);
void removeProducer(const std::string& endpoint_id, size_t caller_id);

WebsocketEndpoint * getNonDynamicEndpoint();
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
OutputAdapter * getOutputAdapter( const Dictionary & properties );
OutputAdapter * getHeaderUpdateAdapter();
OutputAdapter * getConnectionRequestAdapter( const Dictionary & properties );
private:
inline size_t validateCallerId(int64_t caller_id) const {
if (caller_id < 0) {
CSP_THROW(ValueError, "caller_id cannot be negative: " << caller_id);
}
return static_cast<size_t>(caller_id);
}
inline void ensureVectorSize(std::vector<bool>& vec, size_t caller_id) {
if (vec.size() <= caller_id) {
vec.resize(caller_id + 1, false);
}
}
// Whether the output adapater (publish) given by a specific caller_id publishes to a given endpoint
inline bool publishesToEndpoint(const size_t caller_id, const std::string& endpoint_id){
auto config_it = m_endpoint_configs.find(endpoint_id);
if( config_it == m_endpoint_configs.end() || !config_it->second.connected )
return false;

return caller_id < m_endpoint_producers[endpoint_id].size() &&
m_endpoint_producers[endpoint_id][caller_id];
}
size_t m_num_threads;
net::io_context m_ioc;
Engine* m_engine;
boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
ClientAdapterManager* m_mgr;
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
std::vector<std::unique_ptr<std::thread>> m_threads;
Dictionary m_properties;
std::vector<ClientConnectionRequestAdapter*> m_connectionRequestAdapters;

// Bidirectional mapping using vectors since caller_ids are sequential
// Maybe not efficient? Should be good for small number of edges though
std::unordered_map<std::string, std::vector<bool>> m_endpoint_consumers; // endpoint_id -> vector[caller_id] for consuemrs
std::unordered_map<std::string, std::vector<bool>> m_endpoint_producers; // endpoint_id -> vector[caller_id] for producers

// Quick lookup for caller's endpoints
std::vector< std::unordered_set<std::string> > m_consumer_endpoints; // caller_id -> set of endpoints they consume from
std::vector< std::unordered_set<std::string> > m_producer_endpoints; // caller_id -> set of endpoints they produce to
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> m_work_guard;
std::unordered_map<std::string, std::unique_ptr<WebsocketEndpoint>> m_endpoints;
std::unordered_map<std::string, EndpointConfig> m_endpoint_configs;
std::vector<ClientInputAdapter*> m_inputAdapters;
std::vector<ClientOutputAdapter*> m_outputAdapters;
bool m_dynamic;
};

}
#endif
24 changes: 24 additions & 0 deletions cpp/csp/python/Conversions.h
Original file line number Diff line number Diff line change
@@ -666,6 +666,30 @@ inline Dictionary fromPython( PyObject * o )
return out;
}

template<>
inline std::vector<Dictionary> fromPython(PyObject* o)
{
if (!PyList_Check(o))
CSP_THROW(TypeError, "List of dictionaries conversion expected type list got " << Py_TYPE(o)->tp_name);

Py_ssize_t size = PyList_GET_SIZE(o);
std::vector<Dictionary> out;
out.reserve(size);

for (Py_ssize_t i = 0; i < size; ++i)
{
PyObject* item = PyList_GET_ITEM(o, i);

// Skip None values like in Dictionary conversion
if (item == Py_None)
continue;

out.emplace_back(fromPython<Dictionary>(item));
}

return out;
}

template<>
inline std::vector<Dictionary::Data> fromPython( PyObject * o )
{
37 changes: 36 additions & 1 deletion cpp/csp/python/adapters/websocketadapterimpl.cpp
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@
#include <csp/python/PyInputAdapterWrapper.h>
#include <csp/python/PyOutputAdapterWrapper.h>

#include <iostream>
using namespace csp::adapters::websocket;

namespace csp::python
@@ -45,7 +46,11 @@ static OutputAdapter * create_websocket_output_adapter( csp::AdapterManager * ma
auto * websocketManager = dynamic_cast<ClientAdapterManager*>( manager );
if( !websocketManager )
CSP_THROW( TypeError, "Expected WebsocketClientAdapterManager" );
return websocketManager -> getOutputAdapter();
PyObject * pyProperties;
if( !PyArg_ParseTuple( args, "O!",
&PyDict_Type, &pyProperties ) )
CSP_THROW( PythonPassthrough, "" );
return websocketManager -> getOutputAdapter(fromPython<Dictionary>( pyProperties ));
}

static OutputAdapter * create_websocket_header_update_adapter( csp::AdapterManager * manager, PyEngine * pyengine, PyObject * args )
@@ -56,10 +61,40 @@ static OutputAdapter * create_websocket_header_update_adapter( csp::AdapterManag
return websocketManager -> getHeaderUpdateAdapter();
}

static OutputAdapter * create_websocket_connection_request_adapter( csp::AdapterManager * manager, PyEngine * pyengine, PyObject * args )
{
// std::cout << "hereeeee33ee" << "\n";
PyObject * pyProperties;
// PyObject * type;
auto * websocketManager = dynamic_cast<ClientAdapterManager*>( manager );
if( !websocketManager )
CSP_THROW( TypeError, "Expected WebsocketClientAdapterManager" );

if( !PyArg_ParseTuple( args, "O!",
&PyDict_Type, &pyProperties ) )
CSP_THROW( PythonPassthrough, "" );
// std::cout << "hereeeee334444ee" << "\n";
return websocketManager -> getConnectionRequestAdapter( fromPython<Dictionary>( pyProperties ) );


// TODO
// Here I think we should have a websocket connection manager
// that will handle the connections and endpoint management
// It will create the connection request output adapter
// That output adapter, when it ticks, with a list of python Dictionary
// will then use the boost beast 'post' function to schedule, on the
// io context, a callback to process that dict (on the websocket connection manager!!!) and handle the endpoint manipulation appropriately

// that websocket connection manager will run the thread with the io context
// being run. Move it away from clientAdapterManager
}

REGISTER_ADAPTER_MANAGER( _websocket_adapter_manager, create_websocket_adapter_manager );
REGISTER_INPUT_ADAPTER( _websocket_input_adapter, create_websocket_input_adapter );
REGISTER_OUTPUT_ADAPTER( _websocket_output_adapter, create_websocket_output_adapter );
REGISTER_OUTPUT_ADAPTER( _websocket_header_update_adapter, create_websocket_header_update_adapter);
REGISTER_OUTPUT_ADAPTER( _websocket_connection_request_adapter, create_websocket_connection_request_adapter);


static PyModuleDef _websocketadapterimpl_module = {
PyModuleDef_HEAD_INIT,
6 changes: 6 additions & 0 deletions csp/adapters/dynamic_adapter_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from pydantic import BaseModel, NonNegativeInt


class AdapterInfo(BaseModel):
caller_id: NonNegativeInt
is_subscribe: bool
157 changes: 144 additions & 13 deletions csp/adapters/websocket.py
Original file line number Diff line number Diff line change
@@ -21,8 +21,10 @@
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef
from csp.lib import _websocketadapterimpl

from .websocket_types import WebsocketHeaderUpdate
from .dynamic_adapter_utils import AdapterInfo
from .websocket_types import ActionType, ConnectionRequest, WebsocketHeaderUpdate, WebsocketStatus # noqa

# InternalConnectionRequest,
_ = (
BytesMessageProtoMapper,
DateTimeType,
@@ -59,6 +61,12 @@ def diff_dict(old, new):
return d


def _sanitize_port(uri: str, port):
if port:
return str(port)
return "443" if uri.startswith("wss") else "80"


class TableManager:
def __init__(self, tables, delta_updates):
self._tables = tables
@@ -237,7 +245,7 @@ def on_close(self):
self._manager.unsubscribe(self)

def on_message(self, message):
logging.info("got message %r", message)
logging.warning("got message %r", message)
# TODO Ignore for now
# parsed = rapidjson.loads(message)

@@ -387,12 +395,34 @@ def _instantiate(self):
_launch_application(self._port, manager, csp.const("stub"))


# Maybe, we can have the Adapter manager have all the connections
# If we get a new connection request, we include that adapter for the
# subscriptions. When we pop it, we remove it.
# Then, each edge will effectively be independent.
# Maybe. have each websocket push to a shared queue, then from there we
# pass it along to all edges ("input adapters") that are subscribed to it

# Ok, maybe, let's keep it at just 1 subscribe and send call.
# However, we can subscribe to the send and subscribe calls separately.
# We just have to keep track of the Endpoints we have, and


class WebsocketAdapterManager:
"""
Can subscribe dynamically via ts[List[ConnectionRequest]]
We use a ts[List[ConnectionRequest]] to allow users to submit a batch of conneciton requests in
a single engine cycle.
"""

def __init__(
self,
uri: str,
uri: Optional[str] = None,
reconnect_interval: timedelta = timedelta(seconds=2),
headers: Dict[str, str] = None,
headers: Optional[Dict[str, str]] = None,
dynamic: bool = False,
connection_request: Optional[ConnectionRequest] = None,
num_threads: int = 1,
):
"""
uri: str
@@ -401,26 +431,83 @@ def __init__(
time interval to wait before trying to reconnect (must be >= 1 second)
headers: Dict[str, str] = None
headers to apply to the request during the handshake
dynamic: bool = False
Whether we accept dynamically altering the connections via ConnectionRequest objects.
num_threads: int = 1
Determines number of threads to allocate for running the websocket endpoints.
Defaults to 1 to avoid thread switching
"""

self._properties = dict(dynamic=dynamic, num_threads=num_threads)
# Enumerating for clarity
if connection_request is not None and uri is not None:
raise ValueError("'connection_request' cannot be set along with 'uri'")

# Exactly 1 of connection_request and uri is None
if connection_request is not None or uri is not None:
if connection_request is None:
connection_request = ConnectionRequest(
uri=uri, reconnect_interval=reconnect_interval, headers=headers or {}
)
self._properties.update(self._get_properties(connection_request))

# This is a counter that will be used to identify every function call
# We keep track of the subscribes and sends separately
self._subscribe_call_id = 0
self._send_call_id = 0

# This maps types to their wrapper structs
self._wrapper_struct_dict = {}

@property
def _dynamic(self):
return self._properties.get("dynamic", False)

def _get_properties(self, conn_request: ConnectionRequest) -> dict:
uri = conn_request.uri
reconnect_interval = conn_request.reconnect_interval

assert reconnect_interval >= timedelta(seconds=1)
resp = urllib.parse.urlparse(uri)
if resp.hostname is None:
raise ValueError(f"Failed to parse host from URI: {uri}")

self._properties = dict(
res = dict(
host=resp.hostname,
# if no port is explicitly present in the uri, the resp.port is None
port=self._sanitize_port(uri, resp.port),
port=_sanitize_port(uri, resp.port),
route=resp.path or "/", # resource shouldn't be empty string
use_ssl=uri.startswith("wss"),
reconnect_interval=reconnect_interval,
headers=headers if headers else {},
headers=conn_request.headers,
persistent=conn_request.persistent,
action=conn_request.action.name,
on_connect_payload=conn_request.on_connect_payload,
uri=uri,
dynamic=self._dynamic,
)
return res

def _sanitize_port(self, uri: str, port):
if port:
return str(port)
return "443" if uri.startswith("wss") else "80"
def _get_caller_id(self, is_subscribe: bool) -> int:
if is_subscribe:
caller_id = self._subscribe_call_id
self._subscribe_call_id += 1
else:
caller_id = self._send_call_id
self._send_call_id += 1
return caller_id

def get_wrapper_struct(self, ts_type: type):
if (dynamic_type := self._wrapper_struct_dict.get(ts_type)) is None:
# I want to preserve type information
# Not sure a better way to do this
class CustomWrapperStruct(csp.Struct):
msg: ts_type # noqa
uri: str

dynamic_type = CustomWrapperStruct
self._wrapper_struct_dict[ts_type] = dynamic_type
return dynamic_type

def subscribe(
self,
@@ -429,7 +516,27 @@ def subscribe(
field_map: Union[dict, str] = None,
meta_field_map: dict = None,
push_mode: csp.PushMode = csp.PushMode.NON_COLLAPSING,
connection_request: Optional[ts[List[ConnectionRequest]]] = None,
):
"""If dynamic is True, this will tick a custom WrapperStruct,
with 'msg' as the correct type of the message.
And 'uri' that specifies the 'uri' the message comes from.
Otherwise, returns just message.
ts_type should be original type!! The tuple wrapping happens
automatically
"""
caller_id = self._get_caller_id(is_subscribe=True)
# Gives validation, more to start defining a common interface
adapter_props = AdapterInfo(caller_id=caller_id, is_subscribe=True).model_dump()
connection_request = csp.null_ts(List[ConnectionRequest]) if connection_request is None else connection_request
request_dict = csp.apply(
connection_request, lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs], list
)
# Output adapter to handle connection requests
_websocket_connection_request_adapter_def(self, request_dict, adapter_props)

field_map = field_map or {}
meta_field_map = meta_field_map or {}
if isinstance(field_map, str):
@@ -442,12 +549,26 @@ def subscribe(
properties["field_map"] = field_map
properties["meta_field_map"] = meta_field_map

properties.update(adapter_props)
# We wrap the message in a struct to note the url it comes from
if self._dynamic:
ts_type = self.get_wrapper_struct(ts_type=ts_type)
return _websocket_input_adapter_def(self, ts_type, properties, push_mode=push_mode)

def send(self, x: ts["T"]):
return _websocket_output_adapter_def(self, x)
def send(self, x: ts["T"], connection_request: Optional[ts[List[ConnectionRequest]]] = None):
caller_id = self._get_caller_id(is_subscribe=False)
# Gives validation, more to start defining a common interface
adapter_props = AdapterInfo(caller_id=caller_id, is_subscribe=False).model_dump()
connection_request = csp.null_ts(List[ConnectionRequest]) if connection_request is None else connection_request
request_dict = csp.apply(
connection_request, lambda conn_reqs: [self._get_properties(conn_req) for conn_req in conn_reqs], list
)
_websocket_connection_request_adapter_def(self, request_dict, adapter_props)
return _websocket_output_adapter_def(self, x, adapter_props)

def update_headers(self, x: ts[List[WebsocketHeaderUpdate]]):
if self._dynamic:
raise ValueError("If dynamic, cannot call update_headers")
return _websocket_header_update_adapter_def(self, x)

def status(self, push_mode=csp.PushMode.NON_COLLAPSING):
@@ -456,6 +577,7 @@ def status(self, push_mode=csp.PushMode.NON_COLLAPSING):

def _create(self, engine, memo):
"""method needs to return the wrapped c++ adapter manager"""
self._properties.update({"subscribe_calls": self._subscribe_call_id, "send_calls": self._send_call_id})
return _websocketadapterimpl._websocket_adapter_manager(engine, self._properties)


@@ -473,6 +595,7 @@ def _create(self, engine, memo):
_websocketadapterimpl._websocket_output_adapter,
WebsocketAdapterManager,
input=ts["T"],
properties=dict,
)

_websocket_header_update_adapter_def = output_adapter_def(
@@ -481,3 +604,11 @@ def _create(self, engine, memo):
WebsocketAdapterManager,
input=ts[List[WebsocketHeaderUpdate]],
)

_websocket_connection_request_adapter_def = output_adapter_def(
"websocket_connection_request_adapter",
_websocketadapterimpl._websocket_connection_request_adapter,
WebsocketAdapterManager,
input=ts[list], # needed, List[dict] didn't work on c++ level
properties=dict,
)
19 changes: 19 additions & 0 deletions csp/adapters/websocket_types.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from datetime import timedelta
from typing import Dict

from csp.impl.enum import Enum
from csp.impl.struct import Struct

@@ -12,6 +15,22 @@ class WebsocketStatus(Enum):
MESSAGE_SEND_FAIL = 4


class ActionType(Enum):
CONNECT = 0
DISCONNECT = 1
PING = 2


class WebsocketHeaderUpdate(Struct):
key: str
value: str


class ConnectionRequest(Struct):
uri: str
action: ActionType = ActionType.CONNECT # Connect, Disconnect, Ping, etc
# Whetehr we maintain the connection
persistent: bool = True # Only relevant for Connect requests
reconnect_interval: timedelta = timedelta(seconds=2)
on_connect_payload: str = "" # message to send on connect
headers: Dict[str, str] = {}
481 changes: 444 additions & 37 deletions csp/tests/adapters/test_websocket.py

Large diffs are not rendered by default.