Skip to content

Commit

Permalink
Fixes racing condition when message is sent to broker before topic lo…
Browse files Browse the repository at this point in the history
…gs are created
  • Loading branch information
asdaraujo committed Feb 12, 2018
1 parent 6605597 commit f091a9a
Showing 1 changed file with 19 additions and 3 deletions.
22 changes: 19 additions & 3 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@
from . import unittest

from kafka import SimpleClient, create_message
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
from kafka.errors import LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload, \
NotLeaderForPartitionError, UnknownTopicOrPartitionError, \
FailedPayloadsError
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order

def kafka_versions(*versions):
Expand Down Expand Up @@ -123,11 +125,25 @@ def setUp(self):
self.client.load_metadata_for_topics(self.topic, ignore_leadernotavailable=False)
if self.client.has_metadata_for_topic(topic):
break
except LeaderNotAvailableError:
except LeaderNotAvailableError, InvalidTopicError:
time.sleep(1)
else:
raise KafkaTimeoutError('Timeout loading topic metadata!')


# Ensure topic partitions have been created on all brokers to avoid UnknownPartitionErrors
# TODO: It might be a good idea to move this to self.client.ensure_topic_exists
for partition in self.client.get_partition_ids_for_topic(self.topic):
while True:
try:
req = OffsetRequestPayload(self.topic, partition, -1, 100)
self.client.send_offset_request([req])
break
except (NotLeaderForPartitionError, UnknownTopicOrPartitionError, FailedPayloadsError) as e:
if time.time() > timeout:
raise KafkaTimeoutError('Timeout loading topic metadata!')
time.sleep(.1)

self._messages = {}

def tearDown(self):
Expand Down

0 comments on commit f091a9a

Please sign in to comment.