File tree 1 file changed +10
-4
lines changed
src/query/service/src/api/rpc/exchange
1 file changed +10
-4
lines changed Original file line number Diff line number Diff line change @@ -233,12 +233,18 @@ impl Processor for ExchangeShuffleTransform {
233
233
return Ok ( Event :: Finished ) ;
234
234
}
235
235
236
- if self . finished_inputs == self . inputs . len ( ) && self . buffer . is_all_empty ( ) {
237
- for output in & self . outputs {
238
- output. port . finish ( ) ;
236
+ if self . finished_inputs == self . inputs . len ( ) {
237
+ for ( index, output) in self . outputs . iter_mut ( ) . enumerate ( ) {
238
+ if self . buffer . is_empty ( index) && output. status != PortStatus :: Finished {
239
+ self . finished_outputs += 1 ;
240
+ output. status = PortStatus :: Finished ;
241
+ output. port . finish ( ) ;
242
+ }
239
243
}
240
244
241
- return Ok ( Event :: Finished ) ;
245
+ if self . buffer . is_all_empty ( ) {
246
+ return Ok ( Event :: Finished ) ;
247
+ }
242
248
}
243
249
244
250
match self . waiting_outputs . is_empty ( ) {
You can’t perform that action at this time.
0 commit comments