Skip to content

Commit

Permalink
incorporated dpkp's comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Space committed Apr 3, 2015
1 parent 0213d09 commit adf5671
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 26 deletions.
18 changes: 8 additions & 10 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,19 +258,17 @@ def reset_all_metadata(self):
self.topic_partitions.clear()

def has_metadata_for_topic(self, topic):
if not topic in self.topic_partitions:
bytes_topic = kafka_bytestring(topic)
if (bytes_topic is topic) or (not bytes_topic in self.topic_partitions):
return False
topic = bytes_topic
return len(self.topic_partitions[topic]) > 0
topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)

def get_partition_ids_for_topic(self, topic):
topic = kafka_bytestring(topic)
if topic not in self.topic_partitions:
bytes_topic = kafka_bytestring(topic)
if (bytes_topic is topic) or (not bytes_topic in self.topic_partitions):
return []
topic = bytes_topic
return []

return sorted(list(self.topic_partitions[topic]))

def ensure_topic_exists(self, topic, timeout = 30):
Expand Down
4 changes: 2 additions & 2 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
UnknownTopicOrPartitionError, check_error
)

from kafka.util import (kafka_bytestring, ReentrantTimer)
from kafka.util import kafka_bytestring, ReentrantTimer

log = logging.getLogger("kafka")

Expand Down Expand Up @@ -45,7 +45,7 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,

self.client = client
self.topic = kafka_bytestring(topic)
self.group = kafka_bytestring(group)
self.group = None if group is None else kafka_bytestring(group)
self.client.load_metadata_for_topics(topic)
self.offsets = {}

Expand Down
4 changes: 2 additions & 2 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring

log = logging.getLogger("kafka")

Expand Down Expand Up @@ -170,8 +171,7 @@ def send_messages(self, topic, partition, *msg):
All messages produced via this method will set the message 'key' to Null
"""
if not isinstance(topic, six.binary_type):
topic = topic.encode('utf-8')
topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg)

def _send_messages(self, topic, partition, *msg, **kwargs):
Expand Down
9 changes: 4 additions & 5 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from __future__ import absolute_import

import logging
import six

from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
Expand Down Expand Up @@ -58,14 +59,12 @@ def _next_partition(self, topic, key):
return partitioner.partition(key)

def send_messages(self,topic,key,*msg):
if not isinstance(topic, six.binary_type):
topic = topic.encode('utf-8')
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg,key=key)

def send(self, topic, key, msg):
if not isinstance(topic, six.binary_type):
topic = topic.encode('utf-8')
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, msg, key=key)

Expand Down
2 changes: 0 additions & 2 deletions kafka/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ def kafka_bytestring(s):
return s
if isinstance(s, six.string_types):
return s.encode('utf-8')
if s is None:
return None
raise TypeError(s)


Expand Down
10 changes: 5 additions & 5 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ def test_has_metadata_for_topic(self, protocol, conn):
]

topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
TopicMetadata(b'topic_still_creating', NO_LEADER, []),
TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata(b'topic_noleaders', NO_ERROR, [
PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
Expand Down

0 comments on commit adf5671

Please sign in to comment.