Skip to content

Commit

Permalink
[PERF] Parallel glob (#1897)
Browse files Browse the repository at this point in the history
  • Loading branch information
samster25 authored Feb 20, 2024
1 parent 9674214 commit 1a0e740
Showing 1 changed file with 45 additions and 19 deletions.
64 changes: 45 additions & 19 deletions src/daft-scan/src/glob.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::sync::Arc;
use std::{sync::Arc, vec};

use common_error::{DaftError, DaftResult};
use daft_core::schema::SchemaRef;
use daft_csv::CsvParseOptions;
use daft_io::{parse_url, FileMetadata, IOClient, IOStatsContext, IOStatsRef};
use daft_parquet::read::ParquetSchemaInferenceOptions;
use futures::{stream::BoxStream, StreamExt};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use snafu::Snafu;

use crate::{
Expand Down Expand Up @@ -81,6 +81,43 @@ fn run_glob(
Ok(Box::new(iterator))
}

fn run_glob_parallel(
glob_paths: Vec<String>,
io_client: Arc<IOClient>,
runtime: Arc<tokio::runtime::Runtime>,
io_stats: Option<IOStatsRef>,
) -> DaftResult<impl Iterator<Item = DaftResult<FileMetadata>>> {
let num_parallel_tasks = 64;

let owned_runtime = runtime.clone();
let boxstream = futures::stream::iter(glob_paths.into_iter().map(move |path| {
let (_, parsed_glob_path) = parse_url(&path).unwrap();
let glob_input = parsed_glob_path.as_ref().to_string();
let io_client = io_client.clone();
let io_stats = io_stats.clone();

runtime.spawn(async move {
let stream = io_client
.glob(glob_input, None, None, None, io_stats)
.await?;
let results = stream.collect::<Vec<_>>().await;
Result::<_, daft_io::Error>::Ok(futures::stream::iter(results))
})
}))
.buffered(num_parallel_tasks)
.map(|v| v.map_err(|e| daft_io::Error::JoinError { source: e })?)
.try_flatten()
.map(|v| Ok(v?))
.boxed();

// Construct a static-lifetime BoxStreamIterator
let iterator = BoxStreamIterator {
boxstream,
runtime_handle: owned_runtime.handle().clone(),
};
Ok(iterator)
}

impl GlobScanOperator {
pub fn try_new(
glob_paths: &[&str],
Expand Down Expand Up @@ -220,23 +257,12 @@ impl ScanOperator for GlobScanOperator {
self.glob_paths
));

// Run [`run_glob`] on each path and mux them into the same iterator
let files = self
.glob_paths
.clone()
.into_iter()
.flat_map(move |glob_path| {
match run_glob(
glob_path.as_str(),
None,
io_client.clone(),
io_runtime.clone(),
Some(io_stats.clone()),
) {
Ok(paths) => paths,
Err(err) => Box::new(vec![Err(err)].into_iter()),
}
});
let files = run_glob_parallel(
self.glob_paths.clone(),
io_client.clone(),
io_runtime.clone(),
Some(io_stats.clone()),
)?;

let file_format_config = self.file_format_config.clone();
let schema = self.schema.clone();
Expand Down

0 comments on commit 1a0e740

Please sign in to comment.