Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make external API consistently support python3 strings for topic. #361

Merged
merged 1 commit into from
Apr 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,12 +258,14 @@ def reset_all_metadata(self):
self.topic_partitions.clear()

def has_metadata_for_topic(self, topic):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not simply set topic = kafka_bytestring(topic) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way as a premature optimization because I saw has_metadata_for_topic and get_partition_ids_for_topic were on the callpath of send_messages which is probably the most used method and always calls these metadata methods with a byte string. But on closer inspection, it should only call them once on initialization, so its probably clearer to use a consistent pattern.

topic = kafka_bytestring(topic)
return (
topic in self.topic_partitions
and len(self.topic_partitions[topic]) > 0
)

def get_partition_ids_for_topic(self, topic):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same - topic = kafka_bytestring(topic) seems like a simpler approach here

topic = kafka_bytestring(topic)
if topic not in self.topic_partitions:
return []

Expand Down Expand Up @@ -312,6 +314,7 @@ def load_metadata_for_topics(self, *topics):
Partition-level errors will also not be raised here
(a single partition w/o a leader, for example)
"""
topics = [kafka_bytestring(t) for t in topics]
resp = self.send_metadata_request(topics)

log.debug("Broker metadata: %s", resp.brokers)
Expand Down
6 changes: 3 additions & 3 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
UnknownTopicOrPartitionError, check_error
)

from kafka.util import ReentrantTimer
from kafka.util import kafka_bytestring, ReentrantTimer

log = logging.getLogger("kafka")

Expand Down Expand Up @@ -44,8 +44,8 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
auto_commit_every_t=AUTO_COMMIT_INTERVAL):

self.client = client
self.topic = topic
self.group = group
self.topic = kafka_bytestring(topic)
self.group = None if group is None else kafka_bytestring(group)
self.client.load_metadata_for_topics(topic)
self.offsets = {}

Expand Down
2 changes: 1 addition & 1 deletion kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def __init__(self, client, group, topic, auto_commit=True,
simple_consumer_options.pop('partitions', None)
options.update(simple_consumer_options)

args = (client.copy(), group, topic, self.queue,
args = (client.copy(), self.group, self.topic, self.queue,
self.size, self.events)
proc = Process(target=_mp_consume, args=args, kwargs=options)
proc.daemon = True
Expand Down
6 changes: 6 additions & 0 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
ProduceRequest, TopicAndPartition, UnsupportedCodecError
)
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
from kafka.util import kafka_bytestring

log = logging.getLogger("kafka")

Expand Down Expand Up @@ -170,6 +171,7 @@ def send_messages(self, topic, partition, *msg):

All messages produced via this method will set the message 'key' to Null
"""
topic = kafka_bytestring(topic)
return self._send_messages(topic, partition, *msg)

def _send_messages(self, topic, partition, *msg, **kwargs):
Expand All @@ -183,6 +185,10 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
if any(not isinstance(m, six.binary_type) for m in msg):
raise TypeError("all produce message payloads must be type bytes")

# Raise TypeError if topic is not encoded as bytes
if not isinstance(topic, six.binary_type):
raise TypeError("the topic must be type bytes")

# Raise TypeError if the key is not encoded as bytes
if key is not None and not isinstance(key, six.binary_type):
raise TypeError("the key must be type bytes")
Expand Down
4 changes: 4 additions & 0 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging

from kafka.partitioner import HashedPartitioner
from kafka.util import kafka_bytestring

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
Expand Down Expand Up @@ -57,10 +59,12 @@ def _next_partition(self, topic, key):
return partitioner.partition(key)

def send_messages(self,topic,key,*msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg,key=key)

def send(self, topic, key, msg):
topic = kafka_bytestring(topic)
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, msg, key=key)

Expand Down
70 changes: 36 additions & 34 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,34 +117,34 @@ def test_load_metadata(self, protocol, conn):
]

