Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 76 additions & 55 deletions src/commands/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use crate::{
ColumnDictionaryConfig, ColumnEncodingConfig, DEFAULT_BLOOM_FILTER_FPP, DataFormat,
DictionaryMode, ListOutputsFormat, ParquetCompression, ParquetEncoding, ParquetStatistics,
ParquetWriterVersion, PartitionStrategy, SortSpec, TransformCommand, default_thread_budget,
io_strategies::{OutputFileInfo, output_strategy::SinkFactory, path_template::PathTemplate},
io_strategies::{
OutputFileInfo, input_strategy::InputStrategy, output_strategy::SinkFactory,
path_template::PathTemplate,
},
operations::{query::QueryOperation, sort::SortOperation},
pipeline::Pipeline,
sinks::{
Expand All @@ -18,6 +21,7 @@ use crate::{
arrow::ArrowDataSource, data_source::DataSource, parquet::ParquetDataSource,
vortex::VortexDataSource,
},
utils::memory::{estimate_sort_spill_reservation, sample_avg_row_bytes},
};
use anyhow::{Result, anyhow};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -178,70 +182,78 @@ pub async fn run(args: TransformCommand) -> Result<()> {
(from_many, true)
};

let setup_result: Result<()> = {
if !should_glob && input_paths.len() == 1 {
let input_path = &input_paths[0];
let detected_input_format = detect_format(input_path, input_format)?;
// resolve input paths (glob-expand if needed), build sources, and create InputStrategy
let input_strategy = if !should_glob && input_paths.len() == 1 {
let input_path = &input_paths[0];
let source = make_source(input_path, input_format)?;
InputStrategy::Single(source)
} else {
let mut expanded_paths = Vec::new();

for pattern in &input_paths {
for entry in glob(pattern)
.map_err(|e| anyhow!("Error expanding glob pattern {}: {}", pattern, e))?
{
expanded_paths.push(
entry
.map_err(|e| anyhow!("Error decoding file path: {}", e))?
.to_string_lossy()
.to_string(),
);
}
}

let source: Box<dyn DataSource> = match detected_input_format {
DataFormat::Arrow => Box::new(ArrowDataSource::new(input_path.clone())),
DataFormat::Parquet => Box::new(ParquetDataSource::new(input_path.clone())),
DataFormat::Vortex => Box::new(VortexDataSource::new(input_path.clone())),
};
expanded_paths.sort();
expanded_paths.dedup();

pipeline = pipeline.with_input_strategy_with_single_source(source);
Ok(())
} else {
let mut expanded_paths = Vec::new();

for pattern in &input_paths {
for entry in glob(pattern)
.map_err(|e| anyhow!("Error expanding glob pattern {}: {}", pattern, e))?
{
expanded_paths.push(
entry
.map_err(|e| anyhow!("Error decoding file path: {}", e))?
.to_string_lossy()
.to_string(),
if expanded_paths.is_empty() {
anyhow::bail!("No input files found matching patterns: {:?}", input_paths);
}

let mut sources: Vec<Box<dyn DataSource>> = Vec::new();
let mut schema: Option<SchemaRef> = None;
for input_path in &expanded_paths {
let source = make_source(input_path, input_format)?;
if let Some(ref schema) = schema {
let source_schema = source.schema()?;
if *schema != source_schema {
anyhow::bail!(
"Schema mismatch for input file {} (does not match other file(s))",
input_path
);
}
} else {
schema = Some(source.schema()?);
}
sources.push(source);
}
InputStrategy::Multiple(sources)
};

expanded_paths.sort();
expanded_paths.dedup();
// sample rows to estimate sort spill reservation before handing strategy to pipeline
if has_sort {
let avg_row_bytes = sample_avg_row_bytes(&input_strategy, 100_000).await?;

if expanded_paths.is_empty() {
anyhow::bail!("No input files found matching patterns: {:?}", input_paths);
}
if avg_row_bytes > 0 {
let total_rows = input_strategy.row_count().unwrap_or(0);
let total_in_memory_bytes = total_rows.saturating_mul(avg_row_bytes);

let mut sources: Vec<Box<dyn DataSource>> = Vec::new();
let mut schema: Option<SchemaRef> = None;
for input_path in expanded_paths {
let detected_input_format = detect_format(&input_path, input_format)?;
let source: Box<dyn DataSource> = match detected_input_format {
DataFormat::Arrow => Box::new(ArrowDataSource::new(input_path.clone())),
DataFormat::Parquet => Box::new(ParquetDataSource::new(input_path.clone())),
DataFormat::Vortex => Box::new(VortexDataSource::new(input_path.clone())),
};
if let Some(ref schema) = schema {
let source_schema = source.schema()?;
if *schema != source_schema {
anyhow::bail!(
"Schema mismatch for input file {} (does not match other file(s))",
&input_path
);
}
} else {
schema = Some(source.schema()?);
}
sources.push(source);
}
pipeline = pipeline.with_input_strategy_with_multiple_sources(sources);
Ok(())
let memory_limit = effective_memory_limit.unwrap_or(total_budget * 60 / 100);
let partitions = effective_target_partitions.unwrap_or(three_quarter_cpus);
let memory_per_partition = memory_limit / partitions.max(1);

let reservation = estimate_sort_spill_reservation(
avg_row_bytes,
total_in_memory_bytes,
memory_per_partition,
8192, // DataFusion default batch size
);

pipeline = pipeline.with_sort_spill_reservation_bytes(reservation);
}
};
}

setup_result?;
pipeline = pipeline.with_input_strategy(input_strategy);

let list_outputs_format = list_outputs;

Expand Down Expand Up @@ -483,6 +495,15 @@ fn to_title_case(s: &str) -> String {
.join(" ")
}

fn make_source(path: &str, input_format: Option<DataFormat>) -> Result<Box<dyn DataSource>> {
let format = detect_format(path, input_format)?;
Ok(match format {
DataFormat::Arrow => Box::new(ArrowDataSource::new(path.to_string())),
DataFormat::Parquet => Box::new(ParquetDataSource::new(path.to_string())),
DataFormat::Vortex => Box::new(VortexDataSource::new(path.to_string())),
})
}

fn detect_format(path: &str, explicit_format: Option<DataFormat>) -> Result<DataFormat> {
if let Some(format) = explicit_format {
return Ok(format);
Expand Down
13 changes: 13 additions & 0 deletions src/io_strategies/input_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,19 @@ impl InputStrategy {
}
}

pub fn row_count(&self) -> Result<usize> {
match self {
InputStrategy::Single(source) => source.row_count(),
InputStrategy::Multiple(sources) => {
let mut total = 0;
for source in sources {
total += source.row_count()?;
}
Ok(total)
}
}
}

pub async fn as_stream(&self, ctx: &mut SessionContext) -> Result<SendableRecordBatchStream> {
match self {
InputStrategy::Single(source) => source.as_stream_with_session_context(ctx).await,
Expand Down
28 changes: 15 additions & 13 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use crate::{
path_template::PathTemplate,
},
operations::data_operation::DataOperation,
sources::data_source::DataSource,
};

#[derive(Default)]
Expand All @@ -28,6 +27,7 @@ pub struct PipelineConfig {
pub target_partitions: Option<usize>,
pub spill_path: Option<Utf8PathBuf>,
pub spill_compression: SpillCompression,
pub sort_spill_reservation_bytes: Option<usize>,
}

#[derive(Default)]
Expand All @@ -45,18 +45,8 @@ impl Pipeline {
Self::default()
}

pub fn with_input_strategy_with_single_source(mut self, source: Box<dyn DataSource>) -> Self {
self.input_strategy = Some(InputStrategy::Single(source));

self
}

pub fn with_input_strategy_with_multiple_sources(
mut self,
sources: Vec<Box<dyn DataSource>>,
) -> Self {
self.input_strategy = Some(InputStrategy::Multiple(sources));

pub fn with_input_strategy(mut self, input_strategy: InputStrategy) -> Self {
self.input_strategy = Some(input_strategy);
self
}

Comment thread
coracuity marked this conversation as resolved.
Expand Down Expand Up @@ -163,6 +153,14 @@ impl Pipeline {
self
}

pub fn with_sort_spill_reservation_bytes(
mut self,
sort_spill_reservation_bytes: Option<usize>,
) -> Self {
self.config.sort_spill_reservation_bytes = sort_spill_reservation_bytes;
self
}

pub async fn execute(&mut self) -> Result<Vec<OutputFileInfo>> {
let mut ctx = self.build_session_context()?;
self.execute_with_session_context(&mut ctx).await
Expand Down Expand Up @@ -214,6 +212,10 @@ impl Pipeline {
cfg.options_mut().sql_parser.dialect = self.config.query_dialect.into();
cfg.options_mut().execution.spill_compression = self.config.spill_compression.into();

if let Some(reservation) = self.config.sort_spill_reservation_bytes {
cfg.options_mut().execution.sort_spill_reservation_bytes = reservation;
}

if let Some(target_partitions) = self.config.target_partitions {
cfg = cfg.with_target_partitions(target_partitions);
}
Expand Down
Loading
Loading