Skip to content

Commit b62006a

Browse files
authored
Change SimpleProducer to use async_send (async is reserved in py37) (#1454)
1 parent 204388b commit b62006a

File tree

7 files changed

+42
-34
lines changed

7 files changed

+42
-34
lines changed

docs/simple.rst

+4-4
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ Asynchronous Mode
4949
5050
# To send messages asynchronously
5151
client = SimpleClient('localhost:9092')
52-
producer = SimpleProducer(client, async=True)
52+
producer = SimpleProducer(client, async_send=True)
5353
producer.send_messages('my-topic', b'async message')
5454
5555
# To send messages in batch. You can use any of the available
@@ -60,7 +60,7 @@ Asynchronous Mode
6060
# * If the producer dies before the messages are sent, there will be losses
6161
# * Call producer.stop() to send the messages and cleanup
6262
producer = SimpleProducer(client,
63-
async=True,
63+
async_send=True,
6464
batch_send_every_n=20,
6565
batch_send_every_t=60)
6666
@@ -73,7 +73,7 @@ Synchronous Mode
7373
7474
# To send messages synchronously
7575
client = SimpleClient('localhost:9092')
76-
producer = SimpleProducer(client, async=False)
76+
producer = SimpleProducer(client, async_send=False)
7777
7878
# Note that the application is responsible for encoding messages to type bytes
7979
producer.send_messages('my-topic', b'some message')
@@ -88,7 +88,7 @@ Synchronous Mode
8888
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
8989
# by all in sync replicas before sending a response
9090
producer = SimpleProducer(client,
91-
async=False,
91+
async_send=False,
9292
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
9393
ack_timeout=2000,
9494
sync_fail_on_error=False)

kafka/producer/base.py

+23-15
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ class Producer(object):
226226
227227
Arguments:
228228
client (kafka.SimpleClient): instance to use for broker
229-
communications. If async=True, the background thread will use
229+
communications. If async_send=True, the background thread will use
230230
:meth:`client.copy`, which is expected to return a thread-safe
231231
object.
232232
codec (kafka.protocol.ALL_CODECS): compression codec to use.
@@ -238,11 +238,11 @@ class Producer(object):
238238
sync_fail_on_error (bool, optional): whether sync producer should
239239
raise exceptions (True), or just return errors (False),
240240
defaults to True.
241-
async (bool, optional): send message using a background thread,
241+
async_send (bool, optional): send message using a background thread,
242242
defaults to False.
243-
batch_send_every_n (int, optional): If async is True, messages are
243+
batch_send_every_n (int, optional): If async_send is True, messages are
244244
sent in batches of this size, defaults to 20.
245-
batch_send_every_t (int or float, optional): If async is True,
245+
batch_send_every_t (int or float, optional): If async_send is True,
246246
messages are sent immediately after this timeout in seconds, even
247247
if there are fewer than batch_send_every_n, defaults to 20.
248248
async_retry_limit (int, optional): number of retries for failed messages
@@ -268,8 +268,10 @@ class Producer(object):
268268
defaults to 30.
269269
270270
Deprecated Arguments:
271+
async (bool, optional): send message using a background thread,
272+
defaults to False. Deprecated, use 'async_send'
271273
batch_send (bool, optional): If True, messages are sent by a background
272-
thread in batches, defaults to False. Deprecated, use 'async'
274+
thread in batches, defaults to False. Deprecated, use 'async_send'
273275
"""
274276
ACK_NOT_REQUIRED = 0 # No ack is required
275277
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
@@ -282,8 +284,8 @@ def __init__(self, client,
282284
codec=None,
283285
codec_compresslevel=None,
284286
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
285-
async=False,
286-
batch_send=False, # deprecated, use async
287+
async_send=False,
288+
batch_send=False, # deprecated, use async_send
287289
batch_send_every_n=BATCH_SEND_MSG_COUNT,
288290
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
289291
async_retry_limit=ASYNC_RETRY_LIMIT,
@@ -292,15 +294,21 @@ def __init__(self, client,
292294
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
293295
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
294296
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
295-
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
297+
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS,
298+
**kwargs):
299+
300+
# async renamed async_send for python3.7 support
301+
if 'async' in kwargs:
302+
log.warning('Deprecated async option found -- use async_send')
303+
async_send = kwargs['async']
296304

297-
if async:
305+
if async_send:
298306
assert batch_send_every_n > 0
299307
assert batch_send_every_t > 0
300308
assert async_queue_maxsize >= 0
301309

302310
self.client = client
303-
self.async = async
311+
self.async_send = async_send
304312
self.req_acks = req_acks
305313
self.ack_timeout = ack_timeout
306314
self.stopped = False
@@ -313,7 +321,7 @@ def __init__(self, client,
313321
self.codec = codec
314322
self.codec_compresslevel = codec_compresslevel
315323

316-
if self.async:
324+
if self.async_send:
317325
# Messages are sent through this queue
318326
self.queue = Queue(async_queue_maxsize)
319327
self.async_queue_put_timeout = async_queue_put_timeout
@@ -400,7 +408,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
400408
if key is not None and not isinstance(key, six.binary_type):
401409
raise TypeError("the key must be type bytes")
402410

403-
if self.async:
411+
if self.async_send:
404412
for idx, m in enumerate(msg):
405413
try:
406414
item = (TopicPartition(topic, partition), m, key)
@@ -435,15 +443,15 @@ def stop(self, timeout=None):
435443
log.warning('timeout argument to stop() is deprecated - '
436444
'it will be removed in future release')
437445

438-
if not self.async:
446+
if not self.async_send:
439447
log.warning('producer.stop() called, but producer is not async')
440448
return
441449

442450
if self.stopped:
443451
log.warning('producer.stop() called, but producer is already stopped')
444452
return
445453

446-
if self.async:
454+
if self.async_send:
447455
self.queue.put((STOP_ASYNC_PRODUCER, None, None))
448456
self.thread_stop_event.set()
449457
self.thread.join()
@@ -471,5 +479,5 @@ def stop(self, timeout=None):
471479
self.stopped = True
472480

473481
def __del__(self):
474-
if self.async and not self.stopped:
482+
if self.async_send and not self.stopped:
475483
self.stop()

kafka/producer/keyed.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,4 @@ def send(self, topic, key, msg):
4646
return self.send_messages(topic, key, msg)
4747

4848
def __repr__(self):
49-
return '<KeyedProducer batch=%s>' % self.async
49+
return '<KeyedProducer batch=%s>' % self.async_send

kafka/producer/simple.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,4 @@ def send_messages(self, topic, *msg):
5151
)
5252

5353
def __repr__(self):
54-
return '<SimpleProducer batch=%s>' % self.async
54+
return '<SimpleProducer batch=%s>' % self.async_send

test/test_failover_integration.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def test_switch_leader(self):
6060
# require that the server commit messages to all in-sync replicas
6161
# so that failover doesn't lose any messages on server-side
6262
# and we can assert that server-side message count equals client-side
63-
producer = Producer(self.client, async=False,
63+
producer = Producer(self.client, async_send=False,
6464
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
6565

6666
# Send 100 random messages to a specific partition
@@ -101,7 +101,7 @@ def test_switch_leader_async(self):
101101
partition = 0
102102

103103
# Test the base class Producer -- send_messages to a specific partition
104-
producer = Producer(self.client, async=True,
104+
producer = Producer(self.client, async_send=True,
105105
batch_send_every_n=15,
106106
batch_send_every_t=3,
107107
req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
@@ -146,7 +146,7 @@ def test_switch_leader_async(self):
146146
def test_switch_leader_keyed_producer(self):
147147
topic = self.topic
148148

149-
producer = KeyedProducer(self.client, async=False)
149+
producer = KeyedProducer(self.client, async_send=False)
150150

151151
# Send 10 random messages
152152
for _ in range(10):
@@ -182,7 +182,7 @@ def test_switch_leader_keyed_producer(self):
182182
producer.send_messages(topic, key, msg)
183183

184184
def test_switch_leader_simple_consumer(self):
185-
producer = Producer(self.client, async=False)
185+
producer = Producer(self.client, async_send=False)
186186
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
187187
self._send_random_messages(producer, self.topic, 0, 2)
188188
consumer.get_messages()

test/test_producer_integration.py

+4-4
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ def test_async_simple_producer(self):
216216
partition = self.client.get_partition_ids_for_topic(self.topic)[0]
217217
start_offset = self.current_offset(self.topic, partition)
218218

219-
producer = SimpleProducer(self.client, async=True, random_start=False)
219+
producer = SimpleProducer(self.client, async_send=True, random_start=False)
220220
resp = producer.send_messages(self.topic, self.msg("one"))
221221
self.assertEqual(len(resp), 0)
222222

@@ -235,7 +235,7 @@ def test_batched_simple_producer__triggers_by_message(self):
235235
batch_interval = 5
236236
producer = SimpleProducer(
237237
self.client,
238-
async=True,
238+
async_send=True,
239239
batch_send_every_n=batch_messages,
240240
batch_send_every_t=batch_interval,
241241
random_start=False)
@@ -300,7 +300,7 @@ def test_batched_simple_producer__triggers_by_time(self):
300300
batch_interval = 5
301301
producer = SimpleProducer(
302302
self.client,
303-
async=True,
303+
async_send=True,
304304
batch_send_every_n=100,
305305
batch_send_every_t=batch_interval,
306306
random_start=False)
@@ -432,7 +432,7 @@ def test_async_keyed_producer(self):
432432

433433
producer = KeyedProducer(self.client,
434434
partitioner=RoundRobinPartitioner,
435-
async=True,
435+
async_send=True,
436436
batch_send_every_t=1)
437437

438438
resp = producer.send_messages(self.topic, self.key("key1"), self.msg("one"))

test/test_producer_legacy.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def partitions(topic):
7373
@patch('kafka.producer.base._send_upstream')
7474
def test_producer_async_queue_overfilled(self, mock):
7575
queue_size = 2
76-
producer = Producer(MagicMock(), async=True,
76+
producer = Producer(MagicMock(), async_send=True,
7777
async_queue_maxsize=queue_size)
7878

7979
topic = b'test-topic'
@@ -95,25 +95,25 @@ def test_producer_sync_fail_on_error(self):
9595
with patch.object(SimpleClient, '_send_broker_aware_request', return_value = [error]):
9696

9797
client = SimpleClient(MagicMock())
98-
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
98+
producer = SimpleProducer(client, async_send=False, sync_fail_on_error=False)
9999

100100
# This should not raise
101101
(response,) = producer.send_messages('foobar', b'test message')
102102
self.assertEqual(response, error)
103103

104-
producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
104+
producer = SimpleProducer(client, async_send=False, sync_fail_on_error=True)
105105
with self.assertRaises(FailedPayloadsError):
106106
producer.send_messages('foobar', b'test message')
107107

108108
def test_cleanup_is_not_called_on_stopped_producer(self):
109-
producer = Producer(MagicMock(), async=True)
109+
producer = Producer(MagicMock(), async_send=True)
110110
producer.stopped = True
111111
with patch.object(producer, 'stop') as mocked_stop:
112112
producer._cleanup_func(producer)
113113
self.assertEqual(mocked_stop.call_count, 0)
114114

115115
def test_cleanup_is_called_on_running_producer(self):
116-
producer = Producer(MagicMock(), async=True)
116+
producer = Producer(MagicMock(), async_send=True)
117117
producer.stopped = False
118118
with patch.object(producer, 'stop') as mocked_stop:
119119
producer._cleanup_func(producer)

0 commit comments

Comments
 (0)