Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions spring-boot-project/spring-boot-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,11 @@
<artifactId>spring-integration-jmx</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-rsocket</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@
import javax.management.MBeanServer;
import javax.sql.DataSource;

import io.rsocket.RSocketFactory;
import io.rsocket.transport.netty.server.TcpServerTransport;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
Expand All @@ -29,8 +34,10 @@
import org.springframework.boot.autoconfigure.condition.SearchStrategy;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.env.Environment;
Expand All @@ -42,6 +49,14 @@
import org.springframework.integration.jdbc.store.JdbcMessageStore;
import org.springframework.integration.jmx.config.EnableIntegrationMBeanExport;
import org.springframework.integration.monitor.IntegrationMBeanExporter;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.rsocket.outbound.RSocketOutboundGateway;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import org.springframework.util.StringUtils;

/**
Expand Down Expand Up @@ -138,4 +153,91 @@ public IntegrationDataSourceInitializer integrationDataSourceInitializer(DataSou

}

/**
* Integration RSocket configuration.
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ IntegrationRSocketEndpoint.class, RSocketRequester.class, RSocketFactory.class })
@Conditional(IntegrationRSocketConfiguration.AnyRSocketChannelAdapterAvailable.class)
protected static class IntegrationRSocketConfiguration {

/**
* Check if either a {@link IntegrationRSocketEndpoint} or
* {@link RSocketOutboundGateway} bean is available.
*/
static class AnyRSocketChannelAdapterAvailable extends AnyNestedCondition {

AnyRSocketChannelAdapterAvailable() {
super(ConfigurationPhase.REGISTER_BEAN);
}

@ConditionalOnBean(IntegrationRSocketEndpoint.class)
static class IntegrationRSocketEndpointAvailable {

}

@ConditionalOnBean(RSocketOutboundGateway.class)
static class RSocketOutboundGatewayAvailable {

}

}

@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(TcpServerTransport.class)
@AutoConfigureBefore(RSocketMessagingAutoConfiguration.class)
protected static class IntegrationRSocketServerConfiguration {

@Bean
@ConditionalOnMissingBean(ServerRSocketMessageHandler.class)
public RSocketMessageHandler serverRSocketMessageHandler(RSocketStrategies rSocketStrategies,
IntegrationProperties integrationProperties) {

RSocketMessageHandler messageHandler =
new ServerRSocketMessageHandler(
integrationProperties.getRSocket().getServer().isMessageMappingEnabled());
messageHandler.setRSocketStrategies(rSocketStrategies);
return messageHandler;
}

@Bean
@ConditionalOnMissingBean
public ServerRSocketConnector serverRSocketConnector(ServerRSocketMessageHandler messageHandler) {
return new ServerRSocketConnector(messageHandler);
}

}

