@@ -17,9 +17,9 @@ use std::sync::Arc;
17
17
use foyer_common:: queue:: AsyncQueue ;
18
18
use foyer_intrusive:: { core:: adapter:: Link , eviction:: EvictionPolicy } ;
19
19
use itertools:: Itertools ;
20
- use tokio:: sync :: {
21
- mpsc :: { unbounded_channel , UnboundedReceiver , UnboundedSender } ,
22
- Mutex ,
20
+ use tokio:: {
21
+ sync :: { broadcast , mpsc , Mutex } ,
22
+ task :: JoinHandle ,
23
23
} ;
24
24
25
25
use crate :: {
@@ -38,7 +38,7 @@ pub struct FlushTask {
38
38
struct FlusherInner {
39
39
sequence : usize ,
40
40
41
- task_txs : Vec < UnboundedSender < FlushTask > > ,
41
+ task_txs : Vec < mpsc :: UnboundedSender < FlushTask > > ,
42
42
}
43
43
44
44
pub struct Flusher {
@@ -63,7 +63,9 @@ impl Flusher {
63
63
& self ,
64
64
buffers : Arc < AsyncQueue < Vec < u8 , A > > > ,
65
65
region_manager : Arc < RegionManager < A , D , E , EL > > ,
66
- ) where
66
+ stop_rxs : Vec < broadcast:: Receiver < ( ) > > ,
67
+ ) -> Vec < JoinHandle < ( ) > >
68
+ where
67
69
A : BufferAllocator ,
68
70
D : Device < IoBufferAllocator = A > ,
69
71
E : EvictionPolicy < RegionEpItemAdapter < EL > , Link = EL > ,
@@ -73,25 +75,30 @@ impl Flusher {
73
75
74
76
#[ allow( clippy:: type_complexity) ]
75
77
let ( mut txs, rxs) : (
76
- Vec < UnboundedSender < FlushTask > > ,
77
- Vec < UnboundedReceiver < FlushTask > > ,
78
- ) = ( 0 ..self . runners ) . map ( |_| unbounded_channel ( ) ) . unzip ( ) ;
78
+ Vec < mpsc :: UnboundedSender < FlushTask > > ,
79
+ Vec < mpsc :: UnboundedReceiver < FlushTask > > ,
80
+ ) = ( 0 ..self . runners ) . map ( |_| mpsc :: unbounded_channel ( ) ) . unzip ( ) ;
79
81
inner. task_txs . append ( & mut txs) ;
80
82
81
83
let runners = rxs
82
84
. into_iter ( )
83
- . map ( |rx| Runner {
84
- task_rx : rx,
85
+ . zip_eq ( stop_rxs. into_iter ( ) )
86
+ . map ( |( task_rx, stop_rx) | Runner {
87
+ task_rx,
85
88
buffers : buffers. clone ( ) ,
86
89
region_manager : region_manager. clone ( ) ,
90
+ stop_rx,
87
91
} )
88
92
. collect_vec ( ) ;
89
93
94
+ let mut handles = vec ! [ ] ;
90
95
for runner in runners {
91
- tokio:: spawn ( async move {
96
+ let handle = tokio:: spawn ( async move {
92
97
runner. run ( ) . await . unwrap ( ) ;
93
98
} ) ;
99
+ handles. push ( handle) ;
94
100
}
101
+ handles
95
102
}
96
103
97
104
pub fn runners ( & self ) -> usize {
@@ -113,10 +120,12 @@ where
113
120
E : EvictionPolicy < RegionEpItemAdapter < EL > , Link = EL > ,
114
121
EL : Link ,
115
122
{
116
- task_rx : UnboundedReceiver < FlushTask > ,
123
+ task_rx : mpsc :: UnboundedReceiver < FlushTask > ,
117
124
buffers : Arc < AsyncQueue < Vec < u8 , A > > > ,
118
125
119
126
region_manager : Arc < RegionManager < A , D , E , EL > > ,
127
+
128
+ stop_rx : broadcast:: Receiver < ( ) > ,
120
129
}
121
130
122
131
impl < A , D , E , EL > Runner < A , D , E , EL >
@@ -128,71 +137,79 @@ where
128
137
{
129
138
async fn run ( mut self ) -> Result < ( ) > {
130
139
loop {
131
- if let Some ( task) = self . task_rx . recv ( ) . await {
132
- // TODO(MrCroxx): seal buffer
140
+ tokio:: select! {
141
+ Some ( task) = self . task_rx. recv( ) => {
142
+ self . handle( task) . await ?;
143
+ }
144
+ _ = self . stop_rx. recv( ) => {
145
+ tracing:: info!( "[flusher] exit" ) ;
146
+ return Ok ( ( ) )
147
+ }
148
+ }
149
+ }
150
+ }
133
151
134
- tracing:: info!( "[flusher] receive flush task, region: {}" , task. region_id) ;
152
+ async fn handle ( & self , task : FlushTask ) -> Result < ( ) > {
153
+ tracing:: info!( "[flusher] receive flush task, region: {}" , task. region_id) ;
135
154
136
- let region = self . region_manager . region ( & task. region_id ) ;
155
+ let region = self . region_manager . region ( & task. region_id ) ;
137
156
138
- tracing:: trace!( "[flusher] step 1" ) ;
157
+ tracing:: trace!( "[flusher] step 1" ) ;
139
158
140
- {
141
- // step 1: write buffer back to device
142
- let slice = region. load ( .., 0 ) . await ?. unwrap ( ) ;
159
+ {
160
+ // step 1: write buffer back to device
161
+ let slice = region. load ( .., 0 ) . await ?. unwrap ( ) ;
143
162
144
- // wait all physical readers (from previous version) and writers done
145
- let guard = region. exclusive ( false , true , false ) . await ;
163
+ // wait all physical readers (from previous version) and writers done
164
+ let guard = region. exclusive ( false , true , false ) . await ;
146
165
147
- tracing:: trace!( "[flusher] write region {} back to device" , task. region_id) ;
166
+ tracing:: trace!( "[flusher] write region {} back to device" , task. region_id) ;
148
167
149
- let mut offset = 0 ;
150
- let len = region. device ( ) . io_size ( ) ;
151
- while offset < region. device ( ) . region_size ( ) {
152
- let start = offset;
153
- let end = std:: cmp:: min ( offset + len, region. device ( ) . region_size ( ) ) ;
168
+ let mut offset = 0 ;
169
+ let len = region. device ( ) . io_size ( ) ;
170
+ while offset < region. device ( ) . region_size ( ) {
171
+ let start = offset;
172
+ let end = std:: cmp:: min ( offset + len, region. device ( ) . region_size ( ) ) ;
154
173
155
- let s = unsafe { Slice :: new ( & slice. as_ref ( ) [ start..end] ) } ;
156
- region
157
- . device ( )
158
- . write ( s, region. id ( ) , offset as u64 , len)
159
- . await ?;
160
- offset += len;
161
- }
162
- drop ( guard) ;
163
- slice. destroy ( ) . await ;
164
- }
174
+ let s = unsafe { Slice :: new ( & slice. as_ref ( ) [ start..end] ) } ;
175
+ region
176
+ . device ( )
177
+ . write ( s, region. id ( ) , offset as u64 , len)
178
+ . await ?;
179
+ offset += len;
180
+ }
181
+ drop ( guard) ;
182
+ slice. destroy ( ) . await ;
183
+ }
184
+
185
+ tracing:: trace!( "[flusher] step 2" ) ;
165
186
166
- tracing:: trace!( "[flusher] step 2" ) ;
187
+ let buffer = {
188
+ // step 2: detach buffer
189
+ let mut guard = region. exclusive ( false , false , true ) . await ;
167
190
168
- let buffer = {
169
- // step 2: detach buffer
170
- let mut guard = region. exclusive ( false , false , true ) . await ;
191
+ let buffer = guard. detach_buffer ( ) ;
171
192
172
- let buffer = guard. detach_buffer ( ) ;
193
+ tracing:: trace!(
194
+ "[flusher] region {}, writers: {}, buffered readers: {}, physical readers: {}" ,
195
+ region. id( ) ,
196
+ guard. writers( ) ,
197
+ guard. buffered_readers( ) ,
198
+ guard. physical_readers( )
199
+ ) ;
173
200
174
- tracing:: trace!(
175
- "[flusher] region {}, writers: {}, buffered readers: {}, physical readers: {}" ,
176
- region. id( ) ,
177
- guard. writers( ) ,
178
- guard. buffered_readers( ) ,
179
- guard. physical_readers( )
180
- ) ;
201
+ drop ( guard) ;
202
+ buffer
203
+ } ;
181
204
182
- drop ( guard) ;
183
- buffer
184
- } ;
205
+ tracing:: trace!( "[flusher] step 3" ) ;
185
206
186
- tracing:: trace!( "[flusher] step 3" ) ;
207
+ // step 3: release buffer
208
+ self . buffers . release ( buffer) ;
209
+ self . region_manager . set_region_evictable ( & region. id ( ) ) . await ;
187
210
188
- // step 3: release buffer
189
- self . buffers . release ( buffer) ;
190
- self . region_manager . set_region_evictable ( & region. id ( ) ) . await ;
211
+ tracing:: info!( "[flusher] finish flush task, region: {}" , task. region_id) ;
191
212
192
- tracing:: info!( "[flusher] finish flush task, region: {}" , task. region_id) ;
193
- } else {
194
- return Ok ( ( ) ) ;
195
- }
196
- }
213
+ Ok ( ( ) )
197
214
}
198
215
}
0 commit comments