@@ -43,17 +43,23 @@ impl ResolveLogger {
43
43
}
44
44
}
45
45
46
- fn with_state < F : FnOnce ( & ResolveInfoState ) > ( & self , f : F ) -> Fallible < ( ) > {
46
+ fn with_state < F : FnOnce ( & ResolveInfoState ) > ( & self , f : F ) {
47
47
loop {
48
48
let lock = self . state . load_full ( ) ;
49
49
let Ok ( rg) = lock. try_read ( ) else {
50
50
// this is lock free. If we didn't get the read lock it means checkpoint has
51
51
// swapped and acquired the write lock so we can just retry and get the next state
52
52
continue ;
53
53
} ;
54
- let state = rg. as_ref ( ) . or_fail ( ) ?;
55
- f ( state) ;
56
- break Ok ( ( ) ) ;
54
+ // In an earlier version we failed on this Option being None, leading to flakey tests.
55
+ // The Option can be none if thread T1 has a reference to the lock, but parks before try_lock.
56
+ // In the meantime a checkpoint thread T2, swaps out the lock, takes a write lock, takes the option
57
+ // (replacing it with None) and releases the lock. Now T1 wakes up and tries and succeeds the read
58
+ // lock. This scenario is rare and as above it's sound to retry,
59
+ if let Some ( state) = rg. as_ref ( ) {
60
+ f ( state) ;
61
+ break ;
62
+ } ;
57
63
}
58
64
}
59
65
@@ -63,7 +69,7 @@ impl ResolveLogger {
63
69
resolve_context : & pb:: Struct ,
64
70
client_credential : & str ,
65
71
values : & [ crate :: ResolvedValue < ' _ > ] ,
66
- ) -> Fallible < ( ) > {
72
+ ) {
67
73
self . with_state ( |state : & ResolveInfoState | {
68
74
state
69
75
. client_resolve_info
@@ -121,7 +127,7 @@ impl ResolveLogger {
121
127
assigned_flags : & [ crate :: FlagToApply ] ,
122
128
client : & crate :: Client ,
123
129
sdk : & Option < crate :: flags_resolver:: Sdk > ,
124
- ) -> Fallible < ( ) > {
130
+ ) {
125
131
self . with_state ( |state : & ResolveInfoState | {
126
132
let client_info = Some ( pb:: ClientInfo {
127
133
client : client. client_name . to_string ( ) ,
@@ -181,22 +187,29 @@ impl ResolveLogger {
181
187
} )
182
188
}
183
189
184
- pub fn checkpoint ( & self ) -> Fallible < pb:: WriteFlagLogsRequest > {
185
- let state = {
186
- let lock = self
187
- . state
188
- . swap ( Arc :: new ( RwLock :: new ( Some ( ResolveInfoState :: default ( ) ) ) ) ) ;
189
- let mut wg = lock. write ( ) . or_fail ( ) ?;
190
- wg. take ( ) . or_fail ( ) ?
191
- } ;
192
- let client_resolve_info = build_client_resolve_info ( & state) ;
193
- let flag_resolve_info = build_flag_resolve_info ( & state) ;
194
- Ok ( pb:: WriteFlagLogsRequest {
195
- flag_resolve_info,
196
- client_resolve_info,
197
- flag_assigned : state. flag_assigned . into_iter ( ) . collect ( ) ,
198
- telemetry_data : None ,
199
- } )
190
+ pub fn checkpoint ( & self ) -> pb:: WriteFlagLogsRequest {
191
+ let lock = self
192
+ . state
193
+ . swap ( Arc :: new ( RwLock :: new ( Some ( ResolveInfoState :: default ( ) ) ) ) ) ;
194
+ // the only operation we do under write-lock is take the option, and that can't panic, so lock shouldn't be poisoned,
195
+ // even so, if it some how was it's safe to still use the value.
196
+ let mut wg = lock
197
+ . write ( )
198
+ . unwrap_or_else ( |poisoned| poisoned. into_inner ( ) ) ;
199
+ // also shouldn't be possible for this Option to be None as we never insert None and only one thread can swap the value out
200
+ // if this assertion somehow is faulty, returning an empty WriteFlagLogsRequest is sound.
201
+ wg. take ( )
202
+ . map ( |state| {
203
+ let client_resolve_info = build_client_resolve_info ( & state) ;
204
+ let flag_resolve_info = build_flag_resolve_info ( & state) ;
205
+ pb:: WriteFlagLogsRequest {
206
+ flag_resolve_info,
207
+ client_resolve_info,
208
+ flag_assigned : state. flag_assigned . into_iter ( ) . collect ( ) ,
209
+ telemetry_data : None ,
210
+ }
211
+ } )
212
+ . unwrap_or_default ( )
200
213
}
201
214
}
202
215
@@ -373,7 +386,7 @@ mod tests {
373
386
let cred = "clients/test/clientCredentials/test" ;
374
387
let rv = [ ] ;
375
388
logger. log_resolve ( "id" , & ctx, cred, & rv) ;
376
- let req = logger. checkpoint ( ) . unwrap ( ) ;
389
+ let req = logger. checkpoint ( ) ;
377
390
// find the client entry in the built request
378
391
let crec = req
379
392
. client_resolve_info
@@ -463,7 +476,7 @@ mod tests {
463
476
let cred = "clients/test/clientCredentials/test" ;
464
477
let rv = [ ] ;
465
478
logger. log_resolve ( "id" , & ctx, cred, & rv) ;
466
- let req = logger. checkpoint ( ) . unwrap ( ) ;
479
+ let req = logger. checkpoint ( ) ;
467
480
let crec = req
468
481
. client_resolve_info
469
482
. iter ( )
@@ -522,7 +535,7 @@ mod tests {
522
535
523
536
let cred = "clients/test/clientCredentials/test" ;
524
537
logger. log_resolve ( "id" , & Struct :: default ( ) , cred, & rv) ;
525
- let req = logger. checkpoint ( ) . unwrap ( ) ;
538
+ let req = logger. checkpoint ( ) ;
526
539
527
540
let flag_info = req
528
541
. flag_resolve_info
@@ -593,7 +606,7 @@ mod tests {
593
606
594
607
let cred = "clients/test/clientCredentials/test" ;
595
608
logger. log_resolve ( "id" , & Struct :: default ( ) , cred, & rv) ;
596
- let req = logger. checkpoint ( ) . unwrap ( ) ;
609
+ let req = logger. checkpoint ( ) ;
597
610
598
611
let flag_info = req
599
612
. flag_resolve_info
@@ -706,17 +719,17 @@ mod tests {
706
719
let lg = logger. clone ( ) ;
707
720
let tx_thread = tx. clone ( ) ;
708
721
let chk_handle = thread:: spawn ( move || {
709
- for _ in 0 ..3 {
722
+ for _ in 0 ..10 {
710
723
thread:: sleep ( Duration :: from_millis ( 10 ) ) ;
711
- let req = lg. checkpoint ( ) . unwrap ( ) ;
712
- let _ = tx_thread. send ( req) ;
724
+ tx_thread. send ( lg. checkpoint ( ) ) . unwrap ( ) ;
713
725
}
714
726
} ) ;
715
727
716
728
chk_handle. join ( ) . unwrap ( ) ;
717
729
done. store ( true , Ordering :: Relaxed ) ;
718
730
let total_expected = handles. into_iter ( ) . map ( |h| h. join ( ) . unwrap ( ) ) . sum :: < i64 > ( ) ;
719
- let _ = tx. send ( logger. checkpoint ( ) . unwrap ( ) ) ;
731
+ // logger.checkpoint().iter().
732
+ tx. send ( logger. checkpoint ( ) ) . unwrap ( ) ;
720
733
721
734
// Aggregate all checkpoint outputs
722
735
let mut sum_variants: i64 = 0 ;
0 commit comments