Skip to content

Memory leak related to TCP connection management with use of STOMP broker relay [SPR-11531] #16156

@spring-projects-issues

Description

@spring-projects-issues

Kevin Jordan opened SPR-11531 and commented

This may be more of a bug with reactor which Spring websockets uses, but it seems it builds up a lot of connections. I get almost 800MB in just a few short days. I don't have 65k connections active at any given time so this shouldn't be this way. In my configuration I'm using stomp with rabbitmq.

Configuration:

@Configuration
@EnableWebSocketMessageBroker
@EnableScheduling
@ComponentScan(basePackages = "com.practiware.desktop.server")
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, SchedulingConfigurer {

	Logger logger = LoggerFactory.getLogger(this.getClass());

	private ThreadPoolTaskScheduler scheduler;

	@Override
	public void configureClientInboundChannel(ChannelRegistration registration) {
		registration.setInterceptors(new ChannelInterceptor() {

			@Override
			public Message<?> postReceive(Message<?> message,
										  MessageChannel channel) {
				WebSocketConfig.this.logger.info("Post-receive on channel of type: " + channel.getClass().getName());
				WebSocketConfig.this.logger.info("Headers: " + message.getHeaders().toString());
				WebSocketConfig.this.logger.info("Payload: " + String.valueOf(message.getPayload()));
				return message;
			}

			@Override
			public void postSend(Message<?> message,
								 MessageChannel channel,
								 boolean sent) {
				WebSocketConfig.this.logger.info("Post-send on channel of type: " + channel.getClass().getName());
				WebSocketConfig.this.logger.info("Headers: " + message.getHeaders().toString());
				WebSocketConfig.this.logger.info("Payload: " + String.valueOf(message.getPayload()));
			}

			@Override
			public boolean preReceive(MessageChannel channel) {
				WebSocketConfig.this.logger.info("Pre-receive on channel of type: " + channel.getClass().getName());
				return true;
			}

			@SuppressWarnings("unchecked")
			@Override
			public Message<?> preSend(Message<?> message,
									  MessageChannel channel) {
				try {
					WebSocketConfig.this.logger.info("Pre-send on channel of type: " + channel.getClass().getName());
					WebSocketConfig.this.logger.info("Headers: " + message.getHeaders().toString());
					WebSocketConfig.this.logger.info("Payload: " + String.valueOf(message.getPayload()));
					/*Security checks for current user remove*/
				} catch (Exception e) {
					WebSocketConfig.this.logger.error("Error checking permissions",
													  e);
				}
				return message;
			}
		});
	}

	@Override
	public void configureClientOutboundChannel(ChannelRegistration registration) {
		registration.setInterceptors(new ChannelInterceptor() {

			@Override
			public Message<?> postReceive(Message<?> message,
										  MessageChannel channel) {
				WebSocketConfig.this.logger.info("Post-receive on channel of type: " + channel.getClass().getName());
				WebSocketConfig.this.logger.info("Headers: " + message.getHeaders().toString());
				WebSocketConfig.this.logger.info("Payload: " + String.valueOf(message.getPayload()));
				return message;
			}

			@Override
			public void postSend(Message<?> message,
								 MessageChannel channel,
								 boolean sent) {
				WebSocketConfig.this.logger.info("Post-send on channel of type: " + channel.getClass().getName());
				WebSocketConfig.this.logger.info("Headers: " + message.getHeaders().toString());
				WebSocketConfig.this.logger.info("Payload: " + String.valueOf(message.getPayload()));
			}

			@Override
			public boolean preReceive(MessageChannel channel) {
				WebSocketConfig.this.logger.info("Pre-receive on channel of type: " + channel.getClass().getName());
				return true;
			}

			@Override
			public Message<?> preSend(Message<?> message,
									  MessageChannel channel) {
				WebSocketConfig.this.logger.info("Pre-send on channel of type: " + channel.getClass().getName());
				WebSocketConfig.this.logger.info("Headers: " + message.getHeaders().toString());
				WebSocketConfig.this.logger.info("Payload: " + String.valueOf(message.getPayload()));
				return message;
			}
		});
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
		registry.enableStompBrokerRelay("/topic/",
										"/queue/");
		registry.setApplicationDestinationPrefixes("/app");
	}

	@Override
	public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
		taskRegistrar.setScheduler(this.scheduler);
	}

	@Override
	public void registerStompEndpoints(StompEndpointRegistry registry) {
		registry.addEndpoint("/subscriptions").withSockJS();
	}

	@PersistenceContext
	public void setEntityManager(EntityManager em) {
		this.em = em;
	}

	@Autowired
	@Qualifier("messageBrokerSockJsTaskScheduler")
	public void setScheduler(ThreadPoolTaskScheduler scheduler) {
		this.scheduler = scheduler;
	}
}
@Configuration
@EnableWebMvc
public class WebConfig extends WebMvcConfigurerAdapter {

	@Override
	public void configureDefaultServletHandling(DefaultServletHandlerConfigurer configurer) {
		configurer.enable();
	}
}

Could it somehow be opening connections for non-websocket connections? Do I have a mis-configuration? It works fine except for the memory leak.

I've attached a screenshot from MAT from a memory dump of my webapp.


Affects: 4.0 GA, 4.0.2

Attachments:

Metadata

Metadata

Assignees

Labels

in: messagingIssues in messaging modules (jms, messaging)in: webIssues in web modules (web, webmvc, webflux, websocket)type: bugA general bug

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions