Skip to content

Commit 64bcfa3

Browse files
committed
add constructors for timestamps with timezones
1 parent 6746f96 commit 64bcfa3

File tree

6 files changed

+225
-12
lines changed

6 files changed

+225
-12
lines changed

rust/arrow/src/array/array.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,51 @@ def_numeric_from_vec!(
621621
DataType::Time64(TimeUnit::Nanosecond)
622622
);
623623

624+
impl<T: ArrowTimestampType> PrimitiveArray<T> {
625+
pub fn from_vec(data: Vec<i64>, timezone: Option<Arc<String>>) -> Self {
626+
let array_data =
627+
ArrayData::builder(DataType::Timestamp(T::get_time_unit(), timezone))
628+
.len(data.len())
629+
.add_buffer(Buffer::from(data.to_byte_slice()))
630+
.build();
631+
PrimitiveArray::from(array_data)
632+
}
633+
}
634+
635+
impl<T: ArrowTimestampType> PrimitiveArray<T> {
636+
/// Construct a timestamp array from a vec of Option<i64> values and an optional timezone
637+
pub fn from_opt_vec(data: Vec<Option<i64>>, timezone: Option<Arc<String>>) -> Self {
638+
// TODO: duplicated from def_numeric_from_vec! macro, it looks possible to convert to generic
639+
let data_len = data.len();
640+
let num_bytes = bit_util::ceil(data_len, 8);
641+
let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
642+
let mut val_buf = MutableBuffer::new(data_len * mem::size_of::<i64>());
643+
644+
{
645+
let null = vec![0; mem::size_of::<i64>()];
646+
let null_slice = null_buf.data_mut();
647+
for (i, v) in data.iter().enumerate() {
648+
if let Some(n) = v {
649+
bit_util::set_bit(null_slice, i);
650+
// unwrap() in the following should be safe here since we've
651+
// made sure enough space is allocated for the values.
652+
val_buf.write(&n.to_byte_slice()).unwrap();
653+
} else {
654+
val_buf.write(&null).unwrap();
655+
}
656+
}
657+
}
658+
659+
let array_data =
660+
ArrayData::builder(DataType::Timestamp(T::get_time_unit(), timezone))
661+
.len(data_len)
662+
.add_buffer(val_buf.freeze())
663+
.null_bit_buffer(null_buf.freeze())
664+
.build();
665+
PrimitiveArray::from(array_data)
666+
}
667+
}
668+
624669
/// Constructs a boolean array from a vector. Should only be used for testing.
625670
impl From<Vec<bool>> for BooleanArray {
626671
fn from(data: Vec<bool>) -> Self {

rust/arrow/src/compute/kernels/cast.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,11 +1062,10 @@ mod tests {
10621062

10631063
#[test]
10641064
fn test_cast_timestamp_to_date32() {
1065-
let a = TimestampMillisecondArray::from(vec![
1066-
Some(864000000005),
1067-
Some(1545696000001),
1068-
None,
1069-
]);
1065+
let a = TimestampMillisecondArray::from_opt_vec(
1066+
vec![Some(864000000005), Some(1545696000001), None],
1067+
Some(Arc::new(String::from("UTC"))),
1068+
);
10701069
let array = Arc::new(a) as ArrayRef;
10711070
let b = cast(&array, &DataType::Date32(DateUnit::Day)).unwrap();
10721071
let c = b.as_any().downcast_ref::<Date32Array>().unwrap();

rust/arrow/src/datatypes.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use std::mem::size_of;
2828
use std::ops::{Add, Div, Mul, Sub};
2929
use std::slice::from_raw_parts;
3030
use std::str::FromStr;
31+
use std::sync::Arc;
3132

3233
#[cfg(feature = "simd")]
3334
use packed_simd::*;
@@ -37,7 +38,6 @@ use serde_json::{
3738
};
3839

3940
use crate::error::{ArrowError, Result};
40-
use std::sync::Arc;
4141

4242
/// The possible relative types that are supported.
4343
///
@@ -65,7 +65,7 @@ pub enum DataType {
6565
Float32,
6666
Float64,
6767
/// A timestamp with an optional timezone
68-
Timestamp(TimeUnit, Option<String>),
68+
Timestamp(TimeUnit, Option<Arc<String>>),
6969
Date32(DateUnit),
7070
Date64(DateUnit),
7171
Time32(TimeUnit),
@@ -516,6 +516,32 @@ impl ArrowTemporalType for Time64NanosecondType {}
516516
impl ArrowTemporalType for IntervalYearMonthType {}
517517
impl ArrowTemporalType for IntervalDayTimeType {}
518518

519+
/// A timestamp type allows us to create array builders that take a timestamp
520+
pub trait ArrowTimestampType: ArrowTemporalType {
521+
fn get_time_unit() -> TimeUnit;
522+
}
523+
524+
impl ArrowTimestampType for TimestampSecondType {
525+
fn get_time_unit() -> TimeUnit {
526+
TimeUnit::Second
527+
}
528+
}
529+
impl ArrowTimestampType for TimestampMillisecondType {
530+
fn get_time_unit() -> TimeUnit {
531+
TimeUnit::Millisecond
532+
}
533+
}
534+
impl ArrowTimestampType for TimestampMicrosecondType {
535+
fn get_time_unit() -> TimeUnit {
536+
TimeUnit::Microsecond
537+
}
538+
}
539+
impl ArrowTimestampType for TimestampNanosecondType {
540+
fn get_time_unit() -> TimeUnit {
541+
TimeUnit::Nanosecond
542+
}
543+
}
544+
519545
/// Allows conversion from supported Arrow types to a byte slice.
520546
pub trait ToByteSlice {
521547
/// Converts this instance into a byte slice
@@ -574,7 +600,7 @@ impl DataType {
574600
};
575601
let tz = match map.get("timezone") {
576602
None => Ok(None),
577-
Some(VString(tz)) => Ok(Some(tz.to_string())),
603+
Some(VString(tz)) => Ok(Some(Arc::new(tz.to_string()))),
578604
_ => Err(ArrowError::ParseError(
579605
"timezone must be a string".to_string(),
580606
)),
@@ -1280,14 +1306,17 @@ mod tests {
12801306
Field::new("c15", DataType::Timestamp(TimeUnit::Second, None), false),
12811307
Field::new(
12821308
"c16",
1283-
DataType::Timestamp(TimeUnit::Millisecond, Some("UTC".to_string())),
1309+
DataType::Timestamp(
1310+
TimeUnit::Millisecond,
1311+
Some(Arc::new("UTC".to_string())),
1312+
),
12841313
false,
12851314
),
12861315
Field::new(
12871316
"c17",
12881317
DataType::Timestamp(
12891318
TimeUnit::Microsecond,
1290-
Some("Africa/Johannesburg".to_string()),
1319+
Some(Arc::new("Africa/Johannesburg".to_string())),
12911320
),
12921321
false,
12931322
),

rust/arrow/src/ipc/convert.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use flatbuffers::{
2424
FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset,
2525
};
2626
use std::collections::HashMap;
27+
use std::sync::Arc;
2728

2829
/// Serialize a schema in IPC format
2930
fn schema_to_fb(schema: &Schema) -> FlatBufferBuilder {
@@ -166,7 +167,8 @@ fn get_data_type(field: ipc::Field) -> DataType {
166167
}
167168
ipc::Type::Timestamp => {
168169
let timestamp = field.type_as_timestamp().unwrap();
169-
let timezone: Option<String> = timestamp.timezone().map(|tz| tz.to_string());
170+
let timezone: Option<Arc<String>> =
171+
timestamp.timezone().map(|tz| Arc::new(tz.to_string()));
170172
match timestamp.unit() {
171173
ipc::TimeUnit::SECOND => DataType::Timestamp(TimeUnit::Second, timezone),
172174
ipc::TimeUnit::MILLISECOND => {
@@ -308,7 +310,7 @@ fn get_fb_field_type<'a: 'b, 'b>(
308310
(ipc::Type::Time, builder.finish().as_union_value(), None)
309311
}
310312
Timestamp(unit, tz) => {
311-
let tz = tz.clone().unwrap_or(String::new());
313+
let tz = tz.clone().unwrap_or(Arc::new(String::new()));
312314
let tz_str = fbb.create_string(tz.as_str());
313315
let mut builder = ipc::TimestampBuilder::new(&mut fbb);
314316
let time_unit = match unit {

rust/arrow/src/util/integration_util.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,10 @@ mod tests {
378378

379379
#[test]
380380
fn test_arrow_data_equality() {
381+
let secs_tz = Some(Arc::new("Europe/Budapest".to_string()));
382+
let millis_tz = Some(Arc::new("America/New_York".to_string()));
383+
let micros_tz = Some(Arc::new("UTC".to_string()));
384+
let nanos_tz = Some(Arc::new("Africa/Johannesburg".to_string()));
381385
let schema = Schema::new(vec![
382386
Field::new("bools", DataType::Boolean, true),
383387
Field::new("int8s", DataType::Int8, true),
@@ -412,6 +416,26 @@ mod tests {
412416
DataType::Timestamp(TimeUnit::Nanosecond, None),
413417
true,
414418
),
419+
Field::new(
420+
"ts_secs_tz",
421+
DataType::Timestamp(TimeUnit::Second, secs_tz.clone()),
422+
true,
423+
),
424+
Field::new(
425+
"ts_millis_tz",
426+
DataType::Timestamp(TimeUnit::Millisecond, millis_tz.clone()),
427+
true,
428+
),
429+
Field::new(
430+
"ts_micros_tz",
431+
DataType::Timestamp(TimeUnit::Microsecond, micros_tz.clone()),
432+
true,
433+
),
434+
Field::new(
435+
"ts_nanos_tz",
436+
DataType::Timestamp(TimeUnit::Nanosecond, nanos_tz.clone()),
437+
true,
438+
),
415439
Field::new("utf8s", DataType::Utf8, true),
416440
Field::new("lists", DataType::List(Box::new(DataType::Int32)), true),
417441
Field::new(
@@ -464,6 +488,20 @@ mod tests {
464488
let ts_micros = TimestampMicrosecondArray::from(vec![None, None, None]);
465489
let ts_nanos =
466490
TimestampNanosecondArray::from(vec![None, None, Some(-6473623571954960143)]);
491+
let ts_secs_tz = TimestampSecondArray::from_opt_vec(
492+
vec![None, Some(193438817552), None],
493+
secs_tz,
494+
);
495+
let ts_millis_tz = TimestampMillisecondArray::from_opt_vec(
496+
vec![None, Some(38606916383008), Some(58113709376587)],
497+
millis_tz,
498+
);
499+
let ts_micros_tz =
500+
TimestampMicrosecondArray::from_opt_vec(vec![None, None, None], micros_tz);
501+
let ts_nanos_tz = TimestampNanosecondArray::from_opt_vec(
502+
vec![None, None, Some(-6473623571954960143)],
503+
nanos_tz,
504+
);
467505
let utf8s = StringArray::try_from(vec![Some("aa"), None, Some("bbb")]).unwrap();
468506

469507
let value_data = Int32Array::from(vec![None, Some(2), None, None]);
@@ -514,6 +552,10 @@ mod tests {
514552
Arc::new(ts_millis),
515553
Arc::new(ts_micros),
516554
Arc::new(ts_nanos),
555+
Arc::new(ts_secs_tz),
556+
Arc::new(ts_millis_tz),
557+
Arc::new(ts_micros_tz),
558+
Arc::new(ts_nanos_tz),
517559
Arc::new(utf8s),
518560
Arc::new(lists),
519561
Arc::new(structs),

rust/arrow/test/data/integration.json

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,46 @@
201201
"nullable": true,
202202
"children": []
203203
},
204+
{
205+
"name": "ts_secs_tz",
206+
"type": {
207+
"name": "timestamp",
208+
"unit": "SECOND",
209+
"timezone": "Europe/Budapest"
210+
},
211+
"nullable": true,
212+
"children": []
213+
},
214+
{
215+
"name": "ts_millis_tz",
216+
"type": {
217+
"name": "timestamp",
218+
"unit": "MILLISECOND",
219+
"timezone": "America/New_York"
220+
},
221+
"nullable": true,
222+
"children": []
223+
},
224+
{
225+
"name": "ts_micros_tz",
226+
"type": {
227+
"name": "timestamp",
228+
"unit": "MICROSECOND",
229+
"timezone": "UTC"
230+
},
231+
"nullable": true,
232+
"children": []
233+
},
234+
{
235+
"name": "ts_nanos_tz",
236+
"type": {
237+
"name": "timestamp",
238+
"unit": "NANOSECOND",
239+
"timezone": "Africa/Johannesburg"
240+
},
241+
"nullable": true,
242+
"children": []
243+
},
204244
{
205245
"name": "utf8s",
206246
"type": {
@@ -555,6 +595,62 @@
555595
-6473623571954960143
556596
]
557597
},
598+
{
599+
"name": "ts_secs_tz",
600+
"count": 3,
601+
"VALIDITY": [
602+
0,
603+
1,
604+
0
605+
],
606+
"DATA": [
607+
209869064422,
608+
193438817552,
609+
51757838205
610+
]
611+
},
612+
{
613+
"name": "ts_millis_tz",
614+
"count": 3,
615+
"VALIDITY": [
616+
0,
617+
1,
618+
1
619+
],
620+
"DATA": [
621+
228315043570185,
622+
38606916383008,
623+
58113709376587
624+
]
625+
},
626+
{
627+
"name": "ts_micros_tz",
628+
"count": 3,
629+
"VALIDITY": [
630+
0,
631+
0,
632+
0
633+
],
634+
"DATA": [
635+
133457416537791415,
636+
129522736067409280,
637+
177110451066832967
638+
]
639+
},
640+
{
641+
"name": "ts_nanos_tz",
642+
"count": 3,
643+
"VALIDITY": [
644+
0,
645+
0,
646+
1
647+
],
648+
"DATA": [
649+
-804525722984600007,
650+
8166038652634779458,
651+
-6473623571954960143
652+
]
653+
},
558654
{
559655
"name": "utf8s",
560656
"count": 3,

0 commit comments

Comments
 (0)