diff --git a/kafka/client.py b/kafka/client.py index 4cd9e24f5..7675ad484 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -243,15 +243,11 @@ def reinit(self): def reset_topic_metadata(self, *topics): for topic in topics: - try: - partitions = self.topic_partitions[topic] - except KeyError: - continue - - for partition in partitions: - self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) - - del self.topic_partitions[topic] + for topic_partition in list(self.topics_to_brokers.keys()): + if topic_partition.topic == topic: + del self.topics_to_brokers[topic_partition] + if topic in self.topic_partitions: + del self.topic_partitions[topic] def reset_all_metadata(self): self.topics_to_brokers.clear() @@ -315,10 +311,17 @@ def load_metadata_for_topics(self, *topics): (a single partition w/o a leader, for example) """ topics = [kafka_bytestring(t) for t in topics] + + if topics: + for topic in topics: + self.reset_topic_metadata(topic) + else: + self.reset_all_metadata() + resp = self.send_metadata_request(topics) - log.debug("Broker metadata: %s", resp.brokers) - log.debug("Topic metadata: %s", resp.topics) + log.debug("Received new broker metadata: %s", resp.brokers) + log.debug("Received new topic metadata: %s", resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) @@ -327,8 +330,6 @@ def load_metadata_for_topics(self, *topics): topic = topic_metadata.topic partitions = topic_metadata.partitions - self.reset_topic_metadata(topic) - # Errors expected for new topics try: kafka.common.check_error(topic_metadata)