Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async producer retries for failed messages #331

Merged
merged 22 commits into from
Jun 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b82f94f
Retries for async batching
vshlapakov Feb 25, 2015
81d8688
Fixed base producer imports
vshlapakov Feb 25, 2015
4b8288a
Producer _send_upstream fixes, added tests for retries
vshlapakov Feb 26, 2015
a9324f3
Fixed compatible issues with tests
vshlapakov Feb 26, 2015
566e408
Fix: check failed reqs to retry only for positive limit
vshlapakov Feb 26, 2015
b0a0459
Returned default behaviour with no retries
vshlapakov Feb 26, 2015
5e8dc6d
Fixed tests and other issues after rebase
vshlapakov Mar 24, 2015
09c1c8b
Improved retry logic
vshlapakov Apr 21, 2015
0e0f794
Arg fixes for base/keyed producers
vshlapakov Apr 21, 2015
b311145
Clean and simplify retry logic
vshlapakov Apr 21, 2015
c165f17
Disable retry on timeouts by default (old behaviour)
vshlapakov Apr 21, 2015
cf36308
add producer send batch queue overfilled test
Jan 23, 2015
f41e5f3
async queue: refactored code; add one more test
ediskandarov Jan 23, 2015
948e046
Fix small issues with names/tests
vshlapakov Apr 21, 2015
7da48f6
Simplification of retry logic
vshlapakov Apr 22, 2015
5119bb6
Fix names for async retries opts, add timeout for put
vshlapakov Apr 22, 2015
91af27c
Fix async producer queue put arguments
vshlapakov Apr 22, 2015
9eed169
Change backoff message log level
vshlapakov Apr 22, 2015
4c682f3
Increase producer test timeout
vshlapakov Apr 22, 2015
a3fb322
Improve async producer code: logic and style fixes
vshlapakov May 15, 2015
4474a50
Async producer: py2.6 backward compatibility fix
vshlapakov May 15, 2015
7d6f3f5
Check response.error for async producer
vshlapakov Jun 3, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions kafka/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
KafkaMessage = namedtuple("KafkaMessage",
["topic", "partition", "offset", "key", "value"])

# Define retry policy for async producer
# Limit value: int >= 0, 0 means no retries
RetryOptions = namedtuple("RetryOptions",
["limit", "backoff_ms", "retry_on_timeouts"])


#################
# Exceptions #
Expand Down Expand Up @@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError):
pass


class AsyncProducerQueueFull(KafkaError):
def __init__(self, failed_msgs, *args):
super(AsyncProducerQueueFull, self).__init__(*args)
self.failed_msgs = failed_msgs


def _iter_broker_errors():
for name, obj in inspect.getmembers(sys.modules[__name__]):
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
Expand All @@ -218,3 +229,18 @@ def check_error(response):
if response.error:
error_class = kafka_errors.get(response.error, UnknownError)
raise error_class(response)


RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
ConnectionError, FailedPayloadsError
)


RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
LeaderNotAvailableError, ConnectionError
)


RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES
120 changes: 98 additions & 22 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,23 @@
import time

try:
from queue import Empty, Queue
from queue import Empty, Full, Queue
except ImportError:
from Queue import Empty, Queue
from Queue import Empty, Full, Queue
from collections import defaultdict

from threading import Thread, Event

import six

from kafka.common import (
ProduceRequest, TopicAndPartition, UnsupportedCodecError
ProduceRequest, TopicAndPartition, RetryOptions,
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
)
from kafka.common import (
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)

from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring

Expand All @@ -25,21 +30,33 @@
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20

# unlimited
ASYNC_QUEUE_MAXSIZE = 0
ASYNC_QUEUE_PUT_TIMEOUT = 0
# no retries by default
ASYNC_RETRY_LIMIT = 0
ASYNC_RETRY_BACKOFF_MS = 0
ASYNC_RETRY_ON_TIMEOUTS = False

STOP_ASYNC_PRODUCER = -1


