Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed May 14, 2024
1 parent fac949d commit 06ba052
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
title: Sending multiple messages with the same producer
weight: 9
---

Sometimes you may want to send multiple messages without having to create the consumer


```php
// In a service provider:

\Junges\Kafka\Facades\Kafka::macro('myProducer', function () {
return $this->publish('broker')
->onTopic('my-awesome-topic')
->withConfigOption('key', 'value');
});
```

Now, you can call `\Junges\Kafka\Facades\Kafka::myProducer()`, which will always apply the configs you defined in your service provider.
5 changes: 4 additions & 1 deletion docs/producing-messages/5-publishing-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ $producer = Kafka::publish('broker')
->withHeaders(['header-key' => 'header-value']);

$producer->send();
```
```

If you want to send multiple messages, consider using the batch producer. The default `send` method is recommended for low-throughput systems only, as it
flushes the producer after every message that is sent.
3 changes: 3 additions & 0 deletions docs/producing-messages/6-producing-message-batch-to-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ Then create as many messages as you want and push them to the `MesageBatch` inst
Finally, create your producer and call the `sendBatch`, passing the `MessageBatch` instance as a parameter.
This is helpful when you persist messages in storage before publishing (e.g. TransactionalOutbox Pattern).

By using message batch, you can send multiple messages using the same producer instance, which is way faster than the default `send` method, which flushes the producer after each produced message.
Messages are queued for asynchronous sending, and there is no guarantee that it will be sent immediately. The `sendBatch` is recommended for a system with high throughput.

```php
use Junges\Kafka\Facades\Kafka;
use Junges\Kafka\Producers\MessageBatch;
Expand Down
2 changes: 0 additions & 2 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,6 @@ public function getProducerOptions(): array
'compression.codec' => config('kafka.compression', 'snappy'),
'bootstrap.servers' => $this->broker,
'metadata.broker.list' => $this->broker,
'linger.ms' => 0,
'queue.buffering.max.ms' => 0
];

return collect(array_merge($config, $this->customOptions, $this->getSaslOptions()))
Expand Down
2 changes: 0 additions & 2 deletions src/Contracts/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

namespace Junges\Kafka\Contracts;

use Junges\Kafka\Config\Sasl;
use Junges\Kafka\Message\Message;
use Junges\Kafka\Producers\MessageBatch;
use Junges\Kafka\Producers\Producer;
use Junges\Kafka\Support\Testing\Fakes\ProducerFake;
Expand Down
5 changes: 4 additions & 1 deletion src/Contracts/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public function produce(ProducerMessage $message, bool $shouldFlush = false): bo
* @throws CouldNotPublishMessage
* @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch
*/
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int;
public function produceBatch(MessageBatch $messageBatch): int;

/**
* @throws \Junges\Kafka\Exceptions\Transactions\TransactionShouldBeRetriedException
Expand All @@ -41,4 +41,7 @@ public function abortTransaction(int $timeoutInMilliseconds = 1000): void;
* @throws \Junges\Kafka\Exceptions\Transactions\TransactionShouldBeAbortedException
*/
public function commitTransaction(int $timeoutInMilliseconds = 1000): void;

/** Used to properly shut down the producer. */
public function flush(bool $shouldFlush = true): mixed;
}
2 changes: 1 addition & 1 deletion src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public function withDebugDisabled(): self
*
* @throws \Exception
*/
public function send(bool $shouldFlush = false): bool
public function send(bool $shouldFlush = true): bool
{
$producer = $this->build();

Expand Down
17 changes: 13 additions & 4 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class Producer implements ProducerContract
private readonly KafkaProducer $producer;
private readonly Dispatcher $dispatcher;

private string $topic;

public bool $transactionInitialized = false;

public function __construct(
Expand All @@ -42,6 +44,13 @@ public function __construct(
$this->dispatcher = App::make(Dispatcher::class);
}

public function setTopic(string $topic): self
{
$this->topic = $topic;

return $this;
}

/** Set the Kafka Configuration. */
private function setConf(array $options): Conf
{
Expand All @@ -63,7 +72,7 @@ public function produce(ProducerMessage $message, bool $shouldFlush = false): bo
{
$this->dispatcher->dispatch(new PublishingMessage($message));

$topic = $this->producer->newTopic($message->getTopicName());
$topic = $this->producer->newTopic($message->getTopicName() ?? $this->topic);

$message = clone $message;

Expand All @@ -77,7 +86,7 @@ public function produce(ProducerMessage $message, bool $shouldFlush = false): bo
}

/** @inheritDoc */
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
public function produceBatch(MessageBatch $messageBatch): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
Expand Down Expand Up @@ -105,7 +114,7 @@ public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = fal
$produced++;
}

$this->flush($shouldFlush);
$this->flush();

$this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));

Expand Down Expand Up @@ -142,7 +151,7 @@ private function produceMessageBatch(ProducerTopic $topic, ProducerMessage $mess
* @throws CouldNotPublishMessage
* @throws \Exception
*/
private function flush(bool $shouldFlush = true): mixed
public function flush(bool $shouldFlush = true): mixed
{
// Here we define the flush callback that is called when flush is requested by
// the developer or when the lottery wins. Flush is not needed in after all
Expand Down
5 changes: 5 additions & 0 deletions src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,9 @@ public function build(): ProducerFake

return $this->makeProducer($conf);
}

public function getProducer(): ProducerFake
{
return $this->build();
}
}
2 changes: 1 addition & 1 deletion src/Support/Testing/Fakes/ProducerFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function produce(ProducerMessage $message, bool $shouldFlush = false): bo
}

/** @throws \Junges\Kafka\Exceptions\CouldNotPublishMessageBatch */
public function produceBatch(MessageBatch $messageBatch, bool $shouldFlush = false): int
public function produceBatch(MessageBatch $messageBatch): int
{
if ($messageBatch->getTopicName() === '') {
throw CouldNotPublishMessageBatch::invalidTopicName($messageBatch->getTopicName());
Expand Down

0 comments on commit 06ba052

Please sign in to comment.