-
Notifications
You must be signed in to change notification settings - Fork 0
/
uart.py
120 lines (93 loc) · 2.76 KB
/
uart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# -*- coding:utf-8 -*-
#!/usr/bin/python
import sys
import time
import re
import thread
import serial
MODULE = '[UART]'
class Uart():
def __init__(self, log, name, recv_hook, status_change_hook):
self.log = log
self.log.write('%s init uart for %s', MODULE, name)
self.name = name
self.uart_handle = serial.Serial()
self.recv_hook = recv_hook
self.status_chagne_hook = status_change_hook
self.connected = False
# self.uart_check_thread = thread.start_new_thread(self.uart_check, (None, ))
self.uart_recv_thread = thread.start_new_thread(self.auto_recv, (None, ))
def __exit__(self):
self.uart_handle.close()
def open(self, port, baudrate, bytesize, parity, stopbits, timeout):
# pattern = re.compile(r'^COM\d')
# pattern.search(port)
if self.uart_handle.isOpen():
self.uart_handle.close()
self.log.write('%s open uart port %s: %d, %d, %s, %d, readtimeout %d sec',
MODULE, port, baudrate, bytesize, parity, stopbits, timeout)
self.uart_handle.port = port
self.uart_handle.baudrate = baudrate
self.uart_handle.bytesize = bytesize
self.uart_handle.parity = parity
self.uart_handle.stopbits = stopbits
# self.uart_handle.timeout = timeout
try:
self.uart_handle.open()
time.sleep(0.1)
except:
self.log.write('%s can not open uart port %s', MODULE, port)
self.status_chagne_hook(self.name, 'DISCONNECT')
self.connected = False
return
# self.uart_handle.flushInput()
self.uart_handle.flushOutput()
self.log.write('%s open uart port %s: OK', MODULE, port)
self.status_chagne_hook(self.name, 'CONNECT')
self.connected = True
def close(self):
if self.uart_handle.isOpen():
self.uart_handle.close()
self.log.write('%s close uart port %s: OK', MODULE, self.uart_handle.port)
self.status_chagne_hook(self.name, 'DISCONNECT')
self.connected = False
def auto_recv(self, x):
while True:
# print self.uart_handle.isOpen()
if self.uart_handle.isOpen():
try:
data = self.uart_handle.readline()
# print 'get data<' + data + '>'
# print len(data)
if data:
self.recv_hook(data)
except:
pass
def send(self, data):
if self.uart_handle.isOpen():
# self.uart_handle.writelines(data)
# print 'send data<' + data + '>'
# print len(data)
# self.uart_handle.write(data)
try:
self.uart_handle.writelines(data)
except:
pass
else:
return False
def uart_check(self, x):
while True:
# if self.connected and (not self.uart_handle.isOpen()):
# self.status_chagne_hook(self.name, 'DISCONNECT')
if self.connected:
try:
self.uart_handle.writelines('AT+ALIVE\n')
except:
self.status_chagne_hook(self.name, 'DISCONNECT')
self.connected = False
try:
time.sleep(1)
except:
pass
if __name__ == '__main__':
pass