@Configuration(proxyBeanMethods = false)
protected static class IntegrationRSocketClientConfiguration {

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.integration.rsocket.client", name = "port")
public ClientRSocketConnector clientTcpRSocketConnector(IntegrationProperties integrationProperties,
RSocketStrategies rSocketStrategies) {

IntegrationProperties.RSocket.Client client = integrationProperties.getRSocket().getClient();
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector(client.getHost(), client.getPort());
clientRSocketConnector.setRSocketStrategies(rSocketStrategies);
return clientRSocketConnector;
}

@Bean
@ConditionalOnMissingBean
@ConditionalOnProperty(prefix = "spring.integration.rsocket.client", name = "uri")
public ClientRSocketConnector clientWebSocketRSocketConnector(IntegrationProperties integrationProperties,
RSocketStrategies rSocketStrategies) {

IntegrationProperties.RSocket.Client client = integrationProperties.getRSocket().getClient();
ClientRSocketConnector clientRSocketConnector = new ClientRSocketConnector(client.getUri());
clientRSocketConnector.setRSocketStrategies(rSocketStrategies);
return clientRSocketConnector;
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package org.springframework.boot.autoconfigure.integration;

import java.net.URI;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceInitializationMode;

Expand All @@ -24,17 +26,24 @@
*
* @author Vedran Pavic
* @author Stephane Nicoll
* @author Artem Bilan
* @since 2.0.0
*/
@ConfigurationProperties(prefix = "spring.integration")
public class IntegrationProperties {

private final Jdbc jdbc = new Jdbc();

private final RSocket rsocket = new RSocket();

public Jdbc getJdbc() {
return this.jdbc;
}

public RSocket getRSocket() {
return this.rsocket;
}

public static class Jdbc {

private static final String DEFAULT_SCHEMA_LOCATION = "classpath:org/springframework/"
Expand Down Expand Up @@ -68,4 +77,80 @@ public void setInitializeSchema(DataSourceInitializationMode initializeSchema) {

}

public static class RSocket {

private final Client client = new Client();

private final Server server = new Server();

public Client getClient() {
return this.client;
}

public Server getServer() {
return this.server;
}

public static class Client {

/**
* TCP RSocket server host to connect to.
*/
private String host = "localhost";

/**
* TCP RSocket server port to connect to.
*/
private Integer port;

/**
* WebSocket RSocket server uri to connect to.
*/
private URI uri;

public void setHost(String host) {
this.host = host;
}

public String getHost() {
return this.host;
}

public void setPort(Integer port) {
this.port = port;
}

public Integer getPort() {
return this.port;
}

public void setUri(URI uri) {
this.uri = uri;
}

public URI getUri() {
return this.uri;
}

}

public static class Server {

/**
* Whether to handle message mapping for RSocket via Spring Integration.
*/
boolean messageMappingEnabled;

public boolean isMessageMappingEnabled() {
return this.messageMappingEnabled;
}

public void setMessageMappingEnabled(boolean messageMappingEnabled) {
this.messageMappingEnabled = messageMappingEnabled;
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,26 @@

package org.springframework.boot.autoconfigure.integration;

import java.util.List;

import javax.management.MBeanServer;

import io.rsocket.transport.ClientTransport;
import io.rsocket.transport.netty.client.TcpClientTransport;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;

import org.springframework.beans.DirectFieldAccessor;
import org.springframework.boot.autoconfigure.AutoConfigurations;
import org.springframework.boot.autoconfigure.integration.IntegrationAutoConfiguration.IntegrationComponentScanConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.EmbeddedDataSourceConfiguration;
import org.springframework.boot.autoconfigure.jdbc.JdbcTemplateAutoConfiguration;
import org.springframework.boot.autoconfigure.jmx.JmxAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketMessagingAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketRequesterAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketServerAutoConfiguration;
import org.springframework.boot.autoconfigure.rsocket.RSocketStrategiesAutoConfiguration;
import org.springframework.boot.jdbc.DataSourceInitializationMode;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
Expand All @@ -38,10 +48,17 @@
import org.springframework.integration.endpoint.MessageProcessorMessageSource;
import org.springframework.integration.gateway.RequestReplyExchanger;
import org.springframework.integration.handler.MessageProcessor;
import org.springframework.integration.rsocket.ClientRSocketConnector;
import org.springframework.integration.rsocket.IntegrationRSocketEndpoint;
import org.springframework.integration.rsocket.ServerRSocketConnector;
import org.springframework.integration.rsocket.ServerRSocketMessageHandler;
import org.springframework.integration.support.channel.HeaderChannelRegistry;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jmx.export.MBeanExporter;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.invocation.reactive.HandlerMethodArgumentResolver;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -188,6 +205,47 @@ void integrationEnablesDefaultCounts() {
});
}

@Test
void rsocketSupportEnabled() {
this.contextRunner.withUserConfiguration(RSocketServerConfiguration.class)
.withConfiguration(AutoConfigurations.of(RSocketServerAutoConfiguration.class,
RSocketStrategiesAutoConfiguration.class, RSocketMessagingAutoConfiguration.class,
RSocketRequesterAutoConfiguration.class, IntegrationAutoConfiguration.class))
.withPropertyValues("spring.rsocket.server.port=0",
"spring.integration.rsocket.client.port=0",
"spring.integration.rsocket.server.message-mapping-enabled=true")
.run((context) -> {
assertThat(context)
.hasSingleBean(ClientRSocketConnector.class)
.hasBean("clientTcpRSocketConnector")
.hasSingleBean(ServerRSocketConnector.class)
.hasSingleBean(ServerRSocketMessageHandler.class)
.hasSingleBean(RSocketMessageHandler.class);

ServerRSocketMessageHandler serverRSocketMessageHandler =
context.getBean(ServerRSocketMessageHandler.class);
assertThat(context).getBean(RSocketMessageHandler.class).isSameAs(serverRSocketMessageHandler);

@SuppressWarnings("unchecked")
List<HandlerMethodArgumentResolver> handlerMethodArgumentResolvers =
(List<HandlerMethodArgumentResolver>) new DirectFieldAccessor(serverRSocketMessageHandler)
.getPropertyValue("invocableHelper.argumentResolvers.argumentResolvers");

assertThat(handlerMethodArgumentResolvers)
.extracting((resolver) -> resolver.getClass().getSimpleName())
.contains("HeaderMethodArgumentResolver", "DestinationVariableMethodArgumentResolver",
"RSocketRequesterMethodArgumentResolver", "PayloadMethodArgumentResolver",
"MessageHandlerMethodArgumentResolver");

ClientRSocketConnector clientRSocketConnector = context.getBean(ClientRSocketConnector.class);
ClientTransport clientTransport =
(ClientTransport) new DirectFieldAccessor(clientRSocketConnector)
.getPropertyValue("clientTransport");

assertThat(clientTransport).isInstanceOf(TcpClientTransport.class);
});
}

@Configuration(proxyBeanMethods = false)
static class CustomMBeanExporter {

Expand Down Expand Up @@ -220,4 +278,26 @@ MessageSource<?> myMessageSource() {

}

@Configuration(proxyBeanMethods = false)
static class RSocketServerConfiguration {

@Bean
public IntegrationRSocketEndpoint mockIntegrationRSocketEndpoint() {
return new IntegrationRSocketEndpoint() {

@Override
public Mono<Void> handleMessage(Message<?> message) {
return null;
}

@Override
public String[] getPath() {
return new String[] {"/rsocketTestPath"};
}

};
}

}

}