Reactive Streams adapter for Apache Pulsar Java Client. This uses Project Reactor as the Reactive Streams implementation.
The API is evolving and the documentation and examples might not match the released version available in Maven central. Please keep this in mind when using the library and the applying the examples.
This library will be presented at SpringOne 2021 in the talk Reactive Applications with Apache Pulsar and Spring Boot. The SpringOne 2021 virtual conference is free. Please join the talk online!
This library requires Java 8 or + to run.
With Gradle:
repositories {
mavenCentral()
}
dependencies {
implementation "com.github.lhotari:reactive-pulsar-adapter:0.0.6"
}
With Maven:
<dependencies>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-adapter</artifactId>
<version>0.0.6</version>
</dependency>
</dependencies>
There's a Spring Boot example at https://github.com/lhotari/reactive-pulsar-showcase .
Getting it with Gradle:
repositories {
mavenCentral()
}
dependencies {
implementation "com.github.lhotari:reactive-pulsar-spring-boot-starter:0.0.6"
testImplementation "com.github.lhotari:reactive-pulsar-spring-test-support:0.0.6"
}
Getting it with Maven:
<dependencies>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-spring-boot-starter</artifactId>
<version>0.0.6</version>
</dependency>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-spring-test-support</artifactId>
<version>0.0.6</version>
<scope>test</scope>
</dependency>
</dependencies>
Using an existing PulsarClient instance:
ReactivePulsarClient reactivePulsarClient = ReactivePulsarClient.create(pulsarClient);
Configure pulsar.client.serviceUrl
property in application properties. Any additional properties under pulsar.client.
prefix will be used to configure the Pulsar Client.
The Spring Boot starter will configure a ReactivePulsarClient bean which will be available for autowiring.
ReactiveMessageSender<String> messageSender = reactivePulsarClient
.messageSender(Schema.STRING)
.topic(topicName)
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
.sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);
Add require dependency for cache implementation. This step isn't required when using reactive-pulsar-spring-boot-starter. A ReactiveProducerCache
instance will be made available as a Spring bean in that case. However, it
is necessary to set the cache on the ReactiveMessageSenderFactory.
With Gradle:
dependencies {
implementation "com.github.lhotari:reactive-pulsar-adapter:0.0.6"
implementation "com.github.lhotari:reactive-pulsar-caffeine-producer-cache:0.0.6"
}
With Maven:
<dependencies>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-adapter</artifactId>
<version>0.0.6</version>
</dependency>
<dependency>
<groupId>com.github.lhotari</groupId>
<artifactId>reactive-pulsar-caffeine-producer-cache</artifactId>
<version>0.0.6</version>
</dependency>
</dependencies>
CaffeineReactiveProducerCache producerCache = new CaffeineReactiveProducerCache();
ReactiveMessageSender<String> messageSender = reactivePulsarClient
.messageSender(Schema.STRING)
.cache(producerCache)
.topic(topicName)
.maxInflight(100)
.build();
Mono<MessageId> messageId = messageSender
.sendMessage(Mono.just(MessageSpec.of("Hello world!")));
// for demonstration
messageId.subscribe(System.out::println);
It is recommended to use a cached producer in most cases. The cache enables reusing the Pulsar Producer instance and related resources across multiple message sending calls. This improves performance since a producer won't have to be created and closed before and after sending a message.
The adapter library implementation together with the cache implementation will also enable reactive backpressure for sending messages. The maxInflight
setting will limit the number of messages that are pending from the client to the broker. The solution will limit reactive streams subscription requests to keep the number of pending messages under the defined limit. This limit is per-topic and impacts the local JVM only.
Reading all messages for a topic:
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName)
.build();
messageReader.readMessages()
.map(Message::getValue)
// for demonstration
.subscribe(System.out::println);
By default, the stream will complete when end of the topic is reached. The end of the topic is detected with Pulsar Reader's hasMessageAvailableAsync
method.
The ReactiveMessageReader doesn't support partitioned topics. It's possible to read the content of indidual partitions. Topic names for individual partitions can be discovered using the PulsarClient's getPartitionsForTopic
method. The adapter library doesn't currently wrap that method.
With .endOfStreamAction(EndOfStreamAction.POLL)
the Reader will poll for new messages when the reader reaches the end of the topic.
ReactiveMessageReader<String> messageReader =
reactivePulsarClient.messageReader(Schema.STRING)
.topic(topicName)
.startAtSpec(StartAtSpec.LATEST)
.endOfStreamAction(EndOfStreamAction.POLL)
.build();
messageReader.readMessages()
.take(Duration.ofSeconds(5))
.take(5)
// for demonstration
.subscribe(System.out::println);
ReactiveMessageConsumer<String> messageConsumer=
reactivePulsarClient.messageConsumer(Schema.STRING)
.topic(topicName)
.consumerConfigurer(consumerBuilder->consumerBuilder.subscriptionName("sub"))
.build();
messageConsumer.consumeMessages(messageFlux ->
messageFlux.map(message ->
MessageResult.acknowledge(message.getMessageId(), message.getValue())))
.take(Duration.ofSeconds(2))
// for demonstration
.subscribe(System.out::println);
ReactiveMessageHandler reactiveMessageHandler=
ReactiveMessageHandlerBuilder
.builder(reactivePulsarClient
.messageConsumer(Schema.STRING)
.consumerConfigurer(consumerBuilder->
consumerBuilder.subscriptionName("sub")
.topic(topicName))
.build())
.messageHandler(message -> Mono.fromRunnable(()->{
System.out.println(message.getValue());
}))
.build()
.start();
// for demonstration
// the reactive message handler is running in the background, delay for 10 seconds
Thread.sleep(10000L);
// now stop the message handler component
reactiveMessageHandler.stop();
Reactive Pulsar adapter library is Open Source Software released under the Apache Software License 2.0.
The library is Apache 2.0 licensed.
There's currently a discussion about contributing the library to Apache Pulsar project on the mailing list. It is preferred to postpone contributions until there is a resolution to whether the project becomes a part of the Apache Pulsar project. Please check the Apache Pulsar dev mailing list for updates on the topic.
If you detect a bug or have a feature request or a good idea for Reactive Pulsar adapter, please open a GitHub issue or ping one of the contributors on Twitter or on Pulsar Slack.
Please use [reactive-pulsar] tag on Stackoverflow. Ask a question now.