Skip to content

Commit c9621dd

Browse files
committed
add queue demo
1 parent 8f3a9c4 commit c9621dd

File tree

1 file changed

+96
-0
lines changed

1 file changed

+96
-0
lines changed

Diff for: futures/queueDemo.py

+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
#
2+
# 在執行緒之間傳送及接收一百萬個物件
3+
#
4+
from concurrent import futures
5+
from concurrent.futures._base import (PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, wait)
6+
#from Queue import Queue
7+
import timeit
8+
import sys
9+
10+
if sys.version_info > (3, 0):
11+
# Python 3 code in this block
12+
from queue import (Queue, Empty)
13+
else:
14+
# Python 2 code in this block
15+
from Queue import (Queue, Empty)
16+
17+
18+
def create_future(state=PENDING, exception=None, result=None):
19+
future = Future()
20+
future._state = state
21+
future._exception = exception
22+
future._result = result
23+
return future
24+
25+
26+
PENDING_FUTURE = create_future(state=PENDING)
27+
RUNNING_FUTURE = create_future(state=RUNNING)
28+
CANCELLED_FUTURE = create_future(state=CANCELLED)
29+
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
30+
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
31+
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
32+
33+
q = Queue()
34+
35+
QUEUE_BREAKER = {'IsStop': False}
36+
37+
38+
def producer():
39+
print('producer enter')
40+
data = [x for x in range(1024)]
41+
index = 0
42+
timer_start = timeit.default_timer()
43+
for index in range(1000000):
44+
q.put(data)
45+
timer_end = timeit.default_timer()
46+
print('producer exit {} {} {}'.format(index + 1, len(data), (timer_end - timer_start)))
47+
return True
48+
49+
50+
def consumer(num):
51+
count = 0
52+
total = 0
53+
print('consumer {} enter'.format(num))
54+
timer_start = timeit.default_timer()
55+
while True:
56+
if QUEUE_BREAKER['IsStop'] is False:
57+
try:
58+
res = q.get(timeout=1)
59+
count += 1
60+
total += len(res)
61+
if count >= 1000000:
62+
print('break')
63+
q.task_done()
64+
break
65+
except Empty as e:
66+
print('{}'.format(repr(e)))
67+
break
68+
else:
69+
q.task_done()
70+
timer_end = timeit.default_timer()
71+
print('consumer {} exit {} {}'.format(num, count, (timer_end - timer_start)))
72+
return True
73+
74+
75+
def main():
76+
worker = 1
77+
print('qsize {}'.format(q.qsize()))
78+
with futures.ThreadPoolExecutor(max_workers=worker + 1) as executor:
79+
th_producer = executor.submit(producer)
80+
wait(fs=[SUCCESSFUL_FUTURE, th_producer])
81+
consumers = []
82+
for index in range(worker):
83+
consumers.append(executor.submit(consumer, index))
84+
wait(fs=[SUCCESSFUL_FUTURE, consumers[0]])
85+
q.join()
86+
print('Queue is Empty')
87+
QUEUE_BREAKER['IsStop'] = True
88+
executor.shutdown(True)
89+
print('shutdown')
90+
91+
print('main exit')
92+
return 0
93+
94+
95+
if __name__ == '__main__':
96+
sys.exit(main())

0 commit comments

Comments
 (0)