forked from AbateLab/Pump-Control-Program
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnew_era.py
140 lines (121 loc) · 3.88 KB
/
new_era.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import serial
def find_pumps(ser,tot_range=10):
pumps = []
print pumps
for i in range(tot_range):
ser.write('%iADR\x0D'%i)
output = ser.readline()
print "writing to address: " + str(i)
#output = ser.readline()
if len(output)>0:
pumps.append(i)
print pumps
return pumps
def run_all(ser):
cmd = '*RUN\x0D'
ser.write(cmd)
#output = ser.readline()
#if '?' in output: print cmd.strip()+' from run_all not understood'
def stop_all(ser):
cmd = '*STP\x0D'
ser.write(cmd)
#output = ser.readline()
#if '?' in output: print cmd.strip()+' from stop_all not understood'
#custom function
def run_pump(ser,pump):
cmd = '%iRUN\x0D'%pump
ser.write(cmd)
#output = ser.readline()
#if '?' in output: print cmd.strip()+' from run_pump not understood'
def stop_pump(ser,pump):
cmd = '%iSTP\x0D'%pump
ser.write(cmd)
#output = ser.readline()
#if '?' in output: print cmd.strip()+' from stop_pump not understood'
#cmd = '%iRAT0UH\x0D'%pump
#ser.write(cmd)
#output = ser.readline()
#if '?' in output: print cmd.strip()+' from stop_pump not understood'
def set_rates(ser,rate):
cmd = ''
for pump in rate:
flowrate = float(rate[pump])
direction = 'INF'
if flowrate<0: direction = 'WDR'
frcmd = '%iDIR%s\x0D'%(pump,direction)
ser.write(frcmd)
#output = ser.readline()
#if '?' in output: print frcmd.strip()+' from set_rate not understood'
fr = abs(flowrate)
if fr<5000:
cmd += str(pump)+'RAT'+str(fr)[:5]+'UH*'
else:
fr = fr/1000.0
cmd += str(pump)+'RAT'+str(fr)[:5]+'MH*'
cmd += '\x0D'
ser.write(cmd)
#output = ser.readline()
#if '?' in output: print cmd.strip()+' from set_rates not understood'
def get_rate(ser,pump):
#get direction
cmd = '%iDIR\x0D'%pump
ser.write(cmd)
output = ser.readline()
sign = ''
if output[4:7]=='WDR':
sign = '-'
cmd = '%iRAT\x0D'%pump
ser.write(cmd)
output = ser.readline()
if '?' in output: print cmd.strip()+' from get_rate not understood'
units = output[-3:-1]
if units=='MH':
rate = str(float(output[4:-3])*1000)
if units=='UH':
rate = output[4:-3]
return sign+rate
def get_rates(ser,pumps):
rates = dict((p,get_rate(ser,p).split('.')[0]) for p in pumps)
return rates
def set_diameter(ser,pump,dia):
cmd = '%iDIA%s\x0D'%(pump,dia)
ser.write(cmd)
output = ser.readline()
if '?' in output: print cmd.strip()+' from set_diameter not understood'
def get_diameter(ser,pump):
cmd = '%iDIA\x0D'%pump
ser.write(cmd)
output = ser.readline()
if '?' in output: print cmd.strip()+' from get_diameter not understood'
dia = output[4:-1]
return dia
def prime(ser,pump):
# set infuse direction
cmd = '%iDIRINF\x0D'%pump
ser.write(cmd)
output = ser.readline()
if '?' in output: print cmd.strip()+' from prime not understood'
# set rate
cmd = '%iRAT10.0MH\x0D'%pump
ser.write(cmd)
output = ser.readline()
if '?' in output: print cmd.strip()+' from prime not understood'
# run
cmd = '%iRUN\x0D'%pump
ser.write(cmd)
output = ser.readline()
if '?' in output: print cmd.strip()+' from prime not understood'
if __name__ == '__main__':
ser = serial.Serial('/dev/serial/by-id/usb-Prolific_Technology_Inc._USB-Serial_Controller_D-if00-port0', 19200)
#ser = serial.Serial('/dev/serial/by-path/pci-0000:00:14.0-usb-0:3:1.0-port0', 19200)
#ser = serial.Serial('/dev/ttyUSB0', 19200)
#print p.name # check which port was really used
#print p.isOpen()
#p.close()
pumps = find_pumps(ser)
print "Found pumps are: "
print pumps
rates = get_rates(ser,pumps)
print "Found rates are: "
print rates
ser.close()