From b82f94f5a409a237222a6486a870751fa17da254 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 25 Feb 2015 15:04:55 +0300 Subject: [PATCH 01/22] Retries for async batching --- kafka/common.py | 12 ++++++++++-- kafka/producer/base.py | 31 ++++++++++++++++++++++++------- kafka/producer/keyed.py | 11 ++++++++--- kafka/producer/simple.py | 11 ++++++++--- 4 files changed, 50 insertions(+), 15 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index 8207bece5..b3380d709 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -6,6 +6,7 @@ # Structs # ############### +<<<<<<< HEAD # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI MetadataRequest = namedtuple("MetadataRequest", ["topics"]) @@ -14,8 +15,15 @@ ["brokers", "topics"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages"]) +_ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages", "retries"]) + + +class ProduceRequest(_ProduceRequest): + def __new__(cls, topic, partition, messages, retries=0): + return super(ProduceRequest, cls).__new__( + cls, topic, partition, messages, retries) + ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de49a..a5af3d69f 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -24,22 +24,26 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +BATCH_RETRY_BACKOFF_MS = 300 +BATCH_RETRIES_LIMIT = 5 STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): + req_acks, ack_timeout, retry_backoff, retries_limit, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request """ stop = False + reqs = [] + client.reinit() while not stop_event.is_set(): timeout = batch_time - count = batch_size + count = batch_size - len(reqs) send_at = time.time() + timeout msgset = defaultdict(list) @@ -48,7 +52,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while count > 0 and timeout >= 0: try: topic_partition, msg, key = queue.get(timeout=timeout) - except Empty: break @@ -63,7 +66,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, msgset[topic_partition].append((msg, key)) # Send collected requests upstream - reqs = [] for topic_partition, msg in msgset.items(): messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, @@ -75,8 +77,19 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + except FailedPayloadsError as ex: + log.exception("Failed payloads count %s" % len(ex.message)) + if retries_limit is None: + reqs = ex.message + continue + for req in ex.message: + if retries_limit and req.retries < retries_limit: + reqs.append(req._replace(retries=req.retries+1)) + except Exception as ex: + log.exception("Unable to send message: %s" % type(ex)) + + if reqs and retry_backoff: + time.sleep(float(retry_backoff) / 1000) class Producer(object): @@ -111,7 +124,9 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): if batch_send: async = True @@ -148,6 +163,8 @@ def __init__(self, client, async=False, batch_send_every_n, self.req_acks, self.ack_timeout, + batch_retry_backoff_ms, + batch_retries_limit, self.thread_stop_event)) # Thread will die if main thread exits diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 333b6c0cf..aa569b3a7 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,7 +7,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, + BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT ) log = logging.getLogger("kafka") @@ -37,7 +38,9 @@ def __init__(self, client, partitioner=None, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -46,7 +49,9 @@ def __init__(self, client, partitioner=None, async=False, super(KeyedProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_retry_backoff_ms, + batch_retries_limit) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 2699cf2b6..7391be071 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,7 +10,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT + BATCH_SEND_MSG_COUNT, + BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT ) log = logging.getLogger("kafka") @@ -45,13 +46,17 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - random_start=True): + random_start=True, + batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, + batch_retries_limit=BATCH_RETRIES_LIMIT): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, - batch_send_every_t) + batch_send_every_t, + batch_retry_backoff_ms, + batch_retries_limit) def _next_partition(self, topic): if topic not in self.partition_cycles: From 81d868869fa2f7ab980df5477d82654dc2598356 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 25 Feb 2015 15:18:16 +0300 Subject: [PATCH 02/22] Fixed base producer imports --- kafka/producer/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a5af3d69f..34b1d045a 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -15,7 +15,8 @@ import six from kafka.common import ( - ProduceRequest, TopicAndPartition, UnsupportedCodecError + ProduceRequest, TopicAndPartition, + UnsupportedCodecError, FailedPayloadsError ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring From 4b8288a578c0cee696ef9d0523f9cec32e8b1f05 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Feb 2015 12:40:44 +0300 Subject: [PATCH 03/22] Producer _send_upstream fixes, added tests for retries --- kafka/producer/base.py | 26 ++++++-- test/test_producer.py | 137 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+), 6 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 34b1d045a..505e31b8d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -44,6 +44,9 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, while not stop_event.is_set(): timeout = batch_time + + # it's a simplification: we're comparing message sets and + # messages: each set can contain [1..batch_size] messages count = batch_size - len(reqs) send_at = time.time() + timeout msgset = defaultdict(list) @@ -74,6 +77,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages) reqs.append(req) + if not reqs: + continue + + reqs_to_retry = [] try: client.send_produce_request(reqs, acks=req_acks, @@ -81,15 +88,22 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, except FailedPayloadsError as ex: log.exception("Failed payloads count %s" % len(ex.message)) if retries_limit is None: - reqs = ex.message - continue - for req in ex.message: - if retries_limit and req.retries < retries_limit: - reqs.append(req._replace(retries=req.retries+1)) + # retry all failed messages until success + reqs_to_retry = ex.message + elif not retries_limit < 0: + # + for req in ex.message: + if retries_limit and req.retries < retries_limit: + updated_req = req._replace(retries=req.retries+1) + reqs_to_retry.append(updated_req) except Exception as ex: log.exception("Unable to send message: %s" % type(ex)) + finally: + reqs = [] - if reqs and retry_backoff: + if reqs_to_retry and retry_backoff: + reqs = reqs_to_retry + log.warning("%s requests will be retried next call." % len(reqs)) time.sleep(float(retry_backoff) / 1000) diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a1b..eecc7a7a9 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -1,11 +1,22 @@ # -*- coding: utf-8 -*- +import time import logging from mock import MagicMock from . import unittest +from kafka.common import TopicAndPartition, FailedPayloadsError from kafka.producer.base import Producer +from kafka.producer.base import _send_upstream +from kafka.protocol import CODEC_NONE + +import threading +import multiprocessing as mp +try: + from queue import Empty +except ImportError: + from Queue import Empty class TestKafkaProducer(unittest.TestCase): @@ -40,3 +51,129 @@ def partitions(topic): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + + +class TestKafkaProducerSendUpstream(unittest.TestCase): + + def setUp(self): + + # create a multiprocessing Value to store call counter + # (magicmock counters don't work with other processes) + self.send_calls_count = mp.Value('i', 0) + + def send_side_effect(*args, **kwargs): + self.send_calls_count.value += 1 + + self.client = MagicMock() + self.client.send_produce_request.side_effect = send_side_effect + self.queue = mp.Queue() + + def _run_process(self, retries_limit=3, sleep_timeout=1): + # run _send_upstream process with the queue + self.process = mp.Process( + target=_send_upstream, + args=(self.queue, self.client, CODEC_NONE, + 0.3, # batch time (seconds) + 3, # batch length + Producer.ACK_AFTER_LOCAL_WRITE, + Producer.DEFAULT_ACK_TIMEOUT, + 50, # retry backoff (ms) + retries_limit)) + self.process.daemon = True + self.process.start() + time.sleep(sleep_timeout) + self.process.terminate() + + def test_wo_retries(self): + + # lets create a queue and add 10 messages for 1 partition + for i in range(10): + self.queue.put((TopicAndPartition("test", 0), "msg %i", "key %i")) + + self._run_process() + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 4 non-void cals: + # 3 batches of 3 msgs each + 1 batch of 1 message + self.assertEqual(self.send_calls_count.value, 4) + + def test_first_send_failed(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + flag = mp.Value('c', 'f') + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + if flag.value == 'f': + flag.value = 't' + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 5 non-void cals: 1st failed batch of 3 msgs + # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 + self.assertEqual(self.send_calls_count.value, 5) + + def test_with_limited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(3, 2) + + # the queue should be void at the end of the test + self.assertEqual(self.queue.empty(), True) + + # there should be 16 non-void cals: + # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed + self.assertEqual(self.send_calls_count.value, 16) + + + def test_with_unlimited_retries(self): + + # lets create a queue and add 10 messages for 10 different partitions + # to show how retries should work ideally + for i in range(10): + self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + + def send_side_effect(reqs, *args, **kwargs): + self.send_calls_count.value += 1 + raise FailedPayloadsError(reqs) + + self.client.send_produce_request.side_effect = send_side_effect + + self._run_process(None) + + # the queue should have 7 elements + # 3 batches of 1 msg each were retried all this time + self.assertEqual(self.queue.empty(), False) + left = 0 + for i in range(10): + try: + self.queue.get(timeout=0.01) + left += 1 + except Empty: + break + self.assertEqual(left, 7) + + # 1s / 50ms of backoff = 20 times + self.assertEqual(self.send_calls_count.value, 20) From a9324f343e97ae3ceaa2acd480764818bb2b171e Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Feb 2015 15:10:55 +0300 Subject: [PATCH 04/22] Fixed compatible issues with tests --- kafka/producer/base.py | 8 +++++--- test/test_producer.py | 23 +++++++++++------------ 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 505e31b8d..87d923aae 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -86,13 +86,15 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) except FailedPayloadsError as ex: - log.exception("Failed payloads count %s" % len(ex.message)) + failed_reqs = ex.args[0] + log.exception("Failed payloads count %s" % len(failed_reqs)) + if retries_limit is None: # retry all failed messages until success - reqs_to_retry = ex.message + reqs_to_retry = failed_reqs elif not retries_limit < 0: # - for req in ex.message: + for req in failed_reqs: if retries_limit and req.retries < retries_limit: updated_req = req._replace(retries=req.retries+1) reqs_to_retry.append(updated_req) diff --git a/test/test_producer.py b/test/test_producer.py index eecc7a7a9..51a74b582 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -106,11 +106,11 @@ def test_first_send_failed(self): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - flag = mp.Value('c', 'f') + is_first_time = mp.Value('b', True) def send_side_effect(reqs, *args, **kwargs): self.send_calls_count.value += 1 - if flag.value == 'f': - flag.value = 't' + if is_first_time.value: + is_first_time.value = False raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -166,14 +166,13 @@ def send_side_effect(reqs, *args, **kwargs): # the queue should have 7 elements # 3 batches of 1 msg each were retried all this time self.assertEqual(self.queue.empty(), False) - left = 0 - for i in range(10): - try: + try: + for i in range(7): self.queue.get(timeout=0.01) - left += 1 - except Empty: - break - self.assertEqual(left, 7) + except Empty: + self.fail("Should be 7 elems in the queue") + self.assertEqual(self.queue.empty(), True) - # 1s / 50ms of backoff = 20 times - self.assertEqual(self.send_calls_count.value, 20) + # 1s / 50ms of backoff = 20 times max + self.assertTrue(self.send_calls_count.value > 10) + self.assertTrue(self.send_calls_count.value <= 20) From 566e408da0aa7fb1bb74fae96231c94cc4a12e37 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Feb 2015 17:24:52 +0300 Subject: [PATCH 05/22] Fix: check failed reqs to retry only for positive limit --- kafka/producer/base.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 87d923aae..8544d7363 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -89,11 +89,11 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, failed_reqs = ex.args[0] log.exception("Failed payloads count %s" % len(failed_reqs)) + # if no limit, retry all failed messages until success if retries_limit is None: - # retry all failed messages until success reqs_to_retry = failed_reqs - elif not retries_limit < 0: - # + # makes sense to check failed reqs only if we have a limit > 0 + elif retries_limit > 0: for req in failed_reqs: if retries_limit and req.retries < retries_limit: updated_req = req._replace(retries=req.retries+1) From b0a04595c6aee7f6fcaa8927fcdfcd9a04a9b7d3 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Feb 2015 17:26:56 +0300 Subject: [PATCH 06/22] Returned default behaviour with no retries --- kafka/producer/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 8544d7363..44ffdf417 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -26,7 +26,7 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 BATCH_RETRY_BACKOFF_MS = 300 -BATCH_RETRIES_LIMIT = 5 +BATCH_RETRIES_LIMIT = 0 STOP_ASYNC_PRODUCER = -1 From 5e8dc6dcf55890a4e3a214a943ecc655faed3ecc Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 24 Mar 2015 22:21:13 +0300 Subject: [PATCH 07/22] Fixed tests and other issues after rebase --- kafka/common.py | 1 - kafka/producer/base.py | 2 +- test/test_producer.py | 49 ++++++++++++++++-------------------------- 3 files changed, 20 insertions(+), 32 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index b3380d709..5c2b78857 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -6,7 +6,6 @@ # Structs # ############### -<<<<<<< HEAD # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI MetadataRequest = namedtuple("MetadataRequest", ["topics"]) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 44ffdf417..9bfe98b55 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -86,7 +86,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) except FailedPayloadsError as ex: - failed_reqs = ex.args[0] + failed_reqs = ex.failed_payloads log.exception("Failed payloads count %s" % len(failed_reqs)) # if no limit, retry all failed messages until success diff --git a/test/test_producer.py b/test/test_producer.py index 51a74b582..cc58fe459 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -12,11 +12,10 @@ from kafka.protocol import CODEC_NONE import threading -import multiprocessing as mp try: - from queue import Empty + from queue import Empty, Queue except ImportError: - from Queue import Empty + from Queue import Empty, Queue class TestKafkaProducer(unittest.TestCase): @@ -56,21 +55,13 @@ def partitions(topic): class TestKafkaProducerSendUpstream(unittest.TestCase): def setUp(self): - - # create a multiprocessing Value to store call counter - # (magicmock counters don't work with other processes) - self.send_calls_count = mp.Value('i', 0) - - def send_side_effect(*args, **kwargs): - self.send_calls_count.value += 1 - self.client = MagicMock() - self.client.send_produce_request.side_effect = send_side_effect - self.queue = mp.Queue() + self.queue = Queue() def _run_process(self, retries_limit=3, sleep_timeout=1): # run _send_upstream process with the queue - self.process = mp.Process( + stop_event = threading.Event() + self.thread = threading.Thread( target=_send_upstream, args=(self.queue, self.client, CODEC_NONE, 0.3, # batch time (seconds) @@ -78,11 +69,12 @@ def _run_process(self, retries_limit=3, sleep_timeout=1): Producer.ACK_AFTER_LOCAL_WRITE, Producer.DEFAULT_ACK_TIMEOUT, 50, # retry backoff (ms) - retries_limit)) - self.process.daemon = True - self.process.start() + retries_limit, + stop_event)) + self.thread.daemon = True + self.thread.start() time.sleep(sleep_timeout) - self.process.terminate() + stop_event.set() def test_wo_retries(self): @@ -97,7 +89,8 @@ def test_wo_retries(self): # there should be 4 non-void cals: # 3 batches of 3 msgs each + 1 batch of 1 message - self.assertEqual(self.send_calls_count.value, 4) + self.assertEqual(self.client.send_produce_request.call_count, 4) + def test_first_send_failed(self): @@ -106,11 +99,10 @@ def test_first_send_failed(self): for i in range(10): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - is_first_time = mp.Value('b', True) + self.client.is_first_time = True def send_side_effect(reqs, *args, **kwargs): - self.send_calls_count.value += 1 - if is_first_time.value: - is_first_time.value = False + if self.client.is_first_time: + self.client.is_first_time = False raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -122,7 +114,7 @@ def send_side_effect(reqs, *args, **kwargs): # there should be 5 non-void cals: 1st failed batch of 3 msgs # + 3 batches of 3 msgs each + 1 batch of 1 msg = 1 + 3 + 1 = 5 - self.assertEqual(self.send_calls_count.value, 5) + self.assertEqual(self.client.send_produce_request.call_count, 5) def test_with_limited_retries(self): @@ -132,7 +124,6 @@ def test_with_limited_retries(self): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) def send_side_effect(reqs, *args, **kwargs): - self.send_calls_count.value += 1 raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -145,8 +136,7 @@ def send_side_effect(reqs, *args, **kwargs): # there should be 16 non-void cals: # 3 initial batches of 3 msgs each + 1 initial batch of 1 msg + # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed - self.assertEqual(self.send_calls_count.value, 16) - + self.assertEqual(self.client.send_produce_request.call_count, 16) def test_with_unlimited_retries(self): @@ -156,7 +146,6 @@ def test_with_unlimited_retries(self): self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) def send_side_effect(reqs, *args, **kwargs): - self.send_calls_count.value += 1 raise FailedPayloadsError(reqs) self.client.send_produce_request.side_effect = send_side_effect @@ -174,5 +163,5 @@ def send_side_effect(reqs, *args, **kwargs): self.assertEqual(self.queue.empty(), True) # 1s / 50ms of backoff = 20 times max - self.assertTrue(self.send_calls_count.value > 10) - self.assertTrue(self.send_calls_count.value <= 20) + calls = self.client.send_produce_request.call_count + self.assertTrue(calls > 10 & calls <= 20) From 09c1c8b117a3c8217ca36c27eafb22999c20ef93 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 21 Apr 2015 13:30:46 +0300 Subject: [PATCH 08/22] Improved retry logic --- kafka/common.py | 3 ++ kafka/producer/base.py | 86 ++++++++++++++++++++++++++++++---------- kafka/producer/keyed.py | 6 +-- kafka/producer/simple.py | 9 ++--- test/test_producer.py | 6 +-- 5 files changed, 76 insertions(+), 34 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index 5c2b78857..cbb401315 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -78,6 +78,9 @@ def __new__(cls, topic, partition, messages, retries=0): KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) +RetryOptions = namedtuple("RetryOptions", + ["limit", "backoff_ms", "retry_on_timeouts"]) + ################# # Exceptions # diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 9bfe98b55..ebeb82dbd 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -16,7 +16,10 @@ from kafka.common import ( ProduceRequest, TopicAndPartition, - UnsupportedCodecError, FailedPayloadsError + UnsupportedCodecError, FailedPayloadsError, RetryOptions, + RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError, + UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError, + InvalidMessageError, MessageSizeTooLargeError ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring @@ -25,20 +28,19 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 -BATCH_RETRY_BACKOFF_MS = 300 -BATCH_RETRIES_LIMIT = 0 +BATCH_RETRY_OPTIONS = RetryOptions( + limit=0, backoff_ms=300, retry_on_timeouts=True) STOP_ASYNC_PRODUCER = -1 def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, retry_backoff, retries_limit, stop_event): + req_acks, ack_timeout, retry_options, stop_event): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one request """ - stop = False reqs = [] client.reinit() @@ -85,28 +87,71 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) + + except RequestTimedOutError as ex: + # should retry only if user is fine with duplicates + if retry_options.retry_on_timeouts: + reqs_to_retry = reqs + + except KafkaUnavailableError as ex: + # backoff + retry + do_backoff(retry_options) + reqs_to_retry = get_requests_for_retry(reqs, retry_options) + + except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex: + # refresh + retry + client.load_metadata_for_topics() + reqs_to_retry = get_requests_for_retry(reqs, retry_options) + + except (LeaderNotAvailableError, ConnectionError) as ex: + # backoff + refresh + retry + do_backoff(retry_options) + client.load_metadata_for_topics() + reqs_to_retry = get_requests_for_retry(reqs, retry_options) + except FailedPayloadsError as ex: + # retry only failed messages with backoff failed_reqs = ex.failed_payloads - log.exception("Failed payloads count %s" % len(failed_reqs)) - - # if no limit, retry all failed messages until success - if retries_limit is None: - reqs_to_retry = failed_reqs - # makes sense to check failed reqs only if we have a limit > 0 - elif retries_limit > 0: - for req in failed_reqs: - if retries_limit and req.retries < retries_limit: - updated_req = req._replace(retries=req.retries+1) - reqs_to_retry.append(updated_req) + do_backoff(retry_options) + reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options) + + except (InvalidMessageError, MessageSizeTooLargeError) as ex: + # "bad" messages, doesn't make sense to retry + log.exception("Message error when sending: %s" % type(ex)) + except Exception as ex: log.exception("Unable to send message: %s" % type(ex)) + finally: reqs = [] - if reqs_to_retry and retry_backoff: + if reqs_to_retry: reqs = reqs_to_retry - log.warning("%s requests will be retried next call." % len(reqs)) - time.sleep(float(retry_backoff) / 1000) + + +def get_requests_for_retry(requests, retry_options): + log.exception("Failed payloads count %s" % len(requests)) + + # if no limit, retry all failed messages until success + if retry_options.limit is None: + return requests + + # makes sense to check failed reqs only if we have a limit > 0 + reqs_to_retry = [] + if retry_options.limit > 0: + for req in requests: + if req.retries < retry_options.limit: + updated_req = req._replace(retries=req.retries+1) + reqs_to_retry.append(updated_req) + + return reqs_to_retry + + +def do_backoff(retry_options): + if retry_options.backoff_ms: + log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + class Producer(object): @@ -142,8 +187,7 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, - batch_retries_limit=BATCH_RETRIES_LIMIT): + batch_retry_options=BATCH_RETRY_OPTIONS): if batch_send: async = True diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index aa569b3a7..d11db529d 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,8 +7,7 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, - BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT + BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS ) log = logging.getLogger("kafka") @@ -39,8 +38,7 @@ def __init__(self, client, partitioner=None, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, - batch_retries_limit=BATCH_RETRIES_LIMIT): + batch_retry_options=BATCH_RETRY_OPTIONS): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index 7391be071..b869683e0 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,8 +10,7 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, - BATCH_RETRY_BACKOFF_MS, BATCH_RETRIES_LIMIT + BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS ) log = logging.getLogger("kafka") @@ -47,16 +46,14 @@ def __init__(self, client, async=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, random_start=True, - batch_retry_backoff_ms=BATCH_RETRY_BACKOFF_MS, - batch_retries_limit=BATCH_RETRIES_LIMIT): + batch_retry_options=BATCH_RETRY_OPTIONS): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_backoff_ms, - batch_retries_limit) + batch_retry_options) def _next_partition(self, topic): if topic not in self.partition_cycles: diff --git a/test/test_producer.py b/test/test_producer.py index cc58fe459..c9bdc4777 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -6,7 +6,7 @@ from mock import MagicMock from . import unittest -from kafka.common import TopicAndPartition, FailedPayloadsError +from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions from kafka.producer.base import Producer from kafka.producer.base import _send_upstream from kafka.protocol import CODEC_NONE @@ -68,8 +68,8 @@ def _run_process(self, retries_limit=3, sleep_timeout=1): 3, # batch length Producer.ACK_AFTER_LOCAL_WRITE, Producer.DEFAULT_ACK_TIMEOUT, - 50, # retry backoff (ms) - retries_limit, + RetryOptions(limit=retries_limit, backoff_ms=50, + retry_on_timeouts=True), stop_event)) self.thread.daemon = True self.thread.start() From 0e0f794802076db34e8e4dc597c38237e88f4b34 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 21 Apr 2015 13:37:16 +0300 Subject: [PATCH 09/22] Arg fixes for base/keyed producers --- kafka/producer/base.py | 3 +-- kafka/producer/keyed.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index ebeb82dbd..1d5e0459c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -224,8 +224,7 @@ def __init__(self, client, async=False, batch_send_every_n, self.req_acks, self.ack_timeout, - batch_retry_backoff_ms, - batch_retries_limit, + batch_retry_options, self.thread_stop_event)) # Thread will die if main thread exits diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index d11db529d..7bcc62934 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -48,8 +48,7 @@ def __init__(self, client, partitioner=None, async=False, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_backoff_ms, - batch_retries_limit) + batch_retry_options) def _next_partition(self, topic, key): if topic not in self.partitioners: From b31114520a15477da7ad660765a0240b6f348944 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 21 Apr 2015 17:47:55 +0300 Subject: [PATCH 10/22] Clean and simplify retry logic --- kafka/common.py | 17 ++++++++ kafka/producer/base.py | 92 +++++++++++++++--------------------------- test/test_producer.py | 8 ++-- 3 files changed, 54 insertions(+), 63 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index cbb401315..50f8a7753 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -82,6 +82,8 @@ def __new__(cls, topic, partition, messages, retries=0): ["limit", "backoff_ms", "retry_on_timeouts"]) + + ################# # Exceptions # ################# @@ -228,3 +230,18 @@ def check_error(response): if response.error: error_class = kafka_errors.get(response.error, UnknownError) raise error_class(response) + + +RETRY_BACKOFF_ERROR_TYPES = ( + KafkaUnavailableError, LeaderNotAvailableError, + ConnectionError, FailedPayloadsError +) + + +RETRY_REFRESH_ERROR_TYPES = ( + NotLeaderForPartitionError, UnknownTopicOrPartitionError, + LeaderNotAvailableError, ConnectionError +) + + +RETRY_ERROR_TYPES = list(set(RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES)) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 1d5e0459c..a989e3f32 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -15,12 +15,12 @@ import six from kafka.common import ( - ProduceRequest, TopicAndPartition, - UnsupportedCodecError, FailedPayloadsError, RetryOptions, - RequestTimedOutError, KafkaUnavailableError, LeaderNotAvailableError, - UnknownTopicOrPartitionError, NotLeaderForPartitionError, ConnectionError, - InvalidMessageError, MessageSizeTooLargeError + ProduceRequest, TopicAndPartition, RetryOptions, + UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError ) +from kafka.common import ( + RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) + from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set from kafka.util import kafka_bytestring @@ -88,70 +88,42 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, acks=req_acks, timeout=ack_timeout) - except RequestTimedOutError as ex: - # should retry only if user is fine with duplicates - if retry_options.retry_on_timeouts: - reqs_to_retry = reqs - - except KafkaUnavailableError as ex: - # backoff + retry - do_backoff(retry_options) - reqs_to_retry = get_requests_for_retry(reqs, retry_options) - - except (NotLeaderForPartitionError, UnknownTopicOrPartitionError) as ex: - # refresh + retry - client.load_metadata_for_topics() - reqs_to_retry = get_requests_for_retry(reqs, retry_options) - - except (LeaderNotAvailableError, ConnectionError) as ex: - # backoff + refresh + retry - do_backoff(retry_options) - client.load_metadata_for_topics() - reqs_to_retry = get_requests_for_retry(reqs, retry_options) - - except FailedPayloadsError as ex: - # retry only failed messages with backoff - failed_reqs = ex.failed_payloads - do_backoff(retry_options) - reqs_to_retry = get_requests_for_retry(failed_reqs, retry_options) - - except (InvalidMessageError, MessageSizeTooLargeError) as ex: - # "bad" messages, doesn't make sense to retry - log.exception("Message error when sending: %s" % type(ex)) - - except Exception as ex: - log.exception("Unable to send message: %s" % type(ex)) - - finally: - reqs = [] + except tuple(RETRY_ERROR_TYPES) as ex: - if reqs_to_retry: - reqs = reqs_to_retry + # by default, retry all sent messages + reqs_to_retry = reqs + if type(ex) == FailedPayloadsError: + reqs_to_retry = ex.failed_payloads -def get_requests_for_retry(requests, retry_options): - log.exception("Failed payloads count %s" % len(requests)) + elif (type(ex) == RequestTimedOutError and + not retry_options.retry_on_timeouts): + reqs_to_retry = [] - # if no limit, retry all failed messages until success - if retry_options.limit is None: - return requests + # filter reqs_to_retry if there's a retry limit + if retry_options.limit and retry_options.limit > 0: + reqs_to_retry = [req._replace(retries=req.retries+1) + for req in reqs_to_retry + if req.retries < retry_options.limit] - # makes sense to check failed reqs only if we have a limit > 0 - reqs_to_retry = [] - if retry_options.limit > 0: - for req in requests: - if req.retries < retry_options.limit: - updated_req = req._replace(retries=req.retries+1) - reqs_to_retry.append(updated_req) + # doing backoff before next retry + if (reqs_to_retry and type(ex) in RETRY_BACKOFF_ERROR_TYPES + and retry_options.backoff_ms): + log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) - return reqs_to_retry + # refresh topic metadata before next retry + if reqs_to_retry and type(ex) in RETRY_REFRESH_ERROR_TYPES: + client.load_metadata_for_topics() + except Exception as ex: + log.exception("Unable to send message: %s" % type(ex)) -def do_backoff(retry_options): - if retry_options.backoff_ms: - log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) - time.sleep(float(retry_options.backoff_ms) / 1000) + finally: + reqs = [] + if reqs_to_retry: + reqs = reqs_to_retry class Producer(object): diff --git a/test/test_producer.py b/test/test_producer.py index c9bdc4777..c0dc873cf 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -61,6 +61,9 @@ def setUp(self): def _run_process(self, retries_limit=3, sleep_timeout=1): # run _send_upstream process with the queue stop_event = threading.Event() + retry_options = RetryOptions(limit=retries_limit, + backoff_ms=50, + retry_on_timeouts=False) self.thread = threading.Thread( target=_send_upstream, args=(self.queue, self.client, CODEC_NONE, @@ -68,8 +71,7 @@ def _run_process(self, retries_limit=3, sleep_timeout=1): 3, # batch length Producer.ACK_AFTER_LOCAL_WRITE, Producer.DEFAULT_ACK_TIMEOUT, - RetryOptions(limit=retries_limit, backoff_ms=50, - retry_on_timeouts=True), + retry_options, stop_event)) self.thread.daemon = True self.thread.start() @@ -121,7 +123,7 @@ def test_with_limited_retries(self): # lets create a queue and add 10 messages for 10 different partitions # to show how retries should work ideally for i in range(10): - self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) + self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): raise FailedPayloadsError(reqs) From c165f17338c0a9260a91b816f73e5ce4ff7ed359 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 21 Apr 2015 17:51:39 +0300 Subject: [PATCH 11/22] Disable retry on timeouts by default (old behaviour) --- kafka/producer/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a989e3f32..331c71c85 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -29,7 +29,7 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 BATCH_RETRY_OPTIONS = RetryOptions( - limit=0, backoff_ms=300, retry_on_timeouts=True) + limit=0, backoff_ms=300, retry_on_timeouts=False) STOP_ASYNC_PRODUCER = -1 From cf363089617de2d0b18cb83eba1e61adbc5d0144 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=98=D1=81=D0=BA=D0=B0=D0=BD=D0=B4=D0=B0=D1=80=D0=BE?= =?UTF-8?q?=D0=B2=20=D0=AD=D0=B4=D1=83=D0=B0=D1=80=D0=B4?= Date: Fri, 23 Jan 2015 12:56:42 +0300 Subject: [PATCH 12/22] add producer send batch queue overfilled test --- kafka/producer/base.py | 8 +++++++- test/test_producer.py | 18 +++++++++++++++++- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 331c71c85..a0d9ac126 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -249,7 +249,13 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + item = (TopicAndPartition(topic, partition), m, key) + self.queue.put_nowait(item) + except Full: + raise BatchQueueOverfilledError( + 'Producer batch send queue overfilled. ' + 'Current queue size %d.' % self.queue.qsize()) resp = [] else: messages = create_message_set([(m, key) for m in msg], self.codec, key) diff --git a/test/test_producer.py b/test/test_producer.py index c0dc873cf..b57dfd8ea 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -3,10 +3,11 @@ import time import logging -from mock import MagicMock +from mock import MagicMock, patch from . import unittest from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions +from kafka.common import BatchQueueOverfilledError from kafka.producer.base import Producer from kafka.producer.base import _send_upstream from kafka.protocol import CODEC_NONE @@ -51,6 +52,21 @@ def partitions(topic): producer.send_messages(topic, b'hi') assert client.send_produce_request.called + @patch('kafka.producer.base.Process') + def test_producer_batch_send_queue_overfilled(self, process_mock): + queue_size = 2 + producer = Producer(MagicMock(), batch_send=True, + batch_send_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + + message = b'test-message' + with self.assertRaises(BatchQueueOverfilledError): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + + class TestKafkaProducerSendUpstream(unittest.TestCase): From f41e5f3e4befda52a20f072f85b807d77361e64d Mon Sep 17 00:00:00 2001 From: Eduard Iskandarov Date: Sat, 24 Jan 2015 00:30:50 +0300 Subject: [PATCH 13/22] async queue: refactored code; add one more test --- kafka/producer/base.py | 9 +++++++-- test/test_producer.py | 18 ++++++++++++++++-- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index a0d9ac126..0e005c53a 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -31,6 +31,8 @@ BATCH_RETRY_OPTIONS = RetryOptions( limit=0, backoff_ms=300, retry_on_timeouts=False) +# unlimited +ASYNC_QUEUE_MAXSIZE = 0 STOP_ASYNC_PRODUCER = -1 @@ -159,12 +161,14 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_options=BATCH_RETRY_OPTIONS): + batch_retry_options=BATCH_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -186,7 +190,8 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + # Messages are sent through this queue + self.queue = Queue(async_queue_maxsize) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, diff --git a/test/test_producer.py b/test/test_producer.py index b57dfd8ea..627178dad 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -53,15 +53,29 @@ def partitions(topic): assert client.send_produce_request.called @patch('kafka.producer.base.Process') - def test_producer_batch_send_queue_overfilled(self, process_mock): + def test_producer_async_queue_overfilled_batch_send(self, process_mock): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, - batch_send_queue_maxsize=queue_size) + async_queue_maxsize=queue_size) topic = b'test-topic' partition = 0 + message = b'test-message' + + with self.assertRaises(BatchQueueOverfilledError): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + @patch('kafka.producer.base.Process') + def test_producer_async_queue_overfilled(self, process_mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 message = b'test-message' + with self.assertRaises(BatchQueueOverfilledError): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) From 948e046b5443e0f38f6062e13153b57d29915a68 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Tue, 21 Apr 2015 19:44:27 +0300 Subject: [PATCH 14/22] Fix small issues with names/tests --- kafka/common.py | 4 ++++ kafka/producer/base.py | 16 +++++++--------- test/test_producer.py | 12 +++++------- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index 50f8a7753..0e769e4be 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -217,6 +217,10 @@ class KafkaConfigurationError(KafkaError): pass +class AsyncProducerQueueFull(KafkaError): + pass + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0e005c53a..3f0431c72 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -5,9 +5,9 @@ import time try: - from queue import Empty, Queue + from queue import Empty, Full, Queue except ImportError: - from Queue import Empty, Queue + from Queue import Empty, Full, Queue from collections import defaultdict from threading import Thread, Event @@ -16,7 +16,8 @@ from kafka.common import ( ProduceRequest, TopicAndPartition, RetryOptions, - UnsupportedCodecError, FailedPayloadsError, RequestTimedOutError + UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull ) from kafka.common import ( RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) @@ -187,11 +188,8 @@ def __init__(self, client, async=False, self.codec = codec if self.async: - log.warning("async producer does not guarantee message delivery!") - log.warning("Current implementation does not retry Failed messages") - log.warning("Use at your own risk! (or help improve with a PR!)") # Messages are sent through this queue - self.queue = Queue(async_queue_maxsize) + self.queue = Queue(async_queue_maxsize) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -258,8 +256,8 @@ def _send_messages(self, topic, partition, *msg, **kwargs): item = (TopicAndPartition(topic, partition), m, key) self.queue.put_nowait(item) except Full: - raise BatchQueueOverfilledError( - 'Producer batch send queue overfilled. ' + raise AsyncProducerQueueFull( + 'Producer async queue overfilled. ' 'Current queue size %d.' % self.queue.qsize()) resp = [] else: diff --git a/test/test_producer.py b/test/test_producer.py index 627178dad..de012b9ad 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -7,7 +7,7 @@ from . import unittest from kafka.common import TopicAndPartition, FailedPayloadsError, RetryOptions -from kafka.common import BatchQueueOverfilledError +from kafka.common import AsyncProducerQueueFull from kafka.producer.base import Producer from kafka.producer.base import _send_upstream from kafka.protocol import CODEC_NONE @@ -52,8 +52,7 @@ def partitions(topic): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - @patch('kafka.producer.base.Process') - def test_producer_async_queue_overfilled_batch_send(self, process_mock): + def test_producer_async_queue_overfilled_batch_send(self): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size) @@ -62,12 +61,11 @@ def test_producer_async_queue_overfilled_batch_send(self, process_mock): partition = 0 message = b'test-message' - with self.assertRaises(BatchQueueOverfilledError): + with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) - @patch('kafka.producer.base.Process') - def test_producer_async_queue_overfilled(self, process_mock): + def test_producer_async_queue_overfilled(self): queue_size = 2 producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) @@ -76,7 +74,7 @@ def test_producer_async_queue_overfilled(self, process_mock): partition = 0 message = b'test-message' - with self.assertRaises(BatchQueueOverfilledError): + with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) From 7da48f62975385e15e4115df70986688837058b8 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 22 Apr 2015 11:21:19 +0300 Subject: [PATCH 15/22] Simplification of retry logic --- kafka/common.py | 2 +- kafka/producer/base.py | 58 ++++++++++++++++++++---------------------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index 0e769e4be..e327d022e 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -248,4 +248,4 @@ def check_error(response): ) -RETRY_ERROR_TYPES = list(set(RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES)) +RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 3f0431c72..fffea9431 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -85,48 +85,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if not reqs: continue - reqs_to_retry = [] + reqs_to_retry, error_type = [], None try: client.send_produce_request(reqs, acks=req_acks, timeout=ack_timeout) - except tuple(RETRY_ERROR_TYPES) as ex: + except FailedPayloadsError as ex: + error_type = FailedPayloadsError + reqs_to_retry = ex.failed_payloads - # by default, retry all sent messages - reqs_to_retry = reqs - - if type(ex) == FailedPayloadsError: - reqs_to_retry = ex.failed_payloads - - elif (type(ex) == RequestTimedOutError and - not retry_options.retry_on_timeouts): - reqs_to_retry = [] - - # filter reqs_to_retry if there's a retry limit - if retry_options.limit and retry_options.limit > 0: - reqs_to_retry = [req._replace(retries=req.retries+1) - for req in reqs_to_retry - if req.retries < retry_options.limit] - - # doing backoff before next retry - if (reqs_to_retry and type(ex) in RETRY_BACKOFF_ERROR_TYPES - and retry_options.backoff_ms): - log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) - time.sleep(float(retry_options.backoff_ms) / 1000) - - # refresh topic metadata before next retry - if reqs_to_retry and type(ex) in RETRY_REFRESH_ERROR_TYPES: - client.load_metadata_for_topics() + except RequestTimedOutError: + error_type = RequestTimedOutError + if retry_options.retry_on_timeouts: + reqs_to_retry = reqs except Exception as ex: - log.exception("Unable to send message: %s" % type(ex)) + error_type = type(ex) + if type(ex) in RETRY_ERROR_TYPES: + reqs_to_retry = reqs finally: reqs = [] - if reqs_to_retry: - reqs = reqs_to_retry + if not reqs_to_retry: + continue + + # doing backoff before next retry + if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms: + log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) + time.sleep(float(retry_options.backoff_ms) / 1000) + + # refresh topic metadata before next retry + if error_type in RETRY_REFRESH_ERROR_TYPES: + client.load_metadata_for_topics() + + reqs = reqs_to_retry + # filter reqs_to_retry if there's a retry limit + if retry_options.limit and retry_options.limit > 0: + reqs = [req._replace(retries=req.retries+1) + for req in reqs if req.retries < retry_options.limit] class Producer(object): From 5119bb605acc4b24e091778656b229a36f9cac11 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 22 Apr 2015 12:14:11 +0300 Subject: [PATCH 16/22] Fix names for async retries opts, add timeout for put --- kafka/common.py | 4 ++-- kafka/producer/base.py | 28 ++++++++++++++++------------ kafka/producer/keyed.py | 11 ++++++++--- kafka/producer/simple.py | 11 ++++++++--- 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index e327d022e..87c29f017 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -78,12 +78,12 @@ def __new__(cls, topic, partition, messages, retries=0): KafkaMessage = namedtuple("KafkaMessage", ["topic", "partition", "offset", "key", "value"]) +# Define retry policy for async producer +# Limit corner values: None - infinite retries, 0 - no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) - - ################# # Exceptions # ################# diff --git a/kafka/producer/base.py b/kafka/producer/base.py index fffea9431..0b31d1800 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -29,11 +29,13 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 -BATCH_RETRY_OPTIONS = RetryOptions( - limit=0, backoff_ms=300, retry_on_timeouts=False) # unlimited ASYNC_QUEUE_MAXSIZE = 0 +ASYNC_QUEUE_PUT_TIMEOUT = 0 +# no retries by default +ASYNC_RETRY_OPTIONS = RetryOptions( + limit=0, backoff_ms=0, retry_on_timeouts=False) STOP_ASYNC_PRODUCER = -1 @@ -108,7 +110,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, finally: reqs = [] - if not reqs_to_retry: + if not reqs_to_retry or retry_options.limit == 0: continue # doing backoff before next retry @@ -120,11 +122,10 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_type in RETRY_REFRESH_ERROR_TYPES: client.load_metadata_for_topics() - reqs = reqs_to_retry - # filter reqs_to_retry if there's a retry limit - if retry_options.limit and retry_options.limit > 0: - reqs = [req._replace(retries=req.retries+1) - for req in reqs if req.retries < retry_options.limit] + reqs = [req._replace(retries=req.retries+1) + for req in reqs_to_retry + if not retry_options.limit or + (retry_options.limit and req.retries < retry_options.limit)] class Producer(object): @@ -160,8 +161,9 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_options=BATCH_RETRY_OPTIONS, - async_queue_maxsize=ASYNC_QUEUE_MAXSIZE): + async_retry_options=ASYNC_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if batch_send: async = True @@ -188,6 +190,7 @@ def __init__(self, client, async=False, if self.async: # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) + self.async_queue_put_timeout = async_queue_put_timeout self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -197,7 +200,7 @@ def __init__(self, client, async=False, batch_send_every_n, self.req_acks, self.ack_timeout, - batch_retry_options, + async_retry_options, self.thread_stop_event)) # Thread will die if main thread exits @@ -249,10 +252,11 @@ def _send_messages(self, topic, partition, *msg, **kwargs): raise TypeError("the key must be type bytes") if self.async: + put_timeout = self.async_queue_put_timeout for m in msg: try: item = (TopicAndPartition(topic, partition), m, key) - self.queue.put_nowait(item) + self.queue.put(item, bool(put_timeout), put_timeout) except Full: raise AsyncProducerQueueFull( 'Producer async queue overfilled. ' diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 7bcc62934..0fdccd586 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,7 +7,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS + BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, + ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT ) log = logging.getLogger("kafka") @@ -38,7 +39,9 @@ def __init__(self, client, partitioner=None, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_retry_options=BATCH_RETRY_OPTIONS): + async_retry_options=ASYNC_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: partitioner = HashedPartitioner self.partitioner_class = partitioner @@ -48,7 +51,9 @@ def __init__(self, client, partitioner=None, async=False, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_options) + async_retry_options, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic, key): if topic not in self.partitioners: diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index b869683e0..f7dfc4629 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,7 +10,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, BATCH_RETRY_OPTIONS + BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, + ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT ) log = logging.getLogger("kafka") @@ -46,14 +47,18 @@ def __init__(self, client, async=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, random_start=True, - batch_retry_options=BATCH_RETRY_OPTIONS): + async_retry_options=ASYNC_RETRY_OPTIONS, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, + async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} self.random_start = random_start super(SimpleProducer, self).__init__(client, async, req_acks, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - batch_retry_options) + async_retry_options, + async_queue_maxsize, + async_queue_put_timeout) def _next_partition(self, topic): if topic not in self.partition_cycles: From 91af27c64488a0029e960615d0f10d62532d6616 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 22 Apr 2015 12:40:04 +0300 Subject: [PATCH 17/22] Fix async producer queue put arguments --- kafka/producer/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 0b31d1800..df391f7b5 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -252,11 +252,13 @@ def _send_messages(self, topic, partition, *msg, **kwargs): raise TypeError("the key must be type bytes") if self.async: - put_timeout = self.async_queue_put_timeout for m in msg: try: item = (TopicAndPartition(topic, partition), m, key) - self.queue.put(item, bool(put_timeout), put_timeout) + if self.async_queue_put_timeout == 0: + self.queue.put_nowait(item) + else: + self.queue.put(item, True, self.async_queue_put_timeout) except Full: raise AsyncProducerQueueFull( 'Producer async queue overfilled. ' From 9eed1698fdcdda7cd8efe7bbee559ccf8baf9e1e Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 22 Apr 2015 19:05:18 +0300 Subject: [PATCH 18/22] Change backoff message log level --- kafka/producer/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index df391f7b5..03ef2a7b4 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -115,7 +115,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, # doing backoff before next retry if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms: - log.warning("Doing backoff for %s(ms)." % retry_options.backoff_ms) + log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry From 4c682f3d4da6c5af8bfbb00700c431a272b37dc1 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 22 Apr 2015 19:08:13 +0300 Subject: [PATCH 19/22] Increase producer test timeout --- test/test_producer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_producer.py b/test/test_producer.py index de012b9ad..3004c2db5 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -158,7 +158,7 @@ def send_side_effect(reqs, *args, **kwargs): self.client.send_produce_request.side_effect = send_side_effect - self._run_process(3, 2) + self._run_process(3, 3) # the queue should be void at the end of the test self.assertEqual(self.queue.empty(), True) From a3fb3225a27ba6ca1a9fdac519c1f4257754d4eb Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Fri, 15 May 2015 12:58:34 +0300 Subject: [PATCH 20/22] Improve async producer code: logic and style fixes - send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli --- kafka/common.py | 17 +++++------- kafka/producer/base.py | 57 +++++++++++++++++++++++----------------- kafka/producer/keyed.py | 12 ++++++--- kafka/producer/simple.py | 12 ++++++--- test/test_producer.py | 53 ++++++++++++++----------------------- 5 files changed, 75 insertions(+), 76 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index 87c29f017..8c13798e4 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -14,15 +14,8 @@ ["brokers", "topics"]) # https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI -_ProduceRequest = namedtuple("ProduceRequest", - ["topic", "partition", "messages", "retries"]) - - -class ProduceRequest(_ProduceRequest): - def __new__(cls, topic, partition, messages, retries=0): - return super(ProduceRequest, cls).__new__( - cls, topic, partition, messages, retries) - +ProduceRequest = namedtuple("ProduceRequest", + ["topic", "partition", "messages"]) ProduceResponse = namedtuple("ProduceResponse", ["topic", "partition", "error", "offset"]) @@ -79,7 +72,7 @@ def __new__(cls, topic, partition, messages, retries=0): ["topic", "partition", "offset", "key", "value"]) # Define retry policy for async producer -# Limit corner values: None - infinite retries, 0 - no retries +# Limit value: int >= 0, 0 means no retries RetryOptions = namedtuple("RetryOptions", ["limit", "backoff_ms", "retry_on_timeouts"]) @@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError): class AsyncProducerQueueFull(KafkaError): - pass + def __init__(self, failed_msgs, *args): + super(AsyncProducerQueueFull, self).__init__(*args) + self.failed_msgs = failed_msgs def _iter_broker_errors(): diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 03ef2a7b4..602e2ed6d 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -34,8 +34,10 @@ ASYNC_QUEUE_MAXSIZE = 0 ASYNC_QUEUE_PUT_TIMEOUT = 0 # no retries by default -ASYNC_RETRY_OPTIONS = RetryOptions( - limit=0, backoff_ms=0, retry_on_timeouts=False) +ASYNC_RETRY_LIMIT = 0 +ASYNC_RETRY_BACKOFF_MS = 0 +ASYNC_RETRY_ON_TIMEOUTS = False + STOP_ASYNC_PRODUCER = -1 @@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, a specified timeout and send them upstream to the brokers in one request """ - reqs = [] + reqs = {} client.reinit() while not stop_event.is_set(): @@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages = create_message_set(msg, codec, key) req = ProduceRequest(topic_partition.topic, topic_partition.partition, - messages) - reqs.append(req) + tuple(messages)) + reqs[req] = 0 if not reqs: continue reqs_to_retry, error_type = [], None - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except FailedPayloadsError as ex: - error_type = FailedPayloadsError - reqs_to_retry = ex.failed_payloads + try: + reply = client.send_produce_request(reqs.keys(), + acks=req_acks, + timeout=ack_timeout, + fail_on_error=False) + reqs_to_retry = [req for broker_responses in reply + for response in broker_responses + for req in response.failed_payloads + if isinstance(response, FailedPayloadsError)] + if reqs_to_retry: + error_type = FailedPayloadsError except RequestTimedOutError: error_type = RequestTimedOutError if retry_options.retry_on_timeouts: - reqs_to_retry = reqs + reqs_to_retry = reqs.keys() except Exception as ex: error_type = type(ex) if type(ex) in RETRY_ERROR_TYPES: - reqs_to_retry = reqs - - finally: - reqs = [] + reqs_to_retry = reqs.keys() - if not reqs_to_retry or retry_options.limit == 0: + if not reqs_to_retry: + reqs = {} continue # doing backoff before next retry @@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_type in RETRY_REFRESH_ERROR_TYPES: client.load_metadata_for_topics() - reqs = [req._replace(retries=req.retries+1) - for req in reqs_to_retry - if not retry_options.limit or - (retry_options.limit and req.retries < retry_options.limit)] + reqs = {key: count + 1 for key, count in reqs.items() + if key in reqs_to_retry and count < retry_options.limit} class Producer(object): @@ -161,7 +163,9 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): @@ -191,6 +195,10 @@ def __init__(self, client, async=False, # Messages are sent through this queue self.queue = Queue(async_queue_maxsize) self.async_queue_put_timeout = async_queue_put_timeout + async_retry_options = RetryOptions( + limit=async_retry_limit, + backoff_ms=async_retry_backoff_ms, + retry_on_timeouts=async_retry_on_timeouts) self.thread_stop_event = Event() self.thread = Thread(target=_send_upstream, args=(self.queue, @@ -252,7 +260,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): raise TypeError("the key must be type bytes") if self.async: - for m in msg: + for idx, m in enumerate(msg): try: item = (TopicAndPartition(topic, partition), m, key) if self.async_queue_put_timeout == 0: @@ -261,6 +269,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): self.queue.put(item, True, self.async_queue_put_timeout) except Full: raise AsyncProducerQueueFull( + msg[idx:], 'Producer async queue overfilled. ' 'Current queue size %d.' % self.queue.qsize()) resp = [] diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py index 0fdccd586..5252976d0 100644 --- a/kafka/producer/keyed.py +++ b/kafka/producer/keyed.py @@ -7,8 +7,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, - ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -39,7 +39,9 @@ def __init__(self, client, partitioner=None, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): if not partitioner: @@ -51,7 +53,9 @@ def __init__(self, client, partitioner=None, async=False, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - async_retry_options, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, async_queue_maxsize, async_queue_put_timeout) diff --git a/kafka/producer/simple.py b/kafka/producer/simple.py index f7dfc4629..ded6eb6e4 100644 --- a/kafka/producer/simple.py +++ b/kafka/producer/simple.py @@ -10,8 +10,8 @@ from .base import ( Producer, BATCH_SEND_DEFAULT_INTERVAL, - BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS, - ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT + BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT, + ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS ) log = logging.getLogger("kafka") @@ -47,7 +47,9 @@ def __init__(self, client, async=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, random_start=True, - async_retry_options=ASYNC_RETRY_OPTIONS, + async_retry_limit=ASYNC_RETRY_LIMIT, + async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS, + async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS, async_queue_maxsize=ASYNC_QUEUE_MAXSIZE, async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT): self.partition_cycles = {} @@ -56,7 +58,9 @@ def __init__(self, client, async=False, ack_timeout, codec, batch_send, batch_send_every_n, batch_send_every_t, - async_retry_options, + async_retry_limit, + async_retry_backoff_ms, + async_retry_on_timeouts, async_queue_maxsize, async_queue_put_timeout) diff --git a/test/test_producer.py b/test/test_producer.py index 3004c2db5..a2ba8773d 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -17,6 +17,10 @@ from queue import Empty, Queue except ImportError: from Queue import Empty, Queue +try: + xrange +except NameError: + xrange = range class TestKafkaProducer(unittest.TestCase): @@ -52,7 +56,8 @@ def partitions(topic): producer.send_messages(topic, b'hi') assert client.send_produce_request.called - def test_producer_async_queue_overfilled_batch_send(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled_batch_send(self, mock): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size) @@ -64,8 +69,12 @@ def test_producer_async_queue_overfilled_batch_send(self): with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() - def test_producer_async_queue_overfilled(self): + @patch('kafka.producer.base._send_upstream') + def test_producer_async_queue_overfilled(self, mock): queue_size = 2 producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size) @@ -77,7 +86,9 @@ def test_producer_async_queue_overfilled(self): with self.assertRaises(AsyncProducerQueueFull): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list) - + self.assertEqual(producer.queue.qsize(), queue_size) + for _ in xrange(producer.queue.qsize()): + producer.queue.get() class TestKafkaProducerSendUpstream(unittest.TestCase): @@ -121,7 +132,6 @@ def test_wo_retries(self): # 3 batches of 3 msgs each + 1 batch of 1 message self.assertEqual(self.client.send_produce_request.call_count, 4) - def test_first_send_failed(self): # lets create a queue and add 10 messages for 10 different partitions @@ -133,7 +143,8 @@ def test_first_send_failed(self): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - raise FailedPayloadsError(reqs) + return [[FailedPayloadsError(reqs)]] + return [] self.client.send_produce_request.side_effect = send_side_effect @@ -154,7 +165,7 @@ def test_with_limited_retries(self): self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): - raise FailedPayloadsError(reqs) + return [[FailedPayloadsError(reqs)]] self.client.send_produce_request.side_effect = send_side_effect @@ -168,30 +179,6 @@ def send_side_effect(reqs, *args, **kwargs): # 3 retries of the batches above = 4 + 3 * 4 = 16, all failed self.assertEqual(self.client.send_produce_request.call_count, 16) - def test_with_unlimited_retries(self): - - # lets create a queue and add 10 messages for 10 different partitions - # to show how retries should work ideally - for i in range(10): - self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i")) - - def send_side_effect(reqs, *args, **kwargs): - raise FailedPayloadsError(reqs) - - self.client.send_produce_request.side_effect = send_side_effect - - self._run_process(None) - - # the queue should have 7 elements - # 3 batches of 1 msg each were retried all this time - self.assertEqual(self.queue.empty(), False) - try: - for i in range(7): - self.queue.get(timeout=0.01) - except Empty: - self.fail("Should be 7 elems in the queue") - self.assertEqual(self.queue.empty(), True) - - # 1s / 50ms of backoff = 20 times max - calls = self.client.send_produce_request.call_count - self.assertTrue(calls > 10 & calls <= 20) + def tearDown(self): + for _ in xrange(self.queue.qsize()): + self.queue.get() From 4474a50162f73aaabe770ee6dce9bd9b2110b2d5 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Fri, 15 May 2015 13:52:59 +0300 Subject: [PATCH 21/22] Async producer: py2.6 backward compatibility fix --- kafka/producer/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 602e2ed6d..2e2f3c448 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -126,8 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if error_type in RETRY_REFRESH_ERROR_TYPES: client.load_metadata_for_topics() - reqs = {key: count + 1 for key, count in reqs.items() - if key in reqs_to_retry and count < retry_options.limit} + reqs = dict((key, count + 1) for (key, count) in reqs.items() + if key in reqs_to_retry and count < retry_options.limit) class Producer(object): From 7d6f3f541e0c380c0600eb607d927ec8f8cc966f Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Wed, 3 Jun 2015 23:09:15 +0300 Subject: [PATCH 22/22] Check response.error for async producer --- kafka/producer/base.py | 43 +++++++++++++++++++++++------------------- test/test_producer.py | 4 ++-- tox.ini | 1 + 3 files changed, 27 insertions(+), 21 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2e2f3c448..2edeace9c 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -16,8 +16,8 @@ from kafka.common import ( ProduceRequest, TopicAndPartition, RetryOptions, - UnsupportedCodecError, FailedPayloadsError, - RequestTimedOutError, AsyncProducerQueueFull + kafka_errors, UnsupportedCodecError, FailedPayloadsError, + RequestTimedOutError, AsyncProducerQueueFull, UnknownError ) from kafka.common import ( RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES) @@ -89,41 +89,46 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, if not reqs: continue - reqs_to_retry, error_type = [], None + reqs_to_retry, error_cls = [], None + do_backoff, do_refresh = False, False + + def _handle_error(error_cls, reqs, all_retries): + if ((error_cls == RequestTimedOutError and + retry_options.retry_on_timeouts) or + error_cls in RETRY_ERROR_TYPES): + all_retries += reqs + if error_cls in RETRY_BACKOFF_ERROR_TYPES: + do_backoff = True + if error_cls in RETRY_REFRESH_ERROR_TYPES: + do_refresh = True try: reply = client.send_produce_request(reqs.keys(), acks=req_acks, timeout=ack_timeout, fail_on_error=False) - reqs_to_retry = [req for broker_responses in reply - for response in broker_responses - for req in response.failed_payloads - if isinstance(response, FailedPayloadsError)] - if reqs_to_retry: - error_type = FailedPayloadsError - - except RequestTimedOutError: - error_type = RequestTimedOutError - if retry_options.retry_on_timeouts: - reqs_to_retry = reqs.keys() + for i, response in enumerate(reply): + if isinstance(response, FailedPayloadsError): + _handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry) + elif isinstance(response, ProduceResponse) and response.error: + error_cls = kafka_errors.get(response.error, UnknownError) + _handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry) except Exception as ex: - error_type = type(ex) - if type(ex) in RETRY_ERROR_TYPES: - reqs_to_retry = reqs.keys() + error_cls = kafka_errors.get(type(ex), UnknownError) + _handle_error(error_cls, reqs.keys(), reqs_to_retry) if not reqs_to_retry: reqs = {} continue # doing backoff before next retry - if error_type in RETRY_BACKOFF_ERROR_TYPES and retry_options.backoff_ms: + if do_backoff and retry_options.backoff_ms: log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms) time.sleep(float(retry_options.backoff_ms) / 1000) # refresh topic metadata before next retry - if error_type in RETRY_REFRESH_ERROR_TYPES: + if do_refresh: client.load_metadata_for_topics() reqs = dict((key, count + 1) for (key, count) in reqs.items() diff --git a/test/test_producer.py b/test/test_producer.py index a2ba8773d..258b9c3bc 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -143,7 +143,7 @@ def test_first_send_failed(self): def send_side_effect(reqs, *args, **kwargs): if self.client.is_first_time: self.client.is_first_time = False - return [[FailedPayloadsError(reqs)]] + return [FailedPayloadsError(reqs)] return [] self.client.send_produce_request.side_effect = send_side_effect @@ -165,7 +165,7 @@ def test_with_limited_retries(self): self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i)) def send_side_effect(reqs, *args, **kwargs): - return [[FailedPayloadsError(reqs)]] + return [FailedPayloadsError(reqs)] self.client.send_produce_request.side_effect = send_side_effect diff --git a/tox.ini b/tox.ini index fba7d8e28..e3e856864 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,7 @@ commands = nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka} setenv = PROJECT_ROOT = {toxinidir} +passenv = KAFKA_VERSION [testenv:py33] deps =