1
+ #![ cfg( feature = "bilock" ) ]
2
+
3
+ use futures:: executor:: block_on;
1
4
use futures:: future;
2
5
use futures:: stream;
3
- use futures:: task;
6
+ use futures:: task:: { Context , Poll } ;
7
+ use futures:: Future ;
8
+ use futures:: StreamExt ;
9
+ use futures_test:: task:: noop_context;
4
10
use futures_util:: lock:: BiLock ;
11
+ use std:: pin:: Pin ;
5
12
use std:: thread;
6
13
7
- // mod support;
8
- // use support::*;
9
-
10
14
#[ test]
11
15
fn smoke ( ) {
12
- let future = future:: lazy ( |_ | {
16
+ let future = future:: lazy ( |cx | {
13
17
let ( a, b) = BiLock :: new ( 1 ) ;
14
18
15
19
{
16
- let mut lock = match a. poll_lock ( ) {
20
+ let mut lock = match a. poll_lock ( cx ) {
17
21
Poll :: Ready ( l) => l,
18
22
Poll :: Pending => panic ! ( "poll not ready" ) ,
19
23
} ;
20
24
assert_eq ! ( * lock, 1 ) ;
21
25
* lock = 2 ;
22
26
23
- assert ! ( b. poll_lock( ) . is_pending( ) ) ;
24
- assert ! ( a. poll_lock( ) . is_pending( ) ) ;
27
+ assert ! ( b. poll_lock( cx ) . is_pending( ) ) ;
28
+ assert ! ( a. poll_lock( cx ) . is_pending( ) ) ;
25
29
}
26
30
27
- assert ! ( b. poll_lock( ) . is_ready( ) ) ;
28
- assert ! ( a. poll_lock( ) . is_ready( ) ) ;
31
+ assert ! ( b. poll_lock( cx ) . is_ready( ) ) ;
32
+ assert ! ( a. poll_lock( cx ) . is_ready( ) ) ;
29
33
30
34
{
31
- let lock = match b. poll_lock ( ) {
35
+ let lock = match b. poll_lock ( cx ) {
32
36
Poll :: Ready ( l) => l,
33
37
Poll :: Pending => panic ! ( "poll not ready" ) ,
34
38
} ;
@@ -40,34 +44,32 @@ fn smoke() {
40
44
Ok :: < ( ) , ( ) > ( ( ) )
41
45
} ) ;
42
46
43
- assert ! ( task:: spawn( future)
44
- . poll_future_notify( & notify_noop( ) , 0 )
45
- . expect( "failure in poll" )
46
- . is_ready( ) ) ;
47
+ assert_eq ! ( block_on( future) , Ok ( ( ) ) ) ;
47
48
}
48
49
49
50
#[ test]
50
51
fn concurrent ( ) {
51
52
const N : usize = 10000 ;
53
+ let mut cx = noop_context ( ) ;
52
54
let ( a, b) = BiLock :: new ( 0 ) ;
53
55
54
56
let a = Increment { a : Some ( a) , remaining : N } ;
55
- let b = stream:: iter_ok ( 0 ..N ) . fold ( b, |b, _n| {
56
- b. lock ( ) . map ( | mut b| {
57
- * b += 1 ;
58
- b . unlock ( )
59
- } )
57
+ let b = stream:: iter ( 0 ..N ) . fold ( b, |b, _n| async {
58
+ let mut g = b. lock ( ) . await ;
59
+ * g += 1 ;
60
+ drop ( g ) ;
61
+ b
60
62
} ) ;
61
63
62
- let t1 = thread:: spawn ( move || a . wait ( ) ) ;
63
- let b = b . wait ( ) . expect ( "b error" ) ;
64
- let a = t1. join ( ) . unwrap ( ) . expect ( "a error" ) ;
64
+ let t1 = thread:: spawn ( move || block_on ( a ) ) ;
65
+ let b = block_on ( b ) ;
66
+ let a = t1. join ( ) . unwrap ( ) ;
65
67
66
- match a. poll_lock ( ) {
68
+ match a. poll_lock ( & mut cx ) {
67
69
Poll :: Ready ( l) => assert_eq ! ( * l, 2 * N ) ,
68
70
Poll :: Pending => panic ! ( "poll not ready" ) ,
69
71
}
70
- match b. poll_lock ( ) {
72
+ match b. poll_lock ( & mut cx ) {
71
73
Poll :: Ready ( l) => assert_eq ! ( * l, 2 * N ) ,
72
74
Poll :: Pending => panic ! ( "poll not ready" ) ,
73
75
}
@@ -80,22 +82,22 @@ fn concurrent() {
80
82
}
81
83
82
84
impl Future for Increment {
83
- type Item = BiLock < usize > ;
84
- type Error = ( ) ;
85
+ type Output = BiLock < usize > ;
85
86
86
- fn poll ( & mut self ) -> Poll < BiLock < usize > , ( ) > {
87
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < BiLock < usize > > {
87
88
loop {
88
89
if self . remaining == 0 {
89
- return Ok ( self . a . take ( ) . unwrap ( ) . into ( ) ) ;
90
+ return self . a . take ( ) . unwrap ( ) . into ( ) ;
90
91
}
91
92
92
- let a = self . a . as_ref ( ) . unwrap ( ) ;
93
- let mut a = match a. poll_lock ( ) {
93
+ let a = self . a . as_mut ( ) . unwrap ( ) ;
94
+ let mut a = match a. poll_lock ( cx ) {
94
95
Poll :: Ready ( l) => l,
95
- Poll :: Pending => return Ok ( Poll :: Pending ) ,
96
+ Poll :: Pending => return Poll :: Pending ,
96
97
} ;
97
- self . remaining -= 1 ;
98
98
* a += 1 ;
99
+ drop ( a) ;
100
+ self . remaining -= 1 ;
99
101
}
100
102
}
101
103
}
0 commit comments