From c0fa8845ed683fb53f6830c9b19a07c34ffe7ed8 Mon Sep 17 00:00:00 2001 From: Dmitriy Tverdiakov Date: Tue, 16 Nov 2021 14:20:19 +0000 Subject: [PATCH] Add support for multiple tx result streams in reactive backend Prior to this update only a single result stream could exist at a time because publishing thread (event loop) used to be blocked for iterative consumption and, hence, could not be used for additional result streams. Now, the publishing thread will not be blocked and will be available for other result streams too. Skip reasons have been clarified for tests that required investigation. Testkit configs have been updated. --- .../testkit/backend/RxBlockingSubscriber.java | 87 ------- .../testkit/backend/RxBufferedSubscriber.java | 241 ++++++++++++++++++ .../backend/holder/RxResultHolder.java | 8 +- .../backend/holder/RxSessionHolder.java | 12 - .../backend/messages/requests/ResultNext.java | 77 ++---- .../messages/requests/SessionClose.java | 119 +-------- .../backend/messages/requests/StartTest.java | 8 +- testkit-tests/pom.xml | 5 +- 8 files changed, 268 insertions(+), 289 deletions(-) delete mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java create mode 100644 testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java deleted file mode 100644 index f8ca02c18d..0000000000 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBlockingSubscriber.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright (c) "Neo4j" - * Neo4j Sweden AB [http://neo4j.com] - * - * This file is part of Neo4j. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package neo4j.org.testkit.backend; - -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; - -public class RxBlockingSubscriber implements Subscriber -{ - private final CompletableFuture subscriptionFuture = new CompletableFuture<>(); - private final CompletableFuture completionFuture = new CompletableFuture<>(); - private CompletableFuture> nextSignalConsumerFuture; - - public void setNextSignalConsumer( CompletableFuture nextSignalConsumer ) - { - nextSignalConsumerFuture.complete( nextSignalConsumer ); - } - - public CompletionStage getSubscriptionStage() - { - return subscriptionFuture; - } - - public CompletionStage getCompletionStage() - { - return completionFuture; - } - - @Override - public void onSubscribe( Subscription s ) - { - nextSignalConsumerFuture = new CompletableFuture<>(); - subscriptionFuture.complete( s ); - } - - @Override - public void onNext( T t ) - { - blockUntilNextSignalConsumer().complete( t ); - } - - @Override - public void onError( Throwable t ) - { - completionFuture.completeExceptionally( t ); - } - - @Override - public void onComplete() - { - completionFuture.complete( null ); - } - - private CompletableFuture blockUntilNextSignalConsumer() - { - CompletableFuture nextSignalConsumer; - try - { - nextSignalConsumer = nextSignalConsumerFuture.get(); - } - catch ( Throwable throwable ) - { - throw new RuntimeException( "Failed waiting for next signal consumer", throwable ); - } - nextSignalConsumerFuture = new CompletableFuture<>(); - return nextSignalConsumer; - } -} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java new file mode 100644 index 0000000000..4137b464a2 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/RxBufferedSubscriber.java @@ -0,0 +1,241 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import org.reactivestreams.Subscription; +import reactor.core.publisher.BaseSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Supplier; + +/** + * Buffered subscriber for testing purposes. + *

+ * It consumes incoming signals as soon as they arrive and prevents publishing thread from getting blocked. + *

+ * The consumed signals can be retrieved one-by-one using {@link #next()}. It calls upstream {@link org.reactivestreams.Subscription#request(long)} with + * configured fetch size only when next signal is requested and no signals are expected to be emitted either because they have not been requested yet or the + * previous demand has been satisfied. + * + * @param + */ +public class RxBufferedSubscriber extends BaseSubscriber +{ + private final Lock lock = new ReentrantLock(); + private final long fetchSize; + private final CompletableFuture subscriptionFuture = new CompletableFuture<>(); + private final FluxSink itemsSink; + private final OneSignalSubscriber itemsSubscriber; + private long pendingItems; + private boolean nextInProgress; + + public RxBufferedSubscriber( long fetchSize ) + { + this.fetchSize = fetchSize; + AtomicReference> sinkRef = new AtomicReference<>(); + itemsSubscriber = new OneSignalSubscriber<>(); + Flux.create( fluxSink -> + { + sinkRef.set( fluxSink ); + fluxSink.onRequest( ignored -> requestFromUpstream() ); + } ).subscribe( itemsSubscriber ); + itemsSink = sinkRef.get(); + } + + /** + * Returns a {@link Mono} of next signal from this subscription. + *

+ * If necessary, a request with configured fetch size is made for more signals to be published. + *

+ * Only a single in progress request is supported at a time. The returned {@link Mono} must succeed or error before next call is permitted. + *

+ * Both empty successful completion and error completion indicate the completion of the subscribed publisher. This method must not be called after this. + * + * @return the {@link Mono} of next signal. + */ + public Mono next() + { + executeWithLock( lock, () -> + { + if ( nextInProgress ) + { + throw new IllegalStateException( "Only one in progress next is allowed at a time" ); + } + return nextInProgress = true; + } ); + return Mono.fromCompletionStage( subscriptionFuture ) + .then( Mono.create( itemsSubscriber::requestNext ) ) + .doOnSuccess( ignored -> executeWithLock( lock, () -> nextInProgress = false ) ) + .doOnError( ignored -> executeWithLock( lock, () -> nextInProgress = false ) ); + } + + @Override + protected void hookOnSubscribe( Subscription subscription ) + { + subscriptionFuture.complete( subscription ); + } + + @Override + protected void hookOnNext( T value ) + { + executeWithLock( lock, () -> pendingItems-- ); + itemsSink.next( value ); + } + + @Override + protected void hookOnComplete() + { + itemsSink.complete(); + } + + @Override + protected void hookOnError( Throwable throwable ) + { + itemsSink.error( throwable ); + } + + private void requestFromUpstream() + { + boolean moreItemsPending = executeWithLock( lock, () -> + { + boolean morePending; + if ( pendingItems > 0 ) + { + morePending = true; + } + else + { + pendingItems = fetchSize; + morePending = false; + } + return morePending; + } ); + if ( moreItemsPending ) + { + return; + } + Subscription subscription = subscriptionFuture.getNow( null ); + if ( subscription == null ) + { + throw new IllegalStateException( "Upstream subscription must not be null at this stage" ); + } + subscription.request( fetchSize ); + } + + public static T executeWithLock( Lock lock, Supplier supplier ) + { + lock.lock(); + try + { + return supplier.get(); + } + finally + { + lock.unlock(); + } + } + + private static class OneSignalSubscriber extends BaseSubscriber + { + private final Lock lock = new ReentrantLock(); + private MonoSink sink; + private boolean emitted; + private boolean done; + private Throwable throwable; + + public void requestNext( MonoSink sink ) + { + boolean done = executeWithLock( lock, () -> + { + this.sink = sink; + emitted = false; + return this.done; + } ); + + if ( done ) + { + if ( throwable != null ) + { + this.sink.error( throwable ); + } + else + { + this.sink.success(); + } + } + else + { + upstream().request( 1 ); + } + } + + @Override + protected void hookOnSubscribe( Subscription subscription ) + { + // left empty to prevent requesting signals immediately + } + + @Override + protected void hookOnNext( T value ) + { + MonoSink sink = executeWithLock( lock, () -> + { + emitted = true; + return this.sink; + } ); + sink.success( value ); + } + + @Override + protected void hookOnComplete() + { + MonoSink sink = executeWithLock( lock, () -> + { + done = true; + return !emitted ? this.sink : null; + } ); + if ( sink != null ) + { + sink.success(); + } + } + + @Override + protected void hookOnError( Throwable throwable ) + { + MonoSink sink = executeWithLock( lock, () -> + { + done = true; + this.throwable = throwable; + return !emitted ? this.sink : null; + } ); + if ( sink != null ) + { + sink.error( throwable ); + } + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java index 56631f33eb..fd1c290c4b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxResultHolder.java @@ -20,7 +20,7 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.RxBlockingSubscriber; +import neo4j.org.testkit.backend.RxBufferedSubscriber; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -31,23 +31,21 @@ public class RxResultHolder extends AbstractResultHolder { @Setter - private RxBlockingSubscriber subscriber; + private RxBufferedSubscriber subscriber; @Getter private final AtomicLong requestedRecordsCounter = new AtomicLong(); public RxResultHolder( RxSessionHolder sessionHolder, RxResult result ) { super( sessionHolder, result ); - sessionHolder.setResultHolder( this ); } public RxResultHolder( RxTransactionHolder transactionHolder, RxResult result ) { super( transactionHolder, result ); - transactionHolder.getSessionHolder().setResultHolder( this ); } - public Optional> getSubscriber() + public Optional> getSubscriber() { return Optional.ofNullable( subscriber ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java index 607ce07229..73e66b2920 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/holder/RxSessionHolder.java @@ -18,25 +18,13 @@ */ package neo4j.org.testkit.backend.holder; -import lombok.Setter; - -import java.util.Optional; - import org.neo4j.driver.SessionConfig; import org.neo4j.driver.reactive.RxSession; public class RxSessionHolder extends AbstractSessionHolder { - @Setter - private RxResultHolder resultHolder; - public RxSessionHolder( DriverHolder driverHolder, RxSession session, SessionConfig config ) { super( driverHolder, session, config ); } - - public Optional getResultHolder() - { - return Optional.ofNullable( resultHolder ); - } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index 7f83c5a665..6a434ca3d5 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -20,14 +20,13 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.RxBlockingSubscriber; +import neo4j.org.testkit.backend.RxBufferedSubscriber; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.responses.NullRecord; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import org.neo4j.driver.Record; @@ -68,66 +67,22 @@ public Mono processRx( TestkitState testkitState ) return testkitState.getRxResultHolder( data.getResultId() ) .flatMap( resultHolder -> { - CompletionStage responseStage = getSubscriberStage( resultHolder ) - .thenCompose( subscriber -> requestRecordsWhenPreviousAreConsumed( subscriber, resultHolder ) ) - .thenCompose( subscriber -> consumeNextRecordOrCompletionSignal( subscriber, resultHolder ) ) - .thenApply( this::createResponseNullSafe ); - return Mono.fromCompletionStage( responseStage ); - } ); - } - - private CompletionStage> getSubscriberStage( RxResultHolder resultHolder ) - { - return resultHolder.getSubscriber() - .>>map( CompletableFuture::completedFuture ) - .orElseGet( () -> - { - RxBlockingSubscriber subscriber = new RxBlockingSubscriber<>(); - CompletionStage> subscriberStage = - subscriber.getSubscriptionStage() - .thenApply( subscription -> + RxBufferedSubscriber subscriber = + resultHolder.getSubscriber() + .orElseGet( () -> { - resultHolder.setSubscriber( subscriber ); - return subscriber; + RxBufferedSubscriber subscriberInstance = + new RxBufferedSubscriber<>( + getFetchSize( resultHolder ) ); + resultHolder.setSubscriber( subscriberInstance ); + resultHolder.getResult().records() + .subscribe( subscriberInstance ); + return subscriberInstance; } ); - resultHolder.getResult().records().subscribe( subscriber ); - return subscriberStage; - } ); - } - - private CompletionStage> requestRecordsWhenPreviousAreConsumed( - RxBlockingSubscriber subscriber, RxResultHolder resultHolder - ) - { - return resultHolder.getRequestedRecordsCounter().get() > 0 - ? CompletableFuture.completedFuture( subscriber ) - : subscriber.getSubscriptionStage() - .thenApply( subscription -> - { - long fetchSize = getFetchSize( resultHolder ); - subscription.request( fetchSize ); - resultHolder.getRequestedRecordsCounter().addAndGet( fetchSize ); - return subscriber; - } ); - } - - private CompletionStage consumeNextRecord( RxBlockingSubscriber subscriber, RxResultHolder resultHolder ) - { - CompletableFuture recordConsumer = new CompletableFuture<>(); - subscriber.setNextSignalConsumer( recordConsumer ); - return recordConsumer.thenApply( record -> - { - resultHolder.getRequestedRecordsCounter().decrementAndGet(); - return record; - } ); - } - - private CompletionStage consumeNextRecordOrCompletionSignal( RxBlockingSubscriber subscriber, RxResultHolder resultHolder ) - { - return CompletableFuture.anyOf( - consumeNextRecord( subscriber, resultHolder ).toCompletableFuture(), - subscriber.getCompletionStage().toCompletableFuture() - ).thenApply( Record.class::cast ); + return subscriber.next() + .map( this::createResponse ) + .defaultIfEmpty( NullRecord.builder().build() ); + } ); } private long getFetchSize( RxResultHolder resultHolder ) @@ -138,7 +93,7 @@ private long getFetchSize( RxResultHolder resultHolder ) return fetchSize == -1 ? Long.MAX_VALUE : fetchSize; } - private neo4j.org.testkit.backend.messages.responses.Record createResponse( Record record ) + private neo4j.org.testkit.backend.messages.responses.TestkitResponse createResponse( Record record ) { return neo4j.org.testkit.backend.messages.responses.Record.builder() .data( neo4j.org.testkit.backend.messages.responses.Record.RecordBody.builder() diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index e0c0015ba0..d3680a3f49 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -20,18 +20,12 @@ import lombok.Getter; import lombok.Setter; -import neo4j.org.testkit.backend.RxBlockingSubscriber; import neo4j.org.testkit.backend.TestkitState; -import neo4j.org.testkit.backend.holder.RxResultHolder; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import reactor.core.publisher.Mono; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.atomic.AtomicLong; - -import org.neo4j.driver.Record; @Setter @Getter @@ -58,126 +52,15 @@ public CompletionStage processAsync( TestkitState testkitState public Mono processRx( TestkitState testkitState ) { return testkitState.getRxSessionHolder( data.getSessionId() ) - .flatMap( sessionHolder -> sessionHolder.getResultHolder() - .map( this::consumeRequestedDemandAndCancelIfSubscribed ) - .orElse( Mono.empty() ) - .then( Mono.fromDirect( sessionHolder.getSession().close() ) ) ) + .flatMap( sessionHolder -> Mono.fromDirect( sessionHolder.getSession().close() ) ) .then( Mono.just( createResponse() ) ); } - private Mono consumeRequestedDemandAndCancelIfSubscribed( RxResultHolder resultHolder ) - { - return resultHolder.getSubscriber() - .map( subscriber -> Mono.fromCompletionStage( consumeRequestedDemandAndCancelIfSubscribed( resultHolder, subscriber ) ) ) - .orElse( Mono.empty() ); - } - - private CompletionStage consumeRequestedDemandAndCancelIfSubscribed( RxResultHolder resultHolder, RxBlockingSubscriber subscriber ) - { - if ( subscriber.getCompletionStage().toCompletableFuture().isDone() ) - { - return CompletableFuture.completedFuture( null ); - } - - return new DemandConsumer<>( subscriber, resultHolder.getRequestedRecordsCounter() ) - .getCompletedStage() - .thenCompose( completionReason -> - { - CompletionStage result; - switch ( completionReason ) - { - case REQUESTED_DEMAND_CONSUMED: - result = subscriber.getSubscriptionStage().thenApply( subscription -> - { - subscription.cancel(); - return null; - } ); - break; - case RECORD_STREAM_EXHAUSTED: - result = CompletableFuture.completedFuture( null ); - break; - default: - result = new CompletableFuture<>(); - result.toCompletableFuture() - .completeExceptionally( new RuntimeException( "Unexpected completion reason: " + completionReason ) ); - } - return result; - } ); - } - private Session createResponse() { return Session.builder().data( Session.SessionBody.builder().id( data.getSessionId() ).build() ).build(); } - private static class DemandConsumer - { - private final RxBlockingSubscriber subscriber; - private final AtomicLong unfulfilledDemandCounter; - @Getter - private final CompletableFuture completedStage = new CompletableFuture<>(); - - private enum CompletionReason - { - REQUESTED_DEMAND_CONSUMED, - RECORD_STREAM_EXHAUSTED - } - - private DemandConsumer( RxBlockingSubscriber subscriber, AtomicLong unfulfilledDemandCounter ) - { - this.subscriber = subscriber; - this.unfulfilledDemandCounter = unfulfilledDemandCounter; - - subscriber.getCompletionStage().whenComplete( this::onComplete ); - long unfulfilledDemand = this.unfulfilledDemandCounter.get(); - if ( unfulfilledDemand == 0 ) - { - completedStage.complete( CompletionReason.REQUESTED_DEMAND_CONSUMED ); - } - else if ( unfulfilledDemand > 0 ) - { - setupNextSignalConsumer(); - } - } - - private void setupNextSignalConsumer() - { - CompletableFuture consumer = new CompletableFuture<>(); - subscriber.setNextSignalConsumer( consumer ); - consumer.whenComplete( this::onNext ); - } - - private void onNext( T ignored, Throwable throwable ) - { - if ( throwable != null ) - { - completedStage.completeExceptionally( throwable ); - return; - } - - if ( unfulfilledDemandCounter.decrementAndGet() > 0 ) - { - setupNextSignalConsumer(); - } - else - { - completedStage.complete( CompletionReason.REQUESTED_DEMAND_CONSUMED ); - } - } - - private void onComplete( Void ignored, Throwable throwable ) - { - if ( throwable != null ) - { - completedStage.completeExceptionally( throwable ); - } - else - { - completedStage.complete( CompletionReason.RECORD_STREAM_EXHAUSTED ); - } - } - } - @Setter @Getter private static class SessionCloseBody diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index e81fc42ce2..71f39e72ac 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -63,13 +63,11 @@ public class StartTest implements TestkitRequest REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_unmanaged_tx_should_fail_subsequent_usage_after_timeout$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestRoutingConnectionRecvTimeout\\.test_timeout_managed_tx_retry$", skipMessage ); - skipMessage = "Requires investigation"; - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxFuncRun\\.test_iteration_nested$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_broken_transaction_should_not_break_session$", skipMessage ); REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_does_not_update_last_bookmark_on_failure$", skipMessage ); - REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestTxRun\\.test_interwoven_queries$", skipMessage ); + skipMessage = "Does not support multiple concurrent result streams on session level"; + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_iteration_nested$", skipMessage ); + REACTIVE_SKIP_PATTERN_TO_REASON.put( "^.*\\.TestSessionRun\\.test_partial_iteration$", skipMessage ); } private StartTestBody data; diff --git a/testkit-tests/pom.xml b/testkit-tests/pom.xml index 3e421ea3dd..15b06fd824 100644 --- a/testkit-tests/pom.xml +++ b/testkit-tests/pom.xml @@ -159,7 +159,10 @@ ${project.build.directory}/testkit-rx reactive - --configs 4.2-cluster,4.0-community,4.1-enterprise ${testkit.args} + --configs 4.0-enterprise-neo4j 4.1-enterprise-neo4j 4.2-community-bolt 4.2-community-neo4j + 4.2-enterprise-bolt 4.2-enterprise-neo4j 4.2-enterprise-cluster-neo4j 4.3-community-bolt 4.3-community-neo4j + 4.3-enterprise-bolt 4.3-enterprise-neo4j 4.3-enterprise-cluster-neo4j ${testkit.args} + ${testkit.rx.name.pattern}>