Skip to content

Commit 334af14

Browse files
nicklanscovich
andauthored
Push parquet select to leaves, add correct reordering (#271)
This PR makes us finally have correct semantics when we are asked to read a schema that's not the same as the schema in the parquet file. In particular this adds: - Correct sorting of columns to match the specified schema (previous code was actually not correctly doing this) - Identification of leaf fields that are being selected, so we only read exactly what's asked for - Sorting a struct inside a list. - Detection of requested columns that don't exist in the parquet file. - If nullable, note and then fill them in with a column of null - Otherwise, error since we're requesting a missing column that can't be null - Detection of timestamp columns that need to be cast to match the delta specification, and the code to do the actual casting This turns out to be _way harder_ than anticipated, so its a lot of code. Currently does not support reordering things inside `map`s. This is a complex PR as it is, and we don't need support for that just yet. Map content will just be passed through as read from the file. --------- Co-authored-by: Ryan Johnson <[email protected]>
1 parent faa553d commit 334af14

File tree

35 files changed

+1489
-188
lines changed

35 files changed

+1489
-188
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ version = "0.1.1"
2424
arrow = { version = "^52.0" }
2525
arrow-arith = { version = "^52.0" }
2626
arrow-array = { version = "^52.0" }
27+
arrow-cast = { version = "^52.0" }
2728
arrow-data = { version = "^52.0" }
2829
arrow-ord = { version = "^52.0" }
2930
arrow-json = { version = "^52.0" }

acceptance/src/data.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ pub fn sort_record_batch(batch: RecordBatch) -> DeltaResult<RecordBatch> {
6060
Ok(RecordBatch::try_new(batch.schema(), columns)?)
6161
}
6262

63-
static SKIPPED_TESTS: &[&str; 1] = &[
64-
// For multi_partitioned_2: The golden table stores the timestamp as an INT96 (which is
65-
// nanosecond precision), while the spec says we should read partition columns as
66-
// microseconds. This means the read and golden data don't line up. When this is released in
67-
// `dat` upstream, we can stop skipping this test
63+
static SKIPPED_TESTS: &[&str; 2] = &[
64+
// For all_primitive_types and multi_partitioned_2: The golden table stores the timestamp as an
65+
// INT96 (which is nanosecond precision), while the spec says we should read partition columns
66+
// as microseconds. This means the read and golden data don't line up. When this is released in
67+
// `dat` upstream, we can stop skipping these tests
68+
"all_primitive_types",
6869
"multi_partitioned_2",
6970
];
7071

ffi/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ pub enum KernelError {
263263
InvalidTableLocationError,
264264
InvalidDecimalError,
265265
InvalidStructDataError,
266+
InternalError,
266267
}
267268

268269
impl From<Error> for KernelError {
@@ -303,6 +304,7 @@ impl From<Error> for KernelError {
303304
Error::InvalidTableLocation(_) => KernelError::InvalidTableLocationError,
304305
Error::InvalidDecimal(_) => KernelError::InvalidDecimalError,
305306
Error::InvalidStructData(_) => KernelError::InvalidStructDataError,
307+
Error::InternalError(_) => KernelError::InternalError,
306308
Error::Backtraced {
307309
source,
308310
backtrace: _,

ffi/src/scan.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::ffi::c_void;
55
use std::sync::{Arc, Mutex};
66

77
use delta_kernel::scan::state::{visit_scan_files, DvInfo, GlobalScanState};
8-
use delta_kernel::scan::{Scan, ScanBuilder, ScanData};
8+
use delta_kernel::scan::{Scan, ScanData};
99
use delta_kernel::schema::Schema;
1010
use delta_kernel::snapshot::Snapshot;
1111
use delta_kernel::{DeltaResult, EngineData, Error};

kernel/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.1.1" }
3737
visibility = "0.1.0"
3838

3939
# Used in default engine
40-
arrow-array = { workspace = true, optional = true }
40+
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
4141
arrow-select = { workspace = true, optional = true }
4242
arrow-arith = { workspace = true, optional = true }
43+
arrow-cast = { workspace = true, optional = true }
4344
arrow-json = { workspace = true, optional = true }
4445
arrow-ord = { workspace = true, optional = true }
4546
arrow-schema = { workspace = true, optional = true }
@@ -67,6 +68,7 @@ default-engine = [
6768
"arrow-conversion",
6869
"arrow-expression",
6970
"arrow-array",
71+
"arrow-cast",
7072
"arrow-json",
7173
"arrow-schema",
7274
"arrow-select",
@@ -80,6 +82,7 @@ default-engine = [
8082

8183
developer-visibility = []
8284
sync-engine = [
85+
"arrow-cast",
8386
"arrow-conversion",
8487
"arrow-expression",
8588
"arrow-array",

kernel/src/engine/arrow_expression.rs

+1-88
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ use itertools::Itertools;
1717

1818
use super::arrow_conversion::LIST_ARRAY_ROOT;
1919
use crate::engine::arrow_data::ArrowEngineData;
20+
use crate::engine::arrow_utils::ensure_data_types;
2021
use crate::error::{DeltaResult, Error};
2122
use crate::expressions::{BinaryOperator, Expression, Scalar, UnaryOperator, VariadicOperator};
2223
use crate::schema::{DataType, PrimitiveType, SchemaRef};
23-
use crate::utils::require;
2424
use crate::{EngineData, ExpressionEvaluator, ExpressionHandler};
2525

2626
// TODO leverage scalars / Datum
@@ -161,93 +161,6 @@ fn column_as_struct<'a>(
161161
.ok_or(ArrowError::SchemaError(format!("{} is not a struct", name)))
162162
}
163163

164-
fn make_arrow_error(s: String) -> Error {
165-
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s))
166-
}
167-
168-
/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type"
169-
/// is the same, but does so recursively into structs, and ensures lists and maps have the correct
170-
/// associated types as well. This returns an `Ok(())` if the types are compatible, or an error if
171-
/// the types do not match. If there is a `struct` type included, we only ensure that the named
172-
/// fields that the kernel is asking for exist, and that for those fields the types
173-
/// match. Un-selected fields are ignored.
174-
fn ensure_data_types(kernel_type: &DataType, arrow_type: &ArrowDataType) -> DeltaResult<()> {
175-
match (kernel_type, arrow_type) {
176-
(DataType::Primitive(_), _) if arrow_type.is_primitive() => Ok(()),
177-
(DataType::Primitive(PrimitiveType::Boolean), ArrowDataType::Boolean)
178-
| (DataType::Primitive(PrimitiveType::String), ArrowDataType::Utf8)
179-
| (DataType::Primitive(PrimitiveType::Binary), ArrowDataType::Binary) => {
180-
// strings, bools, and binary aren't primitive in arrow
181-
Ok(())
182-
}
183-
(
184-
DataType::Primitive(PrimitiveType::Decimal(kernel_prec, kernel_scale)),
185-
ArrowDataType::Decimal128(arrow_prec, arrow_scale),
186-
) if arrow_prec == kernel_prec && *arrow_scale == *kernel_scale as i8 => {
187-
// decimal isn't primitive in arrow. cast above is okay as we limit range
188-
Ok(())
189-
}
190-
(DataType::Array(inner_type), ArrowDataType::List(arrow_list_type)) => {
191-
let kernel_array_type = &inner_type.element_type;
192-
let arrow_list_type = arrow_list_type.data_type();
193-
ensure_data_types(kernel_array_type, arrow_list_type)
194-
}
195-
(DataType::Map(kernel_map_type), ArrowDataType::Map(arrow_map_type, _)) => {
196-
if let ArrowDataType::Struct(fields) = arrow_map_type.data_type() {
197-
let mut fiter = fields.iter();
198-
if let Some(key_type) = fiter.next() {
199-
ensure_data_types(&kernel_map_type.key_type, key_type.data_type())?;
200-
} else {
201-
return Err(make_arrow_error(
202-
"Arrow map struct didn't have a key type".to_string(),
203-
));
204-
}
205-
if let Some(value_type) = fiter.next() {
206-
ensure_data_types(&kernel_map_type.value_type, value_type.data_type())?;
207-
} else {
208-
return Err(make_arrow_error(
209-
"Arrow map struct didn't have a value type".to_string(),
210-
));
211-
}
212-
Ok(())
213-
} else {
214-
Err(make_arrow_error(
215-
"Arrow map type wasn't a struct.".to_string(),
216-
))
217-
}
218-
}
219-
(DataType::Struct(kernel_fields), ArrowDataType::Struct(arrow_fields)) => {
220-
// build a list of kernel fields that matches the order of the arrow fields
221-
let mapped_fields = arrow_fields
222-
.iter()
223-
.flat_map(|f| kernel_fields.fields.get(f.name()));
224-
225-
// keep track of how many fields we matched up
226-
let mut found_fields = 0;
227-
// ensure that for the fields that we found, the types match
228-
for (kernel_field, arrow_field) in mapped_fields.zip(arrow_fields) {
229-
ensure_data_types(&kernel_field.data_type, arrow_field.data_type())?;
230-
found_fields += 1;
231-
}
232-
233-
// require that we found the number of fields that we requested.
234-
require!(kernel_fields.fields.len() == found_fields, {
235-
let kernel_field_names = kernel_fields.fields.keys().join(", ");
236-
let arrow_field_names = arrow_fields.iter().map(|f| f.name()).join(", ");
237-
make_arrow_error(format!(
238-
"Missing Struct fields. Requested: {}, found: {}",
239-
kernel_field_names, arrow_field_names,
240-
))
241-
});
242-
Ok(())
243-
}
244-
_ => Err(make_arrow_error(format!(
245-
"Incorrect datatype. Expected {}, got {}",
246-
kernel_type, arrow_type
247-
))),
248-
}
249-
}
250-
251164
fn evaluate_expression(
252165
expression: &Expression,
253166
batch: &RecordBatch,

0 commit comments

Comments
 (0)