Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional partitions kwarg in MultiProcessConsumer #380

Merged
merged 1 commit into from
Jun 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions kafka/consumer/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class MultiProcessConsumer(Consumer):
topic: the topic to consume

Keyword Arguments:
partitions: An optional list of partitions to consume the data from
auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
before a commit
Expand All @@ -114,16 +115,19 @@ class MultiProcessConsumer(Consumer):
commit method on this class. A manual call to commit will also reset
these triggers
"""
def __init__(self, client, group, topic, auto_commit=True,
def __init__(self, client, group, topic,
partitions=None,
auto_commit=True,
auto_commit_every_n=AUTO_COMMIT_MSG_COUNT,
auto_commit_every_t=AUTO_COMMIT_INTERVAL,
num_procs=1, partitions_per_proc=0,
num_procs=1,
partitions_per_proc=0,
**simple_consumer_options):

# Initiate the base consumer class
super(MultiProcessConsumer, self).__init__(
client, group, topic,
partitions=None,
partitions=partitions,
auto_commit=auto_commit,
auto_commit_every_n=auto_commit_every_n,
auto_commit_every_t=auto_commit_every_t)
Expand Down
13 changes: 11 additions & 2 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

from mock import MagicMock
from mock import MagicMock, patch
from . import unittest

from kafka import SimpleConsumer, KafkaConsumer
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import KafkaConfigurationError

class TestKafkaConsumer(unittest.TestCase):
Expand All @@ -13,3 +13,12 @@ def test_non_integer_partitions(self):
def test_broker_list_required(self):
with self.assertRaises(KafkaConfigurationError):
KafkaConsumer()

class TestMultiProcessConsumer(unittest.TestCase):
def test_partition_list(self):
client = MagicMock()
partitions = (0,)
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member