Skip to content

Commit

Permalink
Merge branch 'master' into conn_refactor
Browse files Browse the repository at this point in the history
Conflicts:
	example.py
  • Loading branch information
Mark Roberts committed Feb 27, 2014
2 parents 3b39d9d + ab89a44 commit 112158f
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 32 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ from kafka.client import KafkaClient
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer, KeyedProducer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# To send messages synchronously
producer = SimpleProducer(kafka)
Expand Down Expand Up @@ -80,7 +80,7 @@ from kafka.client import KafkaClient
from kafka.producer import KeyedProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# HashedPartitioner is default
producer = KeyedProducer(kafka)
Expand All @@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
from kafka.client import KafkaClient
from kafka.consumer import MultiProcessConsumer

kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")

# This will split the number of partitions among two processes
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
Expand All @@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):

```python
from kafka.client import KafkaClient
kafka = KafkaClient("localhost", 9092)
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
messages=[KafkaProdocol.encode_message("some message")])
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)
Expand Down
4 changes: 2 additions & 2 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class Producer(threading.Thread):
daemon = True

def run(self):
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
producer = SimpleProducer(client)

while True:
Expand All @@ -23,7 +23,7 @@ class Consumer(threading.Thread):
daemon = True

def run(self):
client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")
consumer = SimpleConsumer(client, "test-group", "my-topic")

for message in consumer:
Expand Down
2 changes: 1 addition & 1 deletion kafka/NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ There are a few levels of abstraction:

# Possible API

client = KafkaClient("localhost", 9092)
client = KafkaClient("localhost:9092")

producer = KafkaProducer(client, "topic")
producer.send_string("hello")
Expand Down
25 changes: 18 additions & 7 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
BrokerResponseError, PartitionUnavailableError,
KafkaUnavailableError, KafkaRequestError)

from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
from kafka.protocol import KafkaProtocol

log = logging.getLogger("kafka")
Expand All @@ -24,14 +24,15 @@ class KafkaClient(object):
# NOTE: The timeout given to the client should always be greater than the
# one passed to SimpleConsumer.get_message(), otherwise you can get a
# socket timeout.
def __init__(self, host, port, client_id=CLIENT_ID,
def __init__(self, hosts, client_id=CLIENT_ID,
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
# We need one connection to bootstrap
self.client_id = client_id
self.timeout = timeout
self.conns = { # (host, port) -> KafkaConnection
(host, port): KafkaConnection(host, port, timeout=timeout)
}
self.hosts = collect_hosts(hosts)

# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
self.topics_to_brokers = {} # topic_id -> broker_id
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
Expand All @@ -41,6 +42,15 @@ def __init__(self, host, port, client_id=CLIENT_ID,
# Private API #
##################

def _get_conn(self, host, port):
"Get or create a connection to a broker using host and port"

host_key = (host, port)
if host_key not in self.conns:
self.conns[host_key] = KafkaConnection(host, port)

return self.conns[host_key]

def _get_conn_for_broker(self, broker):
"""
Get or create a connection to a broker
Expand All @@ -49,7 +59,7 @@ def _get_conn_for_broker(self, broker):
self.conns[(broker.host, broker.port)] = \
KafkaConnection(broker.host, broker.port, timeout=self.timeout)

return self.conns[(broker.host, broker.port)]
return self._get_conn(broker.host, broker.port)

def _get_leader_for_partition(self, topic, partition):
key = TopicAndPartition(topic, partition)
Expand All @@ -72,7 +82,8 @@ def _send_broker_unaware_request(self, requestId, request):
Attempt to send a broker-agnostic request to one of the available
brokers. Keep trying until you succeed.
"""
for conn in self.conns.values():
for (host, port) in self.hosts:
conn = self._get_conn(host, port)
try:
conn.send(requestId, request)
response = conn.recv(requestId)
Expand Down
28 changes: 27 additions & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,39 @@
import logging
import socket
import struct
from random import shuffle
from threading import local

from kafka.common import ConnectionError

log = logging.getLogger("kafka")

DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
DEFAULT_KAFKA_PORT = 9092


def collect_hosts(hosts, randomize=True):
"""
Collects a comma-separated set of hosts (host:port) and optionnaly
randomize the returned list.
"""

if isinstance(hosts, str):
hosts = hosts.strip().split(',')

result = []
for host_port in hosts:

res = host_port.split(':')
host = res[0]
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
result.append((host.strip(), port))

if randomize:
shuffle(result)

return result


class KafkaConnection(local):
"""
Expand Down Expand Up @@ -84,7 +110,7 @@ def send(self, request_id, payload):
sent = self._sock.sendall(payload)
if sent is not None:
self._raise_connection_error()
except socket.error, e:
except socket.error:
log.exception('Unable to send payload to Kafka')
self._raise_connection_error()

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import os.path
import sys

from setuptools import setup, Command


class Tox(Command):

user_options = []

def initialize_options(self):
Expand Down
34 changes: 19 additions & 15 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):

class KafkaTestCase(unittest.TestCase):
def setUp(self):
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
ensure_topic_creation(self.client, self.topic)


Expand All @@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server.host, cls.server.port)
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -578,7 +578,7 @@ def setUpClass(cls):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
cls.client = KafkaClient(cls.server2.host, cls.server2.port)
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))

@classmethod
def tearDownClass(cls): # noqa
Expand Down Expand Up @@ -826,23 +826,26 @@ def test_large_messages(self):

class TestFailover(KafkaTestCase):

def setUp(self):
@classmethod
def setUpClass(cls): # noqa
zk_chroot = random_string(10)
replicas = 2
partitions = 2

# mini zookeeper, 2 kafka brokers
self.zk = ZookeeperFixture.instance()
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
super(TestFailover, self).setUp()

def tearDown(self):
self.client.close()
for broker in self.brokers:
cls.zk = ZookeeperFixture.instance()
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]

hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
cls.client = KafkaClient(hosts)

@classmethod
def tearDownClass(cls):
cls.client.close()
for broker in cls.brokers:
broker.close()
self.zk.close()
cls.zk.close()

def test_switch_leader(self):
key, topic, partition = random_string(5), self.topic, 0
Expand Down Expand Up @@ -918,7 +921,8 @@ def _kill_leader(self, topic, partition):
return broker

def _count_messages(self, group, topic):
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
all_messages = []
for message in consumer:
Expand Down
84 changes: 83 additions & 1 deletion test/test_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,16 @@
import struct
import unittest

from mock import MagicMock, patch


from kafka import KafkaClient
from kafka.common import (
ProduceRequest, FetchRequest, Message, ChecksumError,
ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse,
OffsetAndMessage, BrokerMetadata, PartitionMetadata
)
from kafka.common import KafkaUnavailableError
from kafka.codec import (
has_gzip, has_snappy, gzip_encode, gzip_decode,
snappy_encode, snappy_decode
Expand Down Expand Up @@ -405,7 +410,6 @@ def test_encode_offset_request(self):
def test_decode_offset_response(self):
pass


@unittest.skip("Not Implemented")
def test_encode_offset_commit_request(self):
pass
Expand All @@ -423,5 +427,83 @@ def test_decode_offset_fetch_response(self):
pass


class TestKafkaClient(unittest.TestCase):

def test_init_with_list(self):

with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092'])

self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)

def test_init_with_csv(self):

with patch.object(KafkaClient, 'load_metadata_for_topics'):
client = KafkaClient(
hosts='kafka01:9092,kafka02:9092,kafka03:9092')

self.assertItemsEqual(
[('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)],
client.hosts)

def test_send_broker_unaware_request_fail(self):
'Tests that call fails when all hosts are unavailable'

mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)")

def mock_get_conn(host, port):
return mocked_conns[(host, port)]

# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):

client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092'])

self.assertRaises(
KafkaUnavailableError,
client._send_broker_unaware_request,
1, 'fake request')

for key, conn in mocked_conns.iteritems():
conn.send.assert_called_with(1, 'fake request')

def test_send_broker_unaware_request(self):
'Tests that call works when at least one of the host is available'

mocked_conns = {
('kafka01', 9092): MagicMock(),
('kafka02', 9092): MagicMock(),
('kafka03', 9092): MagicMock()
}
# inject KafkaConnection side effects
mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)")
mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response'
mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)")

def mock_get_conn(host, port):
return mocked_conns[(host, port)]

# patch to avoid making requests before we want it
with patch.object(KafkaClient, 'load_metadata_for_topics'), \
patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn):

client = KafkaClient(hosts='kafka01:9092,kafka02:9092')

resp = client._send_broker_unaware_request(1, 'fake request')

self.assertEqual('valid response', resp)
mocked_conns[('kafka02', 9092)].recv.assert_called_with(1)


if __name__ == '__main__':
unittest.main()

0 comments on commit 112158f

Please sign in to comment.