37
37
38
38
import static com .mongodb .assertions .Assertions .notNull ;
39
39
import static com .mongodb .internal .async .ErrorHandlingResultCallback .errorHandlingCallback ;
40
- import static com .mongodb .internal .operation .CommandOperationHelper .createReadCommandAndExecuteAsync ;
40
+ import static com .mongodb .internal .operation .AsyncOperationHelper .createReadCommandAndExecuteAsync ;
41
+ import static com .mongodb .internal .operation .AsyncOperationHelper .decorateReadWithRetriesAsync ;
42
+ import static com .mongodb .internal .operation .AsyncOperationHelper .withAsyncSourceAndConnection ;
41
43
import static com .mongodb .internal .operation .CommandOperationHelper .initialRetryState ;
42
- import static com .mongodb .internal .operation .CommandOperationHelper .logRetryExecute ;
43
44
import static com .mongodb .internal .operation .ExplainHelper .asExplainCommand ;
44
45
import static com .mongodb .internal .operation .OperationHelper .LOGGER ;
45
46
import static com .mongodb .internal .operation .OperationHelper .canRetryRead ;
46
47
import static com .mongodb .internal .operation .OperationHelper .cursorDocumentToQueryResult ;
47
- import static com .mongodb .internal .operation .OperationHelper .withAsyncSourceAndConnection ;
48
48
import static com .mongodb .internal .operation .OperationReadConcernHelper .appendReadConcernToCommand ;
49
49
import static com .mongodb .internal .operation .ServerVersionHelper .MIN_WIRE_VERSION ;
50
50
@@ -90,33 +90,33 @@ public void executeAsync(
90
90
final SingleResultCallback <AsyncBatchCursor <T >> callback ) {
91
91
RetryState retryState = initialRetryState (retryReads );
92
92
binding .retain ();
93
- AsyncCallbackSupplier <AsyncBatchCursor <T >> asyncRead = CommandOperationHelper
94
- .< AsyncBatchCursor < T >> decorateReadWithRetries ( retryState , funcCallback -> {
95
- logRetryExecute ( retryState );
96
- withAsyncSourceAndConnection (binding :: getReadConnectionSource ,
97
- false ,
98
- funcCallback ,
99
- ( source , connection , releasingCallback ) -> {
100
- if ( retryState
101
- . breakAndCompleteIfRetryAnd (
102
- () -> ! canRetryRead ( source . getServerDescription (),
103
- binding . getSessionContext () ),
104
- releasingCallback )) {
105
- return ;
106
- }
107
- final SingleResultCallback < AsyncBatchCursor < T >> wrappedCallback =
108
- exceptionTransformingCallback ( releasingCallback );
109
- createReadCommandAndExecuteAsync ( retryState ,
110
- binding ,
111
- source ,
112
- namespace . getDatabaseName () ,
113
- getCommandCreator ( binding . getSessionContext () ),
114
- CommandResultDocumentCodec . create ( decoder , FIRST_BATCH ),
115
- asyncTransformer ( ),
116
- connection ,
117
- wrappedCallback );
118
- } );
119
- } )
93
+ AsyncCallbackSupplier <AsyncBatchCursor <T >> asyncRead = decorateReadWithRetriesAsync (
94
+ retryState ,
95
+ binding . getOperationContext (),
96
+ ( AsyncCallbackSupplier < AsyncBatchCursor < T >>) funcCallback -> withAsyncSourceAndConnection (
97
+ binding :: getReadConnectionSource ,
98
+ false ,
99
+ funcCallback ,
100
+ ( source , connection , releasingCallback ) -> {
101
+ if ( retryState
102
+ . breakAndCompleteIfRetryAnd (
103
+ () -> ! canRetryRead ( source . getServerDescription ( ),
104
+ binding . getSessionContext ()),
105
+ releasingCallback )) {
106
+ return ;
107
+ }
108
+ SingleResultCallback < AsyncBatchCursor < T >> wrappedCallback =
109
+ exceptionTransformingCallback ( releasingCallback );
110
+ createReadCommandAndExecuteAsync ( retryState ,
111
+ binding ,
112
+ source ,
113
+ namespace . getDatabaseName ( ),
114
+ getCommandCreator ( binding . getSessionContext () ),
115
+ CommandResultDocumentCodec . create ( decoder , FIRST_BATCH ),
116
+ asyncTransformer () ,
117
+ connection ,
118
+ wrappedCallback );
119
+ }) )
120
120
.whenComplete (binding ::release );
121
121
asyncRead .get (errorHandlingCallback (callback , LOGGER ));
122
122
}
@@ -125,10 +125,11 @@ private static <T> SingleResultCallback<T> exceptionTransformingCallback(
125
125
final SingleResultCallback <T > callback ) {
126
126
return (result , t ) -> {
127
127
if (t != null ) {
128
- if (t instanceof MongoCommandException e ) {
129
- MongoQueryException exception =
130
- new MongoQueryException (e .getResponse (), e .getServerAddress ());
131
- callback .onResult (result , exception );
128
+ if (t instanceof MongoCommandException commandException ) {
129
+ callback .onResult (result ,
130
+ new MongoQueryException (
131
+ commandException .getResponse (),
132
+ commandException .getServerAddress ()));
132
133
} else {
133
134
callback .onResult (result , t );
134
135
}
@@ -170,7 +171,7 @@ private CommandOperationHelper.CommandCreator getCommandCreator(
170
171
};
171
172
}
172
173
173
- private CommandOperationHelper .CommandReadTransformerAsync <BsonDocument , AsyncBatchCursor <T >> asyncTransformer () {
174
+ private AsyncOperationHelper .CommandReadTransformerAsync <BsonDocument , AsyncBatchCursor <T >> asyncTransformer () {
174
175
return (result , source , connection ) -> {
175
176
QueryResult <T > queryResult = cursorDocumentToQueryResult (result .getDocument ("cursor" ),
176
177
connection .getDescription ()
0 commit comments