Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Conf/KafkaConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class KafkaConfiguration extends RdKafkaConf
* @param mixed[] $config
* @param string $type
*/
public function __construct(array $brokers, array $topicSubscriptions, array $config = [], string $type = '')
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '')
{
parent::__construct();

Expand Down
4 changes: 0 additions & 4 deletions src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ public function build(): KafkaConsumerInterface
throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_BROKER_EXCEPTION_MESSAGE);
}

if ([] === $this->topics) {
throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE);
}

//set additional config
$this->config['group.id'] = $this->consumerGroup;

Expand Down
26 changes: 19 additions & 7 deletions src/Consumer/KafkaHighLevelConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface;
use RdKafka\Exception as RdKafkaException;
use RdKafka\Message as RdKafkaMessage;
use RdKafka\TopicPartition;
use RdKafka\TopicPartition as RdKafkaTopicPartition;
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;

Expand Down Expand Up @@ -41,13 +42,14 @@ public function __construct(
* Subscribes to all defined topics, if no partitions were set, subscribes to all partitions.
* If partition(s) (and optionally offset(s)) were set, subscribes accordingly
*
* @param array<TopicSubscription> $topicSubscriptions
* @throws KafkaConsumerSubscriptionException
* @return void
*/
public function subscribe(): void
public function subscribe(array $topicSubscriptions = []): void
{
$subscriptions = $this->getTopicSubscriptions();
$assignments = $this->getTopicAssignments();
$subscriptions = $this->getTopicSubscriptions($topicSubscriptions);
$assignments = $this->getTopicAssignments($topicSubscriptions);

if ([] !== $subscriptions && [] !== $assignments) {
throw new KafkaConsumerSubscriptionException(
Expand Down Expand Up @@ -239,13 +241,18 @@ private function getOffsetsToCommitForMessages(array $messages): array
}

/**
* @param array<TopicSubscription> $topicSubscriptions
* @return array|string[]
*/
private function getTopicSubscriptions(): array
private function getTopicSubscriptions(array $topicSubscriptions = []): array
{
$subscriptions = [];

foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
if ([] === $topicSubscriptions) {
$topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();
}

foreach ($topicSubscriptions as $topicSubscription) {
if (
[] !== $topicSubscription->getPartitions()
|| KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset()
Expand All @@ -259,13 +266,18 @@ private function getTopicSubscriptions(): array
}

/**
* @param array<TopicSubscription> $topicSubscriptions
* @return array|RdKafkaTopicPartition[]
*/
private function getTopicAssignments(): array
private function getTopicAssignments(array $topicSubscriptions = []): array
{
$assignments = [];

foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
if ([] === $topicSubscriptions) {
$topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();
}

foreach ($topicSubscriptions as $topicSubscription) {
if (
[] === $topicSubscription->getPartitions()
&& KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset()
Expand Down
12 changes: 0 additions & 12 deletions tests/Unit/Consumer/KafkaConsumerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,6 @@ public function testBuildFailMissingBrokers(): void
$this->kafkaConsumerBuilder->build();
}

/**
* @return void
* @throws KafkaConsumerBuilderException
*/
public function testBuildFailMissingTopics(): void
{
self::expectException(KafkaConsumerBuilderException::class);
self::expectExceptionMessage(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE);

$this->kafkaConsumerBuilder->withAdditionalBroker('localhost')->build();
}

/**
* @return void
*/
Expand Down
16 changes: 16 additions & 0 deletions tests/Unit/Consumer/KafkaHighLevelConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ public function testSubscribeSuccess(): void
$kafkaConsumer->subscribe();
}

/**
* @throws KafkaConsumerSubscriptionException
*/
public function testSubscribeSuccessWithParam(): void
{
$rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class);
$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
$kafkaConfigurationMock->expects(self::never())->method('getTopicSubscriptions');
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);
$kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);

$rdKafkaConsumerMock->expects(self::once())->method('subscribe')->with(['testTopic3']);

$kafkaConsumer->subscribe([new TopicSubscription('testTopic3')]);
}

/**
* @throws KafkaConsumerSubscriptionException
*/
Expand Down