Skip to content

Commit 1dc1283

Browse files
hantsyrstoyanchev
authored andcommitted
Add SmallRye Mutiny adapters
Closes gh-26222
1 parent 8f33450 commit 1dc1283

File tree

4 files changed

+77
-0
lines changed

4 files changed

+77
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ configure(allprojects) { project ->
6868
dependency "io.reactivex:rxjava-reactive-streams:1.2.1"
6969
dependency "io.reactivex.rxjava2:rxjava:2.2.21"
7070
dependency "io.reactivex.rxjava3:rxjava:3.1.1"
71+
dependency "io.smallrye.reactive:mutiny:1.0.0"
7172
dependency "io.projectreactor.tools:blockhound:1.0.4.RELEASE"
7273

7374
dependency "com.caucho:hessian:4.0.63"

spring-core/spring-core.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ dependencies {
4949
optional("io.reactivex:rxjava-reactive-streams")
5050
optional("io.reactivex.rxjava2:rxjava")
5151
optional("io.reactivex.rxjava3:rxjava")
52+
optional("io.smallrye.reactive:mutiny")
5253
optional("io.netty:netty-buffer")
5354
testCompile("io.projectreactor:reactor-test")
5455
testCompile("com.google.code.findbugs:jsr305")

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.concurrent.CompletionStage;
2525
import java.util.function.Function;
2626

27+
import io.smallrye.mutiny.Multi;
28+
import io.smallrye.mutiny.Uni;
2729
import kotlinx.coroutines.CompletableDeferredKt;
2830
import kotlinx.coroutines.Deferred;
2931
import org.reactivestreams.Publisher;
@@ -72,6 +74,8 @@ public class ReactiveAdapterRegistry {
7274

7375
private static final boolean kotlinCoroutinesPresent;
7476

77+
private static final boolean mutinyPresent;
78+
7579
static {
7680
ClassLoader classLoader = ReactiveAdapterRegistry.class.getClassLoader();
7781
reactorPresent = ClassUtils.isPresent("reactor.core.publisher.Flux", classLoader);
@@ -81,6 +85,7 @@ public class ReactiveAdapterRegistry {
8185
rxjava3Present = ClassUtils.isPresent("io.reactivex.rxjava3.core.Flowable", classLoader);
8286
flowPublisherPresent = ClassUtils.isPresent("java.util.concurrent.Flow.Publisher", classLoader);
8387
kotlinCoroutinesPresent = ClassUtils.isPresent("kotlinx.coroutines.reactor.MonoKt", classLoader);
88+
mutinyPresent = ClassUtils.isPresent("io.smallrye.mutiny.Uni", classLoader);
8489
}
8590

8691
private final List<ReactiveAdapter> adapters = new ArrayList<>();
@@ -121,6 +126,11 @@ public ReactiveAdapterRegistry() {
121126
if (reactorPresent && kotlinCoroutinesPresent) {
122127
new CoroutinesRegistrar().registerAdapters(this);
123128
}
129+
130+
// SmallRye Mutiny
131+
if (mutinyPresent) {
132+
new MutinyRegistrar().registerAdapters(this);
133+
}
124134
}
125135

126136

@@ -417,6 +427,23 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
417427
}
418428
}
419429

430+
private static class MutinyRegistrar {
431+
432+
void registerAdapters(ReactiveAdapterRegistry registry) {
433+
registry.registerReactiveType(
434+
ReactiveTypeDescriptor.singleOptionalValue(Uni.class, ()-> Uni.createFrom().nothing()),
435+
uni ->((Uni<?>)uni).convert().toPublisher(),
436+
publisher -> Uni.createFrom().publisher(publisher)
437+
);
438+
439+
registry.registerReactiveType(
440+
ReactiveTypeDescriptor.multiValue(Multi.class, ()-> Multi.createFrom().empty()),
441+
multi -> (Multi<?>) multi,
442+
publisher-> Multi.createFrom().publisher(publisher)
443+
);
444+
}
445+
}
446+
420447

421448
/**
422449
* {@code BlockHoundIntegration} for spring-core classes.

spring-core/src/test/java/org/springframework/core/ReactiveAdapterRegistryTests.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.List;
2222
import java.util.concurrent.CompletableFuture;
2323

24+
import io.smallrye.mutiny.Multi;
25+
import io.smallrye.mutiny.Uni;
2426
import kotlinx.coroutines.Deferred;
2527
import org.junit.jupiter.api.Nested;
2628
import org.junit.jupiter.api.Test;
@@ -357,6 +359,52 @@ void deferred() {
357359
}
358360
}
359361

362+
// SmallRye Mutiny
363+
@Nested
364+
class Mutiny {
365+
366+
@Test
367+
void defaultAdapterRegistrations() {
368+
assertThat(getAdapter(io.smallrye.mutiny.Uni.class)).isNotNull();
369+
assertThat(getAdapter(io.smallrye.mutiny.Multi.class)).isNotNull();
370+
}
371+
372+
@Test
373+
void toUni() {
374+
Publisher<Integer> source = Mono.just(1);
375+
Object target = getAdapter(Uni.class).fromPublisher(source);
376+
assertThat(target).isInstanceOf(Uni.class);
377+
assertThat(((Uni<Integer>) target).await().atMost(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
378+
}
379+
380+
@Test
381+
void fromUni() {
382+
Uni<Integer> source = Uni.createFrom().item(1);
383+
Object target = getAdapter(Uni.class).toPublisher(source);
384+
assertThat(target).isInstanceOf(Mono.class);
385+
assertThat(((Mono<Integer>) target).block(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(1));
386+
}
387+
388+
@Test
389+
void toMulti() {
390+
List<Integer> sequence = Arrays.asList(1, 2, 3);
391+
Publisher<Integer> source = Flux.fromIterable(sequence);
392+
Object target = getAdapter(Multi.class).fromPublisher(source);
393+
assertThat(target).isInstanceOf(Multi.class);
394+
assertThat(((Multi<Integer>) target).collect().asList().await().atMost(Duration.ofMillis(1000))).isEqualTo(sequence);
395+
}
396+
397+
@Test
398+
void fromMulti() {
399+
List<Integer> sequence = Arrays.asList(1, 2, 3);
400+
Multi<Integer> source = Multi.createFrom().iterable(sequence);
401+
Object target = getAdapter(Multi.class).toPublisher(source);
402+
assertThat(target).isInstanceOf(Flux.class);
403+
assertThat(((Flux<Integer>) target).blockLast(Duration.ofMillis(1000))).isEqualTo(Integer.valueOf(3));
404+
}
405+
406+
}
407+
360408
private ReactiveAdapter getAdapter(Class<?> reactiveType) {
361409
ReactiveAdapter adapter = this.registry.getAdapter(reactiveType);
362410
assertThat(adapter).isNotNull();

0 commit comments

Comments
 (0)