@@ -17,7 +17,7 @@ use crate::ptr;
1717use crate :: thread;
1818use crate :: time:: Instant ;
1919
20- use crate :: sync:: atomic:: { AtomicBool , AtomicIsize , AtomicUsize , Ordering } ;
20+ use crate :: sync:: atomic:: { AtomicBool , AtomicIsize , AtomicPtr , Ordering } ;
2121use crate :: sync:: mpsc:: blocking:: { self , SignalToken } ;
2222use crate :: sync:: mpsc:: spsc_queue as spsc;
2323use crate :: sync:: mpsc:: Receiver ;
@@ -27,15 +27,16 @@ const DISCONNECTED: isize = isize::MIN;
2727const MAX_STEALS : isize = 5 ;
2828#[ cfg( not( test) ) ]
2929const MAX_STEALS : isize = 1 << 20 ;
30+ const EMPTY : * mut u8 = ptr:: null_mut ( ) ; // initial state: no data, no blocked receiver
3031
3132pub struct Packet < T > {
3233 // internal queue for all messages
3334 queue : spsc:: Queue < Message < T > , ProducerAddition , ConsumerAddition > ,
3435}
3536
3637struct ProducerAddition {
37- cnt : AtomicIsize , // How many items are on this channel
38- to_wake : AtomicUsize , // SignalToken for the blocked thread to wake up
38+ cnt : AtomicIsize , // How many items are on this channel
39+ to_wake : AtomicPtr < u8 > , // SignalToken for the blocked thread to wake up
3940
4041 port_dropped : AtomicBool , // flag if the channel has been destroyed.
4142}
@@ -71,7 +72,7 @@ impl<T> Packet<T> {
7172 128 ,
7273 ProducerAddition {
7374 cnt : AtomicIsize :: new ( 0 ) ,
74- to_wake : AtomicUsize :: new ( 0 ) ,
75+ to_wake : AtomicPtr :: new ( EMPTY ) ,
7576
7677 port_dropped : AtomicBool :: new ( false ) ,
7778 } ,
@@ -147,17 +148,17 @@ impl<T> Packet<T> {
147148 // Consumes ownership of the 'to_wake' field.
148149 fn take_to_wake ( & self ) -> SignalToken {
149150 let ptr = self . queue . producer_addition ( ) . to_wake . load ( Ordering :: SeqCst ) ;
150- self . queue . producer_addition ( ) . to_wake . store ( 0 , Ordering :: SeqCst ) ;
151- assert ! ( ptr != 0 ) ;
152- unsafe { SignalToken :: cast_from_usize ( ptr) }
151+ self . queue . producer_addition ( ) . to_wake . store ( EMPTY , Ordering :: SeqCst ) ;
152+ assert ! ( ptr != EMPTY ) ;
153+ unsafe { SignalToken :: from_raw ( ptr) }
153154 }
154155
155156 // Decrements the count on the channel for a sleeper, returning the sleeper
156157 // back if it shouldn't sleep. Note that this is the location where we take
157158 // steals into account.
158159 fn decrement ( & self , token : SignalToken ) -> Result < ( ) , SignalToken > {
159- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
160- let ptr = unsafe { token. cast_to_usize ( ) } ;
160+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
161+ let ptr = unsafe { token. to_raw ( ) } ;
161162 self . queue . producer_addition ( ) . to_wake . store ( ptr, Ordering :: SeqCst ) ;
162163
163164 let steals = unsafe { ptr:: replace ( self . queue . consumer_addition ( ) . steals . get ( ) , 0 ) } ;
@@ -176,8 +177,8 @@ impl<T> Packet<T> {
176177 }
177178 }
178179
179- self . queue . producer_addition ( ) . to_wake . store ( 0 , Ordering :: SeqCst ) ;
180- Err ( unsafe { SignalToken :: cast_from_usize ( ptr) } )
180+ self . queue . producer_addition ( ) . to_wake . store ( EMPTY , Ordering :: SeqCst ) ;
181+ Err ( unsafe { SignalToken :: from_raw ( ptr) } )
181182 }
182183
183184 pub fn recv ( & self , deadline : Option < Instant > ) -> Result < T , Failure < T > > {
@@ -376,7 +377,7 @@ impl<T> Packet<T> {
376377 // of time until the data is actually sent.
377378 if was_upgrade {
378379 assert_eq ! ( unsafe { * self . queue. consumer_addition( ) . steals. get( ) } , 0 ) ;
379- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
380+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
380381 return Ok ( true ) ;
381382 }
382383
@@ -389,7 +390,7 @@ impl<T> Packet<T> {
389390 // If we were previously disconnected, then we know for sure that there
390391 // is no thread in to_wake, so just keep going
391392 let has_data = if prev == DISCONNECTED {
392- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
393+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
393394 true // there is data, that data is that we're disconnected
394395 } else {
395396 let cur = prev + steals + 1 ;
@@ -412,7 +413,7 @@ impl<T> Packet<T> {
412413 if prev < 0 {
413414 drop ( self . take_to_wake ( ) ) ;
414415 } else {
415- while self . queue . producer_addition ( ) . to_wake . load ( Ordering :: SeqCst ) != 0 {
416+ while self . queue . producer_addition ( ) . to_wake . load ( Ordering :: SeqCst ) != EMPTY {
416417 thread:: yield_now ( ) ;
417418 }
418419 }
@@ -451,6 +452,6 @@ impl<T> Drop for Packet<T> {
451452 // `to_wake`, so this assert cannot be removed with also removing
452453 // the `to_wake` assert.
453454 assert_eq ! ( self . queue. producer_addition( ) . cnt. load( Ordering :: SeqCst ) , DISCONNECTED ) ;
454- assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , 0 ) ;
455+ assert_eq ! ( self . queue. producer_addition( ) . to_wake. load( Ordering :: SeqCst ) , EMPTY ) ;
455456 }
456457}
0 commit comments