From 97f0df4c6af935adf98d034dbb1a6a7008889e47 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 6 Apr 2021 14:38:56 +0200 Subject: [PATCH 1/3] feat(MPM-329): add subscription param --- src/Conf/KafkaConfiguration.php | 2 +- src/Consumer/KafkaConsumerBuilder.php | 4 ---- src/Consumer/KafkaHighLevelConsumer.php | 26 ++++++++++++++++++------- src/Consumer/KafkaLowLevelConsumer.php | 8 ++++++-- 4 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/Conf/KafkaConfiguration.php b/src/Conf/KafkaConfiguration.php index 6f817af..5d74d92 100644 --- a/src/Conf/KafkaConfiguration.php +++ b/src/Conf/KafkaConfiguration.php @@ -41,7 +41,7 @@ class KafkaConfiguration extends RdKafkaConf * @param mixed[] $config * @param string $type */ - public function __construct(array $brokers, array $topicSubscriptions, array $config = [], string $type = '') + public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '') { parent::__construct(); diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 96561ff..4363205 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -292,10 +292,6 @@ public function build(): KafkaConsumerInterface throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_BROKER_EXCEPTION_MESSAGE); } - if ([] === $this->topics) { - throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE); - } - //set additional config $this->config['group.id'] = $this->consumerGroup; diff --git a/src/Consumer/KafkaHighLevelConsumer.php b/src/Consumer/KafkaHighLevelConsumer.php index acb048c..838a3fb 100644 --- a/src/Consumer/KafkaHighLevelConsumer.php +++ b/src/Consumer/KafkaHighLevelConsumer.php @@ -13,6 +13,7 @@ use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; use RdKafka\Exception as RdKafkaException; use RdKafka\Message as RdKafkaMessage; +use RdKafka\TopicPartition; use RdKafka\TopicPartition as RdKafkaTopicPartition; use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer; @@ -41,13 +42,14 @@ public function __construct( * Subscribes to all defined topics, if no partitions were set, subscribes to all partitions. * If partition(s) (and optionally offset(s)) were set, subscribes accordingly * + * @param array $topicSubscriptions * @throws KafkaConsumerSubscriptionException * @return void */ - public function subscribe(): void + public function subscribe(array $topicSubscriptions = []): void { - $subscriptions = $this->getTopicSubscriptions(); - $assignments = $this->getTopicAssignments(); + $subscriptions = $this->getTopicSubscriptions($topicSubscriptions); + $assignments = $this->getTopicAssignments($topicSubscriptions); if ([] !== $subscriptions && [] !== $assignments) { throw new KafkaConsumerSubscriptionException( @@ -239,13 +241,18 @@ private function getOffsetsToCommitForMessages(array $messages): array } /** + * @param array $topicSubscriptions * @return array|string[] */ - private function getTopicSubscriptions(): array + private function getTopicSubscriptions(array $topicSubscriptions = []): array { $subscriptions = []; - foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) { + if ([] === $topicSubscriptions) { + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();; + } + + foreach ($topicSubscriptions as $topicSubscription) { if ( [] !== $topicSubscription->getPartitions() || KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset() @@ -259,13 +266,18 @@ private function getTopicSubscriptions(): array } /** + * @param array $topicSubscriptions * @return array|RdKafkaTopicPartition[] */ - private function getTopicAssignments(): array + private function getTopicAssignments(array $topicSubscriptions = []): array { $assignments = []; - foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) { + if ([] === $topicSubscriptions) { + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();; + } + + foreach ($topicSubscriptions as $topicSubscription) { if ( [] === $topicSubscription->getPartitions() && KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset() diff --git a/src/Consumer/KafkaLowLevelConsumer.php b/src/Consumer/KafkaLowLevelConsumer.php index a4b00a7..0be6e2b 100644 --- a/src/Consumer/KafkaLowLevelConsumer.php +++ b/src/Consumer/KafkaLowLevelConsumer.php @@ -53,17 +53,21 @@ public function __construct( * Subcribes to all defined topics, if no partitions were set, subscribes to all partitions. * If partition(s) (and optionally offset(s)) were set, subscribes accordingly * + * @param array $topicSubscriptions * @return void * @throws KafkaConsumerSubscriptionException */ - public function subscribe(): void + public function subscribe(array $topicSubscriptions = []): void { if (true === $this->isSubscribed()) { return; } try { - $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); + if ([] === $topicSubscriptions) { + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); + } + foreach ($topicSubscriptions as $topicSubscription) { $topicName = $topicSubscription->getTopicName(); $offset = $topicSubscription->getOffset(); From c26edebb4c525fcff59c8bb63e7f14d64293f580 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 6 Apr 2021 14:52:58 +0200 Subject: [PATCH 2/3] add test, remove from low level (legacy) --- src/Consumer/KafkaLowLevelConsumer.php | 8 ++------ tests/Unit/Consumer/KafkaConsumerBuilderTest.php | 12 ------------ .../Unit/Consumer/KafkaHighLevelConsumerTest.php | 16 ++++++++++++++++ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/Consumer/KafkaLowLevelConsumer.php b/src/Consumer/KafkaLowLevelConsumer.php index 0be6e2b..a4b00a7 100644 --- a/src/Consumer/KafkaLowLevelConsumer.php +++ b/src/Consumer/KafkaLowLevelConsumer.php @@ -53,21 +53,17 @@ public function __construct( * Subcribes to all defined topics, if no partitions were set, subscribes to all partitions. * If partition(s) (and optionally offset(s)) were set, subscribes accordingly * - * @param array $topicSubscriptions * @return void * @throws KafkaConsumerSubscriptionException */ - public function subscribe(array $topicSubscriptions = []): void + public function subscribe(): void { if (true === $this->isSubscribed()) { return; } try { - if ([] === $topicSubscriptions) { - $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); - } - + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); foreach ($topicSubscriptions as $topicSubscription) { $topicName = $topicSubscription->getTopicName(); $offset = $topicSubscription->getOffset(); diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 927e260..0207545 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -288,18 +288,6 @@ public function testBuildFailMissingBrokers(): void $this->kafkaConsumerBuilder->build(); } - /** - * @return void - * @throws KafkaConsumerBuilderException - */ - public function testBuildFailMissingTopics(): void - { - self::expectException(KafkaConsumerBuilderException::class); - self::expectExceptionMessage(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE); - - $this->kafkaConsumerBuilder->withAdditionalBroker('localhost')->build(); - } - /** * @return void */ diff --git a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php index 8385439..685a422 100644 --- a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php +++ b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php @@ -47,6 +47,22 @@ public function testSubscribeSuccess(): void $kafkaConsumer->subscribe(); } + /** + * @throws KafkaConsumerSubscriptionException + */ + public function testSubscribeSuccessWithParam(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $kafkaConfigurationMock->expects(self::never())->method('getTopicSubscriptions'); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $rdKafkaConsumerMock->expects(self::once())->method('subscribe')->with(['testTopic3']); + + $kafkaConsumer->subscribe([new TopicSubscription('testTopic3')]); + } + /** * @throws KafkaConsumerSubscriptionException */ From 7cd983e356c74a480faef05309810ba27a1dc148 Mon Sep 17 00:00:00 2001 From: nick Date: Tue, 6 Apr 2021 14:54:26 +0200 Subject: [PATCH 3/3] fix cs --- src/Consumer/KafkaHighLevelConsumer.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Consumer/KafkaHighLevelConsumer.php b/src/Consumer/KafkaHighLevelConsumer.php index 838a3fb..2ec2de0 100644 --- a/src/Consumer/KafkaHighLevelConsumer.php +++ b/src/Consumer/KafkaHighLevelConsumer.php @@ -249,7 +249,7 @@ private function getTopicSubscriptions(array $topicSubscriptions = []): array $subscriptions = []; if ([] === $topicSubscriptions) { - $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();; + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); } foreach ($topicSubscriptions as $topicSubscription) { @@ -274,7 +274,7 @@ private function getTopicAssignments(array $topicSubscriptions = []): array $assignments = []; if ([] === $topicSubscriptions) { - $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();; + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); } foreach ($topicSubscriptions as $topicSubscription) {