forked from javgh/greenaddress-pos-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnfcbroadcast.py
120 lines (97 loc) · 3.32 KB
/
nfcbroadcast.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
# NFC communication is implemented in its own process, as apparently
# nfcpy and the Qt main event loop do not play well together.
# A queue is used to communicate across process boundaries and submit
# the current Bitcoin URI to be broadcasted via NFC.
import threading
import time
from multiprocessing import Process, Queue
import nfc
import nfc.snep
class NFCBroadcast:
def start(self):
self.queue = Queue()
self.nfc_process = NFCProcess(self.queue)
self.nfc_process.start()
def set_btc_uri(self, btc_uri):
self.queue.put(('uri', btc_uri))
def shutdown(self):
self.queue.put(('shutdown', None))
class NFCProcess(Process):
def __init__(self, queue):
self.queue = queue
super(NFCProcess, self).__init__()
def run(self):
nfc_conn = NFCConnection()
nfc_conn.start()
is_running = True
while is_running:
(cmd, param) = self.queue.get()
if cmd == 'uri':
nfc_conn.set_btc_uri(param)
elif cmd == 'shutdown':
is_running = False
class NFCConnection(threading.Thread):
def __init__(self):
self.btc_uri = None
self.restart_required = False
self.start_signal = Queue()
try:
self.clf = nfc.ContactlessFrontend('usb')
print "Contactless reader ready"
except IOError:
self.clf = None
print "No contactless reader found"
super(NFCConnection, self).__init__()
def run(self):
# wait for first URI
_ = self.start_signal.get()
if self.clf:
self.serve_nfc()
else:
self.idle_forever()
def serve_nfc(self):
print "Contactless reader active"
while True:
self.clf.connect( llcp={'on-connect': self.connected}
, terminate=self.check_restart
)
def idle_forever(self):
while True:
time.sleep(60)
def set_btc_uri(self, btc_uri):
if self.btc_uri == None:
self.btc_uri = btc_uri
self.start_signal.put('start')
else:
self.btc_uri = btc_uri
self.restart_required = True
def connected(self, llc):
self.helper = NFCConnectionHelper(llc, self.btc_uri)
# store reference to thread beyond the local function
# to prevent if from being garbage collected
self.helper.start()
return True
def check_restart(self):
if self.restart_required:
self.restart_required = False
return True
else:
return False
class NFCConnectionHelper(threading.Thread):
def __init__(self, llc, btc_uri):
self.llc = llc
self.btc_uri = btc_uri
super(NFCConnectionHelper, self).__init__()
def run(self):
sp = nfc.ndef.SmartPosterRecord(self.btc_uri)
snep = nfc.snep.SnepClient(self.llc)
snep.put(nfc.ndef.Message(sp))
if __name__ == '__main__':
nfc_broadcast = NFCBroadcast()
nfc_broadcast.start()
nfc_broadcast.set_btc_uri('bitcoin:16mj2Veiw6rWHBDn4dN8rAAsRJog7EooYF?amount=0.0001')
time.sleep(20)
nfc_broadcast.set_btc_uri('bitcoin:16mj2Veiw6rWHBDn4dN8rAAsRJog7EooYF?amount=0.0002')
while True:
time.sleep(60)