Skip to content

Commit fb118fb

Browse files
committed
Manageable queue.put() operation for MPConsumer processes
1 parent 9d5c93e commit fb118fb

File tree

2 files changed

+12
-4
lines changed

2 files changed

+12
-4
lines changed

Diff for: kafka/consumer/base.py

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
ITER_TIMEOUT_SECONDS = 60
2727
NO_MESSAGES_WAIT_TIME_SECONDS = 0.1
28+
FULL_QUEUE_WAIT_TIME_SECONDS = 0.1
2829

2930

3031
class Consumer(object):

Diff for: kafka/consumer/multiprocess.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
from multiprocessing import Process, Manager as MPManager
88

99
try:
10-
from Queue import Empty
10+
from Queue import Empty, Full
1111
except ImportError: # python 2
12-
from queue import Empty
12+
from queue import Empty, Full
1313

1414
from .base import (
1515
AUTO_COMMIT_MSG_COUNT, AUTO_COMMIT_INTERVAL,
16-
NO_MESSAGES_WAIT_TIME_SECONDS
16+
NO_MESSAGES_WAIT_TIME_SECONDS,
17+
FULL_QUEUE_WAIT_TIME_SECONDS
1718
)
1819
from .simple import Consumer, SimpleConsumer
1920

@@ -59,7 +60,13 @@ def _mp_consume(client, group, topic, queue, size, events, **consumer_options):
5960

6061
message = consumer.get_message()
6162
if message:
62-
queue.put(message)
63+
while True:
64+
try:
65+
queue.put(message, timeout=FULL_QUEUE_WAIT_TIME_SECONDS)
66+
break
67+
except Full:
68+
if events.exit.is_set(): break
69+
6370
count += 1
6471

6572
# We have reached the required size. The controller might have

0 commit comments

Comments
 (0)