-
Notifications
You must be signed in to change notification settings - Fork 65
/
test_asyncio.py
109 lines (82 loc) · 3.01 KB
/
test_asyncio.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python
#
# This file is part of pySerial-asyncio - Cross platform serial port support for Python
# (C) 2016 pySerial-team
#
# SPDX-License-Identifier: BSD-3-Clause
"""\
Test asyncio related functionality.
To run from the command line with a specific port with a loop-back,
device connected, use:
$ cd pyserial-asyncio
$ python -m test.test_asyncio SERIALDEVICE
"""
import os
import unittest
import asyncio
import serial_asyncio
HOST = '127.0.0.1'
_PORT = 8888
# on which port should the tests be performed:
PORT = 'socket://%s:%s' % (HOST, _PORT)
@unittest.skipIf(os.name != 'posix', "asyncio not supported on platform")
class Test_asyncio(unittest.TestCase):
"""Test asyncio related functionality"""
def setUp(self):
self.loop = asyncio.get_event_loop()
# create a closed serial port
def tearDown(self):
self.loop.close()
def test_asyncio(self):
TEXT = b'Hello, World!\n'
received = []
actions = []
done = asyncio.Event()
class Input(asyncio.Protocol):
def __init__(self):
super().__init__()
self._transport = None
def connection_made(self, transport):
self._transport = transport
def data_received(self, data):
self._transport.write(data)
class Output(asyncio.Protocol):
def __init__(self):
super().__init__()
self._transport = None
def connection_made(self, transport):
self._transport = transport
actions.append('open')
transport.write(TEXT)
def data_received(self, data):
received.append(data)
if b'\n' in data:
self._transport.close()
def connection_lost(self, exc):
actions.append('close')
done.set()
def pause_writing(self):
actions.append('pause')
print(self._transport.get_write_buffer_size())
def resume_writing(self):
actions.append('resume')
print(self._transport.get_write_buffer_size())
if PORT.startswith('socket://'):
coro = self.loop.create_server(Input, HOST, _PORT)
self.loop.run_until_complete(coro)
client = serial_asyncio.create_serial_connection(self.loop, Output, PORT)
self.loop.run_until_complete(client)
self.loop.run_until_complete(done.wait())
pending = asyncio.all_tasks(self.loop)
self.loop.run_until_complete(asyncio.gather(*pending))
self.assertEqual(b''.join(received), TEXT)
self.assertEqual(actions, ['open', 'close'])
if __name__ == '__main__':
import sys
sys.stdout.write(__doc__)
if len(sys.argv) > 1:
PORT = sys.argv[1]
sys.stdout.write("Testing port: %r\n" % PORT)
sys.argv[1:] = ['-v']
# When this module is executed from the command-line, it runs all its tests
unittest.main()