forked from dabeaz/generators
-
Notifications
You must be signed in to change notification settings - Fork 0
/
thrsend.py
51 lines (39 loc) · 1.16 KB
/
thrsend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# thrsend.py
#
# Send items to consumer threads
import queue, threading
class ConsumerThread(threading.Thread):
def __init__(self, target):
threading.Thread.__init__(self)
self.setDaemon(True)
self.in_queue = queue.Queue()
self.target = target
def send(self,item):
self.in_queue.put(item)
def generate(self):
while True:
item = self.in_queue.get()
yield item
def run(self):
self.target(self.generate())
# Example Use
if __name__ == '__main__':
from follow import follow
from apachelog import apache_log
from broadcast import broadcast
def find_404(log):
r404 = (r for r in log if r['status'] == 404)
for r in r404:
print(r['status'],r['datetime'],r['request'])
def bytes_transferred(log):
total = 0
for r in log:
total += r['bytes']
print("Total bytes", total)
c1 = ConsumerThread(find_404)
c1.start()
c2 = ConsumerThread(bytes_transferred)
c2.start()
lines = follow(open("run/foo/access-log"))
log = apache_log(lines)
broadcast(log,[c1,c2])