@@ -51,7 +51,7 @@ use crate::Msg;
51
51
use crossbeam_channel:: { bounded, SendTimeoutError , Sender } ;
52
52
use std:: io;
53
53
use std:: io:: Write ;
54
- use std:: sync:: atomic:: AtomicU64 ;
54
+ use std:: sync:: atomic:: AtomicUsize ;
55
55
use std:: sync:: atomic:: Ordering ;
56
56
use std:: sync:: Arc ;
57
57
use std:: thread:: JoinHandle ;
@@ -125,11 +125,20 @@ pub struct WorkerGuard {
125
125
/// [fmt]: https://docs.rs/tracing-subscriber/latest/tracing_subscriber/fmt/index.html
126
126
#[ derive( Clone , Debug ) ]
127
127
pub struct NonBlocking {
128
- error_counter : Arc < AtomicU64 > ,
128
+ error_counter : ErrorCounter ,
129
129
channel : Sender < Msg > ,
130
130
is_lossy : bool ,
131
131
}
132
132
133
+ /// Tracks the number of times a log line was dropped by the background thread.
134
+ ///
135
+ /// If the non-blocking writer is not configured in [lossy mode], the error
136
+ /// count should always be 0.
137
+ ///
138
+ /// [lossy mode]: NonBlockingBuilder::lossy
139
+ #[ derive( Clone , Debug ) ]
140
+ pub struct ErrorCounter ( Arc < AtomicUsize > ) ;
141
+
133
142
impl NonBlocking {
134
143
/// Returns a new `NonBlocking` writer wrapping the provided `writer`.
135
144
///
@@ -158,7 +167,7 @@ impl NonBlocking {
158
167
(
159
168
Self {
160
169
channel : sender,
161
- error_counter : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
170
+ error_counter : ErrorCounter ( Arc :: new ( AtomicUsize :: new ( 0 ) ) ) ,
162
171
is_lossy,
163
172
} ,
164
173
worker_guard,
@@ -167,7 +176,7 @@ impl NonBlocking {
167
176
168
177
/// Returns a counter for the number of times logs where dropped. This will always return zero if
169
178
/// `NonBlocking` is not lossy.
170
- pub fn error_counter ( & self ) -> Arc < AtomicU64 > {
179
+ pub fn error_counter ( & self ) -> ErrorCounter {
171
180
self . error_counter . clone ( )
172
181
}
173
182
}
@@ -219,7 +228,7 @@ impl std::io::Write for NonBlocking {
219
228
let buf_size = buf. len ( ) ;
220
229
if self . is_lossy {
221
230
if self . channel . try_send ( Msg :: Line ( buf. to_vec ( ) ) ) . is_err ( ) {
222
- self . error_counter . fetch_add ( 1 , Ordering :: Release ) ;
231
+ self . error_counter . incr_saturating ( ) ;
223
232
}
224
233
} else {
225
234
return match self . channel . send ( Msg :: Line ( buf. to_vec ( ) ) ) {
@@ -280,6 +289,43 @@ impl Drop for WorkerGuard {
280
289
}
281
290
}
282
291
292
+ // === impl ErrorCounter ===
293
+
294
+ impl ErrorCounter {
295
+ /// Returns the number of log lines that have been dropped.
296
+ ///
297
+ /// If the non-blocking writer is not configured in [lossy mode], the error
298
+ /// count should always be 0.
299
+ ///
300
+ /// [lossy mode]: NonBlockingBuilder::lossy
301
+ pub fn dropped_lines ( & self ) -> usize {
302
+ self . 0 . load ( Ordering :: Acquire )
303
+ }
304
+
305
+ fn incr_saturating ( & self ) {
306
+ let mut curr = self . 0 . load ( Ordering :: Acquire ) ;
307
+ // We don't need to enter the CAS loop if the current value is already
308
+ // `usize::MAX`.
309
+ if curr == usize:: MAX {
310
+ return ;
311
+ }
312
+
313
+ // This is implemented as a CAS loop rather than as a simple
314
+ // `fetch_add`, because we don't want to wrap on overflow. Instead, we
315
+ // need to ensure that saturating addition is performed.
316
+ loop {
317
+ let val = curr. saturating_add ( 1 ) ;
318
+ match self
319
+ . 0
320
+ . compare_exchange ( curr, val, Ordering :: AcqRel , Ordering :: Acquire )
321
+ {
322
+ Ok ( _) => return ,
323
+ Err ( actual) => curr = actual,
324
+ }
325
+ }
326
+ }
327
+ }
328
+
283
329
#[ cfg( test) ]
284
330
mod test {
285
331
use super :: * ;
@@ -322,7 +368,7 @@ mod test {
322
368
let error_count = non_blocking. error_counter ( ) ;
323
369
324
370
non_blocking. write_all ( b"Hello" ) . expect ( "Failed to write" ) ;
325
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
371
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
326
372
327
373
let handle = thread:: spawn ( move || {
328
374
non_blocking. write_all ( b", World" ) . expect ( "Failed to write" ) ;
@@ -331,7 +377,7 @@ mod test {
331
377
// Sleep a little to ensure previously spawned thread gets blocked on write.
332
378
thread:: sleep ( Duration :: from_millis ( 100 ) ) ;
333
379
// We should not drop logs when blocked.
334
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
380
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
335
381
336
382
// Read the first message to unblock sender.
337
383
let mut line = rx. recv ( ) . unwrap ( ) ;
@@ -366,17 +412,17 @@ mod test {
366
412
367
413
// First write will not block
368
414
write_non_blocking ( & mut non_blocking, b"Hello" ) ;
369
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
415
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
370
416
371
417
// Second write will not block as Worker will have called `recv` on channel.
372
418
// "Hello" is not yet consumed. MockWriter call to write_all will block until
373
419
// "Hello" is consumed.
374
420
write_non_blocking ( & mut non_blocking, b", World" ) ;
375
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
421
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
376
422
377
423
// Will sit in NonBlocking channel's buffer.
378
424
write_non_blocking ( & mut non_blocking, b"Test" ) ;
379
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
425
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
380
426
381
427
// Allow a line to be written. "Hello" message will be consumed.
382
428
// ", World" will be able to write to MockWriter.
@@ -386,12 +432,12 @@ mod test {
386
432
387
433
// This will block as NonBlocking channel is full.
388
434
write_non_blocking ( & mut non_blocking, b"Universe" ) ;
389
- assert_eq ! ( 1 , error_count. load ( Ordering :: Acquire ) ) ;
435
+ assert_eq ! ( 1 , error_count. dropped_lines ( ) ) ;
390
436
391
437
// Finally the second message sent will be consumed.
392
438
let line = rx. recv ( ) . unwrap ( ) ;
393
439
assert_eq ! ( line, ", World" ) ;
394
- assert_eq ! ( 1 , error_count. load ( Ordering :: Acquire ) ) ;
440
+ assert_eq ! ( 1 , error_count. dropped_lines ( ) ) ;
395
441
}
396
442
397
443
#[ test]
@@ -427,6 +473,6 @@ mod test {
427
473
}
428
474
429
475
assert_eq ! ( 10 , hello_count) ;
430
- assert_eq ! ( 0 , error_count. load ( Ordering :: Acquire ) ) ;
476
+ assert_eq ! ( 0 , error_count. dropped_lines ( ) ) ;
431
477
}
432
478
}
0 commit comments