@@ -13,8 +13,8 @@ use std::sync::atomic::Ordering;
13
13
/// read to/from the queue. This is useful in case we want to know whether or
14
14
/// not a particular value written to the queue has been read.
15
15
pub struct PositionMonitor < T : ' static > {
16
- read_epoch : Arc < AtomicU64 > ,
17
- fifo : Fifo < T > ,
16
+ pub ( crate ) read_epoch : Arc < AtomicU64 > ,
17
+ pub ( crate ) fifo : Fifo < T > ,
18
18
}
19
19
20
20
/// A read position in a queue.
@@ -27,7 +27,10 @@ impl<T> PositionMonitor<T> {
27
27
pub fn read_position ( & self ) -> ReadPosition {
28
28
let current = self . fifo . current_offsets ( Ordering :: Relaxed ) ;
29
29
let read_epoch = self . read_epoch . load ( Ordering :: Relaxed ) ;
30
- ReadPosition ( ( ( read_epoch as u64 ) << 32 ) | ( current. read_offset ( ) as u64 ) )
30
+ let read_epoch_shifted = read_epoch
31
+ . checked_shl ( 32 )
32
+ . expect ( "Reading from position of over 2^32 (2 to the power of 32). This is unsupported." ) ;
33
+ ReadPosition ( read_epoch_shifted | ( current. read_offset ( ) as u64 ) )
31
34
}
32
35
33
36
pub fn write_position ( & self ) -> WritePosition {
@@ -36,7 +39,10 @@ impl<T> PositionMonitor<T> {
36
39
if current. read_high_bit ( ) != current. write_high_bit ( ) {
37
40
write_epoch += 1 ;
38
41
}
39
- WritePosition ( ( ( write_epoch as u64 ) << 32 ) | ( current. write_offset ( ) as u64 ) )
42
+ let write_epoch_shifted = write_epoch
43
+ . checked_shl ( 32 )
44
+ . expect ( "Writing to position of over 2^32 (2 to the power of 32). This is unsupported." ) ;
45
+ WritePosition ( write_epoch_shifted | ( current. write_offset ( ) as u64 ) )
40
46
}
41
47
}
42
48
@@ -62,4 +68,4 @@ impl ReadPosition {
62
68
}
63
69
true
64
70
}
65
- }
71
+ }
0 commit comments