Skip to content

Avoid re-creating RSocketRequester instance per subscriber #25330

@OlegDokuka

Description

@OlegDokuka

Right now RSocketRequester.Builder uses the connect method as a terminal operation which builds a Mono<RSocketRequester as of now this method wraps doConnect with extra Mono.defer which seems to be a legacy solution to defer of some heavy internals allocation.

As of now, this method seems to be redundant and somewhat stops to use one of the key features of vanilla RSocket like reconnect which allows the usage of the same Mono<RSocket> instance in order to access the same cached connection.

Unfortunately, because of the Mono.defer(() -> doCancel() the underlying mono is not directly propagated to the user, hence all subsequent subscriptions create new connections instead of the usage of the cached (assumed the reconnect feature is enabled)

Expected

rsocketRequesterBuilder
				.rsocketConnector(rsocketConnector ->
					rsocketConnector
							.lease(() -> Leases.create().receiver(leaseReceiver))
							.reconnect(
								Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
								     .maxBackoff(Duration.ofSeconds(5))
							)
				)
				.connectWebSocket(URI.create(adjustmentProperties.getBaseUrl()));

to behave approximately identical to

RSocketConnector
				.create()
				.lease(() -> Leases.create().receiver(leaseReceiver))
				.dataMimeType(dataMimeType.toString())
				.metadataMimeType(metadataMimeType.toString())
				.reconnect(
						Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
						     .maxBackoff(Duration.ofSeconds(5))
				)
				.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))
				.map(rsocket -> RSocketRequester.wrap(
					rsocket,
					dataMimeType,
					metadataMimeType,
					rSocketStrategies
				));

Actual

connect method wraps

RSocketConnector
				.create()
				.lease(() -> Leases.create().receiver(leaseReceiver))
				.dataMimeType(dataMimeType.toString())
				.metadataMimeType(metadataMimeType.toString())
				.reconnect(
						Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
						     .maxBackoff(Duration.ofSeconds(5))
				)
				.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket")))

into

Mono.defer(() -> RSocketConnector
				.create()
				.lease(() -> Leases.create().receiver(leaseReceiver))
				.dataMimeType(dataMimeType.toString())
				.metadataMimeType(metadataMimeType.toString())
				.reconnect(
						Retry.backoff(Long.MAX_VALUE, Duration.ofMillis(100))
						     .maxBackoff(Duration.ofSeconds(5))
				)
				.connect(WebsocketClientTransport.create(URI.create(adjustmentProperties.getBaseUrl() + "rsocket"))))

so reconnect does not work as expected

Metadata

Metadata

Assignees

Labels

in: messagingIssues in messaging modules (jms, messaging)type: enhancementA general enhancement

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions