Skip to content

Commit

Permalink
[PERF] Make merging of ScanTasks be more conservative when provided w…
Browse files Browse the repository at this point in the history
…ith a LIMIT (#2758)

Co-authored-by: Jay Chia <[email protected]@users.noreply.github.com>
  • Loading branch information
jaychia and Jay Chia authored Aug 31, 2024
1 parent 749554c commit 3a7a0b4
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 10 deletions.
3 changes: 2 additions & 1 deletion src/daft-plan/src/physical_planner/translate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ pub(super) fn translate_single_logical_node(
);

// Apply transformations on the ScanTasks to optimize
let scan_tasks = daft_scan::scan_task_iters::merge_by_sizes(scan_tasks, cfg);
let scan_tasks =
daft_scan::scan_task_iters::merge_by_sizes(scan_tasks, pushdowns, cfg);
let scan_tasks = scan_tasks.collect::<DaftResult<Vec<_>>>()?;
if scan_tasks.is_empty() {
let clustering_spec =
Expand Down
68 changes: 59 additions & 9 deletions src/daft-scan/src/scan_task_iters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ use parquet2::metadata::RowGroupList;
use crate::{
file_format::{FileFormatConfig, ParquetSourceConfig},
storage_config::StorageConfig,
ChunkSpec, DataSource, ScanTask, ScanTaskRef,
ChunkSpec, DataSource, Pushdowns, ScanTask, ScanTaskRef,
};

type BoxScanTaskIter<'a> = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>> + 'a>;

/// Coalesces ScanTasks by their [`ScanTask::size_bytes()`]
/// Coalesces ScanTasks by their [`ScanTask::estimate_in_memory_size_bytes()`]
///
/// NOTE: `min_size_bytes` and `max_size_bytes` are only parameters for the algorithm used for merging ScanTasks,
/// and do not provide any guarantees about the sizes of ScanTasks yielded by the resultant iterator.
Expand All @@ -28,35 +28,80 @@ type BoxScanTaskIter<'a> = Box<dyn Iterator<Item = DaftResult<ScanTaskRef>> + 'a
/// * `max_size_bytes`: Maximum size in bytes of a ScanTask, capping the maximum size of a merged ScanTask
pub fn merge_by_sizes<'a>(
scan_tasks: BoxScanTaskIter<'a>,
pushdowns: &Pushdowns,
cfg: &'a DaftExecutionConfig,
) -> BoxScanTaskIter<'a> {
Box::new(MergeByFileSize {
iter: scan_tasks,
cfg,
accumulator: None,
})
if let Some(limit) = pushdowns.limit {
// If LIMIT pushdown is present, perform a more conservative merge using the estimated size of the LIMIT
let mut scan_tasks = scan_tasks.peekable();
let first_scantask = scan_tasks
.peek()
.and_then(|x| x.as_ref().map(|x| x.clone()).ok());
if let Some(first_scantask) = first_scantask {
let estimated_bytes_for_reading_limit_rows = first_scantask
.as_ref()
.estimate_in_memory_size_bytes(Some(cfg))
.and_then(|est_materialized_bytes| {
first_scantask
.as_ref()
.approx_num_rows(Some(cfg))
.map(|approx_num_rows| {
(est_materialized_bytes as f64) / approx_num_rows * (limit as f64)
})
});
if let Some(limit_bytes) = estimated_bytes_for_reading_limit_rows {
return Box::new(MergeByFileSize {
iter: Box::new(scan_tasks),
cfg,
target_upper_bound_size_bytes: (limit_bytes * 1.5) as usize,
target_lower_bound_size_bytes: (limit_bytes / 2.) as usize,
accumulator: None,
}) as BoxScanTaskIter;
}
}
// If we are unable to determine an estimation on the LIMIT size, so we don't perform a merge
Box::new(scan_tasks)
} else {
Box::new(MergeByFileSize {
iter: scan_tasks,
cfg,
target_upper_bound_size_bytes: cfg.scan_tasks_max_size_bytes,
target_lower_bound_size_bytes: cfg.scan_tasks_min_size_bytes,
accumulator: None,
}) as BoxScanTaskIter
}
}

struct MergeByFileSize<'a> {
iter: BoxScanTaskIter<'a>,
cfg: &'a DaftExecutionConfig,

// The target upper/lower bound for in-memory size_bytes of a merged ScanTask
target_upper_bound_size_bytes: usize,
target_lower_bound_size_bytes: usize,

