Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

feat: support temporal partition function #161

Merged
merged 3 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions icelake/src/types/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,11 @@ impl Transform {
fn result_type(&self, input_type: &Any) -> Result<Any> {
match self {
Transform::Identity => Ok(input_type.clone()),
Transform::Void => Ok(Primitive::Int.into()),
Transform::Year => Ok(Primitive::Int.into()),
Transform::Month => Ok(Primitive::Int.into()),
Transform::Day => Ok(Primitive::Int.into()),
Transform::Hour => Ok(Primitive::Int.into()),
_ => Err(Error::new(
ErrorKind::IcebergFeatureUnsupported,
format!("Transform {:?} not supported yet!", &self),
Expand Down
8 changes: 8 additions & 0 deletions icelake/src/types/transform/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use super::Transform;
use crate::Result;
use arrow::array::ArrayRef;

mod identity;
mod temporal;
mod void;

/// TransformFunction is a trait that defines the interface of a transform function.
pub trait TransformFunction: Send {
Expand All @@ -18,6 +21,11 @@ pub type BoxedTransformFunction = Box<dyn TransformFunction>;
pub fn create_transform_function(transform: &Transform) -> Result<BoxedTransformFunction> {
match transform {
Transform::Identity => Ok(Box::new(identity::Identity {})),
Transform::Void => Ok(Box::new(void::Void {})),
Transform::Year => Ok(Box::new(temporal::Year {})),
Transform::Month => Ok(Box::new(temporal::Month {})),
Transform::Day => Ok(Box::new(temporal::Day {})),
Transform::Hour => Ok(Box::new(temporal::Hour {})),
_ => Err(crate::error::Error::new(
crate::ErrorKind::IcebergFeatureUnsupported,
format!("Transform {:?} is not implemented", transform),
Expand Down
170 changes: 170 additions & 0 deletions icelake/src/types/transform/temporal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
use super::TransformFunction;
use crate::{Error, Result};
use arrow::array::{
Array, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::compute::binary;
use arrow::datatypes;
use arrow::datatypes::DataType;
use arrow::{
array::{ArrayRef, Date32Array, Int32Array},
compute::{month_dyn, year_dyn},
};
use chrono::Datelike;
use std::sync::Arc;

/// 1970-01-01 is base date in iceberg.
/// 719163 is the number of days from 0000-01-01 to 1970-01-01
const EPOCH_DAY_FROM_CE: i32 = 719163;
const DAY_PER_SECOND: f64 = 0.0000115741;
const HOUR_PER_SECOND: f64 = 1_f64 / 3600.0;

pub struct Year;

impl TransformFunction for Year {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let array = year_dyn(&input).map_err(|err| {
Error::new(
crate::ErrorKind::ArrowError,
format!("error in transformfunction: {}", err),
)
})?;
Ok(Arc::<Int32Array>::new(
array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|v| v - 1970),
))
}
}

pub struct Month;

impl TransformFunction for Month {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let year_array = year_dyn(&input).map_err(|err| {
Error::new(
crate::ErrorKind::ArrowError,
format!("error in transformfunction: {}", err),
)
})?;
let year_array: Int32Array = year_array
.as_any()
.downcast_ref::<Int32Array>()
.unwrap()
.unary(|v| 12 * (v - 1970));
let month_array = month_dyn(&input).map_err(|err| {
Error::new(
crate::ErrorKind::ArrowError,
format!("error in transformfunction: {}", err),
)
})?;
Ok(Arc::<Int32Array>::new(
binary(
month_array.as_any().downcast_ref::<Int32Array>().unwrap(),
year_array.as_any().downcast_ref::<Int32Array>().unwrap(),
// Compute month from 1970-01-01, so minus 1 here.
|a, b| a + b - 1,
)
.unwrap(),
))
}
}

pub struct Day {}

impl TransformFunction for Day {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let res: Int32Array = match input.data_type() {
DataType::Timestamp(unit, _) => match unit {
datatypes::TimeUnit::Second => input
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.unary(|v| -> i32 { (v as f64 * DAY_PER_SECOND) as i32 }),
datatypes::TimeUnit::Millisecond => input
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.unary(|v| -> i32 { (v as f64 / 1000.0 * DAY_PER_SECOND) as i32 }),
datatypes::TimeUnit::Microsecond => input
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.unary(|v| -> i32 { (v as f64 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32 }),
datatypes::TimeUnit::Nanosecond => input
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.unary(|v| -> i32 {
(v as f64 / 1000.0 / 1000.0 / 1000.0 * DAY_PER_SECOND) as i32
}),
},
DataType::Date32 => {
input
.as_any()
.downcast_ref::<Date32Array>()
.unwrap()
.unary(|v| -> i32 {
datatypes::Date32Type::to_naive_date(v).num_days_from_ce()
- EPOCH_DAY_FROM_CE
})
}
_ => unreachable!(
"Should not call transform in Day with type {:?}",
input.data_type()
),
};
Ok(Arc::new(res))
}
}

pub struct Hour;

impl TransformFunction for Hour {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
let res: Int32Array = match input.data_type() {
DataType::Timestamp(unit, _) => match unit {
datatypes::TimeUnit::Second => input
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap()
.unary(|v| -> i32 {
println!("second: {}", v);
(v as f64 * HOUR_PER_SECOND) as i32
}),
datatypes::TimeUnit::Millisecond => input
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap()
.unary(|v| -> i32 {
println!("mill: {}", v);
(v as f64 * HOUR_PER_SECOND / 1000.0) as i32
}),
datatypes::TimeUnit::Microsecond => input
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.unwrap()
.unary(|v| -> i32 {
println!("micro: {}", v);
(v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0) as i32
}),
datatypes::TimeUnit::Nanosecond => input
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.unwrap()
.unary(|v| -> i32 {
println!("nano: {}", v);
(v as f64 * HOUR_PER_SECOND / 1000.0 / 1000.0 / 1000.0) as i32
}),
},
_ => unreachable!(
"Should not call transform in Day with type {:?}",
input.data_type()
),
};
Ok(Arc::new(res))
}
}
14 changes: 14 additions & 0 deletions icelake/src/types/transform/void.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::types::TransformFunction;
use crate::Result;
use arrow::{
array::{new_null_array, ArrayRef},
datatypes::DataType,
};

pub struct Void {}

impl TransformFunction for Void {
fn transform(&self, input: ArrayRef) -> Result<ArrayRef> {
Ok(new_null_array(&DataType::Int32, input.len()))
}
}
Loading