@@ -27,7 +27,7 @@ use async_trait::async_trait;
2727use bytes:: Bytes ;
2828use datafusion:: { datasource:: listing:: ListingTableUrl , execution:: runtime_env:: RuntimeEnvBuilder } ;
2929use fs_extra:: file:: CopyOptions ;
30- use futures:: { stream:: FuturesUnordered , TryStreamExt } ;
30+ use futures:: { stream:: FuturesUnordered , StreamExt , TryStreamExt } ;
3131use relative_path:: { RelativePath , RelativePathBuf } ;
3232use tokio:: fs:: { self , DirEntry } ;
3333use tokio_stream:: wrappers:: ReadDirStream ;
@@ -430,17 +430,16 @@ impl ObjectStorage for LocalFS {
430430 ) -> Result < HashMap < RelativePathBuf , Vec < Bytes > > , ObjectStorageError > {
431431 let mut correlations: HashMap < RelativePathBuf , Vec < Bytes > > = HashMap :: new ( ) ;
432432 let users_root_path = self . root . join ( USERS_ROOT_DIR ) ;
433- let directories = ReadDirStream :: new ( fs:: read_dir ( & users_root_path) . await ?) ;
434- let users : Vec < DirEntry > = directories. try_collect ( ) . await ? ;
435- for user in users {
433+ let mut directories = ReadDirStream :: new ( fs:: read_dir ( & users_root_path) . await ?) ;
434+ while let Some ( user ) = directories. next ( ) . await {
435+ let user = user? ;
436436 if !user. path ( ) . is_dir ( ) {
437437 continue ;
438438 }
439439 let correlations_path = users_root_path. join ( user. path ( ) ) . join ( "correlations" ) ;
440- let directories = ReadDirStream :: new ( fs:: read_dir ( & correlations_path) . await ?) ;
441- let correlations_files: Vec < DirEntry > = directories. try_collect ( ) . await ?;
442- for correlation in correlations_files {
443- let correlation_absolute_path = correlation. path ( ) ;
440+ let mut files = ReadDirStream :: new ( fs:: read_dir ( & correlations_path) . await ?) ;
441+ while let Some ( correlation) = files. next ( ) . await {
442+ let correlation_absolute_path = correlation?. path ( ) ;
444443 let file = fs:: read ( correlation_absolute_path. clone ( ) ) . await ?;
445444 let correlation_relative_path = correlation_absolute_path
446445 . strip_prefix ( self . root . as_path ( ) )
0 commit comments