// Current element being accumulated on
accumulator: Option<ScanTaskRef>,
}

impl<'a> MergeByFileSize<'a> {
/// Returns whether or not the current accumulator is "ready" to be emitted as a finalized merged ScanTask
///
/// "Readiness" is determined by a combination of factors based on how large the accumulated ScanTask is
/// in estimated bytes, as well as other factors including any limit pushdowns.
fn accumulator_ready(&self) -> bool {
// Emit the accumulator as soon as it is bigger than the specified `target_lower_bound_size_bytes`
if let Some(acc) = &self.accumulator
&& let Some(acc_bytes) = acc.estimate_in_memory_size_bytes(Some(self.cfg))
&& acc_bytes >= self.cfg.scan_tasks_min_size_bytes
&& acc_bytes >= self.target_lower_bound_size_bytes
{
true
} else {
false
}
}

/// Checks if the current accumulator can be merged with the provided ScanTask
fn can_merge(&self, other: &ScanTask) -> bool {
let accumulator = self
.accumulator
Expand All @@ -68,12 +113,13 @@ impl<'a> MergeByFileSize<'a> {
&& other.storage_config == accumulator.storage_config
&& other.pushdowns == accumulator.pushdowns;

// Merge only if the resultant accumulator is smaller than the targeted upper bound
let sum_smaller_than_max_size_bytes = if let Some(child_bytes) =
other.estimate_in_memory_size_bytes(Some(self.cfg))
&& let Some(accumulator_bytes) =
accumulator.estimate_in_memory_size_bytes(Some(self.cfg))
{
child_bytes + accumulator_bytes <= self.cfg.scan_tasks_max_size_bytes
child_bytes + accumulator_bytes <= self.target_upper_bound_size_bytes
} else {
false
};
Expand All @@ -87,6 +133,7 @@ impl<'a> Iterator for MergeByFileSize<'a> {

fn next(&mut self) -> Option<Self::Item> {
loop {
// Create accumulator if not already present
if self.accumulator.is_none() {
self.accumulator = match self.iter.next() {
Some(Ok(item)) => Some(item),
Expand All @@ -95,6 +142,7 @@ impl<'a> Iterator for MergeByFileSize<'a> {
};
}

// Emit accumulator if ready
if self.accumulator_ready() {
return self.accumulator.take().map(Ok);
}
Expand All @@ -105,6 +153,7 @@ impl<'a> Iterator for MergeByFileSize<'a> {
None => return self.accumulator.take().map(Ok),
};

// Emit accumulator if `next_item` cannot be merged
if next_item
.estimate_in_memory_size_bytes(Some(self.cfg))
.is_none()
Expand All @@ -113,6 +162,7 @@ impl<'a> Iterator for MergeByFileSize<'a> {
return self.accumulator.replace(next_item).map(Ok);
}

// Merge into a new accumulator
self.accumulator = Some(Arc::new(
ScanTask::merge(
self.accumulator
Expand Down
24 changes: 24 additions & 0 deletions tests/io/test_merge_scan_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,27 @@ def test_merge_scan_task_below_min(csv_files):
assert (
df.num_partitions() == 1
), "Should have 1 partition [(CSV1, CSV2, CSV3)] since both merges are below the minimum and maximum"


def test_merge_scan_task_limit_override(csv_files):
# A LIMIT operation should override merging of scan tasks, making it only merge up-to the estimated size of the limit
#
# With a very small CSV inflation factor, the merger will think that these CSVs provide very few rows of data and will be more aggressive
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=17,
scan_tasks_max_size_bytes=20,
csv_inflation_factor=0.1,
):
df = daft.read_csv(str(csv_files)).limit(1)
assert (
df.num_partitions() == 1
), "Should have 1 partitions [(CSV1, CSV2, CSV3)] since we have a limit 1 but are underestimating the size of data of the CSVs"

# With a very large CSV inflation factor, the merger will think that these CSVs provide more rows of data and will be more conservative
with daft.execution_config_ctx(
scan_tasks_min_size_bytes=17,
scan_tasks_max_size_bytes=20,
csv_inflation_factor=2.0,
):
df = daft.read_csv(str(csv_files)).limit(1)
assert df.num_partitions() == 3, "Should have 3 partitions [(CSV1, CSV2, CSV3)] since we have a limit 1"

0 comments on commit 3a7a0b4

Please sign in to comment.