Skip to content

Commit

Permalink
Rename createConsumer to consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Feb 15, 2024
1 parent a12517e commit 8dce52a
Show file tree
Hide file tree
Showing 24 changed files with 57 additions and 54 deletions.
2 changes: 1 addition & 1 deletion docs/advanced-usage/2-graceful-shutdown.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ If your app requires that you run sum sort of processing when the consumers stop
```php
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer(['topic'])
$consumer = Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->withHandler(new Handler)
->build()
Expand Down
4 changes: 2 additions & 2 deletions docs/advanced-usage/3-sasl-authentication.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ It's also a secure way to enable your clients to endorse an identity. To provide
passing a `Junges\Kafka\Config\Sasl` instance as the argument:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withSasl(
password: 'password',
username: 'username',
Expand All @@ -19,7 +19,7 @@ $consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
You can also set the security protocol used with sasl. It's optional and by default `SASL_PLAINTEXT` is used, but you can set it to `SASL_SSL`:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withSasl(
password: 'password',
username: 'username',
Expand Down
2 changes: 1 addition & 1 deletion docs/advanced-usage/4-custom-committers.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class MyCommitterFactory implements CommitterFactory
}
}

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->usingCommitterFactory(new MyCommitterFactory())
->build();
```
Expand Down
2 changes: 1 addition & 1 deletion docs/advanced-usage/5-middlewares.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ weight: 5
Middlewares provides a convenient way to filter and inspecting your Kafka messages. To write a middleware in this package, you can use the `withMiddleware` method. The middleware is a callable in which the first argument is the message itself and the second one is the next handler. The middlewares get executed in the order they are defined:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withMiddleware(function($message, callable $next) {
// Perform some work here
return $next($message);
Expand Down
2 changes: 1 addition & 1 deletion docs/advanced-usage/6-stop-consumer-after-last-message.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ You can do it by adding a call to `stopAfterLastMessage` method when creating yo
This is particularly useful when using signal handlers.

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer(['topic'])
$consumer = \Junges\Kafka\Facades\Kafka::consumer(['topic'])
->withConsumerGroupId('group')
->stopAfterLastMessage()
->withHandler(new Handler)
Expand Down
2 changes: 1 addition & 1 deletion docs/advanced-usage/8-before-callbacks.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ You can call pre-defined callbacks **Before** and **After** consuming messages.
The callbacks get executed in the order they are defined, and they receive a `\Junges\Kafka\Contracts\MessageConsumer` as argument:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->beforeConsuming(function(\Junges\Kafka\Contracts\MessageConsumer $consumer) {
while (app()->isDownForMaintenance()) {
sleep(1);
Expand Down
4 changes: 2 additions & 2 deletions docs/consuming-messages/1-creating-consumer.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ To create a consumer using this package you can use the `createConsumer` method,
```php
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer();
$consumer = Kafka::consumer();
```

This method also allows you to specify the `topics` it should consume, the `broker` and the consumer `group id`:

```php
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer(['topic-1', 'topic-2'], 'group-id', 'broker');
$consumer = Kafka::consumer(['topic-1', 'topic-2'], 'group-id', 'broker');
```

This method returns a `Junges\Kafka\Consumers\ConsumerBuilder::class` instance, and you can use it to configure your consumer.
2 changes: 1 addition & 1 deletion docs/consuming-messages/10-class-structure.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class MyTopicConsumer extends Command

public function handle()
{
$consumer = Kafka::createConsumer(['my-topic'])
$consumer = Kafka::consumer(['my-topic'])
->withBrokers('localhost:8092')
->withAutoCommit()
->withHandler(function(ConsumerMessage $message) {
Expand Down
6 changes: 3 additions & 3 deletions docs/consuming-messages/2-subscribing-to-kafka-topics.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ With a consumer created, you can subscribe to a kafka topic using the `subscribe
```php
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer()->subscribe('topic');
$consumer = Kafka::consumer()->subscribe('topic');
```

Of course, you can subscribe to more than one topic at once, either using an array of topics or specifying one by one:

```php
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer()->subscribe('topic-1', 'topic-2', 'topic-n');
$consumer = Kafka::consumer()->subscribe('topic-1', 'topic-2', 'topic-n');

// Or, using array:
$consumer = Kafka::createConsumer()->subscribe([
$consumer = Kafka::consumer()->subscribe([
'topic-1',
'topic-2',
'topic-n'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The consumer will see the new topics on its next periodic metadata refresh which
To subscribe to topics using regex, you can simply pass the regex you want to use to the `subscribe` method:

```php
\Junges\Kafka\Facades\Kafka::createConsumer()
\Junges\Kafka\Facades\Kafka::consumer()
->subscribe('^myPfx_.*')
->withHandler(...)
```
Expand Down
5 changes: 2 additions & 3 deletions docs/consuming-messages/3-assigning-partitions.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ Kafka clients allows you to implement your own partition assignment strategies f
If you have a topic with multiple consumers and want to assign a consumer to a specific partition topic, you can
use the `assignPartitions` method, available on the `ConsumerBuilder` instance:


```php
$partition = 1; // The partition number you want to assign

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', $partition)
]);
Expand All @@ -22,7 +21,7 @@ The `assignPartitions` method accepts an array of `\RdKafka\TopicPartition` obje
by adding more entries to the `assignPartitions` parameter:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->assignPartitions([
new \RdKafka\TopicPartition('your-topic-name', 1)
new \RdKafka\TopicPartition('your-topic-name', 2)
Expand Down
2 changes: 1 addition & 1 deletion docs/consuming-messages/4-consumer-groups.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ To attach your consumer to a consumer group, you can use the method `withConsume
```php
use Junges\Kafka\Facades\Kafka;

$consumer = Kafka::createConsumer()->withConsumerGroupId('foo');
$consumer = Kafka::consumer()->withConsumerGroupId('foo');
```
4 changes: 2 additions & 2 deletions docs/consuming-messages/5-message-handlers.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Now that you have created your kafka consumer, you must create a handler for the
You can use an invokable class or a simple callback. Use the `withHandler` method to specify your handler:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer();
$consumer = \Junges\Kafka\Facades\Kafka::consumer();

// Using callback:
$consumer->withHandler(function(\Junges\Kafka\Contracts\ConsumerMessage $message) {
Expand All @@ -25,7 +25,7 @@ class Handler
}
}

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withHandler(new Handler)
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withHandler(new Handler)
```

The `KafkaConsumerMessage` contract gives you some handy methods to get the message properties:
Expand Down
14 changes: 7 additions & 7 deletions docs/consuming-messages/6-configuring-consumer-options.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ To create a `dlq` in this package, you can use the `withDlq` method. If you don'
adding the `-dlq` suffix to the topic name.

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->subscribe('topic')->withDlq();
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq();

//Or, specifying the dlq topic name:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->subscribe('topic')->withDlq('your-dlq-topic-name')
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->subscribe('topic')->withDlq('your-dlq-topic-name')
```

When your message is sent to the dead letter queue, we will add three header keys to containing information about what happened to that message:
Expand All @@ -30,35 +30,35 @@ The auto-commit check is called in every poll, and it checks that the time elaps
use the `withAutoCommit` method:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withAutoCommit();
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withAutoCommit();
```

### Configuring max messages to be consumed
If you want to consume a limited amount of messages, you can use the `withMaxMessages` method to set the max number of messages to be consumed by a
kafka consumer:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxMessages(2);
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxMessages(2);
```

### Configuring the max time when a consumer can process messages
If you want to consume a limited amount of time, you can use the `withMaxTime` method to set the max number of seconds for
kafka consumer to process messages:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->withMaxTime(3600);
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->withMaxTime(3600);
```

### Setting Kafka configuration options
To set configuration options, you can use two methods: `withOptions`, passing an array of option and option value or, using the `withOption method and
passing two arguments, the option name and the option value.

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withOptions([
'option-name' => 'option-value'
]);
// Or:
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->withOption('option-name', 'option-value');
```
5 changes: 3 additions & 2 deletions docs/consuming-messages/7-custom-deserializers.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ This interface force you to declare the `deserialize` method.
To set the deserializer you want to use, use the `usingDeserializer` method:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->usingDeserializer(new MyCustomDeserializer());
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->usingDeserializer(new MyCustomDeserializer());
```

> NOTE: The deserializer class must use the same algorithm as the serializer used to produce this message.

### Using AVRO deserializer
To use the AVRO deserializer on your consumer, add the Avro deserializer:

```php
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
Expand Down Expand Up @@ -53,5 +54,5 @@ $registry->addKeySchemaMappingForTopic(
// per default both key and body will get decoded
$deserializer = new \Junges\Kafka\Message\Deserializers\AvroDeserializer($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */);

$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->usingDeserializer($deserializer);
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->usingDeserializer($deserializer);
```
2 changes: 1 addition & 1 deletion docs/consuming-messages/8-consuming-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ weight: 8
After building the consumer, you must call the `consume` method to consume the messages:

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()->build();
$consumer = \Junges\Kafka\Facades\Kafka::consumer()->build();
```

### Consuming the kafka messages
Expand Down
2 changes: 1 addition & 1 deletion docs/consuming-messages/9-handling-message-batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The example below shows that batch is going to be handled if batch size is great
Batching feature could be helpful when you work with databases like ClickHouse, where you insert data in large batches.

```php
$consumer = \Junges\Kafka\Facades\Kafka::createConsumer()
$consumer = \Junges\Kafka\Facades\Kafka::consumer()
->enableBatching()
->withBatchSizeLimit(1000)
->withBatchReleaseInterval(1500)
Expand Down
4 changes: 3 additions & 1 deletion src/Contracts/KafkaManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,7 @@ interface KafkaManager
public function publish(?string $broker = null): MessageProducer;

/** Return a ConsumerBuilder instance. */
public function createConsumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder;
public function consumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder;

public function shouldFake(): self;
}
2 changes: 1 addition & 1 deletion src/Facades/Kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

/**
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
* @method static \Junges\Kafka\Consumers\Builder createConsumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedTimes(int $times = 1, ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedOn(string $topic, ProducerMessage $expectedMessage = null, callable $callback = null)
Expand Down
4 changes: 2 additions & 2 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public function publish(string $broker = null): MessageProducer
}

/** Return a ConsumerBuilder instance. */
public function createConsumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder
public function consumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder
{
if ($this->shouldFake) {
return Kafka::fake()->createConsumer(
return Kafka::fake()->consumer(
$topics, $groupId, $brokers
);
}
Expand Down
15 changes: 8 additions & 7 deletions src/Support/Testing/Fakes/KafkaFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class KafkaFake
private KafkaManager $kafkaManager;

private array $publishedMessages = [];

/** @var \Junges\Kafka\Contracts\ConsumerMessage[] */
private array $messagesToConsume = [];

public function __construct(KafkaManager $kafka)
public function __construct(?KafkaManager $manager)
{
$this->kafkaManager = $kafka;
$this->kafkaManager = $manager?->shouldFake();
$this->makeProducerBuilderFake();
}

Expand All @@ -34,7 +35,7 @@ public function publish(?string $broker = null): ProducerBuilderFake
}

/** Return a ConsumerBuilder instance. */
public function createConsumer(array $topics = [], string $groupId = null, string $brokers = null): BuilderFake
public function consumer(array $topics = [], string $groupId = null, string $brokers = null): BuilderFake
{
return BuilderFake::create(
brokers: $brokers ?? config('kafka.brokers'),
Expand Down Expand Up @@ -64,7 +65,7 @@ private function addConsumerMessage(ConsumerMessage $message): void
}

/** Assert if a messages was published based on a truth-test callback. */
public function assertPublished(?ProducerMessage $expectedMessage = null, ?callable $callback = null)
public function assertPublished(?ProducerMessage $expectedMessage = null, ?callable $callback = null): void
{
PHPUnit::assertTrue(
condition: $this->published($callback, $expectedMessage)->count() > 0,
Expand All @@ -73,7 +74,7 @@ public function assertPublished(?ProducerMessage $expectedMessage = null, ?calla
}

/** Assert if a messages was published based on a truth-test callback. */
public function assertPublishedTimes(int $times = 1, ?ProducerMessage $expectedMessage = null, ?callable $callback = null)
public function assertPublishedTimes(int $times = 1, ?ProducerMessage $expectedMessage = null, ?callable $callback = null): void
{
$count = $this->published($callback, $expectedMessage)->count();

Expand All @@ -84,7 +85,7 @@ public function assertPublishedTimes(int $times = 1, ?ProducerMessage $expectedM
}

/** Assert that a message was published on a specific topic. */
public function assertPublishedOn(string $topic, ?ProducerMessage $expectedMessage = null, ?callable $callback = null)
public function assertPublishedOn(string $topic, ?ProducerMessage $expectedMessage = null, ?callable $callback = null): void
{
PHPUnit::assertTrue(
condition: $this->published($callback, $expectedMessage, $topic)->count() > 0,
Expand All @@ -93,7 +94,7 @@ public function assertPublishedOn(string $topic, ?ProducerMessage $expectedMessa
}

/** Assert that a message was published on a specific topic. */
public function assertPublishedOnTimes(string $topic, int $times = 1, ?ProducerMessage $expectedMessage = null, ?callable $callback = null)
public function assertPublishedOnTimes(string $topic, int $times = 1, ?ProducerMessage $expectedMessage = null, ?callable $callback = null): void
{
$count = $this->published($callback, $expectedMessage, $topic)->count();

Expand Down
6 changes: 3 additions & 3 deletions tests/Consumers/ConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public function testItCanConsumeMessages(): void

$this->mockProducer();

$consumer = Kafka::createConsumer(['test'])
$consumer = Kafka::consumer(['test'])
->withHandler($fakeConsumer = new FakeConsumer())
->withAutoCommit()
->withMaxMessages(1)
Expand All @@ -110,7 +110,7 @@ public function testItCanConsumeMessagesWithQueueableHandlers(): void

$this->mockProducer();

$consumer = Kafka::createConsumer(['test'])
$consumer = Kafka::consumer(['test'])
->withHandler($fakeConsumer = new SimpleQueueableHandler())
->withAutoCommit()
->withMaxMessages(1)
Expand Down Expand Up @@ -176,7 +176,7 @@ public function testCanStopConsume(): void

$this->mockProducer();

$this->stoppableConsumer = Kafka::createConsumer(['test'])
$this->stoppableConsumer = Kafka::consumer(['test'])
->withHandler(function (ConsumerMessage $message) {
if ($message->getKey() === 'key2' && $this->stoppableConsumer) {
$this->stoppableConsumer->onStopConsuming(function () {
Expand Down
Loading

0 comments on commit 8dce52a

Please sign in to comment.