Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ datafusion-comet-spark-expr = { path = "spark-expr", version = "0.5.0" }
datafusion-comet-proto = { path = "proto", version = "0.5.0" }
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono-tz = { version = "0.8" }
futures = "0.3.28"
num = "0.4"
rand = "0.8"
regex = "1.9.6"
Expand Down
3 changes: 1 addition & 2 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ arrow-data = { workspace = true }
arrow-schema = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental"] }
half = { version = "2.4.1", default-features = false }
futures = "0.3.28"
futures = { workspace = true }
mimalloc = { version = "*", default-features = false, optional = true }
tokio = { version = "1", features = ["rt-multi-thread"] }
async-trait = "0.1"
Expand Down Expand Up @@ -88,7 +88,6 @@ hex = "0.4.3"

[features]
default = []
nightly = []

[lib]
name = "comet"
Expand Down
6 changes: 2 additions & 4 deletions native/core/src/common/bit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

use std::{cmp::min, mem::size_of};

use arrow::buffer::Buffer;

use crate::{
errors::CometResult as Result,
likely,
parquet::{data_type::AsBytes, util::bit_packing::unpack32},
unlikely,
};
use arrow::buffer::Buffer;
use datafusion_comet_spark_expr::utils::{likely, unlikely};

#[inline]
pub fn from_ne_slice<T: FromBytes>(bs: &[u8]) -> T {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use arrow::{
datatypes::{Decimal128Type, DecimalType},
record_batch::RecordBatch,
};
use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
use arrow_schema::{DataType, Schema, DECIMAL128_MAX_PRECISION};
use arrow_schema::{DataType, Schema};
use datafusion::logical_expr::ColumnarValue;
use datafusion::physical_expr_common::physical_expr::down_cast_any_ref;
use datafusion_common::{DataFusionError, ScalarValue};
Expand Down Expand Up @@ -172,15 +171,3 @@ impl PhysicalExpr for CheckOverflow {
self.hash(&mut s);
}
}

/// Adapted from arrow-rs `validate_decimal_precision` but returns bool
/// instead of Err to avoid the cost of formatting the error strings and is
/// optimized to remove a memcpy that exists in the original function
/// we can remove this code once we upgrade to a version of arrow-rs that
/// includes https://github.com/apache/arrow-rs/pull/6419
#[inline]
pub fn is_valid_decimal_precision(value: i128, precision: u8) -> bool {
precision <= DECIMAL128_MAX_PRECISION
&& value >= MIN_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
&& value <= MAX_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
}
7 changes: 0 additions & 7 deletions native/core/src/execution/datafusion/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,13 @@ mod normalize_nan;
pub use normalize_nan::NormalizeNaNAndZero;

use crate::errors::CometError;
pub mod avg;
pub mod avg_decimal;
pub mod bloom_filter_agg;
pub mod bloom_filter_might_contain;
pub mod comet_scalar_funcs;
pub mod correlation;
pub mod covariance;
pub mod negative;
pub mod stddev;
pub mod strings;
pub mod subquery;
pub mod sum_decimal;
pub mod unbound;
pub mod variance;

pub use datafusion_comet_spark_expr::{EvalMode, SparkError};

