Skip to content

Commit

Permalink
Rename consumer builder
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Feb 14, 2024
1 parent f2905e8 commit 19a7e30
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
use Junges\Kafka\Support\Timer;
use RdKafka\TopicPartition;

class ConsumerBuilder implements ConsumerBuilderContract
class Builder implements ConsumerBuilderContract
{
use InteractsWithConfigCallbacks;
use Conditionable;
Expand Down Expand Up @@ -83,7 +83,7 @@ protected function __construct(protected string $brokers, array $topics = [], pr
/** @inheritDoc */
public static function create(string $brokers, array $topics = [], string $groupId = null): self
{
return new ConsumerBuilder(
return new Builder(
brokers: $brokers,
topics: $topics,
groupId: $groupId
Expand Down
6 changes: 3 additions & 3 deletions src/Contracts/ConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ public function withMaxMessages(int $maxMessages): self;
* Define the max number seconds that a consumer should run
*
* @param int $maxTime
* @return \Junges\Kafka\Consumers\ConsumerBuilder
* @return \Junges\Kafka\Consumers\Builder
*/
public function withMaxTime(int $maxTime): self;

/**
* Specify the max retries attempts.
*
* @param int $maxCommitRetries
* @return \Junges\Kafka\Consumers\ConsumerBuilder
* @return \Junges\Kafka\Consumers\Builder
*/
public function withMaxCommitRetries(int $maxCommitRetries): self;

Expand All @@ -71,7 +71,7 @@ public function withSasl(string $username, string $password, string $mechanisms,
* the message itself and the second is the next handler
*
* @param callable(mixed, callable): void $middleware
* @return \Junges\Kafka\Consumers\ConsumerBuilder
* @return \Junges\Kafka\Consumers\Builder
*/
public function withMiddleware(callable $middleware): self;

Expand Down
2 changes: 1 addition & 1 deletion src/Facades/Kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
* @method static \Junges\Kafka\Consumers\ConsumerBuilder createConsumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static \Junges\Kafka\Consumers\Builder createConsumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedTimes(int $times = 1, ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedOn(string $topic, ProducerMessage $expectedMessage = null, callable $callback = null)
Expand Down
6 changes: 3 additions & 3 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
namespace Junges\Kafka;

use Illuminate\Support\Traits\Macroable;
use Junges\Kafka\Consumers\ConsumerBuilder;
use Junges\Kafka\Consumers\Builder as ConsumerBuilder;
use Junges\Kafka\Contracts\KafkaManager;
use Junges\Kafka\Contracts\MessageProducer;
use Junges\Kafka\Producers\Builder;
use Junges\Kafka\Producers\Builder as ProducerBuilder;

class Factory implements KafkaManager
{
Expand All @@ -15,7 +15,7 @@ class Factory implements KafkaManager
/** Creates a new ProducerBuilder instance, setting brokers and topic. */
public function publish(string $broker = null): MessageProducer
{
return new Builder(
return new ProducerBuilder(
broker: $broker ?? config('kafka.brokers')
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@
use Junges\Kafka\Config\NullBatchConfig;
use Junges\Kafka\Consumers\CallableBatchConsumer;
use Junges\Kafka\Consumers\CallableConsumer;
use Junges\Kafka\Consumers\ConsumerBuilder;
use Junges\Kafka\Consumers\Builder;
use Junges\Kafka\Contracts\ConsumerBuilder as ConsumerBuilderContract;
use Junges\Kafka\Contracts\HandlesBatchConfiguration;
use Junges\Kafka\Contracts\MessageConsumer;
use Junges\Kafka\Support\Timer;

class ConsumerBuilderFake extends ConsumerBuilder implements ConsumerBuilderContract
class BuilderFake extends Builder implements ConsumerBuilderContract
{
/** @var \Junges\Kafka\Contracts\ConsumerMessage[] */
private array $messages = [];

/** @inheritDoc */
public static function create(string $brokers, array $topics = [], string $groupId = null): self
{
return new ConsumerBuilderFake(
return new BuilderFake(
brokers: $brokers,
topics: $topics,
groupId: $groupId
Expand Down
4 changes: 2 additions & 2 deletions src/Support/Testing/Fakes/KafkaFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public function publish(?string $broker = null): ProducerBuilderFake
}

/** Return a ConsumerBuilder instance. */
public function createConsumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilderFake
public function createConsumer(array $topics = [], string $groupId = null, string $brokers = null): BuilderFake
{
return ConsumerBuilderFake::create(
return BuilderFake::create(
brokers: $brokers ?? config('kafka.brokers'),
topics: $topics,
groupId: $groupId ?? config('kafka.consumer_group_id')
Expand Down
56 changes: 28 additions & 28 deletions tests/Consumers/ConsumerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
use Junges\Kafka\Config\Config;
use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Consumers\Consumer;
use Junges\Kafka\Consumers\ConsumerBuilder;
use Junges\Kafka\Consumers\Builder;
use Junges\Kafka\Contracts\Committer;
use Junges\Kafka\Contracts\CommitterFactory;
use Junges\Kafka\Exceptions\ConsumerException;
Expand All @@ -22,14 +22,14 @@ final class ConsumerBuilderTest extends LaravelKafkaTestCase
{
public function testItReturnsAConsumerInstance(): void
{
$consumer = ConsumerBuilder::create('broker')->build();
$consumer = Builder::create('broker')->build();

$this->assertInstanceOf(Consumer::class, $consumer);
}

public function testItCanSubscribeToATopic(): void
{
$consumer = ConsumerBuilder::create('broker');
$consumer = Builder::create('broker');

$consumer->subscribe('foo');

Expand All @@ -40,7 +40,7 @@ public function testItCanSubscribeToATopic(): void

public function testItDoesNotSubscribeToATopicTwice(): void
{
$consumer = ConsumerBuilder::create('broker');
$consumer = Builder::create('broker');

$consumer->subscribe('foo', 'foo');

Expand All @@ -51,7 +51,7 @@ public function testItDoesNotSubscribeToATopicTwice(): void

public function testICanChangeDeserializersOnTheFly(): void
{
$consumer = ConsumerBuilder::create('broker');
$consumer = Builder::create('broker');

$consumer->usingDeserializer(new JsonDeserializer());

Expand All @@ -62,15 +62,15 @@ public function testICanChangeDeserializersOnTheFly(): void

public function testItCanSubscribeToMoreThanOneTopicsAtOnce(): void
{
$consumer = ConsumerBuilder::create('broker');
$consumer = Builder::create('broker');

$consumer->subscribe('foo', 'bar');

$topics = $this->getPropertyWithReflection('topics', $consumer);

$this->assertEquals(['foo', 'bar'], $topics);

$consumer = ConsumerBuilder::create('broker');
$consumer = Builder::create('broker');

$consumer->subscribe(['foo', 'bar']);

Expand All @@ -81,7 +81,7 @@ public function testItCanSubscribeToMoreThanOneTopicsAtOnce(): void

public function testItCanSetConsumerGroupId(): void
{
$consumer = ConsumerBuilder::create('broker')->withConsumerGroupId('foo');
$consumer = Builder::create('broker')->withConsumerGroupId('foo');

$groupId = $this->getPropertyWithReflection('groupId', $consumer);

Expand All @@ -92,12 +92,12 @@ public function testItThrowsInvalidArgumentExceptionIfCreatingWithInvalidTopic()
{
$this->expectException(InvalidArgumentException::class);

ConsumerBuilder::create('broker', [1234], 'group');
Builder::create('broker', [1234], 'group');
}

public function testItCanSaveTheCommitBatchSize(): void
{
$consumer = ConsumerBuilder::create('broker')
$consumer = Builder::create('broker')
->withCommitBatchSize(1);

$commitValue = $this->getPropertyWithReflection('commit', $consumer);
Expand All @@ -107,7 +107,7 @@ public function testItCanSaveTheCommitBatchSize(): void

public function testItUsesTheCorrectHandler(): void
{
$consumer = ConsumerBuilder::create('broker')->withHandler(new FakeConsumer());
$consumer = Builder::create('broker')->withHandler(new FakeConsumer());

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -118,7 +118,7 @@ public function testItUsesTheCorrectHandler(): void

public function testItCanSetMaxMessages(): void
{
$consumer = ConsumerBuilder::create('broker')->withMaxMessages(2);
$consumer = Builder::create('broker')->withMaxMessages(2);

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -129,7 +129,7 @@ public function testItCanSetMaxMessages(): void

public function testItCanSetMaxCommitRetries(): void
{
$consumer = ConsumerBuilder::create('broker')->withMaxCommitRetries(2);
$consumer = Builder::create('broker')->withMaxCommitRetries(2);

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -140,7 +140,7 @@ public function testItCanSetMaxCommitRetries(): void

public function testItCanSetTheDeadLetterQueue(): void
{
$consumer = ConsumerBuilder::create('broker')->subscribe('test')->withDlq('test-topic-dlq');
$consumer = Builder::create('broker')->subscribe('test')->withDlq('test-topic-dlq');

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -151,7 +151,7 @@ public function testItCanSetTheDeadLetterQueue(): void

public function testItUsesDlqSuffixIfDlqIsNull(): void
{
$consumer = ConsumerBuilder::create('broker', ['foo'])->withDlq();
$consumer = Builder::create('broker', ['foo'])->withDlq();

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -162,7 +162,7 @@ public function testItUsesDlqSuffixIfDlqIsNull(): void

public function testItCanSetSasl(): void
{
$consumer = ConsumerBuilder::create('broker')
$consumer = Builder::create('broker')
->withSasl('username', 'password', 'mechanisms');

$expectedSaslConfig = new Sasl('username', 'password', 'mechanisms');
Expand All @@ -176,7 +176,7 @@ public function testItCanSetSasl(): void

public function testItCanAddMiddlewaresToTheHandler(): void
{
$consumer = ConsumerBuilder::create('broker', ['foo'], 'group')
$consumer = Builder::create('broker', ['foo'], 'group')
->withMiddleware(function ($message, callable $next) {
$next($message);
});
Expand All @@ -192,7 +192,7 @@ public function testItCanAddMiddlewaresToTheHandler(): void

public function testItCanAddInvokableClassesAsMiddleware(): void
{
$consumer = ConsumerBuilder::create('broker', ['foo'], 'group')
$consumer = Builder::create('broker', ['foo'], 'group')
->withMiddleware(new TestMiddleware());

$this->assertInstanceOf(Consumer::class, $consumer->build());
Expand All @@ -206,7 +206,7 @@ public function testItCanAddInvokableClassesAsMiddleware(): void

public function testItCanSetSecurityProtocol(): void
{
$consumer = ConsumerBuilder::create('broker', ['foo'], 'group')
$consumer = Builder::create('broker', ['foo'], 'group')
->withSecurityProtocol('security');

$this->assertInstanceOf(Consumer::class, $consumer->build());
Expand All @@ -218,7 +218,7 @@ public function testItCanSetSecurityProtocol(): void

public function testItCanSetSecurityProtocolViaSaslConfig(): void
{
$consumer = ConsumerBuilder::create('broker', ['foo'], 'group')
$consumer = Builder::create('broker', ['foo'], 'group')
->withSasl(
'username',
'password',
Expand All @@ -237,15 +237,15 @@ public function testItCanSetSecurityProtocolViaSaslConfig(): void

public function testItCanSetAutoCommit(): void
{
$consumer = ConsumerBuilder::create('broker')->withAutoCommit();
$consumer = Builder::create('broker')->withAutoCommit();

$this->assertInstanceOf(Consumer::class, $consumer->build());

$autoCommit = $this->getPropertyWithReflection('autoCommit', $consumer);

$this->assertTrue($autoCommit);

$consumer = ConsumerBuilder::create('broker')->withAutoCommit(false);
$consumer = Builder::create('broker')->withAutoCommit(false);

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -256,15 +256,15 @@ public function testItCanSetAutoCommit(): void

public function testItCanSetStopAfterLastMessage(): void
{
$consumer = ConsumerBuilder::create('broker')->stopAfterLastMessage();
$consumer = Builder::create('broker')->stopAfterLastMessage();

$this->assertInstanceOf(Consumer::class, $consumer->build());

$autoCommit = $this->getPropertyWithReflection('stopAfterLastMessage', $consumer);

$this->assertTrue($autoCommit);

$consumer = ConsumerBuilder::create('broker')->stopAfterLastMessage(false);
$consumer = Builder::create('broker')->stopAfterLastMessage(false);

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -275,7 +275,7 @@ public function testItCanSetStopAfterLastMessage(): void

public function testItCanSetConsumerOptions(): void
{
$consumer = ConsumerBuilder::create('broker')
$consumer = Builder::create('broker')
->withOptions([
'auto.offset.reset' => 'latest',
'enable.auto.commit' => 'false',
Expand All @@ -294,7 +294,7 @@ public function testItCanSetConsumerOptions(): void

public function testItCanSpecifyBrokersUsingWithBrokers(): void
{
$consumer = ConsumerBuilder::create('broker')->withBrokers('my-test-broker');
$consumer = Builder::create('broker')->withBrokers('my-test-broker');

$this->assertInstanceOf(Consumer::class, $consumer->build());

Expand All @@ -311,7 +311,7 @@ public function make(KafkaConsumer $kafkaConsumer, Config $config): Committer
return new VoidCommitter();
}
};
$consumer = ConsumerBuilder::create('broker')
$consumer = Builder::create('broker')
->usingCommitterFactory($adhocCommitterFactory)
->build();

Expand All @@ -323,7 +323,7 @@ public function testItCantCreateAConsumerWithDlqWithoutSubscribingToAnyTopics():
{
$this->expectException(ConsumerException::class);

ConsumerBuilder::create('broker')->withDlq();
Builder::create('broker')->withDlq();
}
}

Expand Down
8 changes: 4 additions & 4 deletions tests/KafkaTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Str;
use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Consumers\ConsumerBuilder;
use Junges\Kafka\Consumers\Builder as ConsumerBuilder;
use Junges\Kafka\Contracts\ProducerMessage;
use Junges\Kafka\Events\BatchMessagePublished;
use Junges\Kafka\Events\CouldNotPublishMessage as CouldNotPublishMessageEvent;
Expand All @@ -17,7 +17,7 @@
use Junges\Kafka\Message\Message;
use Junges\Kafka\Message\Serializers\JsonSerializer;
use Junges\Kafka\Producers\MessageBatch;
use Junges\Kafka\Producers\Builder;
use Junges\Kafka\Producers\Builder as ProducerBuilder;
use Mockery as m;
use RdKafka\Producer;
use RdKafka\ProducerTopic;
Expand Down Expand Up @@ -193,7 +193,7 @@ public function testICanDisableDebugUsingWithDebugDisabledMethod(): void
return $mockedProducer;
});

/** @var Builder $producer */
/** @var ProducerBuilder $producer */
$producer = Kafka::publish()
->withConfigOptions([
'metadata.broker.list' => 'broker',
Expand Down Expand Up @@ -343,7 +343,7 @@ public function testMacro(): void

$producer = Kafka::defaultProducer();

$this->assertInstanceOf(Builder::class, $producer);
$this->assertInstanceOf(ProducerBuilder::class, $producer);
$this->assertEquals($sasl, $this->getPropertyWithReflection('saslConfig', $producer));
}
}

0 comments on commit 19a7e30

Please sign in to comment.