Skip to content
Merged
38 changes: 38 additions & 0 deletions src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,44 @@ A reactive outbound channel adapter implementation is about initiation (or conti
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top.
A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.

Example for a reactive inbound channel adapter:
```java
public class ReactiveSourceMessageProducer extends MessageProducerSupport {

private final ReactiveSource myReactiveSource;

public ReactiveSourceMessageProducer(ReactiveSource myReactiveSource) {
Assert.notNull(myReactiveSource, "'myReactiveSource' must not be null");
this.myReactiveSource = myReactiveSource;
}

@Override
protected void doStart() {
Flux < Message << ? >> messageFlux =
this.myReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());

subscribeToPublisher(messageFlux);
}

}
```
And now we will be able to use it:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flow sample belongs to the producer section above.
The CustomReactiveMessageHandler deserves its own independent usage sample.
Otherwise it is confusing a little bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be resolved

```java
public class MainFlow {

@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(new ReactiveSourceMessageProducer(new ReactiveSource()))
.channel(outputChannel)
.get();
}
}
```
Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>> and <<./r2dbc.adoc#r2dbc,R2DBC>>.
The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data.
Also an https://github.com/spring-projects/spring-integration-extensions/tree/main/spring-integration-cassandra[Apache Cassandra Extension] provides a `MessageHandler` implementation for the Cassandra reactive driver.
Expand Down