Expand Down
14 changes: 7 additions & 7 deletions native/core/src/execution/datafusion/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,14 @@ use crate::{
execution::{
datafusion::{
expressions::{
avg::Avg,
avg_decimal::AvgDecimal,
bitwise_not::BitwiseNotExpr,
bloom_filter_agg::BloomFilterAgg,
bloom_filter_might_contain::BloomFilterMightContain,
checkoverflow::CheckOverflow,
correlation::Correlation,
covariance::Covariance,
negative,
stddev::Stddev,
strings::{Contains, EndsWith, Like, StartsWith, StringSpaceExpr, SubstringExpr},
subquery::Subquery,
sum_decimal::SumDecimal,
unbound::UnboundColumn,
variance::Variance,
NormalizeNaNAndZero,
},
operators::expand::CometExpandExec,
Expand Down Expand Up @@ -98,6 +91,13 @@ use datafusion_comet_proto::{
},
spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning},
};
use datafusion_comet_spark_expr::avg::Avg;
use datafusion_comet_spark_expr::avg_decimal::AvgDecimal;
use datafusion_comet_spark_expr::correlation::Correlation;
use datafusion_comet_spark_expr::covariance::Covariance;
use datafusion_comet_spark_expr::stddev::Stddev;
use datafusion_comet_spark_expr::sum_decimal::SumDecimal;
use datafusion_comet_spark_expr::variance::Variance;
use datafusion_comet_spark_expr::{
ArrayInsert, Cast, CreateNamedStruct, DateTruncExpr, GetArrayStructFields, GetStructField,
HourExpr, IfExpr, ListExtract, MinuteExpr, RLike, SecondExpr, SparkCastOptions,
Expand Down
27 changes: 0 additions & 27 deletions native/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,30 +104,3 @@ fn default_logger_config() -> CometResult<Config> {
.build(root)
.map_err(|err| CometError::Config(err.to_string()))
}

// These are borrowed from hashbrown crate:
// https://github.com/rust-lang/hashbrown/blob/master/src/raw/mod.rs

// On stable we can use #[cold] to get a equivalent effect: this attributes
// suggests that the function is unlikely to be called
#[cfg(not(feature = "nightly"))]
#[inline]
#[cold]
fn cold() {}

#[cfg(not(feature = "nightly"))]
#[inline]
fn likely(b: bool) -> bool {
if !b {
cold();
}
b
}
#[cfg(not(feature = "nightly"))]
#[inline]
fn unlikely(b: bool) -> bool {
if b {
cold();
}
b
}
7 changes: 3 additions & 4 deletions native/core/src/parquet/read/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

use std::mem;

use arrow::buffer::Buffer;
use parquet::schema::types::ColumnDescPtr;

use super::values::Decoder;
use crate::{
common::bit::{self, read_u32, BitReader},
parquet::ParquetMutableVector,
unlikely,
};
use arrow::buffer::Buffer;
use datafusion_comet_spark_expr::utils::unlikely;
use parquet::schema::types::ColumnDescPtr;

const INITIAL_BUF_LEN: usize = 16;

Expand Down
2 changes: 1 addition & 1 deletion native/core/src/parquet/read/values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ use crate::write_val_or_null;
use crate::{
common::bit::{self, BitReader},
parquet::{data_type::*, ParquetMutableVector},
unlikely,
};
use arrow::datatypes::DataType as ArrowDataType;
use datafusion_comet_spark_expr::utils::unlikely;

pub fn get_decoder<T: DataType>(
value_data: Buffer,
Expand Down
3 changes: 3 additions & 0 deletions native/spark-expr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ edition = { workspace = true }
[dependencies]
arrow = { workspace = true }
arrow-array = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
chrono = { workspace = true }
datafusion = { workspace = true }
Expand All @@ -39,12 +40,14 @@ chrono-tz = { workspace = true }
num = { workspace = true }
regex = { workspace = true }
thiserror = { workspace = true }
futures = { workspace = true }
twox-hash = "2.0.0"

[dev-dependencies]
arrow-data = {workspace = true}
criterion = "0.5.1"
rand = { workspace = true}
tokio = { version = "1", features = ["rt-multi-thread"] }


[lib]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_common::{not_impl_err, Result, ScalarValue};
use datafusion_physical_expr::{expressions::format_state_name, PhysicalExpr};
use std::{any::Any, sync::Arc};

use crate::execution::datafusion::expressions::checkoverflow::is_valid_decimal_precision;
use crate::utils::is_valid_decimal_precision;
use arrow_array::ArrowNativeTypeOp;
use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
use datafusion::logical_expr::Volatility::Immutable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ use arrow::compute::{and, filter, is_not_null};

use std::{any::Any, sync::Arc};

use crate::execution::datafusion::expressions::{
covariance::CovarianceAccumulator, stddev::StddevAccumulator,
};
use crate::covariance::CovarianceAccumulator;
use crate::stddev::StddevAccumulator;
use arrow::{
array::ArrayRef,
datatypes::{DataType, Field},
Expand Down
7 changes: 7 additions & 0 deletions native/spark-expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,23 @@ mod cast;
mod error;
mod if_expr;

pub mod avg;
pub mod avg_decimal;
pub mod correlation;
pub mod covariance;
mod kernels;
mod list;
mod regexp;
pub mod scalar_funcs;
pub mod spark_hash;
pub mod stddev;
mod structs;
pub mod sum_decimal;
mod temporal;
pub mod timezone;
mod to_json;
pub mod utils;
pub mod variance;

pub use cast::{spark_cast, Cast, SparkCastOptions};
pub use error::{SparkError, SparkResult};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::{any::Any, sync::Arc};

use crate::execution::datafusion::expressions::variance::VarianceAccumulator;
use crate::variance::VarianceAccumulator;
use arrow::{
array::ArrayRef,
datatypes::{DataType, Field},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::execution::datafusion::expressions::checkoverflow::is_valid_decimal_precision;
use crate::unlikely;
use crate::utils::{is_valid_decimal_precision, unlikely};
use arrow::{
array::BooleanBufferBuilder,
buffer::{BooleanBuffer, NullBuffer},
Expand Down Expand Up @@ -113,7 +112,6 @@ impl AggregateUDFImpl for SumDecimal {
Ok(Box::new(SumDecimalGroupsAccumulator::new(
self.result_type.clone(),
self.precision,
self.scale,
)))
}

Expand Down Expand Up @@ -286,18 +284,16 @@ struct SumDecimalGroupsAccumulator {
sum: Vec<i128>,
result_type: DataType,
precision: u8,
scale: i8,
}

impl SumDecimalGroupsAccumulator {
fn new(result_type: DataType, precision: u8, scale: i8) -> Self {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scale was unused but the core crate suppresses dead code warnings so we did not know until this was moved

fn new(result_type: DataType, precision: u8) -> Self {
Self {
is_not_null: BooleanBufferBuilder::new(0),
is_empty: BooleanBufferBuilder::new(0),
sum: Vec::new(),
result_type,
precision,
scale,
}
}

Expand Down Expand Up @@ -488,11 +484,11 @@ mod tests {
use arrow::datatypes::*;
use arrow_array::builder::{Decimal128Builder, StringBuilder};
use arrow_array::RecordBatch;
use datafusion::execution::TaskContext;
use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use datafusion::physical_plan::memory::MemoryExec;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::Result;
use datafusion_execution::TaskContext;
use datafusion_expr::AggregateUDF;
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
use datafusion_physical_expr::expressions::{Column, Literal};
Expand Down
39 changes: 38 additions & 1 deletion native/spark-expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ use arrow_array::{
cast::as_primitive_array,
types::{Int32Type, TimestampMicrosecondType},
};
use arrow_schema::{ArrowError, DataType};
use arrow_schema::{ArrowError, DataType, DECIMAL128_MAX_PRECISION};
use std::sync::Arc;

use crate::timezone::Tz;
use arrow::{
array::{as_dictionary_array, Array, ArrayRef, PrimitiveArray},
temporal_conversions::as_datetime,
};
use arrow_data::decimal::{MAX_DECIMAL_FOR_EACH_PRECISION, MIN_DECIMAL_FOR_EACH_PRECISION};
use chrono::{DateTime, Offset, TimeZone};

/// Preprocesses input arrays to add timezone information from Spark to Arrow array datatype or
Expand Down Expand Up @@ -176,3 +177,39 @@ fn pre_timestamp_cast(array: ArrayRef, timezone: String) -> Result<ArrayRef, Arr
_ => Ok(array),
}
}

/// Adapted from arrow-rs `validate_decimal_precision` but returns bool
/// instead of Err to avoid the cost of formatting the error strings and is
/// optimized to remove a memcpy that exists in the original function
/// we can remove this code once we upgrade to a version of arrow-rs that
/// includes https://github.com/apache/arrow-rs/pull/6419
#[inline]
pub fn is_valid_decimal_precision(value: i128, precision: u8) -> bool {
precision <= DECIMAL128_MAX_PRECISION
&& value >= MIN_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
&& value <= MAX_DECIMAL_FOR_EACH_PRECISION[precision as usize - 1]
}

// These are borrowed from hashbrown crate:
// https://github.com/rust-lang/hashbrown/blob/master/src/raw/mod.rs

// On stable we can use #[cold] to get a equivalent effect: this attributes
// suggests that the function is unlikely to be called
#[inline]
#[cold]
pub fn cold() {}

#[inline]
pub fn likely(b: bool) -> bool {
if !b {
cold();
}
b
}
#[inline]
pub fn unlikely(b: bool) -> bool {
if b {
cold();
}
b
}
Loading