Skip to content

Commit

Permalink
Use aiohttp's WebSocket client (Fixes miguelgrinberg/python-socketio#324
Browse files Browse the repository at this point in the history
)
  • Loading branch information
miguelgrinberg committed Nov 24, 2019
1 parent e38daad commit 280aa0f
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 118 deletions.
53 changes: 21 additions & 32 deletions engineio/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
except ImportError: # pragma: no cover
aiohttp = None
import six
try:
import websockets
except ImportError: # pragma: no cover
websockets = None

from . import client
from . import exceptions
Expand Down Expand Up @@ -227,8 +223,8 @@ async def _connect_polling(self, url, headers, engineio_path):

async def _connect_websocket(self, url, headers, engineio_path):
"""Establish or upgrade to a WebSocket connection with the server."""
if websockets is None: # pragma: no cover
self.logger.error('websockets package not installed')
if aiohttp is None: # pragma: no cover
self.logger.error('aiohttp package not installed')
return False
websocket_url = self._get_engineio_url(url, engineio_path,
'websocket')
Expand All @@ -243,30 +239,20 @@ async def _connect_websocket(self, url, headers, engineio_path):
self.logger.info(
'Attempting WebSocket connection to ' + websocket_url)

# get the cookies from the long-polling connection so that they can
# also be sent the the WebSocket route
cookies = None
if self.http:
cookies = '; '.join(["{}={}".format(cookie.key, cookie.value)
for cookie in self.http._cookie_jar])
headers = headers.copy()
headers['Cookie'] = cookies

try:
if not self.ssl_verify:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
ws = await websockets.connect(
ws = await self.http.ws_connect(
websocket_url + self._get_url_timestamp(),
extra_headers=headers, ssl=ssl_context)
headers=headers, ssl=ssl_context)
else:
ws = await websockets.connect(
ws = await self.http.ws_connect(
websocket_url + self._get_url_timestamp(),
extra_headers=headers)
except (websockets.exceptions.InvalidURI,
websockets.exceptions.InvalidHandshake,
OSError):
headers=headers)
except (aiohttp.client_exceptions.WSServerHandshakeError,
aiohttp.client_exceptions.ServerConnectionError):
if upgrade:
self.logger.warning(
'WebSocket upgrade failed: connection error')
Expand All @@ -277,14 +263,14 @@ async def _connect_websocket(self, url, headers, engineio_path):
p = packet.Packet(packet.PING, data='probe').encode(
always_bytes=False)
try:
await ws.send(p)
await ws.send_str(p)
except Exception as e: # pragma: no cover
self.logger.warning(
'WebSocket upgrade failed: unexpected send exception: %s',
str(e))
return False
try:
p = await ws.recv()
p = (await ws.receive()).data
except Exception as e: # pragma: no cover
self.logger.warning(
'WebSocket upgrade failed: unexpected recv exception: %s',
Expand All @@ -297,19 +283,17 @@ async def _connect_websocket(self, url, headers, engineio_path):
return False
p = packet.Packet(packet.UPGRADE).encode(always_bytes=False)
try:
await ws.send(p)
await ws.send_str(p)
except Exception as e: # pragma: no cover
self.logger.warning(
'WebSocket upgrade failed: unexpected send exception: %s',
str(e))
return False
self.current_transport = 'websocket'
if self.http: # pragma: no cover
await self.http.close()
self.logger.info('WebSocket upgrade was successful')
else:
try:
p = await ws.recv()
p = (await ws.receive()).data
except Exception as e: # pragma: no cover
raise exceptions.ConnectionError(
'Unexpected recv exception: ' + str(e))
Expand Down Expand Up @@ -495,8 +479,8 @@ async def _read_loop_websocket(self):
while self.state == 'connected':
p = None
try:
p = await self.ws.recv()
except websockets.exceptions.ConnectionClosed:
p = (await self.ws.receive()).data
except aiohttp.client_exceptions.ServerDisconnectedError:
self.logger.info(
'Read loop: WebSocket connection was closed, aborting')
await self.queue.put(None)
Expand Down Expand Up @@ -579,9 +563,14 @@ async def _write_loop(self):
# websocket
try:
for pkt in packets:
await self.ws.send(pkt.encode(always_bytes=False))
if pkt.binary:
await self.ws.send_bytes(pkt.encode(
always_bytes=False))
else:
await self.ws.send_str(pkt.encode(
always_bytes=False))
self.queue.task_done()
except websockets.exceptions.ConnectionClosed:
except aiohttp.client_exceptions.ServerDisconnectedError:
self.logger.info(
'Write loop: WebSocket connection was closed, '
'aborting')
Expand Down
4 changes: 2 additions & 2 deletions examples/client/asyncio/latency_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ async def on_message(data):
await send_ping()


async def start_server():
async def start_client():
await eio.connect('http://localhost:5000')
await eio.wait()


if __name__ == '__main__':
loop.run_until_complete(start_server())
loop.run_until_complete(start_client())
4 changes: 2 additions & 2 deletions examples/client/asyncio/simple_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ def signal_handler(sig, frame):
print('exiting')


async def start_server():
async def start_client():
await eio.connect('http://localhost:5000')
await eio.wait()


if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
loop.run_until_complete(start_server())
loop.run_until_complete(start_client())
1 change: 0 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
],
'asyncio_client': [
'aiohttp>=3.4',
'websockets>=7.0',
]
},
tests_require=[
Expand Down
Loading

0 comments on commit 280aa0f

Please sign in to comment.