Skip to content

Commit e8c0226

Browse files
authored
switch to using scan_data version of scan for execute() as well (#265)
Previously we had two independent code paths for interacting with tables via a scan, `execute()` or `scan_data()`. `scan_data` is what most engines will use, and is a bit more complex in that it evaluates expressions over the returned add files. This meant that bugs like #261 could happen because our tests used `execute` which didn't catch the issue. This PR makes `execute` use `scan_data` under the hood. It's a bit more complex, but now we won't need to maintain two code paths. Until #271 merges, this PR will fail tests, because nullable columns are not filled in as expected.
1 parent 334af14 commit e8c0226

File tree

5 files changed

+135
-130
lines changed

5 files changed

+135
-130
lines changed

kernel/examples/read-table-multi-threaded/src/main.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ fn try_main() -> DeltaResult<()> {
172172
let scan_data = scan.scan_data(engine.as_ref())?;
173173

174174
// get any global state associated with this scan
175-
let global_state = scan.global_scan_state();
175+
let global_state = Arc::new(scan.global_scan_state());
176176

177177
// create the channels we'll use. record_batch_[t/r]x are used for the threads to send back the
178178
// processed RecordBatches to themain thread
@@ -221,7 +221,7 @@ fn try_main() -> DeltaResult<()> {
221221
// this is the work each thread does
222222
fn do_work(
223223
engine: Arc<dyn Engine>,
224-
scan_state: GlobalScanState,
224+
scan_state: Arc<GlobalScanState>,
225225
record_batch_tx: Sender<RecordBatch>,
226226
scan_file_rx: spmc::Receiver<ScanFile>,
227227
) {

kernel/src/scan/data_skipping.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
55
use tracing::debug;
66

77
use crate::actions::visitors::SelectionVectorVisitor;
8+
use crate::actions::{get_log_schema, ADD_NAME};
89
use crate::error::DeltaResult;
910
use crate::expressions::{BinaryOperator, Expression as Expr, UnaryOperator, VariadicOperator};
1011
use crate::schema::{DataType, PrimitiveType, SchemaRef, StructField, StructType};
@@ -242,7 +243,8 @@ impl DataSkippingFilter {
242243
// 3. The selection evaluator does DISTINCT(col(predicate), 'false') to produce true (= keep) when
243244
// the predicate is true/null and false (= skip) when the predicate is false.
244245
let select_stats_evaluator = engine.get_expression_handler().get_evaluator(
245-
stats_schema.clone(),
246+
// safety: kernel is very broken if we don't have the schema for Add actions
247+
get_log_schema().project(&[ADD_NAME]).unwrap(),
246248
STATS_EXPR.clone(),
247249
DataType::STRING,
248250
);

kernel/src/scan/log_replay.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ lazy_static! {
8282
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
8383
// indexes will be off
8484
pub(crate) static ref SCAN_ROW_SCHEMA: Arc<StructType> = Arc::new(StructType::new(vec!(
85-
StructField::new("path", DataType::STRING, true),
85+
StructField::new("path", DataType::STRING, false),
8686
StructField::new("size", DataType::LONG, true),
8787
StructField::new("modificationTime", DataType::LONG, true),
8888
StructField::new("stats", DataType::STRING, true),

kernel/src/scan/mod.rs

+128-125
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
//! Functionality to create and execute scans (reads) over data stored in a delta table
22
3+
use std::collections::HashMap;
34
use std::sync::Arc;
45

56
use itertools::Itertools;
67
use tracing::debug;
78
use url::Url;
89

9-
use self::log_replay::{log_replay_iter, scan_action_iter};
10+
use self::log_replay::scan_action_iter;
1011
use self::state::GlobalScanState;
1112
use crate::actions::deletion_vector::{split_vector, treemap_to_bools, DeletionVectorDescriptor};
12-
use crate::actions::{get_log_schema, Add, ADD_NAME, REMOVE_NAME};
13+
use crate::actions::{get_log_schema, ADD_NAME, REMOVE_NAME};
1314
use crate::column_mapping::ColumnMappingMode;
1415
use crate::expressions::{Expression, Scalar};
16+
use crate::scan::state::{DvInfo, Stats};
1517
use crate::schema::{DataType, Schema, SchemaRef, StructField, StructType};
1618
use crate::snapshot::Snapshot;
1719
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta};
@@ -177,30 +179,6 @@ impl Scan {
177179
&self.predicate
178180
}
179181

180-
/// Get an iterator of Add actions that should be included in scan for a query. This handles
181-
/// log-replay, reconciling Add and Remove actions, and applying data skipping (if possible)
182-
pub(crate) fn files(
183-
&self,
184-
engine: &dyn Engine,
185-
) -> DeltaResult<impl Iterator<Item = DeltaResult<Add>> + Send> {
186-
let commit_read_schema = get_log_schema().project(&[ADD_NAME, REMOVE_NAME])?;
187-
let checkpoint_read_schema = get_log_schema().project(&[ADD_NAME])?;
188-
189-
let log_iter = self.snapshot.log_segment.replay(
190-
engine,
191-
commit_read_schema,
192-
checkpoint_read_schema,
193-
self.predicate.clone(),
194-
)?;
195-
196-
Ok(log_replay_iter(
197-
engine,
198-
log_iter,
199-
&self.logical_schema,
200-
&self.predicate,
201-
))
202-
}
203-
204182
/// Get an iterator of [`EngineData`]s that should be included in scan for a query. This handles
205183
/// log-replay, reconciling Add and Remove actions, and applying data skipping (if
206184
/// possible). Each item in the returned iterator is a tuple of:
@@ -256,99 +234,87 @@ impl Scan {
256234
// This calls [`Scan::files`] to get a set of `Add` actions for the scan, and then uses the
257235
// `engine`'s [`crate::ParquetHandler`] to read the actual table data.
258236
pub fn execute(&self, engine: &dyn Engine) -> DeltaResult<Vec<ScanResult>> {
237+
struct ScanFile {
238+
path: String,
239+
size: i64,
240+
dv_info: DvInfo,
241+
partition_values: HashMap<String, String>,
242+
}
243+
fn scan_data_callback(
244+
batches: &mut Vec<ScanFile>,
245+
path: &str,
246+
size: i64,
247+
_: Option<Stats>,
248+
dv_info: DvInfo,
249+
partition_values: HashMap<String, String>,
250+
) {
251+
batches.push(ScanFile {
252+
path: path.to_string(),
253+
size,
254+
dv_info,
255+
partition_values,
256+
});
257+
}
258+
259259
debug!(
260260
"Executing scan with logical schema {:#?} and physical schema {:#?}",
261261
self.logical_schema, self.physical_schema
262262
);
263-
let output_schema = DataType::from(self.schema().clone());
264-
let parquet_handler = engine.get_parquet_handler();
265-
266-
let mut results: Vec<ScanResult> = vec![];
267-
let files = self.files(engine)?;
268-
for add_result in files {
269-
let add = add_result?;
270-
let meta = FileMeta {
271-
last_modified: add.modification_time,
272-
size: add.size as usize,
273-
location: self.snapshot.table_root.join(&add.path)?,
274-
};
275-
276-
let read_results =
277-
parquet_handler.read_parquet_files(&[meta], self.physical_schema.clone(), None)?;
278-
279-
let read_expression = if self.have_partition_cols
280-
|| self.snapshot.column_mapping_mode != ColumnMappingMode::None
281-
{
282-
// Loop over all fields and create the correct expressions for them
283-
let all_fields = self
284-
.all_fields
285-
.iter()
286-
.map(|field| match field {
287-
ColumnType::Partition(field_idx) => {
288-
let field = self.logical_schema.fields.get_index(*field_idx).ok_or_else(|| {
289-
Error::generic("logical schema did not contain expected field, can't execute scan")
290-
})?.1;
291-
let name = field.physical_name(self.snapshot.column_mapping_mode)?;
292-
let value_expression = parse_partition_value(
293-
add.partition_values.get(name),
294-
field.data_type(),
295-
)?;
296-
Ok::<Expression, Error>(Expression::Literal(value_expression))
297-
}
298-
ColumnType::Selected(field_name) => Ok(Expression::column(field_name)),
299-
})
300-
.try_collect()?;
301-
Some(Expression::Struct(all_fields))
302-
} else {
303-
None
304-
};
305-
debug!("Final expression for read: {read_expression:?}");
306-
307-
let dv_treemap = add
308-
.deletion_vector
309-
.as_ref()
310-
.map(|dv_descriptor| {
311-
let fs_client = engine.get_file_system_client();
312-
dv_descriptor.read(fs_client, &self.snapshot.table_root)
313-
})
314-
.transpose()?;
315-
316-
let mut dv_mask = dv_treemap.map(treemap_to_bools);
317-
318-
for read_result in read_results {
319-
let len = if let Ok(ref res) = read_result {
320-
res.length()
321-
} else {
322-
0
323-
};
324-
325-
let read_result = match read_expression {
326-
Some(ref read_expression) => engine
327-
.get_expression_handler()
328-
.get_evaluator(
329-
self.physical_schema.clone(),
330-
read_expression.clone(),
331-
output_schema.clone(),
332-
)
333-
.evaluate(read_result?.as_ref()),
334-
None => {
335-
// if we don't have partition columns, the result is just what we read
336-
read_result
337-
}
338-
};
339263

340-
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
341-
// will cover the following results
342-
let rest = split_vector(dv_mask.as_mut(), len, None);
343-
let scan_result = ScanResult {
344-
raw_data: read_result,
345-
mask: dv_mask,
346-
};
347-
dv_mask = rest;
348-
results.push(scan_result);
349-
}
264+
let global_state = Arc::new(self.global_scan_state());
265+
let scan_data = self.scan_data(engine)?;
266+
let mut scan_files = vec![];
267+
for data in scan_data {
268+
let (data, vec) = data?;
269+
scan_files =
270+
state::visit_scan_files(data.as_ref(), &vec, scan_files, scan_data_callback)?;
350271
}
351-
Ok(results)
272+
scan_files
273+
.into_iter()
274+
.map(|scan_file| -> DeltaResult<_> {
275+
let file_path = self.snapshot.table_root.join(&scan_file.path)?;
276+
let mut selection_vector = scan_file
277+
.dv_info
278+
.get_selection_vector(engine, &self.snapshot.table_root)?;
279+
let meta = FileMeta {
280+
last_modified: 0,
281+
size: scan_file.size as usize,
282+
location: file_path,
283+
};
284+
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
285+
&[meta],
286+
global_state.read_schema.clone(),
287+
None,
288+
)?;
289+
let gs = global_state.clone(); // Arc clone
290+
Ok(read_result_iter.into_iter().map(move |read_result| {
291+
let read_result = read_result?;
292+
// to transform the physical data into the correct logical form
293+
let logical = transform_to_logical_internal(
294+
engine,
295+
read_result,
296+
&gs,
297+
&scan_file.partition_values,
298+
&self.all_fields,
299+
self.have_partition_cols,
300+
);
301+
let len = logical.as_ref().map_or(0, |res| res.length());
302+
// need to split the dv_mask. what's left in dv_mask covers this result, and rest
303+
// will cover the following results. we `take()` out of `selection_vector` to avoid
304+
// trying to return a captured variable. We're going to reassign `selection_vector`
305+
// to `rest` in a moment anyway
306+
let mut sv = selection_vector.take();
307+
let rest = split_vector(sv.as_mut(), len, None);
308+
let result = ScanResult {
309+
raw_data: logical,
310+
mask: sv,
311+
};
312+
selection_vector = rest;
313+
Ok(result)
314+
}))
315+
})
316+
.flatten_ok()
317+
.try_collect()?
352318
}
353319
}
354320

@@ -438,17 +404,39 @@ pub fn selection_vector(
438404
Ok(treemap_to_bools(dv_treemap))
439405
}
440406

407+
/// Transform the raw data read from parquet into the correct logical form, based on the provided
408+
/// global scan state and partition values
441409
pub fn transform_to_logical(
442410
engine: &dyn Engine,
443411
data: Box<dyn EngineData>,
444412
global_state: &GlobalScanState,
445-
partition_values: &std::collections::HashMap<String, String>,
413+
partition_values: &HashMap<String, String>,
446414
) -> DeltaResult<Box<dyn EngineData>> {
447415
let (all_fields, _read_fields, have_partition_cols) = get_state_info(
448416
&global_state.logical_schema,
449417
&global_state.partition_columns,
450418
global_state.column_mapping_mode,
451419
)?;
420+
transform_to_logical_internal(
421+
engine,
422+
data,
423+
global_state,
424+
partition_values,
425+
&all_fields,
426+
have_partition_cols,
427+
)
428+
}
429+
430+
// We have this function because `execute` can save `all_fields` and `have_partition_cols` in the
431+
// scan, and then reuse them for each batch transform
432+
fn transform_to_logical_internal(
433+
engine: &dyn Engine,
434+
data: Box<dyn EngineData>,
435+
global_state: &GlobalScanState,
436+
partition_values: &std::collections::HashMap<String, String>,
437+
all_fields: &[ColumnType],
438+
have_partition_cols: bool,
439+
) -> DeltaResult<Box<dyn EngineData>> {
452440
let read_schema = global_state.read_schema.clone();
453441
if have_partition_cols || global_state.column_mapping_mode != ColumnMappingMode::None {
454442
// need to add back partition cols and/or fix-up mapped columns
@@ -596,8 +584,29 @@ mod tests {
596584
use crate::schema::PrimitiveType;
597585
use crate::Table;
598586

587+
fn get_files_for_scan(scan: Scan, engine: &dyn Engine) -> DeltaResult<Vec<String>> {
588+
let scan_data = scan.scan_data(engine)?;
589+
fn scan_data_callback(
590+
paths: &mut Vec<String>,
591+
path: &str,
592+
_size: i64,
593+
_: Option<Stats>,
594+
dv_info: DvInfo,
595+
_partition_values: HashMap<String, String>,
596+
) {
597+
paths.push(path.to_string());
598+
assert!(dv_info.deletion_vector.is_none());
599+
}
600+
let mut files = vec![];
601+
for data in scan_data {
602+
let (data, vec) = data?;
603+
files = state::visit_scan_files(data.as_ref(), &vec, files, scan_data_callback)?;
604+
}
605+
Ok(files)
606+
}
607+
599608
#[test]
600-
fn test_scan_files() {
609+
fn test_scan_data_paths() {
601610
let path =
602611
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
603612
let url = url::Url::from_directory_path(path).unwrap();
@@ -606,14 +615,12 @@ mod tests {
606615
let table = Table::new(url);
607616
let snapshot = table.snapshot(&engine, None).unwrap();
608617
let scan = snapshot.into_scan_builder().build().unwrap();
609-
let files: Vec<Add> = scan.files(&engine).unwrap().try_collect().unwrap();
610-
618+
let files = get_files_for_scan(scan, &engine).unwrap();
611619
assert_eq!(files.len(), 1);
612620
assert_eq!(
613-
&files[0].path,
621+
files[0],
614622
"part-00000-517f5d32-9c95-48e8-82b4-0229cc194867-c000.snappy.parquet"
615623
);
616-
assert!(&files[0].deletion_vector.is_none());
617624
}
618625

619626
#[test]
@@ -689,8 +696,7 @@ mod tests {
689696
let table = Table::new(url);
690697
let snapshot = table.snapshot(&engine, None)?;
691698
let scan = snapshot.into_scan_builder().build()?;
692-
let files: Vec<DeltaResult<Add>> = scan.files(&engine)?.collect();
693-
699+
let files = get_files_for_scan(scan, &engine)?;
694700
// test case:
695701
//
696702
// commit0: P and M, no add/remove
@@ -701,10 +707,7 @@ mod tests {
701707
//
702708
// thus replay should produce only file-70b
703709
assert_eq!(
704-
files
705-
.into_iter()
706-
.map(|file| file.unwrap().path)
707-
.collect::<Vec<_>>(),
710+
files,
708711
vec!["part-00000-70b1dcdf-0236-4f63-a072-124cdbafd8a0-c000.snappy.parquet"]
709712
);
710713
Ok(())

kernel/src/scan/state.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ pub struct GlobalScanState {
3030
/// this struct can be used by an engine to materialize a selection vector
3131
#[derive(Debug)]
3232
pub struct DvInfo {
33-
deletion_vector: Option<DeletionVectorDescriptor>,
33+
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
3434
}
3535

3636
/// Give engines an easy way to consume stats

0 commit comments

Comments
 (0)