Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/Consumer/AbstractKafkaConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,14 @@ protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMes
);
}

/**
* @return array<int, TopicSubscription>
*/
public function getTopicSubscriptions(): array
{
return $this->kafkaConfiguration->getTopicSubscriptions();
}

/**
* @param integer $timeoutMs
* @return null|RdKafkaMessage
Expand Down
14 changes: 12 additions & 2 deletions src/Consumer/KafkaConsumerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

namespace Jobcloud\Kafka\Consumer;

use Jobcloud\Kafka\Consumer\ConsumerInterface;
use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;
use RdKafka\ConsumerTopic as RdKafkaConsumerTopic;
use RdKafka\TopicPartition as RdKafkaTopicPartition;

/**
* @todo v2: subscribe(array $topicSubscriptions = [])
* @method array getTopicSubscriptions()
*/

interface KafkaConsumerInterface
{
/**
Expand Down Expand Up @@ -105,4 +108,11 @@ public function getFirstOffsetForTopicPartition(string $topic, int $partition, i
* @return integer
*/
public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int;

/**
* @todo v2
*
* @return array<int, TopicSubscription>
*/
//public function getTopicSubscriptions(): array;
}
4 changes: 2 additions & 2 deletions src/Consumer/KafkaHighLevelConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function __construct(
*/
public function subscribe(array $topicSubscriptions = []): void
{
$subscriptions = $this->getTopicSubscriptions($topicSubscriptions);
$subscriptions = $this->getTopicSubscriptionNames($topicSubscriptions);
$assignments = $this->getTopicAssignments($topicSubscriptions);

if ([] !== $subscriptions && [] !== $assignments) {
Expand Down Expand Up @@ -244,7 +244,7 @@ private function getOffsetsToCommitForMessages(array $messages): array
* @param array<TopicSubscription> $topicSubscriptions
* @return array|string[]
*/
private function getTopicSubscriptions(array $topicSubscriptions = []): array
private function getTopicSubscriptionNames(array $topicSubscriptions = []): array
{
$subscriptions = [];

Expand Down
24 changes: 24 additions & 0 deletions tests/Unit/Consumer/KafkaHighLevelConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Jobcloud\Kafka\Tests\Unit\Kafka\Consumer;

use Jobcloud\Kafka\Consumer\KafkaHighLevelConsumer;
use Jobcloud\Kafka\Consumer\TopicSubscriptionInterface;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Message\Decoder\DecoderInterface;
use Jobcloud\Kafka\Consumer\TopicSubscription;
Expand Down Expand Up @@ -639,6 +640,29 @@ public function testClose(): void
$kafkaConsumer->close();
}

/**
* @return void
*/
public function testGetTopicSubscriptionsReturnsTopicSubscriptions(): void
{
$rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class);
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);

$topicSubscriptionsMock = [
$this->createMock(TopicSubscriptionInterface::class),
$this->createMock(TopicSubscriptionInterface::class)
];

$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
$kafkaConfigurationMock->expects(self::once())
->method('getTopicSubscriptions')
->willReturn($topicSubscriptionsMock);

$kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);

self::assertSame($topicSubscriptionsMock, $kafkaConsumer->getTopicSubscriptions());
}

/**
* @param int $partitionId
* @return RdKafkaMetadataPartition|MockObject
Expand Down
24 changes: 24 additions & 0 deletions tests/Unit/Consumer/KafkaLowLevelConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Jobcloud\Kafka\Tests\Unit\Kafka\Consumer;

use Jobcloud\Kafka\Consumer\KafkaLowLevelConsumer;
use Jobcloud\Kafka\Consumer\TopicSubscriptionInterface;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;
use Jobcloud\Kafka\Message\Decoder\DecoderInterface;
Expand Down Expand Up @@ -555,6 +556,29 @@ function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int
$this->assertEquals(5, $lowOffset);
}

/**
* @return void
*/
public function testGetTopicSubscriptionsReturnsTopicSubscriptions(): void
{
$rdKafkaConsumerMock = $this->createMock(RdKafkaLowLevelConsumer::class);
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);

$topicSubscriptionsMock = [
$this->createMock(TopicSubscriptionInterface::class),
$this->createMock(TopicSubscriptionInterface::class)
];

$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
$kafkaConfigurationMock->expects(self::once())
->method('getTopicSubscriptions')
->willReturn($topicSubscriptionsMock);

$kafkaConsumer = new KafkaLowLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);

self::assertSame($topicSubscriptionsMock, $kafkaConsumer->getTopicSubscriptions());
}

/**
* @param int $partitionId
* @return RdKafkaMetadataPartition|MockObject
Expand Down