Skip to content

Commit 765b65d

Browse files
committed
handle timestamp conversions
1 parent 6ff1ade commit 765b65d

File tree

4 files changed

+107
-46
lines changed

4 files changed

+107
-46
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ version = "0.1.1"
2424
arrow = { version = "^52.0" }
2525
arrow-arith = { version = "^52.0" }
2626
arrow-array = { version = "^52.0" }
27+
arrow-cast = { version = "^52.0" }
2728
arrow-data = { version = "^52.0" }
2829
arrow-ord = { version = "^52.0" }
2930
arrow-json = { version = "^52.0" }

kernel/Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,10 @@ delta_kernel_derive = { path = "../derive-macros", version = "0.1.1" }
3737
visibility = "0.1.0"
3838

3939
# Used in default engine
40-
arrow-array = { workspace = true, optional = true }
40+
arrow-array = { workspace = true, optional = true, features = ["chrono-tz"] }
4141
arrow-select = { workspace = true, optional = true }
4242
arrow-arith = { workspace = true, optional = true }
43+
arrow-cast = { workspace = true, optional = true }
4344
arrow-json = { workspace = true, optional = true }
4445
arrow-ord = { workspace = true, optional = true }
4546
arrow-schema = { workspace = true, optional = true }
@@ -67,6 +68,7 @@ default-engine = [
6768
"arrow-conversion",
6869
"arrow-expression",
6970
"arrow-array",
71+
"arrow-cast",
7072
"arrow-json",
7173
"arrow-schema",
7274
"arrow-select",
@@ -80,6 +82,7 @@ default-engine = [
8082

8183
developer-visibility = []
8284
sync-engine = [
85+
"arrow-cast",
8386
"arrow-conversion",
8487
"arrow-expression",
8588
"arrow-array",

kernel/src/engine/arrow_utils.rs

+91-34
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,34 @@ fn make_arrow_error(s: String) -> Error {
2424
Error::Arrow(arrow_schema::ArrowError::InvalidArgumentError(s))
2525
}
2626

27+
/// Capture the compatibility between two data-types, as passed to [`ensure_data_types`]
28+
pub(crate) enum DataTypeCompat {
29+
/// The two types are the same
30+
Identical,
31+
/// What is read from parquet needs to be cast to the associated type
32+
NeedsCast(ArrowDataType),
33+
/// Types are compatible, but are nested types. This is used when comparing types where casting
34+
/// is not desired (i.e. in the expression evaluator)
35+
Nested,
36+
}
37+
38+
// Check if two types can be cast
39+
fn check_cast_compat(
40+
source_type: &ArrowDataType,
41+
target_type: ArrowDataType,
42+
) -> DeltaResult<DataTypeCompat> {
43+
match (source_type, &target_type) {
44+
(&ArrowDataType::Timestamp(_, _), &ArrowDataType::Timestamp(_, _)) => {
45+
// timestamps are able to be cast between each other
46+
Ok(DataTypeCompat::NeedsCast(target_type))
47+
}
48+
_ => Err(make_arrow_error(format!(
49+
"Incorrect datatype. Expected {}, got {}",
50+
target_type, source_type
51+
))), //| (DataType::Primitive(PrimitiveType::TimestampNtz), ArrowDataType::Timestamp(_, _)) => {
52+
}
53+
}
54+
2755
/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type"
2856
/// is the same, but does so recursively into structs, and ensures lists and maps have the correct
2957
/// associated types as well. This returns an `Ok(())` if the types are compatible, or an error if
@@ -33,31 +61,28 @@ fn make_arrow_error(s: String) -> Error {
3361
pub(crate) fn ensure_data_types(
3462
kernel_type: &DataType,
3563
arrow_type: &ArrowDataType,
36-
) -> DeltaResult<()> {
64+
) -> DeltaResult<DataTypeCompat> {
3765
match (kernel_type, arrow_type) {
3866
(DataType::Primitive(_), _) if arrow_type.is_primitive() => {
3967
let converted_type: ArrowDataType = kernel_type.try_into()?;
4068
if &converted_type == arrow_type {
41-
Ok(())
69+
Ok(DataTypeCompat::Identical)
4270
} else {
43-
Err(make_arrow_error(format!(
44-
"Incorrect datatype. Expected {}, got {}",
45-
converted_type, arrow_type
46-
)))
71+
check_cast_compat(arrow_type, converted_type)
4772
}
4873
}
4974
(DataType::Primitive(PrimitiveType::Boolean), ArrowDataType::Boolean)
5075
| (DataType::Primitive(PrimitiveType::String), ArrowDataType::Utf8)
5176
| (DataType::Primitive(PrimitiveType::Binary), ArrowDataType::Binary) => {
5277
// strings, bools, and binary aren't primitive in arrow
53-
Ok(())
78+
Ok(DataTypeCompat::Identical)
5479
}
5580
(
5681
DataType::Primitive(PrimitiveType::Decimal(kernel_prec, kernel_scale)),
5782
ArrowDataType::Decimal128(arrow_prec, arrow_scale),
5883
) if arrow_prec == kernel_prec && *arrow_scale == *kernel_scale as i8 => {
5984
// decimal isn't primitive in arrow. cast above is okay as we limit range
60-
Ok(())
85+
Ok(DataTypeCompat::Identical)
6186
}
6287
(DataType::Array(inner_type), ArrowDataType::List(arrow_list_type)) => {
6388
let kernel_array_type = &inner_type.element_type;
@@ -81,7 +106,7 @@ pub(crate) fn ensure_data_types(
81106
"Arrow map struct didn't have a value type".to_string(),
82107
));
83108
}
84-
Ok(())
109+
Ok(DataTypeCompat::Nested)
85110
} else {
86111
Err(make_arrow_error(
87112
"Arrow map type wasn't a struct.".to_string(),
@@ -111,7 +136,7 @@ pub(crate) fn ensure_data_types(
111136
kernel_field_names, arrow_field_names,
112137
))
113138
});
114-
Ok(())
139+
Ok(DataTypeCompat::Nested)
115140
}
116141
_ => Err(make_arrow_error(format!(
117142
"Incorrect datatype. Expected {}, got {}",
@@ -211,52 +236,65 @@ pub(crate) fn ensure_data_types(
211236
*/
212237

213238
/// Reordering is specified as a tree. Each level is a vec of `ReorderIndex`s. Each element's index
214-
/// represents a column that will be in the read parquet data at that level and index. The `index()`
215-
/// of the element is the position that the column should appear in the final output. If it is a
216-
/// `Child` variant, then at that index there is a `Struct` whose ordering is specified by the
217-
/// values in the associated `Vec` according to these same rules.
239+
/// represents a column that will be in the read parquet data at that level and index. The `index`
240+
/// of the element is the position that the column should appear in the final output. The `transform`
241+
/// indicates what, if any, transforms are needed. See the docs for [`ReorderIndexTransform`] for the
242+
/// meaning.
218243
#[derive(Debug, PartialEq)]
219244
pub(crate) struct ReorderIndex {
220245
pub(crate) index: usize,
221-
kind: ReorderIndexKind,
246+
transform: ReorderIndexTransform,
222247
}
223248

224249
#[derive(Debug, PartialEq)]
225-
pub(crate) enum ReorderIndexKind {
250+
pub(crate) enum ReorderIndexTransform {
251+
/// For a non-nested type, indicates that we need to cast to the contained type
252+
Cast(ArrowDataType),
253+
/// Used for struct/list/map. Potentially transform child fields using contained reordering
226254
Child(Vec<ReorderIndex>),
227-
Index,
255+
/// No work needed to transform this data
256+
None,
257+
/// Data is missing, fill in with a null column
228258
Missing(ArrowFieldRef),
229259
}
230260

231261
impl ReorderIndex {
262+
fn new_cast(index: usize, target: ArrowDataType) -> Self {
263+
ReorderIndex {
264+
index,
265+
transform: ReorderIndexTransform::Cast(target),
266+
}
267+
}
268+
232269
fn new_child(index: usize, children: Vec<ReorderIndex>) -> Self {
233270
ReorderIndex {
234271
index,
235-
kind: ReorderIndexKind::Child(children),
272+
transform: ReorderIndexTransform::Child(children),
236273
}
237274
}
238275

239276
fn new_index(index: usize) -> Self {
240277
ReorderIndex {
241278
index,
242-
kind: ReorderIndexKind::Index,
279+
transform: ReorderIndexTransform::None,
243280
}
244281
}
245282

246283
fn new_missing(index: usize, field: ArrowFieldRef) -> Self {
247284
ReorderIndex {
248285
index,
249-
kind: ReorderIndexKind::Missing(field),
286+
transform: ReorderIndexTransform::Missing(field),
250287
}
251288
}
252289

253-
/// Check if this reordering contains a `Missing` variant anywhere. See comment below on
290+
/// Check if this reordering requires a transformation anywhere. See comment below on
254291
/// [`is_ordered`] to understand why this is needed.
255-
fn contains_missing(&self) -> bool {
256-
match self.kind {
257-
ReorderIndexKind::Child(ref children) => is_ordered(children),
258-
ReorderIndexKind::Index => true,
259-
ReorderIndexKind::Missing(_) => false,
292+
fn needs_transform(&self) -> bool {
293+
match self.transform {
294+
ReorderIndexTransform::Cast(_) => true,
295+
ReorderIndexTransform::Child(ref children) => is_ordered(children),
296+
ReorderIndexTransform::None => true,
297+
ReorderIndexTransform::Missing(_) => false,
260298
}
261299
}
262300
}
@@ -410,10 +448,18 @@ fn get_indices(
410448
if let Some((index, _, requested_field)) =
411449
requested_schema.fields.get_full(field.name())
412450
{
413-
ensure_data_types(&requested_field.data_type, field.data_type())?;
451+
match ensure_data_types(&requested_field.data_type, field.data_type())? {
452+
DataTypeCompat::Identical =>
453+
reorder_indices.push(ReorderIndex::new_index(index)),
454+
DataTypeCompat::NeedsCast(target) =>
455+
reorder_indices.push(ReorderIndex::new_cast(index, target)),
456+
DataTypeCompat::Nested => return
457+
Err(Error::generic(
458+
"Comparing nested types in get_indices. This is a kernel bug, please report"
459+
))
460+
}
414461
found_fields.insert(requested_field.name());
415462
mask_indices.push(parquet_offset + parquet_index);
416-
reorder_indices.push(ReorderIndex::new_index(index));
417463
}
418464
}
419465
}
@@ -491,13 +537,13 @@ fn is_ordered(requested_ordering: &[ReorderIndex]) -> bool {
491537
return true;
492538
}
493539
// we have >=1 element. check that the first element is ordered
494-
if !requested_ordering[0].contains_missing() {
540+
if !requested_ordering[0].needs_transform() {
495541
return false;
496542
}
497543
// now check that all elements are ordered wrt. each other, and are internally ordered
498544
requested_ordering
499545
.windows(2)
500-
.all(|ri| (ri[0].index < ri[1].index) && ri[1].contains_missing())
546+
.all(|ri| (ri[0].index < ri[1].index) && !ri[1].needs_transform())
501547
}
502548

503549
// we use this as a placeholder for an array and its associated field. We can fill in a Vec of None
@@ -524,8 +570,19 @@ pub(crate) fn reorder_struct_array(
524570
for (parquet_position, reorder_index) in requested_ordering.iter().enumerate() {
525571
// for each item, reorder_index.index() tells us where to put it, and its position in
526572
// requested_ordering tells us where it is in the parquet data
527-
match &reorder_index.kind {
528-
ReorderIndexKind::Child(children) => {
573+
match &reorder_index.transform {
574+
ReorderIndexTransform::Cast(target) => {
575+
let source_col = input_cols[parquet_position].as_ref();
576+
let new_col = Arc::new(arrow_cast::cast::cast(source_col, target)?);
577+
let new_field = Arc::new(
578+
input_fields[parquet_position]
579+
.as_ref()
580+
.clone()
581+
.with_data_type(new_col.data_type().clone()),
582+
);
583+
final_fields_cols[reorder_index.index] = Some((new_field, new_col));
584+
}
585+
ReorderIndexTransform::Child(children) => {
529586
match input_cols[parquet_position].data_type() {
530587
ArrowDataType::Struct(_) => {
531588
let struct_array = input_cols[parquet_position].as_struct().clone();
@@ -563,13 +620,13 @@ pub(crate) fn reorder_struct_array(
563620
}
564621
}
565622
}
566-
ReorderIndexKind::Index => {
623+
ReorderIndexTransform::None => {
567624
final_fields_cols[reorder_index.index] = Some((
568625
input_fields[parquet_position].clone(), // cheap Arc clone
569626
input_cols[parquet_position].clone(), // cheap Arc clone
570627
));
571628
}
572-
ReorderIndexKind::Missing(field) => {
629+
ReorderIndexTransform::Missing(field) => {
573630
let null_array = Arc::new(new_null_array(field.data_type(), num_rows));
574631
let field = field.clone(); // cheap Arc clone
575632
final_fields_cols[reorder_index.index] = Some((field, null_array));

kernel/tests/read.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -987,17 +987,17 @@ fn with_predicate_and_removes() -> Result<(), Box<dyn std::error::Error>> {
987987
#[test]
988988
fn short_dv() -> Result<(), Box<dyn std::error::Error>> {
989989
let expected = vec![
990-
"+----+-------+-------------------------+---------------------+",
991-
"| id | value | timestamp | rand |",
992-
"+----+-------+-------------------------+---------------------+",
993-
"| 3 | 3 | 2023-05-31T18:58:33.633 | 0.7918174793484931 |",
994-
"| 4 | 4 | 2023-05-31T18:58:33.633 | 0.9281049271981882 |",
995-
"| 5 | 5 | 2023-05-31T18:58:33.633 | 0.27796520310701633 |",
996-
"| 6 | 6 | 2023-05-31T18:58:33.633 | 0.15263801464228832 |",
997-
"| 7 | 7 | 2023-05-31T18:58:33.633 | 0.1981143710215575 |",
998-
"| 8 | 8 | 2023-05-31T18:58:33.633 | 0.3069439236599195 |",
999-
"| 9 | 9 | 2023-05-31T18:58:33.633 | 0.5175919190815845 |",
1000-
"+----+-------+-------------------------+---------------------+",
990+
"+----+-------+--------------------------+---------------------+",
991+
"| id | value | timestamp | rand |",
992+
"+----+-------+--------------------------+---------------------+",
993+
"| 3 | 3 | 2023-05-31T18:58:33.633Z | 0.7918174793484931 |",
994+
"| 4 | 4 | 2023-05-31T18:58:33.633Z | 0.9281049271981882 |",
995+
"| 5 | 5 | 2023-05-31T18:58:33.633Z | 0.27796520310701633 |",
996+
"| 6 | 6 | 2023-05-31T18:58:33.633Z | 0.15263801464228832 |",
997+
"| 7 | 7 | 2023-05-31T18:58:33.633Z | 0.1981143710215575 |",
998+
"| 8 | 8 | 2023-05-31T18:58:33.633Z | 0.3069439236599195 |",
999+
"| 9 | 9 | 2023-05-31T18:58:33.633Z | 0.5175919190815845 |",
1000+
"+----+-------+--------------------------+---------------------+",
10011001
];
10021002
read_table_data_str("./tests/data/with-short-dv/", None, None, expected)?;
10031003
Ok(())

0 commit comments

Comments
 (0)