Skip to content

Conversation

@migroskub
Copy link
Contributor

I've seen that there is no clear example of how to quickly get started with Spring Integration and Reactive Streams. This PR tries to help with this.

I'm trying to help mainly for questions like this.

I've seen that there is no clear example of how to quickly get started with Spring Integration and Reactive Streams. This PR tries to help with this.
@pivotal-cla
Copy link

@migroskub Please sign the Contributor License Agreement!

Click here to manually synchronize the status of this Pull Request.

See the FAQ for frequently asked questions.

@pivotal-cla
Copy link

@migroskub Thank you for signing the Contributor License Agreement!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for a sample!

Please, consider to move it before A reactive outbound channel adapter paragraph.

Any samples for the outbound part?

The R2dbcMessageHandler should be a good one took into.

@artembilan artembilan added this to the 6.0 M2 milestone Feb 8, 2022
@artembilan
Copy link
Member

@migroskub ,

Gentle ping.
Otherwise we will take it from here ourselves and will merge it as we think it should be.

Thank you!

@migroskub
Copy link
Contributor Author

migroskub commented Feb 11, 2022

I've committed the changes as you've suggested. I have 3 notes after reading some pages from the documentation.

  1. There are "hidden" pages. Pages that are accessible only from links, but not from the main page. For example, take a look at the functions support page, which can't be accessed from the main page. I've seen more like this.
  2. There are lots of grammar mistakes. It makes it very hard to read. I suggest starting documenting with Grammarly as a helper tool, and I think it would be a pretty good idea to let this tool walk through the existing pages and suggest fixes.
  3. The most important - It's pretty hard to understand what are the correct interfaces that we should use in each case of writing channel adapters with the Java DSL. I think there should be a better guide for writing custom channel adapters (since there are not so many channel adapters, writing custom ones is a pretty common task, if not the most common task when using Spring Integration). I'm not talking here only about Reactive Channel Adapters, but also of synchronous ones. The reference doesn't mention any of the JavaDSL channel adapter interfaces, so that's super hard to understand the purpose of each interface and what fits my needs. The only solution what diving into the source code, which has inconsistency between channel adapters (some channel adapters name is XInboundChannelAdapter, and some channel adapters name is XMessageProducer. I know that there may be some reasons and conventions to it, but this whole part is just not documented). I'm saying this because I think it will be hard for newbies to contribute channel adapters this way. I've PR'd because I'm trying to help a little with this to the next developers who'll come by this challenge.

@artembilan
Copy link
Member

Thank you for feedback, @migroskub !

  1. The Function Support is part of Messaging Endpoints chapter. So, it is indeed cannot be visible from the main page. You need to go to this one to see it in the content on the left: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#messaging-endpoints-chapter. Same applies for may others paragraphs which are part for bigger chapters. Perhaps if you go to a single page, it would become clearer for you: https://docs.spring.io/spring-integration/docs/current/reference/html/index-single.html
  2. I will take a look into that tool.
  3. Please, raise a separate GH issue for that channel adapters development concern.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

}
```

And now we will be able to use it:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this flow sample belongs to the producer section above.
The CustomReactiveMessageHandler deserves its own independent usage sample.
Otherwise it is confusing a little bit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be resolved


@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> this.queryTypeExpression.getValue(this.evaluationContext, message, Type.class))
Copy link
Contributor Author

@migroskub migroskub Feb 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@artembilan I'm struggling with the CustomReactiveMessageHandler implementation - can you simplify this to a more basic supplier usage that will be practical and won't require evaluationContext?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just do -> message.getHeaders().get("queryType", Type.class).
And no queryTypeExpression in the code at all!

A returned reactive type can be subscribed immediately if we are in one-way, fire-and-forget scenario, or it is propagated downstream (request-reply scenarios) for further integration flow or an explicit subscription in the target business logic, but still downstream preserving reactive streams semantics.
An example for a reactive inbound channel adapter:
```java
public class CustomReactiveMessageSource extends MessageProducerSupport {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not correct to call it MessageSource. Just because it does not implement a MessageSource. And definitely a reactive publisher could not be wrapped to the MessageSource anyway.
Better to call it CustomReactiveMessageProducer since you really do it correctly extending MessageProducerSupport and using its subscribeToPublisher() from the doStart() impl.


private final CustomReactiveSource customReactiveSource;

public CustomReactiveSourceMessageProducer(CustomReactiveSource customReactiveSource) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that constructor for the class you called properly. So, please, revise the name of the class 😄


@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> this.queryTypeExpression.getValue(this.evaluationContext, message, Type.class))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just do -> message.getHeaders().get("queryType", Type.class).
And no queryTypeExpression in the code at all!

MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ; is wrong here.
I would prefer to have this Flux extracted as a variable. It's OK to have it in this bean method definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like this?

public class MainFlow {  
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlows.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! This is good. It does give us a clue what is that customReactiveSource about and which from() factory is going to be for building an IntegrationFlow.

@migroskub
Copy link
Contributor Author

I think the PR is ready

@migroskub migroskub requested a review from artembilan February 11, 2022 21:23

@Override
protected void doStart() {
Flux<Message<<?>> messageFlux =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo for diamond definition. Has lead to this error:

 Execution failed for task ':checkAsciidocLinks'.
> [*** Anchor reference errors found:, 
  Anchor '?' not found in 'reactive-streams.adoc', if in another file, it needs to be qualified with './fileName.adoc#']

But originally I definitely have missed it 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't exactly understood this. What should be fixed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is wrong: Flux<Message<<?>>.
Must be Flux<Message<?>>. You have to have exact number of closing bracket as many as you have opening.
The error is misleading, but it really helped me to spot the typo in your code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed


@Override
protected void doStart() {
Flux<Message<<?>> messageFlux =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is wrong: Flux<Message<<?>>.
Must be Flux<Message<?>>. You have to have exact number of closing bracket as many as you have opening.
The error is misleading, but it really helped me to spot the typo in your code.

@artembilan artembilan merged commit 285c380 into spring-projects:main Feb 14, 2022
@artembilan
Copy link
Member

Thank you, @migroskub; looking forward for more!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants