Skip to content
Merged
97 changes: 93 additions & 4 deletions src/reference/asciidoc/reactive-streams.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,101 @@ When the target protocol for integration provides a Reactive Streams solution, i
An inbound, event-driven channel adapter implementation is about wrapping a request (if necessary) into a deferred `Mono` or `Flux` and perform a send (and produce reply, if any) only when a protocol component initiates a subscription into a `Mono` returned from the listener method.
This way we have a reactive stream solution encapsulated exactly in this component.
Of course, downstream integration flow subscribed on the output channel should honor Reactive Streams specification and be performed in the on demand, back-pressure ready manner.
This is not always available by the nature (or the current implementation) of `MessageHandler` processor used in the integration flow.

This is not always available by the nature (or with the current implementation) of `MessageHandler` processor used in the integration flow.
This limitation can be handled using thread pools and queues or `FluxMessageChannel` (see above) before and after integration endpoints when there is no reactive implementation.

A reactive outbound channel adapter implementation is about initiation (or continuation) of a reactive stream to interaction with an external system according provided reactive API for the target protocol.
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of reactive stream on top.
A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
An example for a reactive inbound channel adapter:
```java
public class CustomReactiveMessageSource extends MessageProducerSupport {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not correct to call it MessageSource. Just because it does not implement a MessageSource. And definitely a reactive publisher could not be wrapped to the MessageSource anyway.
Better to call it CustomReactiveMessageProducer since you really do it correctly extending MessageProducerSupport and using its subscribeToPublisher() from the doStart() impl.


private final CustomReactiveSource customReactiveSource;

public CustomReactiveSourceMessageProducer(CustomReactiveSource customReactiveSource) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that constructor for the class you called properly. So, please, revise the name of the class 😄

Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}

@Override
protected void doStart() {
Flux < Message << ? >> messageFlux =
this.myReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());

subscribeToPublisher(messageFlux);
}

}
```

A reactive outbound channel adapter implementation is about the initiation (or continuation) of a reactive stream to interaction with an external system according to the provided reactive API for the target protocol.
An inbound payload could be a reactive type per se or as an event of the whole integration flow which is a part of the reactive stream on top.
A returned reactive type can be subscribed immediately if we are in a one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.

An example for an outbound channel adapter:
```java
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

private final CustomEntityOperations customEntityOperations;

private Expression queryTypeExpression = new ValueExpression<>(Type.INSERT);

public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations, Expression queryTypeExpression) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
if (queryTypeExpression != null) {
this.queryTypeExpression = queryTypeExpression;
}
}

@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> this.queryTypeExpression.getValue(this.evaluationContext, message, Type.class))
Copy link
Contributor Author

@migroskub migroskub Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan I'm struggling with the CustomReactiveMessageHandler implementation - can you simplify this to a more basic supplier usage that will be practical and won't require evaluationContext?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just do -> message.getHeaders().get("queryType", Type.class).
And no queryTypeExpression in the code at all!

.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}

private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}

private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}

public enum Type {
INSERT,
UPDATE,
}
}
```

And now we will be able to use it:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flow sample belongs to the producer section above.
The CustomReactiveMessageHandler deserves its own independent usage sample.
Otherwise it is confusing a little bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be resolved

```java
public class MainFlow {

@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlows.from(new ReactiveSourceMessageProducer(new ReactiveSource()))
.channel(outputChannel)
.get();
}
}
```

Currently Spring Integration provides channel adapter (or gateway) implementations for <<./webflux.adoc#webflux,WebFlux>>, <<./rsocket.adoc#rsocket,RSocket>>, <<./mongodb.adoc#mongodb,MongoDb>> and <<./r2dbc.adoc#r2dbc,R2DBC>>.
The <<./redis.adoc#redis-stream-outbound,Redis Stream Channel Adapters>> are also reactive and uses `ReactiveStreamOperations` from Spring Data.
Expand Down