Skip to content

Commit

Permalink
Introduce Kafka::asyncPublish() that will not flush on each send/batc…
Browse files Browse the repository at this point in the history
…hSend but only once when the application is terminating (#310)

* upd: async production (that has only one flush at the end of the application) [see discussion #309]

* fix: added broker parameter to the facade helper [see discussion #309]

* ref: simplify Builder construct extension [see discussion #309]

---------

Co-authored-by: Alexander (SASh) Alexiev <[email protected]>
  • Loading branch information
sash and Alexander (SASh) Alexiev authored Aug 9, 2024
1 parent 051cdca commit 3031da7
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 4 deletions.
11 changes: 10 additions & 1 deletion docs/producing-messages/1-producing-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@ Kafka::publish('broker')->onTopic('topic-name')
```

This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer.
The following lines describes these methods.
The following lines describes these methods.

If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class
```php
use Junges\Kafka\Facades\Kafka;

Kafka::asyncPublish('broker')->onTopic('topic-name')
```

The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send or batch send. That reduces the overhead when you want to send a lot of messages in your request handlers.
1 change: 1 addition & 0 deletions src/Facades/Kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

/**
* @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null)
* @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null)
* @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null)
* @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null)
* @method static void assertPublishedTimes(int $times = 1, ProducerMessage $expectedMessage = null, callable $callback = null)
Expand Down
13 changes: 13 additions & 0 deletions src/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ public function publish(string $broker = null): MessageProducer
);
}

/** Creates a new ProducerBuilder instance, setting brokers and topic. The producer will be flushed only when the application terminates, and doing SEND does not mean that the message was flushed! */
public function asyncPublish(string $broker = null): MessageProducer
{
if ($this->shouldFake) {
return Kafka::fake()->publish();
}

return new ProducerBuilder(
broker: $broker ?? config('kafka.brokers'),
asyncProducer: true
);
}

/** Return a ConsumerBuilder instance. */
public function consumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder
{
Expand Down
13 changes: 11 additions & 2 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Builder implements MessageProducer
private array $options = [];
private ProducerMessage $message;
private MessageSerializer $serializer;
private Producer $producer;
private ?Producer $producer = null;
private string $topic = '';
private ?Sasl $saslConfig = null;
private readonly string $broker;
Expand All @@ -27,6 +27,7 @@ class Builder implements MessageProducer

public function __construct(
?string $broker = null,
private readonly bool $asyncProducer = false,
) {
/** @var ProducerMessage $message */
$message = app(ProducerMessage::class);
Expand Down Expand Up @@ -191,6 +192,9 @@ public function sendBatch(MessageBatch $messageBatch): int

public function build(): Producer
{
if ($this->asyncProducer && $this->producer){
return $this->producer;
}
$conf = new Config(
broker: $this->broker,
topics: [],
Expand All @@ -200,9 +204,14 @@ public function build(): Producer
callbacks: $this->callbacks,
);

return app(Producer::class, [
$res = app(Producer::class, [
'config' => $conf,
'serializer' => $this->serializer,
'async' => $this->asyncProducer,
]);
if ($this->asyncProducer) {
$this->producer = $res;
}
return $res;
}
}
14 changes: 13 additions & 1 deletion src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ class Producer implements ProducerContract
public function __construct(
private readonly Config $config,
private readonly MessageSerializer $serializer,
private readonly bool $async = false,
) {
$this->producer = app(KafkaProducer::class, [
'conf' => $this->setConf($this->config->getProducerOptions()),
]);
$this->dispatcher = App::make(Dispatcher::class);
if ($this->async) {
app()->terminating(function () {
$this->flush();
});
}
}

/** Set the Kafka Configuration. */
Expand Down Expand Up @@ -72,6 +78,10 @@ public function produce(ProducerMessage $message): bool

$this->producer->poll(0);

if ($this->async) {
return true;
}

return $this->flush();
}

Expand Down Expand Up @@ -104,7 +114,9 @@ public function produceBatch(MessageBatch $messageBatch): int
$produced++;
}

$this->flush();
if (!$this->async) {
$this->flush();
}

$this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));

Expand Down

0 comments on commit 3031da7

Please sign in to comment.