2020
2121import lombok .Getter ;
2222import lombok .Setter ;
23+ import neo4j .org .testkit .backend .RxBlockingSubscriber ;
2324import neo4j .org .testkit .backend .TestkitState ;
25+ import neo4j .org .testkit .backend .holder .RxResultHolder ;
2426import neo4j .org .testkit .backend .messages .responses .Session ;
2527import neo4j .org .testkit .backend .messages .responses .TestkitResponse ;
2628import reactor .core .publisher .Mono ;
2729
30+ import java .util .concurrent .CompletableFuture ;
2831import java .util .concurrent .CompletionStage ;
32+ import java .util .concurrent .atomic .AtomicLong ;
33+
34+ import org .neo4j .driver .Record ;
2935
3036@ Setter
3137@ Getter
@@ -52,15 +58,121 @@ public CompletionStage<TestkitResponse> processAsync( TestkitState testkitState
5258 public Mono <TestkitResponse > processRx ( TestkitState testkitState )
5359 {
5460 return testkitState .getRxSessionHolder ( data .getSessionId () )
55- .flatMap ( sessionHolder -> Mono .fromDirect ( sessionHolder .getSession ().close () ) )
61+ .flatMap ( sessionHolder -> sessionHolder .getResultHolder ()
62+ .map ( this ::consumeRequestedDemandAndCancelIfSubscribed )
63+ .orElse ( Mono .empty () )
64+ .then ( Mono .fromDirect ( sessionHolder .getSession ().close () ) ) )
5665 .then ( Mono .just ( createResponse () ) );
5766 }
5867
68+ private Mono <Void > consumeRequestedDemandAndCancelIfSubscribed ( RxResultHolder resultHolder )
69+ {
70+ return resultHolder .getSubscriber ()
71+ .map ( subscriber -> Mono .fromCompletionStage ( consumeRequestedDemandAndCancelIfSubscribed ( resultHolder , subscriber ) ) )
72+ .orElse ( Mono .empty () );
73+ }
74+
75+ private CompletionStage <Void > consumeRequestedDemandAndCancelIfSubscribed ( RxResultHolder resultHolder , RxBlockingSubscriber <Record > subscriber )
76+ {
77+ if ( subscriber .getCompletionStage ().toCompletableFuture ().isDone () )
78+ {
79+ return CompletableFuture .completedFuture ( null );
80+ }
81+
82+ return new DemandConsumer <>( subscriber , resultHolder .getRequestedRecordsCounter () )
83+ .getCompletedStage ()
84+ .thenCompose ( completionReason ->
85+ {
86+ CompletionStage <Void > result ;
87+ switch ( completionReason )
88+ {
89+ case REQUESTED_DEMAND_CONSUMED :
90+ result = subscriber .getSubscriptionStage ().thenApply ( subscription ->
91+ {
92+ subscription .cancel ();
93+ return null ;
94+ } );
95+ break ;
96+ case RECORD_STREAM_EXHAUSTED :
97+ result = CompletableFuture .completedFuture ( null );
98+ break ;
99+ default :
100+ result = new CompletableFuture <>();
101+ result .toCompletableFuture ()
102+ .completeExceptionally ( new RuntimeException ( "Unexpected completion reason: " + completionReason ) );
103+ }
104+ return result ;
105+ } );
106+ }
107+
59108 private Session createResponse ()
60109 {
61110 return Session .builder ().data ( Session .SessionBody .builder ().id ( data .getSessionId () ).build () ).build ();
62111 }
63112
113+ private static class DemandConsumer <T >
114+ {
115+ private final RxBlockingSubscriber <T > subscriber ;
116+ private final AtomicLong unfulfilledDemandCounter ;
117+ @ Getter
118+ private final CompletableFuture <CompletionReason > completedStage = new CompletableFuture <>();
119+
120+ private enum CompletionReason
121+ {
122+ REQUESTED_DEMAND_CONSUMED ,
123+ RECORD_STREAM_EXHAUSTED
124+ }
125+
126+ private DemandConsumer ( RxBlockingSubscriber <T > subscriber , AtomicLong unfulfilledDemandCounter )
127+ {
128+ this .subscriber = subscriber ;
129+ this .unfulfilledDemandCounter = unfulfilledDemandCounter ;
130+
131+ subscriber .getCompletionStage ().whenComplete ( this ::onComplete );
132+ if ( this .unfulfilledDemandCounter .get () > 0 )
133+ {
134+ setupNextSignalConsumer ();
135+ }
136+ }
137+
138+ private void setupNextSignalConsumer ()
139+ {
140+ CompletableFuture <T > consumer = new CompletableFuture <>();
141+ subscriber .setNextSignalConsumer ( consumer );
142+ consumer .whenComplete ( this ::onNext );
143+ }
144+
145+ private void onNext ( T ignored , Throwable throwable )
146+ {
147+ if ( throwable != null )
148+ {
149+ completedStage .completeExceptionally ( throwable );
150+ return ;
151+ }
152+
153+ if ( unfulfilledDemandCounter .decrementAndGet () > 0 )
154+ {
155+ setupNextSignalConsumer ();
156+ }
157+ else
158+ {
159+ completedStage .complete ( CompletionReason .REQUESTED_DEMAND_CONSUMED );
160+ }
161+ }
162+
163+ private void onComplete ( Void ignored , Throwable throwable )
164+ {
165+ if ( throwable != null )
166+ {
167+ completedStage .completeExceptionally ( throwable );
168+ }
169+ else
170+ {
171+ completedStage .complete ( CompletionReason .RECORD_STREAM_EXHAUSTED );
172+ }
173+ }
174+ }
175+
64176 @ Setter
65177 @ Getter
66178 private static class SessionCloseBody
0 commit comments