@@ -9,11 +9,10 @@ use crate::query_result::{CassNode, CassResult};
99use crate :: types:: * ;
1010use crate :: uuid:: CassUuid ;
1111use futures:: future;
12- use scylla:: response:: Coordinator ;
1312use std:: future:: Future ;
1413use std:: mem;
1514use std:: os:: raw:: c_void;
16- use std:: sync:: { Arc , Condvar , Mutex } ;
15+ use std:: sync:: { Arc , Condvar , Mutex , OnceLock } ;
1716use tokio:: task:: JoinHandle ;
1817use tokio:: time:: Duration ;
1918
@@ -52,14 +51,14 @@ impl BoundCallback {
5251
5352#[ derive( Default ) ]
5453struct CassFutureState {
55- value : Option < CassFutureResult > ,
5654 err_string : Option < String > ,
5755 callback : Option < BoundCallback > ,
5856 join_handle : Option < JoinHandle < ( ) > > ,
5957}
6058
6159pub struct CassFuture {
6260 state : Mutex < CassFutureState > ,
61+ result : OnceLock < CassFutureResult > ,
6362 wait_for_value : Condvar ,
6463}
6564
@@ -89,14 +88,18 @@ impl CassFuture {
8988 ) -> Arc < CassFuture > {
9089 let cass_fut = Arc :: new ( CassFuture {
9190 state : Mutex :: new ( Default :: default ( ) ) ,
91+ result : OnceLock :: new ( ) ,
9292 wait_for_value : Condvar :: new ( ) ,
9393 } ) ;
9494 let cass_fut_clone = Arc :: clone ( & cass_fut) ;
9595 let join_handle = RUNTIME . spawn ( async move {
9696 let r = fut. await ;
9797 let maybe_cb = {
9898 let mut guard = cass_fut_clone. state . lock ( ) . unwrap ( ) ;
99- guard. value = Some ( r) ;
99+ cass_fut_clone
100+ . result
101+ . set ( r)
102+ . expect ( "Tried to resolve future result twice!" ) ;
100103 // Take the callback and call it after releasing the lock
101104 guard. callback . take ( )
102105 } ;
@@ -117,16 +120,17 @@ impl CassFuture {
117120
118121 pub fn new_ready ( r : CassFutureResult ) -> Arc < Self > {
119122 Arc :: new ( CassFuture {
120- state : Mutex :: new ( CassFutureState {
121- value : Some ( r) ,
122- ..Default :: default ( )
123- } ) ,
123+ state : Mutex :: new ( CassFutureState :: default ( ) ) ,
124+ result : OnceLock :: from ( r) ,
124125 wait_for_value : Condvar :: new ( ) ,
125126 } )
126127 }
127128
128- pub fn with_waited_result < T > ( & self , f : impl FnOnce ( & mut CassFutureResult ) -> T ) -> T {
129- self . with_waited_state ( |s| f ( s. value . as_mut ( ) . unwrap ( ) ) )
129+ pub fn with_waited_result < ' s , T > ( & ' s self , f : impl FnOnce ( & ' s CassFutureResult ) -> T ) -> T
130+ where
131+ T : ' s ,
132+ {
133+ self . with_waited_state ( |_| f ( self . result . get ( ) . unwrap ( ) ) )
130134 }
131135
132136 /// Awaits the future until completion.
@@ -155,7 +159,7 @@ impl CassFuture {
155159 guard = self
156160 . wait_for_value
157161 . wait_while ( guard, |state| {
158- state . value . is_none ( ) && state. join_handle . is_none ( )
162+ self . result . get ( ) . is_none ( ) && state. join_handle . is_none ( )
159163 } )
160164 // unwrap: Error appears only when mutex is poisoned.
161165 . unwrap ( ) ;
@@ -173,10 +177,10 @@ impl CassFuture {
173177
174178 fn with_waited_result_timed < T > (
175179 & self ,
176- f : impl FnOnce ( & mut CassFutureResult ) -> T ,
180+ f : impl FnOnce ( & CassFutureResult ) -> T ,
177181 timeout_duration : Duration ,
178182 ) -> Result < T , FutureError > {
179- self . with_waited_state_timed ( |s | f ( s . value . as_mut ( ) . unwrap ( ) ) , timeout_duration)
183+ self . with_waited_state_timed ( |_ | f ( self . result . get ( ) . unwrap ( ) ) , timeout_duration)
180184 }
181185
182186 /// Tries to await the future with a given timeout.
@@ -244,7 +248,7 @@ impl CassFuture {
244248 let ( guard_result, timeout_result) = self
245249 . wait_for_value
246250 . wait_timeout_while ( guard, remaining_timeout, |state| {
247- state . value . is_none ( ) && state. join_handle . is_none ( )
251+ self . result . get ( ) . is_none ( ) && state. join_handle . is_none ( )
248252 } )
249253 // unwrap: Error appears only when mutex is poisoned.
250254 . unwrap ( ) ;
@@ -277,7 +281,7 @@ impl CassFuture {
277281 return CassError :: CASS_ERROR_LIB_CALLBACK_ALREADY_SET ;
278282 }
279283 let bound_cb = BoundCallback { cb, data } ;
280- if lock . value . is_some ( ) {
284+ if self . result . get ( ) . is_some ( ) {
281285 // The value is already available, we need to call the callback ourselves
282286 mem:: drop ( lock) ;
283287 bound_cb. invoke ( self_ptr) ;
@@ -347,8 +351,7 @@ pub unsafe extern "C" fn cass_future_ready(
347351 return cass_false;
348352 } ;
349353
350- let state_guard = future. state . lock ( ) . unwrap ( ) ;
351- match state_guard. value {
354+ match future. result . get ( ) {
352355 None => cass_false,
353356 Some ( _) => cass_true,
354357 }
@@ -363,7 +366,7 @@ pub unsafe extern "C" fn cass_future_error_code(
363366 return CassError :: CASS_ERROR_LIB_BAD_PARAMS ;
364367 } ;
365368
366- future. with_waited_result ( |r : & mut CassFutureResult | match r {
369+ future. with_waited_result ( |r : & CassFutureResult | match r {
367370 Ok ( CassResultValue :: QueryError ( err) ) => err. to_cass_error ( ) ,
368371 Err ( ( err, _) ) => * err,
369372 _ => CassError :: CASS_OK ,
@@ -382,7 +385,7 @@ pub unsafe extern "C" fn cass_future_error_message(
382385 } ;
383386
384387 future. with_waited_state ( |state : & mut CassFutureState | {
385- let value = & state . value ;
388+ let value = future . result . get ( ) ;
386389 let msg = state
387390 . err_string
388391 . get_or_insert_with ( || match value. as_ref ( ) . unwrap ( ) {
@@ -409,7 +412,7 @@ pub unsafe extern "C" fn cass_future_get_result(
409412 } ;
410413
411414 future
412- . with_waited_result ( |r : & mut CassFutureResult | -> Option < Arc < CassResult > > {
415+ . with_waited_result ( |r : & CassFutureResult | -> Option < Arc < CassResult > > {
413416 match r. as_ref ( ) . ok ( ) ? {
414417 CassResultValue :: QueryResult ( qr) => Some ( Arc :: clone ( qr) ) ,
415418 _ => None ,
@@ -428,7 +431,7 @@ pub unsafe extern "C" fn cass_future_get_error_result(
428431 } ;
429432
430433 future
431- . with_waited_result ( |r : & mut CassFutureResult | -> Option < Arc < CassErrorResult > > {
434+ . with_waited_result ( |r : & CassFutureResult | -> Option < Arc < CassErrorResult > > {
432435 match r. as_ref ( ) . ok ( ) ? {
433436 CassResultValue :: QueryError ( qr) => Some ( Arc :: clone ( qr) ) ,
434437 _ => None ,
@@ -447,7 +450,7 @@ pub unsafe extern "C" fn cass_future_get_prepared(
447450 } ;
448451
449452 future
450- . with_waited_result ( |r : & mut CassFutureResult | -> Option < Arc < CassPrepared > > {
453+ . with_waited_result ( |r : & CassFutureResult | -> Option < Arc < CassPrepared > > {
451454 match r. as_ref ( ) . ok ( ) ? {
452455 CassResultValue :: Prepared ( p) => Some ( Arc :: clone ( p) ) ,
453456 _ => None ,
@@ -466,7 +469,7 @@ pub unsafe extern "C" fn cass_future_tracing_id(
466469 return CassError :: CASS_ERROR_LIB_BAD_PARAMS ;
467470 } ;
468471
469- future. with_waited_result ( |r : & mut CassFutureResult | match r {
472+ future. with_waited_result ( |r : & CassFutureResult | match r {
470473 Ok ( CassResultValue :: QueryResult ( result) ) => match result. tracing_id {
471474 Some ( id) => {
472475 unsafe { * tracing_id = CassUuid :: from ( id) } ;
@@ -490,21 +493,7 @@ pub unsafe extern "C" fn cass_future_coordinator(
490493 future. with_waited_result ( |r| match r {
491494 Ok ( CassResultValue :: QueryResult ( result) ) => {
492495 // unwrap: Coordinator is `None` only for tests.
493- let coordinator_ptr = result. coordinator . as_ref ( ) . unwrap ( ) as * const Coordinator ;
494-
495- // We need to 'extend' the lifetime of returned Coordinator so safe FFI api does not complain.
496- // The lifetime of "result" reference provided to this closure is the lifetime of a mutex guard.
497- // We are guaranteed, that once the future is resolved (i.e. this closure is called), the result will not
498- // be modified in any way. Thus, we can guarantee that returned coordinator lives as long as underlying
499- // CassResult lives (i.e. longer than the lifetime of acquired mutex guard).
500- //
501- // SAFETY: Coordinator's lifetime is tied to the lifetime of underlying CassResult, thus:
502- // 1. Coordinator lives as long as the underlying CassResult lives
503- // 2. Coordinator will not be moved as long as underlying CassResult is not freed
504- // 3. Coordinator is immutable once future is resolved (because CassResult is set once)
505- let coordinator_ref = unsafe { & * coordinator_ptr } ;
506-
507- RefFFI :: as_ptr ( coordinator_ref)
496+ RefFFI :: as_ptr ( result. coordinator . as_ref ( ) . unwrap ( ) )
508497 }
509498 _ => RefFFI :: null ( ) ,
510499 } )
0 commit comments