From 3031da77b50054c90b7abd72dc8e19e05947d4e9 Mon Sep 17 00:00:00 2001 From: SASh Date: Sat, 10 Aug 2024 01:23:47 +0300 Subject: [PATCH] Introduce Kafka::asyncPublish() that will not flush on each send/batchSend but only once when the application is terminating (#310) * upd: async production (that has only one flush at the end of the application) [see discussion #309] * fix: added broker parameter to the facade helper [see discussion #309] * ref: simplify Builder construct extension [see discussion #309] --------- Co-authored-by: Alexander (SASh) Alexiev --- docs/producing-messages/1-producing-messages.md | 11 ++++++++++- src/Facades/Kafka.php | 1 + src/Factory.php | 13 +++++++++++++ src/Producers/Builder.php | 13 +++++++++++-- src/Producers/Producer.php | 14 +++++++++++++- 5 files changed, 48 insertions(+), 4 deletions(-) diff --git a/docs/producing-messages/1-producing-messages.md b/docs/producing-messages/1-producing-messages.md index 252efa66..b11b9c08 100644 --- a/docs/producing-messages/1-producing-messages.md +++ b/docs/producing-messages/1-producing-messages.md @@ -12,4 +12,13 @@ Kafka::publish('broker')->onTopic('topic-name') ``` This method returns a `ProducerBuilder` instance, which contains a few methods to configure your kafka producer. -The following lines describes these methods. \ No newline at end of file +The following lines describes these methods. + +If you are going to produce a lot of messages to different topics, please use the `asyncPublish` method on the `Junges\Kafka\Facades\Kafka` class +```php +use Junges\Kafka\Facades\Kafka; + +Kafka::asyncPublish('broker')->onTopic('topic-name') +``` + +The main difference is that the Async Producer is a singleton and will only flush the producer when the application is shutting down, instead of after each send or batch send. That reduces the overhead when you want to send a lot of messages in your request handlers. \ No newline at end of file diff --git a/src/Facades/Kafka.php b/src/Facades/Kafka.php index c7aba6fa..95aa9fc2 100644 --- a/src/Facades/Kafka.php +++ b/src/Facades/Kafka.php @@ -8,6 +8,7 @@ /** * @method static \Junges\Kafka\Contracts\MessageProducer publish(string $broker = null) + * @method static \Junges\Kafka\Contracts\MessageProducer asyncPublish(string $broker = null) * @method static \Junges\Kafka\Consumers\Builder consumer(array $topics = [], string $groupId = null, string $brokers = null) * @method static void assertPublished(ProducerMessage $expectedMessage = null, callable $callback = null) * @method static void assertPublishedTimes(int $times = 1, ProducerMessage $expectedMessage = null, callable $callback = null) diff --git a/src/Factory.php b/src/Factory.php index 612952dd..aeb081a6 100644 --- a/src/Factory.php +++ b/src/Factory.php @@ -31,6 +31,19 @@ public function publish(string $broker = null): MessageProducer ); } + /** Creates a new ProducerBuilder instance, setting brokers and topic. The producer will be flushed only when the application terminates, and doing SEND does not mean that the message was flushed! */ + public function asyncPublish(string $broker = null): MessageProducer + { + if ($this->shouldFake) { + return Kafka::fake()->publish(); + } + + return new ProducerBuilder( + broker: $broker ?? config('kafka.brokers'), + asyncProducer: true + ); + } + /** Return a ConsumerBuilder instance. */ public function consumer(array $topics = [], string $groupId = null, string $brokers = null): ConsumerBuilder { diff --git a/src/Producers/Builder.php b/src/Producers/Builder.php index 119d055d..6f463545 100644 --- a/src/Producers/Builder.php +++ b/src/Producers/Builder.php @@ -18,7 +18,7 @@ class Builder implements MessageProducer private array $options = []; private ProducerMessage $message; private MessageSerializer $serializer; - private Producer $producer; + private ?Producer $producer = null; private string $topic = ''; private ?Sasl $saslConfig = null; private readonly string $broker; @@ -27,6 +27,7 @@ class Builder implements MessageProducer public function __construct( ?string $broker = null, + private readonly bool $asyncProducer = false, ) { /** @var ProducerMessage $message */ $message = app(ProducerMessage::class); @@ -191,6 +192,9 @@ public function sendBatch(MessageBatch $messageBatch): int public function build(): Producer { + if ($this->asyncProducer && $this->producer){ + return $this->producer; + } $conf = new Config( broker: $this->broker, topics: [], @@ -200,9 +204,14 @@ public function build(): Producer callbacks: $this->callbacks, ); - return app(Producer::class, [ + $res = app(Producer::class, [ 'config' => $conf, 'serializer' => $this->serializer, + 'async' => $this->asyncProducer, ]); + if ($this->asyncProducer) { + $this->producer = $res; + } + return $res; } } diff --git a/src/Producers/Producer.php b/src/Producers/Producer.php index 2d1b0061..08434f25 100644 --- a/src/Producers/Producer.php +++ b/src/Producers/Producer.php @@ -34,11 +34,17 @@ class Producer implements ProducerContract public function __construct( private readonly Config $config, private readonly MessageSerializer $serializer, + private readonly bool $async = false, ) { $this->producer = app(KafkaProducer::class, [ 'conf' => $this->setConf($this->config->getProducerOptions()), ]); $this->dispatcher = App::make(Dispatcher::class); + if ($this->async) { + app()->terminating(function () { + $this->flush(); + }); + } } /** Set the Kafka Configuration. */ @@ -72,6 +78,10 @@ public function produce(ProducerMessage $message): bool $this->producer->poll(0); + if ($this->async) { + return true; + } + return $this->flush(); } @@ -104,7 +114,9 @@ public function produceBatch(MessageBatch $messageBatch): int $produced++; } - $this->flush(); + if (!$this->async) { + $this->flush(); + } $this->dispatcher->dispatch(new MessageBatchPublished($messageBatch, $produced));