def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout, stop_event):
req_acks, ack_timeout, retry_options, stop_event):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
request
"""
stop = False
reqs = {}
client.reinit()

while not stop_event.is_set():
timeout = batch_time
count = batch_size

# it's a simplification: we're comparing message sets and
# messages: each set can contain [1..batch_size] messages
count = batch_size - len(reqs)
send_at = time.time() + timeout
msgset = defaultdict(list)

Expand All @@ -48,7 +65,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
while count > 0 and timeout >= 0:
try:
topic_partition, msg, key = queue.get(timeout=timeout)

except Empty:
break

Expand All @@ -63,20 +79,60 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
msgset[topic_partition].append((msg, key))

# Send collected requests upstream
reqs = []
for topic_partition, msg in msgset.items():
messages = create_message_set(msg, codec, key)
req = ProduceRequest(topic_partition.topic,
topic_partition.partition,
messages)
reqs.append(req)
tuple(messages))
reqs[req] = 0

if not reqs:
continue

reqs_to_retry, error_cls = [], None
do_backoff, do_refresh = False, False

def _handle_error(error_cls, reqs, all_retries):
if ((error_cls == RequestTimedOutError and
retry_options.retry_on_timeouts) or
error_cls in RETRY_ERROR_TYPES):
all_retries += reqs
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
do_backoff = True
if error_cls in RETRY_REFRESH_ERROR_TYPES:
do_refresh = True

try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
except Exception:
log.exception("Unable to send message")
reply = client.send_produce_request(reqs.keys(),
acks=req_acks,
timeout=ack_timeout,
fail_on_error=False)
for i, response in enumerate(reply):
if isinstance(response, FailedPayloadsError):
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
elif isinstance(response, ProduceResponse) and response.error:
error_cls = kafka_errors.get(response.error, UnknownError)
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)

except Exception as ex:
error_cls = kafka_errors.get(type(ex), UnknownError)
_handle_error(error_cls, reqs.keys(), reqs_to_retry)

if not reqs_to_retry:
reqs = {}
continue

# doing backoff before next retry
if do_backoff and retry_options.backoff_ms:
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
time.sleep(float(retry_options.backoff_ms) / 1000)

# refresh topic metadata before next retry
if do_refresh:
client.load_metadata_for_topics()

reqs = dict((key, count + 1) for (key, count) in reqs.items()
if key in reqs_to_retry and count < retry_options.limit)


class Producer(object):
Expand Down Expand Up @@ -111,12 +167,18 @@ def __init__(self, client, async=False,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):

if batch_send:
async = True
assert batch_send_every_n > 0
assert batch_send_every_t > 0
assert async_queue_maxsize >= 0
else:
batch_send_every_n = 1
batch_send_every_t = 3600
Expand All @@ -135,10 +197,13 @@ def __init__(self, client, async=False,
self.codec = codec

if self.async:
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
# Messages are sent through this queue
self.queue = Queue(async_queue_maxsize)
self.async_queue_put_timeout = async_queue_put_timeout
async_retry_options = RetryOptions(
limit=async_retry_limit,
backoff_ms=async_retry_backoff_ms,
retry_on_timeouts=async_retry_on_timeouts)
self.thread_stop_event = Event()
self.thread = Thread(target=_send_upstream,
args=(self.queue,
Expand All @@ -148,6 +213,7 @@ def __init__(self, client, async=False,
batch_send_every_n,
self.req_acks,
self.ack_timeout,
async_retry_options,
self.thread_stop_event))

# Thread will die if main thread exits
Expand Down Expand Up @@ -199,8 +265,18 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
raise TypeError("the key must be type bytes")

if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m, key))
for idx, m in enumerate(msg):
try:
item = (TopicAndPartition(topic, partition), m, key)
if self.async_queue_put_timeout == 0:
self.queue.put_nowait(item)
else:
self.queue.put(item, True, self.async_queue_put_timeout)
except Full:
raise AsyncProducerQueueFull(
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need to have a way to let the user know which messages were successfully added to the queue, and which were not (because we loop for m in msg it is possible that the first n messages are added, but msg n+1 raises the exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! My proposal is tracking index n and return msg[n+1:] as failed_msgs property's value of AsyncProducerQueueFull exception.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that would work

msg[idx:],
'Producer async queue overfilled. '
'Current queue size %d.' % self.queue.qsize())
resp = []
else:
messages = create_message_set([(m, key) for m in msg], self.codec, key)
Expand Down
17 changes: 14 additions & 3 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)

log = logging.getLogger("kafka")
Expand Down Expand Up @@ -37,7 +38,12 @@ def __init__(self, client, partitioner=None, async=False,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
Expand All @@ -46,7 +52,12 @@ def __init__(self, client, partitioner=None, async=False,
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)

def _next_partition(self, topic, key):
if topic not in self.partitioners:
Expand Down
17 changes: 14 additions & 3 deletions kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
)

log = logging.getLogger("kafka")
Expand Down Expand Up @@ -45,13 +46,23 @@ def __init__(self, client, async=False,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
random_start=True):
random_start=True,
async_retry_limit=ASYNC_RETRY_LIMIT,
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
async_retry_limit,
async_retry_backoff_ms,
async_retry_on_timeouts,
async_queue_maxsize,
async_queue_put_timeout)

def _next_partition(self, topic):
if topic not in self.partition_cycles:
Expand Down
Loading