Skip to content

Commit c98eef3

Browse files
committed
Include websocket tests by default
Signed-off-by: Nijat K <[email protected]>
1 parent d70dce8 commit c98eef3

9 files changed

+110
-118
lines changed

cpp/csp/adapters/websocket/CMakeLists.txt

-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )
22

33
set(WS_CLIENT_HEADER_FILES
4-
WebsocketClientTypes.h
54
ClientAdapterManager.h
65
ClientInputAdapter.h
76
ClientOutputAdapter.h
@@ -13,7 +12,6 @@ set(WS_CLIENT_HEADER_FILES
1312
)
1413

1514
set(WS_CLIENT_SOURCE_FILES
16-
WebsocketClientTypes.cpp
1715
ClientAdapterManager.cpp
1816
ClientInputAdapter.cpp
1917
ClientOutputAdapter.cpp

cpp/csp/adapters/websocket/ClientAdapterManager.h

-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33

44
#include <csp/adapters/websocket/WebsocketEndpoint.h>
55
#include <csp/adapters/websocket/WebsocketEndpointManager.h>
6-
#include <csp/adapters/websocket/WebsocketClientTypes.h>
76
#include <csp/adapters/websocket/ClientInputAdapter.h>
87
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
98
#include <csp/core/Enum.h>

cpp/csp/adapters/websocket/WebsocketClientTypes.cpp

-13
This file was deleted.

cpp/csp/adapters/websocket/WebsocketClientTypes.h

-26
This file was deleted.

cpp/csp/adapters/websocket/WebsocketEndpointManager.cpp

+23-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
#include <csp/adapters/websocket/WebsocketEndpointManager.h>
22

3+
namespace csp {
4+
5+
INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
6+
"ACTIVE",
7+
"GENERIC_ERROR",
8+
"CONNECTION_FAILED",
9+
"CLOSED",
10+
"MESSAGE_SEND_FAIL",
11+
);
12+
13+
}
314
namespace csp::adapters::websocket {
415

516
WebsocketEndpointManager::WebsocketEndpointManager( ClientAdapterManager* mgr, const Dictionary & properties, Engine* engine )
@@ -96,7 +107,8 @@ void WebsocketEndpointManager::shutdownEndpoint(const std::string& endpoint_id)
96107
m_endpoints.erase(endpoint_it);
97108
std::stringstream ss;
98109
ss << "No more connections for endpoint={" << endpoint_id << "} Shutting down...";
99-
m_mgr -> pushStatus(StatusLevel::INFO, ClientStatusType::CLOSED, ss.str());
110+
std::string msg = ss.str();
111+
m_mgr -> pushStatus(StatusLevel::INFO, ClientStatusType::CLOSED, msg);
100112
}
101113

102114
void WebsocketEndpointManager::setupEndpoint(const std::string& endpoint_id,
@@ -135,9 +147,10 @@ void WebsocketEndpointManager::setupEndpoint(const std::string& endpoint_id,
135147
// should only happen if persist is False
136148
if ( !payload.empty() )
137149
endpoint -> send(payload);
138-
139-
m_mgr -> pushStatus(StatusLevel::INFO, ClientStatusType::ACTIVE,
140-
"Connected successfully for endpoint={" + endpoint_id +"}");
150+
std::stringstream ss;
151+
ss << "Connected successfully for endpoint={" << endpoint_id << "}";
152+
std::string msg = ss.str();
153+
m_mgr -> pushStatus(StatusLevel::INFO, ClientStatusType::ACTIVE, msg);
141154
// We remove the caller id, if it was the only one, then we shut down the endpoint
142155
if( !persist )
143156
removeEndpointForCallerId(endpoint_id, is_consumer, validated_id);
@@ -170,8 +183,9 @@ void WebsocketEndpointManager::setupEndpoint(const std::string& endpoint_id,
170183
stored_endpoint -> setOnSendFail(
171184
[ this, endpoint_id ]( const std::string& s ) {
172185
std::stringstream ss;
173-
ss << "Error: " << s << " for " << endpoint_id;
174-
m_mgr -> pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
186+
ss << "Error: " << s << " for endpoint={" << endpoint_id << "}";
187+
std::string msg = ss.str();
188+
m_mgr -> pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, msg );
175189
}
176190
);
177191
stored_endpoint -> run();
@@ -214,10 +228,11 @@ void WebsocketEndpointManager::handleEndpointFailure(const std::string& endpoint
214228

215229
std::stringstream ss;
216230
ss << "Connection Failure for endpoint={" << endpoint_id << "} Due to: " << reason;
231+
std::string msg = ss.str();
217232
if ( status_type == ClientStatusType::CLOSED || status_type == ClientStatusType::ACTIVE )
218-
m_mgr -> pushStatus(StatusLevel::INFO, status_type, ss.str());
233+
m_mgr -> pushStatus(StatusLevel::INFO, status_type, msg);
219234
else{
220-
m_mgr -> pushStatus(StatusLevel::ERROR, status_type, ss.str());
235+
m_mgr -> pushStatus(StatusLevel::ERROR, status_type, msg);
221236
}
222237
};
223238

cpp/csp/adapters/websocket/WebsocketEndpointManager.h

+19-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
#define WEBSOCKET_ENDPOINT_MANAGER_H
33

44
#include <boost/asio.hpp>
5-
#include <csp/adapters/websocket/WebsocketClientTypes.h>
65
#include <csp/adapters/websocket/WebsocketEndpoint.h>
76
#include <csp/adapters/websocket/ClientAdapterManager.h>
87
#include <csp/adapters/websocket/ClientInputAdapter.h>
@@ -70,6 +69,25 @@ struct EndpointCallbacks {
7069
std::function<void(const std::string&, void*, size_t)> onMessage;
7170
};
7271

72+
struct WebsocketClientStatusTypeTraits
73+
{
74+
enum _enum : unsigned char
75+
{
76+
ACTIVE = 0,
77+
GENERIC_ERROR = 1,
78+
CONNECTION_FAILED = 2,
79+
CLOSED = 3,
80+
MESSAGE_SEND_FAIL = 4,
81+
82+
NUM_TYPES
83+
};
84+
85+
protected:
86+
_enum m_value;
87+
};
88+
89+
using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
90+
7391
class WebsocketEndpointManager {
7492
public:
7593
explicit WebsocketEndpointManager(ClientAdapterManager* mgr, const Dictionary & properties, Engine* engine);

csp/adapters/websocket.py

+5-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import csp
1010
from csp import ts
11+
from csp.adapters.dynamic_adapter_utils import AdapterInfo
1112
from csp.adapters.status import Status
1213
from csp.adapters.utils import (
1314
BytesMessageProtoMapper,
@@ -17,20 +18,20 @@
1718
RawBytesMessageMapper,
1819
RawTextMessageMapper,
1920
)
21+
from csp.adapters.websocket_types import ActionType, ConnectionRequest, WebsocketHeaderUpdate, WebsocketStatus
2022
from csp.impl.wiring import input_adapter_def, output_adapter_def, status_adapter_def
2123
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef
2224
from csp.lib import _websocketadapterimpl
2325

24-
from .dynamic_adapter_utils import AdapterInfo
25-
from .websocket_types import ActionType, ConnectionRequest, WebsocketHeaderUpdate, WebsocketStatus # noqa
26-
2726
# InternalConnectionRequest,
2827
_ = (
28+
ActionType,
2929
BytesMessageProtoMapper,
3030
DateTimeType,
3131
JSONTextMessageMapper,
3232
RawBytesMessageMapper,
3333
RawTextMessageMapper,
34+
WebsocketStatus,
3435
)
3536
T = TypeVar("T")
3637

@@ -577,7 +578,7 @@ def update_headers(self, x: ts[List[WebsocketHeaderUpdate]]):
577578

578579
def status(self, push_mode=csp.PushMode.NON_COLLAPSING):
579580
ts_type = Status
580-
return status_adapter_def(self, ts_type, push_mode=push_mode)
581+
return status_adapter_def(self, ts_type, push_mode)
581582

582583
def _create(self, engine, memo):
583584
"""method needs to return the wrapped c++ adapter manager"""

csp/tests/adapters/test_websocket.py

+62-63
Original file line numberDiff line numberDiff line change
@@ -2,77 +2,76 @@
22
import pytest
33
import pytz
44
import threading
5+
import tornado.ioloop
6+
import tornado.web
7+
import tornado.websocket
58
from contextlib import contextmanager
69
from datetime import datetime, timedelta
710
from typing import List, Optional, Type
811

912
import csp
1013
from csp import ts
11-
12-
if os.environ.get("CSP_TEST_WEBSOCKET"):
13-
import tornado.ioloop
14-
import tornado.web
15-
import tornado.websocket
16-
17-
from csp.adapters.websocket import (
18-
ActionType,
19-
ConnectionRequest,
20-
JSONTextMessageMapper,
21-
RawTextMessageMapper,
22-
Status,
23-
WebsocketAdapterManager,
24-
WebsocketHeaderUpdate,
25-
WebsocketStatus,
26-
)
27-
28-
class EchoWebsocketHandler(tornado.websocket.WebSocketHandler):
29-
def on_message(self, msg):
30-
# Carve-out to allow inspecting the headers
31-
if msg == "header1":
32-
msg = self.request.headers.get(msg, "")
33-
elif not isinstance(msg, str) and msg.decode("utf-8") == "header1":
34-
# Need this for bytes
35-
msg = self.request.headers.get("header1", "")
36-
return self.write_message(msg)
37-
38-
@contextmanager
39-
def create_tornado_server(port: int):
40-
"""Base context manager for creating a Tornado server in a thread"""
41-
ready_event = threading.Event()
42-
io_loop = None
43-
app = None
44-
io_thread = None
45-
46-
def run_io_loop():
47-
nonlocal io_loop, app
48-
io_loop = tornado.ioloop.IOLoop()
49-
io_loop.make_current()
50-
app = tornado.web.Application([(r"/", EchoWebsocketHandler)])
51-
app.listen(port)
52-
ready_event.set()
53-
io_loop.start()
54-
55-
io_thread = threading.Thread(target=run_io_loop)
56-
io_thread.start()
57-
ready_event.wait()
58-
59-
try:
60-
yield io_loop, app, io_thread
61-
finally:
62-
io_loop.add_callback(io_loop.stop)
63-
if io_thread:
64-
io_thread.join(timeout=5)
65-
if io_thread.is_alive():
66-
raise RuntimeError("IOLoop failed to stop")
67-
68-
@contextmanager
69-
def tornado_server(port: int = 8001):
70-
"""Simplified context manager that uses the base implementation"""
71-
with create_tornado_server(port) as (_io_loop, _app, _io_thread):
72-
yield
14+
from csp.adapters.websocket import (
15+
ActionType,
16+
ConnectionRequest,
17+
JSONTextMessageMapper,
18+
RawTextMessageMapper,
19+
Status,
20+
WebsocketAdapterManager,
21+
WebsocketHeaderUpdate,
22+
WebsocketStatus,
23+
)
24+
25+
26+
class EchoWebsocketHandler(tornado.websocket.WebSocketHandler):
27+
def on_message(self, msg):
28+
# Carve-out to allow inspecting the headers
29+
if msg == "header1":
30+
msg = self.request.headers.get(msg, "")
31+
elif not isinstance(msg, str) and msg.decode("utf-8") == "header1":
32+
# Need this for bytes
33+
msg = self.request.headers.get("header1", "")
34+
return self.write_message(msg)
35+
36+
37+
@contextmanager
38+
def create_tornado_server(port: int):
39+
"""Base context manager for creating a Tornado server in a thread"""
40+
ready_event = threading.Event()
41+
io_loop = None
42+
app = None
43+
io_thread = None
44+
45+
def run_io_loop():
46+
nonlocal io_loop, app
47+
io_loop = tornado.ioloop.IOLoop()
48+
io_loop.make_current()
49+
app = tornado.web.Application([(r"/", EchoWebsocketHandler)])
50+
app.listen(port)
51+
ready_event.set()
52+
io_loop.start()
53+
54+
io_thread = threading.Thread(target=run_io_loop)
55+
io_thread.start()
56+
ready_event.wait()
57+
58+
try:
59+
yield io_loop, app, io_thread
60+
finally:
61+
io_loop.add_callback(io_loop.stop)
62+
if io_thread:
63+
io_thread.join(timeout=5)
64+
if io_thread.is_alive():
65+
raise RuntimeError("IOLoop failed to stop")
66+
67+
68+
@contextmanager
69+
def tornado_server(port: int = 8001):
70+
"""Simplified context manager that uses the base implementation"""
71+
with create_tornado_server(port) as (_io_loop, _app, _io_thread):
72+
yield
7373

7474

75-
@pytest.mark.skipif(os.environ.get("CSP_TEST_WEBSOCKET") is None, reason="'CSP_TEST_WEBSOCKET' env variable is not set")
7675
class TestWebsocket:
7776
@pytest.fixture(scope="class", autouse=True)
7877
def setup_tornado(self, request):

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ develop = [
8686
"sqlalchemy", # db
8787
"threadpoolctl", # test_random
8888
"tornado", # profiler, perspective, websocket
89+
"python-rapidjson", # websocket
8990
# type checking
9091
"pydantic>=2",
9192
]

0 commit comments

Comments
 (0)