topics = [
TopicMetadata('topic_1', NO_ERROR, [
PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
TopicMetadata(b'topic_1', NO_ERROR, [
PartitionMetadata(b'topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
]),
TopicMetadata('topic_noleader', NO_ERROR, [
PartitionMetadata('topic_noleader', 0, -1, [], [],
TopicMetadata(b'topic_noleader', NO_ERROR, [
PartitionMetadata(b'topic_noleader', 0, -1, [], [],
NO_LEADER),
PartitionMetadata('topic_noleader', 1, -1, [], [],
PartitionMetadata(b'topic_noleader', 1, -1, [], [],
NO_LEADER),
]),
TopicMetadata('topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_3', NO_ERROR, [
PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
TopicMetadata(b'topic_no_partitions', NO_LEADER, []),
TopicMetadata(b'topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata(b'topic_3', NO_ERROR, [
PartitionMetadata(b'topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
PartitionMetadata(b'topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
PartitionMetadata(b'topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)

# client loads metadata at init
client = KafkaClient(hosts=['broker_1:4567'])
self.assertDictEqual({
TopicAndPartition('topic_1', 0): brokers[1],
TopicAndPartition('topic_noleader', 0): None,
TopicAndPartition('topic_noleader', 1): None,
TopicAndPartition('topic_3', 0): brokers[0],
TopicAndPartition('topic_3', 1): brokers[1],
TopicAndPartition('topic_3', 2): brokers[0]},
TopicAndPartition(b'topic_1', 0): brokers[1],
TopicAndPartition(b'topic_noleader', 0): None,
TopicAndPartition(b'topic_noleader', 1): None,
TopicAndPartition(b'topic_3', 0): brokers[0],
TopicAndPartition(b'topic_3', 1): brokers[1],
TopicAndPartition(b'topic_3', 2): brokers[0]},
client.topics_to_brokers)

# if we ask for metadata explicitly, it should raise errors
Expand All @@ -156,6 +156,7 @@ def test_load_metadata(self, protocol, conn):

# This should not raise
client.load_metadata_for_topics('topic_no_leader')
client.load_metadata_for_topics(b'topic_no_leader')

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
Expand All @@ -169,11 +170,11 @@ def test_has_metadata_for_topic(self, protocol, conn):
]

topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
TopicMetadata(b'topic_still_creating', NO_LEADER, []),
TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata(b'topic_noleaders', NO_ERROR, [
PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
Expand All @@ -188,8 +189,8 @@ def test_has_metadata_for_topic(self, protocol, conn):
self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_ensure_topic_exists(self, protocol, conn):
@patch('kafka.client.KafkaProtocol.decode_metadata_response')
def test_ensure_topic_exists(self, decode_metadata_response, conn):

conn.recv.return_value = 'response' # anything but None

Expand All @@ -199,14 +200,14 @@ def test_ensure_topic_exists(self, protocol, conn):
]

topics = [
TopicMetadata('topic_still_creating', NO_LEADER, []),
TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata('topic_noleaders', NO_ERROR, [
PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
TopicMetadata(b'topic_still_creating', NO_LEADER, []),
TopicMetadata(b'topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata(b'topic_noleaders', NO_ERROR, [
PartitionMetadata(b'topic_noleaders', 0, -1, [], [], NO_LEADER),
PartitionMetadata(b'topic_noleaders', 1, -1, [], [], NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
decode_metadata_response.return_value = MetadataResponse(brokers, topics)

client = KafkaClient(hosts=['broker_1:4567'])

Expand All @@ -218,6 +219,7 @@ def test_ensure_topic_exists(self, protocol, conn):

# This should not raise
client.ensure_topic_exists('topic_noleaders', timeout=1)
client.ensure_topic_exists(b'topic_noleaders', timeout=1)

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
Expand Down Expand Up @@ -269,8 +271,8 @@ def test_get_leader_for_unassigned_partitions(self, protocol, conn):
]

topics = [
TopicMetadata('topic_no_partitions', NO_LEADER, []),
TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
TopicMetadata(b'topic_no_partitions', NO_LEADER, []),
TopicMetadata(b'topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)

Expand All @@ -279,10 +281,10 @@ def test_get_leader_for_unassigned_partitions(self, protocol, conn):
self.assertDictEqual({}, client.topics_to_brokers)

with self.assertRaises(LeaderNotAvailableError):
client._get_leader_for_partition('topic_no_partitions', 0)
client._get_leader_for_partition(b'topic_no_partitions', 0)

with self.assertRaises(UnknownTopicOrPartitionError):
client._get_leader_for_partition('topic_unknown', 0)
client._get_leader_for_partition(b'topic_unknown', 0)

@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
Expand Down
8 changes: 4 additions & 4 deletions test/test_client_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def tearDownClass(cls): # noqa

@kafka_versions("all")
def test_consume_none(self):
fetch = FetchRequest(self.topic, 0, 0, 1024)
fetch = FetchRequest(self.bytes_topic, 0, 0, 1024)

fetch_resp, = self.client.send_fetch_request([fetch])
self.assertEqual(fetch_resp.error, 0)
self.assertEqual(fetch_resp.topic, self.topic)
self.assertEqual(fetch_resp.topic, self.bytes_topic)
self.assertEqual(fetch_resp.partition, 0)

messages = list(fetch_resp.messages)
Expand All @@ -56,11 +56,11 @@ def test_ensure_topic_exists(self):

@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
def test_commit_fetch_offsets(self):
req = OffsetCommitRequest(self.topic, 0, 42, b"metadata")
req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
(resp,) = self.client.send_offset_commit_request(b"group", [req])
self.assertEqual(resp.error, 0)

req = OffsetFetchRequest(self.topic, 0)
req = OffsetFetchRequest(self.bytes_topic, 0)
(resp,) = self.client.send_offset_fetch_request(b"group", [req])
self.assertEqual(resp.error, 0)
self.assertEqual(resp.offset, 42)
Expand Down
2 changes: 1 addition & 1 deletion test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def tearDownClass(cls):

def send_messages(self, partition, messages):
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
produce = ProduceRequest(self.topic, partition, messages = messages)
produce = ProduceRequest(self.bytes_topic, partition, messages = messages)
resp, = self.client.send_produce_request([produce])
self.assertEqual(resp.error, 0)

Expand Down
5 changes: 3 additions & 2 deletions test/test_failover_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from kafka.common import TopicAndPartition, FailedPayloadsError, ConnectionError
from kafka.producer.base import Producer
from kafka.producer import KeyedProducer
from kafka.util import kafka_bytestring

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import (
Expand Down Expand Up @@ -147,7 +148,7 @@ def test_switch_leader_keyed_producer(self):
key = random_string(3)
msg = random_string(10)
producer.send_messages(topic, key, msg)
if producer.partitioners[topic].partition(key) == 0:
if producer.partitioners[kafka_bytestring(topic)].partition(key) == 0:
recovered = True
except (FailedPayloadsError, ConnectionError):
logging.debug("caught exception sending message -- will retry")
Expand All @@ -172,7 +173,7 @@ def _send_random_messages(self, producer, topic, partition, n):
logging.debug('_send_random_message to %s:%d -- try %d success', topic, partition, j)

def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
leader = self.client.topics_to_brokers[TopicAndPartition(kafka_bytestring(topic), partition)]
broker = self.brokers[leader.nodeId]
broker.close()
return broker
Expand Down
4 changes: 2 additions & 2 deletions test/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ def test_acks_cluster_commit(self):

def assert_produce_request(self, messages, initial_offset, message_ct,
partition=0):
produce = ProduceRequest(self.topic, partition, messages=messages)
produce = ProduceRequest(self.bytes_topic, partition, messages=messages)

# There should only be one response message from the server.
# This will throw an exception if there's more than one.
Expand All @@ -471,7 +471,7 @@ def assert_fetch_offset(self, partition, start_offset, expected_messages):
# There should only be one response message from the server.
# This will throw an exception if there's more than one.

resp, = self.client.send_fetch_request([ FetchRequest(self.topic, partition, start_offset, 1024) ])
resp, = self.client.send_fetch_request([ FetchRequest(self.bytes_topic, partition, start_offset, 1024) ])

self.assertEqual(resp.error, 0)
self.assertEqual(resp.partition, partition)
Expand Down
8 changes: 6 additions & 2 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from kafka import KafkaClient
from kafka.common import OffsetRequest
from kafka.util import kafka_bytestring

__all__ = [
'random_string',
Expand Down Expand Up @@ -50,6 +51,7 @@ def get_open_port():
class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
bytes_topic = None
server = None

def setUp(self):
Expand All @@ -59,7 +61,8 @@ def setUp(self):

if not self.topic:
topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10).decode('utf-8'))
self.topic = topic.encode('utf-8')
self.topic = topic
self.bytes_topic = topic.encode('utf-8')

if self.create_client:
self.client = KafkaClient('%s:%d' % (self.server.host, self.server.port))
Expand All @@ -77,7 +80,8 @@ def tearDown(self):
self.client.close()

def current_offset(self, topic, partition):
offsets, = self.client.send_offset_request([ OffsetRequest(topic, partition, -1, 1) ])
offsets, = self.client.send_offset_request([ OffsetRequest(kafka_bytestring(topic),
partition, -1, 1) ])
return offsets.offsets[0]

def msgs(self, iterable):
Expand Down