Skip to content

Commit

Permalink
[FEAT]: sql read_deltalake function (#2974)
Browse files Browse the repository at this point in the history
depends on #2954

---------

Co-authored-by: Kev Wang <[email protected]>
  • Loading branch information
universalmind303 and kevinzwang authored Oct 1, 2024
1 parent f4d1da2 commit fe4553f
Show file tree
Hide file tree
Showing 19 changed files with 933 additions and 32 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions src/common/file-formats/src/file_format_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,17 @@ impl ParquetSourceConfig {
}
}

impl Default for ParquetSourceConfig {
fn default() -> Self {
Self {
coerce_int96_timestamp_unit: TimeUnit::Nanoseconds,
field_id_mapping: None,
row_groups: None,
chunk_size: None,
}
}
}

#[cfg(feature = "python")]
#[pymethods]
impl ParquetSourceConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/daft-dsl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub use expr::{
binary_op, col, has_agg, has_stateful_udf, is_partition_compatible, AggExpr,
ApproxPercentileParams, Expr, ExprRef, Operator, SketchType,
};
pub use lit::{lit, literals_to_series, null_lit, Literal, LiteralValue};
pub use lit::{lit, literal_value, literals_to_series, null_lit, Literal, LiteralValue};
#[cfg(feature = "python")]
use pyo3::prelude::*;
pub use resolve_expr::{
Expand Down
75 changes: 61 additions & 14 deletions src/daft-dsl/src/lit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use daft_core::{
display_timestamp,
},
};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};

#[cfg(feature = "python")]
Expand Down Expand Up @@ -68,6 +69,8 @@ pub enum LiteralValue {
/// Python object.
#[cfg(feature = "python")]
Python(PyObjectWrapper),

Struct(IndexMap<Field, LiteralValue>),
}

impl Eq for LiteralValue {}
Expand Down Expand Up @@ -112,6 +115,12 @@ impl Hash for LiteralValue {
}
#[cfg(feature = "python")]
Python(py_obj) => py_obj.hash(state),
Struct(entries) => {
entries.iter().for_each(|(v, f)| {
v.hash(state);
f.hash(state);
});
}
}
}
}
Expand Down Expand Up @@ -143,6 +152,16 @@ impl Display for LiteralValue {
Python::with_gil(|py| pyobj.0.call_method0(py, pyo3::intern!(py, "__str__")))
.unwrap()
}),
Struct(entries) => {
write!(f, "Struct(")?;
for (i, (field, v)) in entries.iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}
write!(f, "{}: {}", field.name, v)?;
}
write!(f, ")")
}
}
}
}
Expand All @@ -169,6 +188,7 @@ impl LiteralValue {
Series(series) => series.data_type().clone(),
#[cfg(feature = "python")]
Python(_) => DataType::Python,
Struct(entries) => DataType::Struct(entries.keys().cloned().collect()),
}
}

Expand Down Expand Up @@ -203,6 +223,13 @@ impl LiteralValue {
Series(series) => series.clone().rename("literal"),
#[cfg(feature = "python")]
Python(val) => PythonArray::from(("literal", vec![val.0.clone()])).into_series(),
Struct(entries) => {
let struct_dtype = DataType::Struct(entries.keys().cloned().collect());
let struct_field = Field::new("literal", struct_dtype);

let values = entries.values().map(|v| v.to_series()).collect();
StructArray::new(struct_field, values, None).into_series()
}
};
result
}
Expand Down Expand Up @@ -235,6 +262,7 @@ impl LiteralValue {
Decimal(..) | Series(..) | Time(..) | Binary(..) => display_sql_err,
#[cfg(feature = "python")]
Python(..) => display_sql_err,
Struct(..) => display_sql_err,
}
}

Expand Down Expand Up @@ -304,49 +332,64 @@ impl LiteralValue {
}
}

pub trait Literal {
pub trait Literal: Sized {
/// [Literal](Expr::Literal) expression.
fn lit(self) -> ExprRef;
fn lit(self) -> ExprRef {
Expr::Literal(self.literal_value()).into()
}
fn literal_value(self) -> LiteralValue;
}

impl Literal for String {
fn lit(self) -> ExprRef {
Expr::Literal(LiteralValue::Utf8(self)).into()
fn literal_value(self) -> LiteralValue {
LiteralValue::Utf8(self)
}
}

impl<'a> Literal for &'a str {
fn lit(self) -> ExprRef {
Expr::Literal(LiteralValue::Utf8(self.to_owned())).into()
fn literal_value(self) -> LiteralValue {
LiteralValue::Utf8(self.to_owned())
}
}

macro_rules! make_literal {
($TYPE:ty, $SCALAR:ident) => {
impl Literal for $TYPE {
fn lit(self) -> ExprRef {
Expr::Literal(LiteralValue::$SCALAR(self)).into()
fn literal_value(self) -> LiteralValue {
LiteralValue::$SCALAR(self)
}
}
};
}

impl<'a> Literal for &'a [u8] {
fn lit(self) -> ExprRef {
Expr::Literal(LiteralValue::Binary(self.to_vec())).into()
fn literal_value(self) -> LiteralValue {
LiteralValue::Binary(self.to_vec())
}
}

impl Literal for Series {
fn lit(self) -> ExprRef {
Expr::Literal(LiteralValue::Series(self)).into()
fn literal_value(self) -> LiteralValue {
LiteralValue::Series(self)
}
}

#[cfg(feature = "python")]
impl Literal for pyo3::PyObject {
fn lit(self) -> ExprRef {
Expr::Literal(LiteralValue::Python(PyObjectWrapper(self))).into()
fn literal_value(self) -> LiteralValue {
LiteralValue::Python(PyObjectWrapper(self))
}
}

impl<T> Literal for Option<T>
where
T: Literal,
{
fn literal_value(self) -> LiteralValue {
match self {
Some(val) => val.literal_value(),
None => LiteralValue::Null,
}
}
}

Expand All @@ -361,6 +404,10 @@ pub fn lit<L: Literal>(t: L) -> ExprRef {
t.lit()
}

pub fn literal_value<L: Literal>(t: L) -> LiteralValue {
t.literal_value()
}

pub fn null_lit() -> ExprRef {
Arc::new(Expr::Literal(LiteralValue::Null))
}
Expand Down
Loading

0 comments on commit fe4553f

Please sign in to comment.