From 3eb55e9a0510d872f6f7765b1a5f17db46486e45 Mon Sep 17 00:00:00 2001 From: Brent Gardner Date: Tue, 9 Aug 2022 09:45:22 -0700 Subject: [PATCH] Upgrade to arrow 20.0.0 (but no change to object_store), including `prost`, and `tonic` (#3083) * Upgrade arrow fix decimal (#4) Fix human error Patch crates io to fix build (#5) * fix decimal * patch crate versions Patch objectstore Test in CI Undo override? Fix more errors Fix last error? Formatting Clippy Fixes Fix refs Able to get session context, but JDBC driver hung Upgrade to arrow 20 Upgrade to RC2 Formatting Fix some imports Install protoc Try platform agnostic path Debug in CI :( Debug in CI :( Debug in CI :( Not worth it, just separate builds Variables Fixes Fix windows? Fix windows? Hackily fix windows Down to 1 failure Fix protoc All? tests pass Formatting * Fix remaining tests * Clippy * Update docs for Windows * Try with old objectstore * Revert path "fixes" that broke windows * Update to arrow 20 --- .github/workflows/rust.yml | 76 +++++++- CONTRIBUTING.md | 9 + datafusion-cli/Cargo.toml | 2 +- datafusion-examples/Cargo.toml | 6 +- datafusion/common/Cargo.toml | 5 +- datafusion/common/src/from_slice.rs | 2 +- datafusion/common/src/scalar.rs | 22 +-- datafusion/core/Cargo.toml | 4 +- datafusion/core/fuzz-utils/Cargo.toml | 2 +- .../src/avro_to_arrow/arrow_array_reader.rs | 10 +- datafusion/core/src/avro_to_arrow/schema.rs | 4 +- .../core/src/catalog/information_schema.rs | 2 +- .../src/datasource/file_format/parquet.rs | 8 +- .../core/src/physical_optimizer/pruning.rs | 8 +- .../src/physical_plan/file_format/parquet.rs | 22 ++- .../core/src/physical_plan/hash_join.rs | 4 +- .../core/src/physical_plan/hash_utils.rs | 2 +- .../core/src/physical_plan/repartition.rs | 2 +- .../core/src/physical_plan/sort_merge_join.rs | 4 +- datafusion/core/src/scheduler/plan.rs | 2 +- datafusion/core/src/scheduler/task.rs | 2 +- datafusion/core/tests/parquet_pruning.rs | 4 +- datafusion/core/tests/sql/aggregates.rs | 8 +- datafusion/core/tests/sql/decimal.rs | 168 +++++++++--------- datafusion/core/tests/sql/errors.rs | 5 +- datafusion/core/tests/sql/joins.rs | 32 ++-- datafusion/core/tests/sql/mod.rs | 4 +- datafusion/expr/Cargo.toml | 2 +- datafusion/expr/src/aggregate_function.rs | 48 ++--- datafusion/expr/src/binary_rule.rs | 112 ++++++------ datafusion/expr/src/type_coercion.rs | 2 +- datafusion/expr/src/utils.rs | 2 +- datafusion/jit/Cargo.toml | 2 +- datafusion/optimizer/Cargo.toml | 2 +- .../src/decorrelate_scalar_subquery.rs | 6 +- .../optimizer/src/decorrelate_where_exists.rs | 3 +- .../optimizer/src/decorrelate_where_in.rs | 5 +- .../optimizer/src/simplify_expressions.rs | 15 +- datafusion/physical-expr/Cargo.toml | 2 +- .../physical-expr/src/aggregate/average.rs | 14 +- .../physical-expr/src/aggregate/build_in.rs | 37 ++-- .../physical-expr/src/aggregate/min_max.rs | 26 +-- datafusion/physical-expr/src/aggregate/sum.rs | 20 +-- .../src/aggregate/sum_distinct.rs | 4 +- .../physical-expr/src/expressions/binary.rs | 18 +- .../src/expressions/binary/adapter.rs | 2 +- .../src/expressions/binary/kernels_arrow.rs | 8 +- .../physical-expr/src/expressions/cast.rs | 40 ++--- .../physical-expr/src/expressions/in_list.rs | 56 +++++- .../physical-expr/src/expressions/try_cast.rs | 34 ++-- datafusion/physical-expr/src/type_coercion.rs | 2 +- datafusion/proto/Cargo.toml | 6 +- datafusion/proto/src/from_proto.rs | 4 +- datafusion/proto/src/lib.rs | 10 +- datafusion/proto/src/to_proto.rs | 4 +- datafusion/row/Cargo.toml | 2 +- datafusion/row/src/layout.rs | 2 +- datafusion/row/src/lib.rs | 2 +- datafusion/sql/Cargo.toml | 2 +- datafusion/sql/examples/sql.rs | 4 +- datafusion/sql/src/planner.rs | 18 +- datafusion/sql/src/utils.rs | 8 +- 62 files changed, 527 insertions(+), 416 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 3a15759e1eaa..6d08b98a5545 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -82,6 +82,16 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-linux-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + protoc --version - name: Cache Cargo uses: actions/cache@v3 with: @@ -94,6 +104,7 @@ jobs: rust-version: ${{ matrix.rust }} - name: Run tests run: | + export PATH=$PATH:$HOME/d/protoc/bin cargo test --features avro,jit,scheduler,json # test datafusion-sql examples cargo run --example sql @@ -159,17 +170,65 @@ jobs: POSTGRES_USER: postgres POSTGRES_PASSWORD: postgres - windows-and-macos: - name: Test on ${{ matrix.os }} Rust ${{ matrix.rust }} + windows: + name: Test on Windows Rust ${{ matrix.rust }} runs-on: ${{ matrix.os }} strategy: matrix: - os: [windows-latest, macos-latest] + os: [windows-latest] rust: [stable] steps: - uses: actions/checkout@v2 with: submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-win64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + protoc.exe --version + # TODO: this won't cache anything, which is expensive. Setup this action + # with a OS-dependent path. + - name: Setup Rust toolchain + run: | + rustup toolchain install ${{ matrix.rust }} + rustup default ${{ matrix.rust }} + rustup component add rustfmt + - name: Run tests + shell: bash + run: | + export PATH=$PATH:$HOME/d/protoc/bin + cargo test + env: + # do not produce debug symbols to keep memory usage down + RUSTFLAGS: "-C debuginfo=0" + + macos: + name: Test on MacOS Rust ${{ matrix.rust }} + runs-on: ${{ matrix.os }} + strategy: + matrix: + os: [macos-latest] + rust: [stable] + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-osx-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + echo "$HOME/d/protoc/bin" >> $GITHUB_PATH + export PATH=$PATH:$HOME/d/protoc/bin + protoc --version # TODO: this won't cache anything, which is expensive. Setup this action # with a OS-dependent path. - name: Setup Rust toolchain @@ -250,6 +309,16 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install protobuf compiler + shell: bash + run: | + mkdir -p $HOME/d/protoc + cd $HOME/d/protoc + export PROTO_ZIP="protoc-21.4-linux-x86_64.zip" + curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP + unzip $PROTO_ZIP + export PATH=$PATH:$HOME/d/protoc/bin + protoc --version - name: Setup Rust toolchain run: | rustup toolchain install ${{ matrix.rust }} @@ -263,6 +332,7 @@ jobs: key: cargo-coverage-cache3- - name: Run coverage run: | + export PATH=$PATH:$HOME/d/protoc/bin rustup toolchain install stable rustup default stable cargo install --version 0.20.1 cargo-tarpaulin diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 467fc47f765d..0a8a34fe87ca 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -35,6 +35,15 @@ list to help you get started. This section describes how you can get started at developing DataFusion. +### Windows setup + +```shell +wget https://az792536.vo.msecnd.net/vms/VMBuild_20190311/VirtualBox/MSEdge/MSEdge.Win10.VirtualBox.zip +choco install -y git rustup.install visualcpp-build-tools +git-bash.exe +cargo build +``` + ### Bootstrap environment DataFusion is written in Rust and it uses a standard rust toolkit: diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 6ac3a30c8249..adb9c7dbddc6 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -29,7 +29,7 @@ rust-version = "1.59" readme = "README.md" [dependencies] -arrow = { version = "19.0.0" } +arrow = { version = "20.0.0" } clap = { version = "3", features = ["derive", "cargo"] } datafusion = { path = "../datafusion/core", version = "10.0.0" } dirs = "4.0.0" diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index ed23512f3e2d..fde19e7db68a 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -34,13 +34,13 @@ path = "examples/avro_sql.rs" required-features = ["datafusion/avro"] [dev-dependencies] -arrow-flight = { version = "19.0.0" } +arrow-flight = { version = "20.0.0" } async-trait = "0.1.41" datafusion = { path = "../datafusion/core" } futures = "0.3" num_cpus = "1.13.0" -prost = "0.10" +prost = "0.11.0" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.82" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } -tonic = "0.7" +tonic = "0.8" diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index 9bab66adab4e..35ed2bb2ac3f 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -39,11 +39,12 @@ pyarrow = ["pyo3"] [dependencies] apache-avro = { version = "0.14", features = ["snappy"], optional = true } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } +avro-rs = { version = "0.13", features = ["snappy"], optional = true } cranelift-module = { version = "0.86.1", optional = true } object_store = { version = "0.3", optional = true } ordered-float = "3.0" -parquet = { version = "19.0.0", features = ["arrow"], optional = true } +parquet = { version = "20.0.0", features = ["arrow"], optional = true } pyo3 = { version = "0.16", optional = true } serde_json = "1.0" sqlparser = "0.20" diff --git a/datafusion/common/src/from_slice.rs b/datafusion/common/src/from_slice.rs index 2fedc668ae4e..385848f934e9 100644 --- a/datafusion/common/src/from_slice.rs +++ b/datafusion/common/src/from_slice.rs @@ -69,7 +69,7 @@ where offsets.push(length_so_far); values.extend_from_slice(s); } - let array_data = ArrayData::builder(Self::get_data_type()) + let array_data = ArrayData::builder(Self::DATA_TYPE) .len(slice.len()) .add_buffer(Buffer::from_slice_ref(&offsets)) .add_buffer(Buffer::from_slice_ref(&values)); diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 5b45b4b06967..3069a54f491f 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -27,7 +27,7 @@ use arrow::{ IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type, - DECIMAL_MAX_PRECISION, + DECIMAL128_MAX_PRECISION, }, util::decimal::{BasicDecimal, Decimal128}, }; @@ -611,7 +611,7 @@ impl ScalarValue { scale: usize, ) -> Result { // make sure the precision and scale is valid - if precision <= DECIMAL_MAX_PRECISION && scale <= precision { + if precision <= DECIMAL128_MAX_PRECISION && scale <= precision { return Ok(ScalarValue::Decimal128(Some(value), precision, scale)); } Err(DataFusionError::Internal(format!( @@ -654,7 +654,7 @@ impl ScalarValue { ScalarValue::Int32(_) => DataType::Int32, ScalarValue::Int64(_) => DataType::Int64, ScalarValue::Decimal128(_, precision, scale) => { - DataType::Decimal(*precision, *scale) + DataType::Decimal128(*precision, *scale) } ScalarValue::TimestampSecond(_, tz_opt) => { DataType::Timestamp(TimeUnit::Second, tz_opt.clone()) @@ -935,7 +935,7 @@ impl ScalarValue { } let array: ArrayRef = match &data_type { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { let decimal_array = ScalarValue::iter_to_decimal_array(scalars, precision, scale)?; Arc::new(decimal_array) @@ -1448,7 +1448,7 @@ impl ScalarValue { Ok(match array.data_type() { DataType::Null => ScalarValue::Null, - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { ScalarValue::get_decimal_value_from_array(array, index, precision, scale) } DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean), @@ -1899,7 +1899,7 @@ impl TryFrom<&DataType> for ScalarValue { DataType::UInt16 => ScalarValue::UInt16(None), DataType::UInt32 => ScalarValue::UInt32(None), DataType::UInt64 => ScalarValue::UInt64(None), - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { ScalarValue::Decimal128(None, *precision, *scale) } DataType::Utf8 => ScalarValue::Utf8(None), @@ -2145,7 +2145,7 @@ mod tests { #[test] fn scalar_decimal_test() { let decimal_value = ScalarValue::Decimal128(Some(123), 10, 1); - assert_eq!(DataType::Decimal(10, 1), decimal_value.get_datatype()); + assert_eq!(DataType::Decimal128(10, 1), decimal_value.get_datatype()); let try_into_value: i128 = decimal_value.clone().try_into().unwrap(); assert_eq!(123_i128, try_into_value); assert!(!decimal_value.is_null()); @@ -2163,14 +2163,14 @@ mod tests { let array = decimal_value.to_array(); let array = array.as_any().downcast_ref::().unwrap(); assert_eq!(1, array.len()); - assert_eq!(DataType::Decimal(10, 1), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone()); assert_eq!(123i128, array.value(0).as_i128()); // decimal scalar to array with size let array = decimal_value.to_array_of_size(10); let array_decimal = array.as_any().downcast_ref::().unwrap(); assert_eq!(10, array.len()); - assert_eq!(DataType::Decimal(10, 1), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone()); assert_eq!(123i128, array_decimal.value(0).as_i128()); assert_eq!(123i128, array_decimal.value(9).as_i128()); // test eq array @@ -2208,7 +2208,7 @@ mod tests { // convert the vec to decimal array and check the result let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap(); assert_eq!(3, array.len()); - assert_eq!(DataType::Decimal(10, 2), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone()); let decimal_vec = vec![ ScalarValue::Decimal128(Some(1), 10, 2), @@ -2218,7 +2218,7 @@ mod tests { ]; let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap(); assert_eq!(4, array.len()); - assert_eq!(DataType::Decimal(10, 2), array.data_type().clone()); + assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone()); assert!(ScalarValue::try_new_decimal128(1, 10, 2) .unwrap() diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml index a76b2fc49b93..4ef1af708504 100644 --- a/datafusion/core/Cargo.toml +++ b/datafusion/core/Cargo.toml @@ -56,7 +56,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion [dependencies] ahash = { version = "0.7", default-features = false } apache-avro = { version = "0.14", optional = true } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } async-trait = "0.1.41" bytes = "1.1" chrono = { version = "0.4", default-features = false } @@ -78,7 +78,7 @@ num_cpus = "1.13.0" object_store = "0.3.0" ordered-float = "3.0" parking_lot = "0.12" -parquet = { version = "19.0.0", features = ["arrow", "async"] } +parquet = { version = "20.0.0", features = ["arrow", "async"] } paste = "^1.0" pin-project-lite = "^0.2.7" pyo3 = { version = "0.16", optional = true } diff --git a/datafusion/core/fuzz-utils/Cargo.toml b/datafusion/core/fuzz-utils/Cargo.toml index 0d66a6999999..46c386f24bb6 100644 --- a/datafusion/core/fuzz-utils/Cargo.toml +++ b/datafusion/core/fuzz-utils/Cargo.toml @@ -23,6 +23,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } env_logger = "0.9.0" rand = "0.8" diff --git a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs index 864a675ed40f..2da8066b1c87 100644 --- a/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs +++ b/datafusion/core/src/avro_to_arrow/arrow_array_reader.rs @@ -101,12 +101,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> { "Failed to parse avro value: {:?}", e ))), - other => { - return Err(ArrowError::ParseError(format!( - "Row needs to be of type object, got: {:?}", - other - ))) - } + other => Err(ArrowError::ParseError(format!( + "Row needs to be of type object, got: {:?}", + other + ))), }) .collect::>>>()?; if rows.is_empty() { diff --git a/datafusion/core/src/avro_to_arrow/schema.rs b/datafusion/core/src/avro_to_arrow/schema.rs index 5e601504d1f6..d1964189610e 100644 --- a/datafusion/core/src/avro_to_arrow/schema.rs +++ b/datafusion/core/src/avro_to_arrow/schema.rs @@ -141,7 +141,7 @@ fn schema_to_field_with_props( AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32), AvroSchema::Decimal { precision, scale, .. - } => DataType::Decimal(*precision, *scale), + } => DataType::Decimal128(*precision, *scale), AvroSchema::Uuid => DataType::FixedSizeBinary(16), AvroSchema::Date => DataType::Date32, AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond), @@ -217,7 +217,7 @@ fn default_field_name(dt: &DataType) -> &str { DataType::Union(_, _, _) => "union", DataType::Dictionary(_, _) => "map", DataType::Map(_, _) => unimplemented!("Map support not implemented"), - DataType::Decimal(_, _) => "decimal", + DataType::Decimal128(_, _) => "decimal", DataType::Decimal256(_, _) => "decimal", } } diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs index d4944c2d5444..49b1f9dc7952 100644 --- a/datafusion/core/src/catalog/information_schema.rs +++ b/datafusion/core/src/catalog/information_schema.rs @@ -508,7 +508,7 @@ impl InformationSchemaColumnsBuilder { Float32 => (Some(24), Some(2), None), // Numbers from postgres `double` type Float64 => (Some(24), Some(2), None), - Decimal(precision, scale) => { + Decimal128(precision, scale) => { (Some(*precision as u64), Some(10), Some(*scale as u64)) } _ => (None, None, None), diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index dfd08352dffb..04b0b9d523df 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1073,7 +1073,7 @@ mod tests { assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(4, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal let exec = get_exec("int64_decimal.parquet", None, None).await?; @@ -1081,7 +1081,7 @@ mod tests { assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(10, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal let exec = get_exec("fixed_length_decimal.parquet", None, None).await?; @@ -1089,14 +1089,14 @@ mod tests { assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(25, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); - assert_eq!(&DataType::Decimal(13, 2), column.data_type()); + assert_eq!(&DataType::Decimal128(13, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal // TODO: arrow-rs don't support convert the physical type of binary to decimal diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs index 859ef79fc1d2..b68d1d13a8cb 100644 --- a/datafusion/core/src/physical_optimizer/pruning.rs +++ b/datafusion/core/src/physical_optimizer/pruning.rs @@ -800,7 +800,7 @@ mod tests { use crate::from_slice::FromSlice; use crate::logical_plan::{col, lit}; use crate::{assert_batches_eq, physical_optimizer::pruning::StatisticsType}; - use arrow::array::Decimal128Array; + use arrow::array::{BasicDecimalArray, Decimal128Array}; use arrow::{ array::{BinaryArray, Int32Array, Int64Array, StringArray}, datatypes::{DataType, TimeUnit}, @@ -1515,7 +1515,7 @@ mod tests { // decimal(9,2) let schema = Arc::new(Schema::new(vec![Field::new( "s1", - DataType::Decimal(9, 2), + DataType::Decimal128(9, 2), true, )])); // s1 > 5 @@ -1537,7 +1537,7 @@ mod tests { // decimal(18,2) let schema = Arc::new(Schema::new(vec![Field::new( "s1", - DataType::Decimal(18, 2), + DataType::Decimal128(18, 2), true, )])); // s1 > 5 @@ -1559,7 +1559,7 @@ mod tests { // decimal(23,2) let schema = Arc::new(Schema::new(vec![Field::new( "s1", - DataType::Decimal(23, 2), + DataType::Decimal128(23, 2), true, )])); // s1 > 5 diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f404e4de8578..bd5528c09d04 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -424,7 +424,7 @@ macro_rules! get_statistic { ParquetStatistics::Int32(s) => { match $target_arrow_type { // int32 to decimal with the precision and scale - Some(DataType::Decimal(precision, scale)) => { + Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(*s.$func() as i128), precision, @@ -437,7 +437,7 @@ macro_rules! get_statistic { ParquetStatistics::Int64(s) => { match $target_arrow_type { // int64 to decimal with the precision and scale - Some(DataType::Decimal(precision, scale)) => { + Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(*s.$func() as i128), precision, @@ -462,7 +462,7 @@ macro_rules! get_statistic { ParquetStatistics::FixedLenByteArray(s) => { match $target_arrow_type { // just support the decimal data type - Some(DataType::Decimal(precision, scale)) => { + Some(DataType::Decimal128(precision, scale)) => { Some(ScalarValue::Decimal128( Some(from_bytes_to_i128(s.$bytes_func())), precision, @@ -534,10 +534,10 @@ fn parquet_to_arrow_decimal_type(parquet_column: &ColumnDescriptor) -> Option { - Some(DataType::Decimal(precision as usize, scale as usize)) + Some(DataType::Decimal128(precision as usize, scale as usize)) } _ => match type_ptr.get_basic_info().converted_type() { - ConvertedType::DECIMAL => Some(DataType::Decimal( + ConvertedType::DECIMAL => Some(DataType::Decimal128( type_ptr.get_precision() as usize, type_ptr.get_scale() as usize, )), @@ -1474,7 +1474,8 @@ mod tests { // INT32: c1 > 5, the c1 is decimal(9,2) let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(9, 2), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]); let schema_descr = get_test_schema_descr(vec![( "c1", PhysicalType::INT32, @@ -1515,7 +1516,8 @@ mod tests { // INT32: c1 > 5, but parquet decimal type has different precision or scale to arrow decimal // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0) let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 5, 2))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(5, 2), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(5, 2), false)]); // The decimal of parquet is decimal(9,0) let schema_descr = get_test_schema_descr(vec![( "c1", @@ -1567,7 +1569,8 @@ mod tests { // INT64: c1 < 5, the c1 is decimal(18,2) let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18, 2), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]); let schema_descr = get_test_schema_descr(vec![( "c1", PhysicalType::INT64, @@ -1606,7 +1609,8 @@ mod tests { // FIXED_LENGTH_BYTE_ARRAY: c1 = 100, the c1 is decimal(28,2) // the type of parquet is decimal(18,2) let expr = col("c1").eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3))); - let schema = Schema::new(vec![Field::new("c1", DataType::Decimal(18, 3), false)]); + let schema = + Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 3), false)]); let schema_descr = get_test_schema_descr(vec![( "c1", PhysicalType::FIXED_LEN_BYTE_ARRAY, diff --git a/datafusion/core/src/physical_plan/hash_join.rs b/datafusion/core/src/physical_plan/hash_join.rs index 6540cc1301cf..6111e3ae1dd6 100644 --- a/datafusion/core/src/physical_plan/hash_join.rs +++ b/datafusion/core/src/physical_plan/hash_join.rs @@ -1098,8 +1098,8 @@ fn equal_rows( DataType::LargeUtf8 => { equal_rows_elem!(LargeStringArray, l, r, left, right, null_equals_null) } - DataType::Decimal(_, lscale) => match r.data_type() { - DataType::Decimal(_, rscale) => { + DataType::Decimal128(_, lscale) => match r.data_type() { + DataType::Decimal128(_, rscale) => { if lscale == rscale { equal_rows_elem!( Decimal128Array, diff --git a/datafusion/core/src/physical_plan/hash_utils.rs b/datafusion/core/src/physical_plan/hash_utils.rs index b9c34ec9b401..a89247d7e200 100644 --- a/datafusion/core/src/physical_plan/hash_utils.rs +++ b/datafusion/core/src/physical_plan/hash_utils.rs @@ -336,7 +336,7 @@ pub fn create_hashes<'a>( DataType::Null => { hash_null(random_state, hashes_buffer, multi_col); } - DataType::Decimal(_, _) => { + DataType::Decimal128(_, _) => { hash_decimal128(col, random_state, hashes_buffer, multi_col); } DataType::UInt8 => { diff --git a/datafusion/core/src/physical_plan/repartition.rs b/datafusion/core/src/physical_plan/repartition.rs index 552e1820a739..f9024797b388 100644 --- a/datafusion/core/src/physical_plan/repartition.rs +++ b/datafusion/core/src/physical_plan/repartition.rs @@ -933,7 +933,7 @@ mod tests { let items_set: HashSet<&str> = items_vec.iter().copied().collect(); assert_eq!(items_vec.len(), items_set.len()); let source_str_set: HashSet<&str> = - (&["foo", "bar", "frob", "baz", "goo", "gar", "grob", "gaz"]) + ["foo", "bar", "frob", "baz", "goo", "gar", "grob", "gaz"] .iter() .copied() .collect(); diff --git a/datafusion/core/src/physical_plan/sort_merge_join.rs b/datafusion/core/src/physical_plan/sort_merge_join.rs index 4eb1616c9a6d..da5f9c649f0b 100644 --- a/datafusion/core/src/physical_plan/sort_merge_join.rs +++ b/datafusion/core/src/physical_plan/sort_merge_join.rs @@ -1098,7 +1098,7 @@ fn compare_join_arrays( DataType::Float64 => compare_value!(Float64Array), DataType::Utf8 => compare_value!(StringArray), DataType::LargeUtf8 => compare_value!(LargeStringArray), - DataType::Decimal(..) => compare_value!(Decimal128Array), + DataType::Decimal128(..) => compare_value!(Decimal128Array), DataType::Timestamp(time_unit, None) => match time_unit { TimeUnit::Second => compare_value!(TimestampSecondArray), TimeUnit::Millisecond => compare_value!(TimestampMillisecondArray), @@ -1164,7 +1164,7 @@ fn is_join_arrays_equal( DataType::Float64 => compare_value!(Float64Array), DataType::Utf8 => compare_value!(StringArray), DataType::LargeUtf8 => compare_value!(LargeStringArray), - DataType::Decimal(..) => compare_value!(Decimal128Array), + DataType::Decimal128(..) => compare_value!(Decimal128Array), DataType::Timestamp(time_unit, None) => match time_unit { TimeUnit::Second => compare_value!(TimestampSecondArray), TimeUnit::Millisecond => compare_value!(TimestampMillisecondArray), diff --git a/datafusion/core/src/scheduler/plan.rs b/datafusion/core/src/scheduler/plan.rs index e7d5e1d33176..b5a786a32256 100644 --- a/datafusion/core/src/scheduler/plan.rs +++ b/datafusion/core/src/scheduler/plan.rs @@ -29,7 +29,7 @@ use crate::scheduler::pipeline::{ }; /// Identifies the [`Pipeline`] within the [`PipelinePlan`] to route output to -#[derive(Debug, Clone, Copy, PartialEq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct OutputLink { /// The index of the [`Pipeline`] in [`PipelinePlan`] to route output to pub pipeline: usize, diff --git a/datafusion/core/src/scheduler/task.rs b/datafusion/core/src/scheduler/task.rs index e90b2f07a012..b723a37ce7e8 100644 --- a/datafusion/core/src/scheduler/task.rs +++ b/datafusion/core/src/scheduler/task.rs @@ -137,7 +137,7 @@ impl Task { let partition = self.waker.partition; let waker = futures::task::waker_ref(&self.waker); - let mut cx = Context::from_waker(&*waker); + let mut cx = Context::from_waker(&waker); let pipelines = &self.context.pipelines; let routable = &pipelines[node]; diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs index f91d6e170ddd..bda3b1f8b66c 100644 --- a/datafusion/core/tests/parquet_pruning.rs +++ b/datafusion/core/tests/parquet_pruning.rs @@ -20,7 +20,7 @@ //! expected. use std::sync::Arc; -use arrow::array::Decimal128Array; +use arrow::array::{BasicDecimalArray, Decimal128Array}; use arrow::{ array::{ Array, ArrayRef, Date32Array, Date64Array, Float64Array, Int32Array, StringArray, @@ -881,7 +881,7 @@ fn make_f64_batch(v: Vec) -> RecordBatch { fn make_decimal_batch(v: Vec, precision: usize, scale: usize) -> RecordBatch { let schema = Arc::new(Schema::new(vec![Field::new( "decimal_col", - DataType::Decimal(precision, scale), + DataType::Decimal128(precision, scale), true, )])); let array = Arc::new( diff --git a/datafusion/core/tests/sql/aggregates.rs b/datafusion/core/tests/sql/aggregates.rs index eb0e07f84291..76918bcb0f30 100644 --- a/datafusion/core/tests/sql/aggregates.rs +++ b/datafusion/core/tests/sql/aggregates.rs @@ -1472,7 +1472,7 @@ async fn aggregate_decimal_min() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(10, 3), + &DataType::Decimal128(10, 3), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); @@ -1496,7 +1496,7 @@ async fn aggregate_decimal_max() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(10, 3), + &DataType::Decimal128(10, 3), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); @@ -1519,7 +1519,7 @@ async fn aggregate_decimal_sum() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(20, 3), + &DataType::Decimal128(20, 3), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); @@ -1542,7 +1542,7 @@ async fn aggregate_decimal_avg() -> Result<()> { "+-----------------+", ]; assert_eq!( - &DataType::Decimal(14, 7), + &DataType::Decimal128(14, 7), result[0].schema().field(0).data_type() ); assert_batches_sorted_eq!(expected, &result); diff --git a/datafusion/core/tests/sql/decimal.rs b/datafusion/core/tests/sql/decimal.rs index c8c242155257..9b16ca53462b 100644 --- a/datafusion/core/tests/sql/decimal.rs +++ b/datafusion/core/tests/sql/decimal.rs @@ -23,45 +23,45 @@ async fn decimal_cast() -> Result<()> { let sql = "select cast(1.23 as decimal(10,4))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 4), + &DataType::Decimal128(10, 4), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+---------------------------------------+", - "| CAST(Float64(1.23) AS Decimal(10, 4)) |", - "+---------------------------------------+", - "| 1.2300 |", - "+---------------------------------------+", + "+------------------------------------------+", + "| CAST(Float64(1.23) AS Decimal128(10, 4)) |", + "+------------------------------------------+", + "| 1.2300 |", + "+------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select cast(cast(1.23 as decimal(10,3)) as decimal(10,4))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 4), + &DataType::Decimal128(10, 4), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+---------------------------------------------------------------+", - "| CAST(CAST(Float64(1.23) AS Decimal(10, 3)) AS Decimal(10, 4)) |", - "+---------------------------------------------------------------+", - "| 1.2300 |", - "+---------------------------------------------------------------+", + "+---------------------------------------------------------------------+", + "| CAST(CAST(Float64(1.23) AS Decimal128(10, 3)) AS Decimal128(10, 4)) |", + "+---------------------------------------------------------------------+", + "| 1.2300 |", + "+---------------------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select cast(1.2345 as decimal(24,2))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(24, 2), + &DataType::Decimal128(24, 2), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+-----------------------------------------+", - "| CAST(Float64(1.2345) AS Decimal(24, 2)) |", - "+-----------------------------------------+", - "| 1.23 |", - "+-----------------------------------------+", + "+--------------------------------------------+", + "| CAST(Float64(1.2345) AS Decimal128(24, 2)) |", + "+--------------------------------------------+", + "| 1.23 |", + "+--------------------------------------------+", ]; assert_batches_eq!(expected, &actual); @@ -75,7 +75,7 @@ async fn decimal_by_sql() -> Result<()> { let sql = "SELECT c1 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -110,7 +110,7 @@ async fn decimal_by_filter() -> Result<()> { let sql = "select c1 from decimal_simple where c1 > 0.000030"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -133,11 +133,11 @@ async fn decimal_by_filter() -> Result<()> { let sql = "select * from decimal_simple where c1 > c5"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); assert_eq!( - &DataType::Decimal(12, 7), + &DataType::Decimal128(12, 7), actual[0].schema().field(4).data_type() ); let expected = vec![ @@ -161,7 +161,7 @@ async fn decimal_agg_function() -> Result<()> { let sql = "select min(c1) from decimal_simple where c4=false"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -176,7 +176,7 @@ async fn decimal_agg_function() -> Result<()> { let sql = "select max(c1) from decimal_simple where c4=false"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -193,7 +193,7 @@ async fn decimal_agg_function() -> Result<()> { let actual = execute_to_batches(&ctx, sql).await; // inferred precision is 10+10 assert_eq!( - &DataType::Decimal(20, 6), + &DataType::Decimal128(20, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -211,7 +211,7 @@ async fn decimal_agg_function() -> Result<()> { let sql = "select avg(c1) from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(14, 10), + &DataType::Decimal128(14, 10), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -234,7 +234,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1=CAST(0.00002 as Decimal(10,8))"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -274,7 +274,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where 0.00002 > c1"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -290,7 +290,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1 <= 0.00002"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -308,7 +308,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1 > 0.00002"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -335,7 +335,7 @@ async fn decimal_logic_op() -> Result<()> { let sql = "select * from decimal_simple where c1 >= 0.00002"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -372,7 +372,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let actual = execute_to_batches(&ctx, sql).await; // array decimal(10,6) + scalar decimal(20,0) => decimal(21,6) assert_eq!( - &DataType::Decimal(27, 6), + &DataType::Decimal128(27, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -401,7 +401,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1+c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(13, 7), + &DataType::Decimal128(13, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -430,7 +430,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1-1 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(27, 6), + &DataType::Decimal128(27, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -459,7 +459,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1-c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(13, 7), + &DataType::Decimal128(13, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -488,7 +488,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1*20 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(31, 6), + &DataType::Decimal128(31, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -517,7 +517,7 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1*c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(23, 13), + &DataType::Decimal128(23, 13), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -546,36 +546,36 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c1/cast(0.00001 as decimal(5,5)) from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(21, 12), + &DataType::Decimal128(21, 12), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+-------------------------------------------------------------+", - "| decimal_simple.c1 / CAST(Float64(0.00001) AS Decimal(5, 5)) |", - "+-------------------------------------------------------------+", - "| 1.000000000000 |", - "| 2.000000000000 |", - "| 2.000000000000 |", - "| 3.000000000000 |", - "| 3.000000000000 |", - "| 3.000000000000 |", - "| 4.000000000000 |", - "| 4.000000000000 |", - "| 4.000000000000 |", - "| 4.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "| 5.000000000000 |", - "+-------------------------------------------------------------+", + "+----------------------------------------------------------------+", + "| decimal_simple.c1 / CAST(Float64(0.00001) AS Decimal128(5, 5)) |", + "+----------------------------------------------------------------+", + "| 1.000000000000 |", + "| 2.000000000000 |", + "| 2.000000000000 |", + "| 3.000000000000 |", + "| 3.000000000000 |", + "| 3.000000000000 |", + "| 4.000000000000 |", + "| 4.000000000000 |", + "| 4.000000000000 |", + "| 4.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "| 5.000000000000 |", + "+----------------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select c1/c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(30, 19), + &DataType::Decimal128(30, 19), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -605,36 +605,36 @@ async fn decimal_arithmetic_op() -> Result<()> { let sql = "select c5%cast(0.00001 as decimal(5,5)) from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(7, 7), + &DataType::Decimal128(7, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ - "+-------------------------------------------------------------+", - "| decimal_simple.c5 % CAST(Float64(0.00001) AS Decimal(5, 5)) |", - "+-------------------------------------------------------------+", - "| 0.0000040 |", - "| 0.0000050 |", - "| 0.0000090 |", - "| 0.0000020 |", - "| 0.0000050 |", - "| 0.0000010 |", - "| 0.0000040 |", - "| 0.0000000 |", - "| 0.0000000 |", - "| 0.0000040 |", - "| 0.0000020 |", - "| 0.0000080 |", - "| 0.0000030 |", - "| 0.0000080 |", - "| 0.0000000 |", - "+-------------------------------------------------------------+", + "+----------------------------------------------------------------+", + "| decimal_simple.c5 % CAST(Float64(0.00001) AS Decimal128(5, 5)) |", + "+----------------------------------------------------------------+", + "| 0.0000040 |", + "| 0.0000050 |", + "| 0.0000090 |", + "| 0.0000020 |", + "| 0.0000050 |", + "| 0.0000010 |", + "| 0.0000040 |", + "| 0.0000000 |", + "| 0.0000000 |", + "| 0.0000040 |", + "| 0.0000020 |", + "| 0.0000080 |", + "| 0.0000030 |", + "| 0.0000080 |", + "| 0.0000000 |", + "+----------------------------------------------------------------+", ]; assert_batches_eq!(expected, &actual); let sql = "select c1%c5 from decimal_simple"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(11, 7), + &DataType::Decimal128(11, 7), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -670,7 +670,7 @@ async fn decimal_sort() -> Result<()> { let sql = "select * from decimal_simple where c1 >= 0.00004 order by c1"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -693,7 +693,7 @@ async fn decimal_sort() -> Result<()> { let sql = "select * from decimal_simple where c1 >= 0.00004 order by c1 desc"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -716,7 +716,7 @@ async fn decimal_sort() -> Result<()> { let sql = "select * from decimal_simple where c1 < 0.00003 order by c1 desc,c4"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(0).data_type() ); let expected = vec![ @@ -740,7 +740,7 @@ async fn decimal_group_function() -> Result<()> { let sql = "select count(*),c1 from decimal_simple group by c1 order by c1"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(1).data_type() ); let expected = vec![ @@ -759,7 +759,7 @@ async fn decimal_group_function() -> Result<()> { let sql = "select count(*),c1,c4 from decimal_simple group by c1,c4 order by c1,c4"; let actual = execute_to_batches(&ctx, sql).await; assert_eq!( - &DataType::Decimal(10, 6), + &DataType::Decimal128(10, 6), actual[0].schema().field(1).data_type() ); let expected = vec![ diff --git a/datafusion/core/tests/sql/errors.rs b/datafusion/core/tests/sql/errors.rs index 6a95912137c3..92cc88e28745 100644 --- a/datafusion/core/tests/sql/errors.rs +++ b/datafusion/core/tests/sql/errors.rs @@ -43,8 +43,9 @@ async fn test_cast_expressions_error() -> Result<()> { match result { Ok(_) => panic!("expected error"), Err(e) => { - assert_contains!(e.to_string(), - "Cast error: Cannot cast string 'c' to value of arrow::datatypes::types::Int32Type type" + assert_contains!( + e.to_string(), + "Cannot cast string 'c' to value of Int32 type" ); } } diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs index 19de20d6136a..b899ac220737 100644 --- a/datafusion/core/tests/sql/joins.rs +++ b/datafusion/core/tests/sql/joins.rs @@ -1265,10 +1265,10 @@ async fn hash_join_with_date32() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Inner Join: #t1.c1 = #t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Inner Join: #t1.c1 = #t2.c1 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1307,10 +1307,10 @@ async fn hash_join_with_date64() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Left Join: #t1.c2 = #t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Left Join: #t1.c2 = #t2.c2 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1351,10 +1351,10 @@ async fn hash_join_with_decimal() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Right Join: #t1.c3 = #t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Right Join: #t1.c3 = #t2.c3 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); @@ -1395,10 +1395,10 @@ async fn hash_join_with_dictionary() -> Result<()> { let plan = state.optimize(&plan)?; let expected = vec![ "Explain [plan_type:Utf8, plan:Utf8]", - " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " Inner Join: #t1.c4 = #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(5, 2);N, c4:Dictionary(Int32, Utf8);N]", - " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Projection: #t1.c1, #t1.c2, #t1.c3, #t1.c4, #t2.c1, #t2.c2, #t2.c3, #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " Inner Join: #t1.c4 = #t2.c4 [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N, c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t1 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(5, 2);N, c4:Dictionary(Int32, Utf8);N]", + " TableScan: t2 projection=[c1, c2, c3, c4] [c1:Date32;N, c2:Date64;N, c3:Decimal128(10, 2);N, c4:Dictionary(Int32, Utf8);N]", ]; let formatted = plan.display_indent_schema().to_string(); let actual: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 07bd2f9d39cd..6f1ae52aee83 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -279,7 +279,7 @@ fn create_hashjoin_datatype_context() -> Result { let t1_schema = Schema::new(vec![ Field::new("c1", DataType::Date32, true), Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal(5, 2), true), + Field::new("c3", DataType::Decimal128(5, 2), true), Field::new( "c4", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), @@ -312,7 +312,7 @@ fn create_hashjoin_datatype_context() -> Result { let t2_schema = Schema::new(vec![ Field::new("c1", DataType::Date32, true), Field::new("c2", DataType::Date64, true), - Field::new("c3", DataType::Decimal(10, 2), true), + Field::new("c3", DataType::Decimal128(10, 2), true), Field::new( "c4", DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)), diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 923015ae57cc..a3c4dd4acb0f 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -36,6 +36,6 @@ path = "src/lib.rs" [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } datafusion-common = { path = "../common", version = "10.0.0" } sqlparser = "0.20" diff --git a/datafusion/expr/src/aggregate_function.rs b/datafusion/expr/src/aggregate_function.rs index 09d759e56466..a4281aa0aa2a 100644 --- a/datafusion/expr/src/aggregate_function.rs +++ b/datafusion/expr/src/aggregate_function.rs @@ -19,7 +19,7 @@ use crate::{Signature, TypeSignature, Volatility}; use arrow::datatypes::{ - DataType, Field, TimeUnit, DECIMAL_MAX_PRECISION, DECIMAL_MAX_SCALE, + DataType, Field, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, }; use datafusion_common::{DataFusionError, Result}; use std::ops::Deref; @@ -407,11 +407,11 @@ pub fn sum_return_type(arg_type: &DataType) -> Result { // In the https://www.postgresql.org/docs/current/functions-aggregate.html doc, // the result type of floating-point is FLOAT64 with the double precision. DataType::Float64 | DataType::Float32 => Ok(DataType::Float64), - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { // in the spark, the result type is DECIMAL(min(38,precision+10), s) // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala#L66 - let new_precision = DECIMAL_MAX_PRECISION.min(*precision + 10); - Ok(DataType::Decimal(new_precision, *scale)) + let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 10); + Ok(DataType::Decimal128(new_precision, *scale)) } other => Err(DataFusionError::Plan(format!( "SUM does not support type \"{:?}\"", @@ -503,12 +503,12 @@ pub fn stddev_return_type(arg_type: &DataType) -> Result { /// function return type of an average pub fn avg_return_type(arg_type: &DataType) -> Result { match arg_type { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { // in the spark, the result type is DECIMAL(min(38,precision+4), min(38,scale+4)). // ref: https://github.com/apache/spark/blob/fcf636d9eb8d645c24be3db2d599aba2d7e2955a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala#L66 - let new_precision = DECIMAL_MAX_PRECISION.min(*precision + 4); - let new_scale = DECIMAL_MAX_SCALE.min(*scale + 4); - Ok(DataType::Decimal(new_precision, new_scale)) + let new_precision = DECIMAL128_MAX_PRECISION.min(*precision + 4); + let new_scale = DECIMAL128_MAX_SCALE.min(*scale + 4); + Ok(DataType::Decimal128(new_precision, new_scale)) } DataType::Int8 | DataType::Int16 @@ -609,7 +609,7 @@ pub fn is_sum_support_arg_type(arg_type: &DataType) -> bool { | DataType::Int64 | DataType::Float32 | DataType::Float64 - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) ) } @@ -626,7 +626,7 @@ pub fn is_avg_support_arg_type(arg_type: &DataType) -> bool { | DataType::Int64 | DataType::Float32 | DataType::Float64 - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) ) } @@ -755,7 +755,7 @@ mod tests { ]; let input_types = vec![ vec![DataType::Int32], - vec![DataType::Decimal(10, 2)], + vec![DataType::Decimal128(10, 2)], vec![DataType::Utf8], ]; for fun in funs { @@ -770,7 +770,7 @@ mod tests { let input_types = vec![ vec![DataType::Int32], vec![DataType::Float32], - vec![DataType::Decimal(20, 3)], + vec![DataType::Decimal128(20, 3)], ]; for fun in funs { for input_type in &input_types { @@ -807,13 +807,13 @@ mod tests { #[test] fn test_avg_return_data_type() -> Result<()> { - let data_type = DataType::Decimal(10, 5); + let data_type = DataType::Decimal128(10, 5); let result_type = avg_return_type(&data_type)?; - assert_eq!(DataType::Decimal(14, 9), result_type); + assert_eq!(DataType::Decimal128(14, 9), result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); let result_type = avg_return_type(&data_type)?; - assert_eq!(DataType::Decimal(38, 14), result_type); + assert_eq!(DataType::Decimal128(38, 14), result_type); Ok(()) } @@ -823,20 +823,20 @@ mod tests { let result_type = variance_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(variance_return_type(&data_type).is_err()); Ok(()) } #[test] fn test_sum_return_data_type() -> Result<()> { - let data_type = DataType::Decimal(10, 5); + let data_type = DataType::Decimal128(10, 5); let result_type = sum_return_type(&data_type)?; - assert_eq!(DataType::Decimal(20, 5), result_type); + assert_eq!(DataType::Decimal128(20, 5), result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); let result_type = sum_return_type(&data_type)?; - assert_eq!(DataType::Decimal(38, 10), result_type); + assert_eq!(DataType::Decimal128(38, 10), result_type); Ok(()) } @@ -846,7 +846,7 @@ mod tests { let result_type = stddev_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(stddev_return_type(&data_type).is_err()); Ok(()) } @@ -857,7 +857,7 @@ mod tests { let result_type = covariance_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(covariance_return_type(&data_type).is_err()); Ok(()) } @@ -868,7 +868,7 @@ mod tests { let result_type = correlation_return_type(&data_type)?; assert_eq!(DataType::Float64, result_type); - let data_type = DataType::Decimal(36, 10); + let data_type = DataType::Decimal128(36, 10); assert!(correlation_return_type(&data_type).is_err()); Ok(()) } diff --git a/datafusion/expr/src/binary_rule.rs b/datafusion/expr/src/binary_rule.rs index 21e62344cc34..d6994d68847a 100644 --- a/datafusion/expr/src/binary_rule.rs +++ b/datafusion/expr/src/binary_rule.rs @@ -19,7 +19,7 @@ use crate::Operator; use arrow::compute::can_cast_types; -use arrow::datatypes::{DataType, DECIMAL_MAX_PRECISION, DECIMAL_MAX_SCALE}; +use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE}; use datafusion_common::DataFusionError; use datafusion_common::Result; @@ -194,9 +194,9 @@ fn comparison_binary_numeric_coercion( // that the coercion removes the least amount of information match (lhs_type, rhs_type) { // support decimal data type for comparison operation - (d1 @ Decimal(_, _), d2 @ Decimal(_, _)) => get_wider_decimal_type(d1, d2), - (Decimal(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type), - (_, Decimal(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type), + (d1 @ Decimal128(_, _), d2 @ Decimal128(_, _)) => get_wider_decimal_type(d1, d2), + (Decimal128(_, _), _) => get_comparison_common_decimal_type(lhs_type, rhs_type), + (_, Decimal128(_, _)) => get_comparison_common_decimal_type(rhs_type, lhs_type), (Float64, _) | (_, Float64) => Some(Float64), (_, Float32) | (Float32, _) => Some(Float32), (Int64, _) | (_, Int64) => Some(Int64), @@ -218,25 +218,25 @@ fn get_comparison_common_decimal_type( let other_decimal_type = &match other_type { // This conversion rule is from spark // https://github.com/apache/spark/blob/1c81ad20296d34f137238dadd67cc6ae405944eb/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala#L127 - DataType::Int8 => DataType::Decimal(3, 0), - DataType::Int16 => DataType::Decimal(5, 0), - DataType::Int32 => DataType::Decimal(10, 0), - DataType::Int64 => DataType::Decimal(20, 0), - DataType::Float32 => DataType::Decimal(14, 7), - DataType::Float64 => DataType::Decimal(30, 15), + DataType::Int8 => DataType::Decimal128(3, 0), + DataType::Int16 => DataType::Decimal128(5, 0), + DataType::Int32 => DataType::Decimal128(10, 0), + DataType::Int64 => DataType::Decimal128(20, 0), + DataType::Float32 => DataType::Decimal128(14, 7), + DataType::Float64 => DataType::Decimal128(30, 15), _ => { return None; } }; match (decimal_type, &other_decimal_type) { - (d1 @ DataType::Decimal(_, _), d2 @ DataType::Decimal(_, _)) => { + (d1 @ DataType::Decimal128(_, _), d2 @ DataType::Decimal128(_, _)) => { get_wider_decimal_type(d1, d2) } _ => None, } } -// Returns a `DataType::Decimal` that can store any value from either +// Returns a `DataType::Decimal128` that can store any value from either // `lhs_decimal_type` and `rhs_decimal_type` // The result decimal type is (max(s1, s2) + max(p1-s1, p2-s2), max(s1, s2)). fn get_wider_decimal_type( @@ -244,7 +244,7 @@ fn get_wider_decimal_type( rhs_type: &DataType, ) -> Option { match (lhs_decimal_type, rhs_type) { - (DataType::Decimal(p1, s1), DataType::Decimal(p2, s2)) => { + (DataType::Decimal128(p1, s1), DataType::Decimal128(p2, s2)) => { // max(s1, s2) + max(p1-s1, p2-s2), max(s1, s2) let s = *s1.max(s2); let range = (p1 - s1).max(p2 - s2); @@ -258,13 +258,13 @@ fn get_wider_decimal_type( // Now, we just support the signed integer type and floating-point type. fn coerce_numeric_type_to_decimal(numeric_type: &DataType) -> Option { match numeric_type { - DataType::Int8 => Some(DataType::Decimal(3, 0)), - DataType::Int16 => Some(DataType::Decimal(5, 0)), - DataType::Int32 => Some(DataType::Decimal(10, 0)), - DataType::Int64 => Some(DataType::Decimal(20, 0)), + DataType::Int8 => Some(DataType::Decimal128(3, 0)), + DataType::Int16 => Some(DataType::Decimal128(5, 0)), + DataType::Int32 => Some(DataType::Decimal128(10, 0)), + DataType::Int64 => Some(DataType::Decimal128(20, 0)), // TODO if we convert the floating-point data to the decimal type, it maybe overflow. - DataType::Float32 => Some(DataType::Decimal(14, 7)), - DataType::Float64 => Some(DataType::Decimal(30, 15)), + DataType::Float32 => Some(DataType::Decimal128(14, 7)), + DataType::Float64 => Some(DataType::Decimal128(30, 15)), _ => None, } } @@ -289,10 +289,10 @@ fn mathematics_numerical_coercion( // these are ordered from most informative to least informative so // that the coercion removes the least amount of information match (lhs_type, rhs_type) { - (Decimal(_, _), Decimal(_, _)) => { + (Decimal128(_, _), Decimal128(_, _)) => { coercion_decimal_mathematics_type(mathematics_op, lhs_type, rhs_type) } - (Decimal(_, _), _) => { + (Decimal128(_, _), _) => { let converted_decimal_type = coerce_numeric_type_to_decimal(rhs_type); match converted_decimal_type { None => None, @@ -303,7 +303,7 @@ fn mathematics_numerical_coercion( ), } } - (_, Decimal(_, _)) => { + (_, Decimal128(_, _)) => { let converted_decimal_type = coerce_numeric_type_to_decimal(lhs_type); match converted_decimal_type { None => None, @@ -329,9 +329,9 @@ fn mathematics_numerical_coercion( } fn create_decimal_type(precision: usize, scale: usize) -> DataType { - DataType::Decimal( - DECIMAL_MAX_PRECISION.min(precision), - DECIMAL_MAX_SCALE.min(scale), + DataType::Decimal128( + DECIMAL128_MAX_PRECISION.min(precision), + DECIMAL128_MAX_SCALE.min(scale), ) } @@ -344,7 +344,7 @@ fn coercion_decimal_mathematics_type( match (left_decimal_type, right_decimal_type) { // The coercion rule from spark // https://github.com/apache/spark/blob/c20af535803a7250fef047c2bf0fe30be242369d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecision.scala#L35 - (Decimal(p1, s1), Decimal(p2, s2)) => { + (Decimal128(p1, s1), Decimal128(p2, s2)) => { match mathematics_op { Operator::Plus | Operator::Minus => { // max(s1, s2) @@ -392,19 +392,17 @@ pub fn is_signed_numeric(dt: &DataType) -> bool { | DataType::Float16 | DataType::Float32 | DataType::Float64 - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) ) } /// Determine if a DataType is numeric or not pub fn is_numeric(dt: &DataType) -> bool { is_signed_numeric(dt) - || match dt { - DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 => { - true - } - _ => false, - } + || matches!( + dt, + DataType::UInt8 | DataType::UInt16 | DataType::UInt32 | DataType::UInt64 + ) } /// Determine if at least of one of lhs and rhs is numeric, and the other must be NULL or numeric @@ -653,7 +651,7 @@ mod tests { #[test] fn test_decimal_binary_comparison_coercion() -> Result<()> { - let input_decimal = DataType::Decimal(20, 3); + let input_decimal = DataType::Decimal128(20, 3); let input_types = [ DataType::Int8, DataType::Int16, @@ -661,18 +659,18 @@ mod tests { DataType::Int64, DataType::Float32, DataType::Float64, - DataType::Decimal(38, 10), - DataType::Decimal(20, 8), + DataType::Decimal128(38, 10), + DataType::Decimal128(20, 8), ]; let result_types = [ - DataType::Decimal(20, 3), - DataType::Decimal(20, 3), - DataType::Decimal(20, 3), - DataType::Decimal(23, 3), - DataType::Decimal(24, 7), - DataType::Decimal(32, 15), - DataType::Decimal(38, 10), - DataType::Decimal(25, 8), + DataType::Decimal128(20, 3), + DataType::Decimal128(20, 3), + DataType::Decimal128(20, 3), + DataType::Decimal128(23, 3), + DataType::Decimal128(24, 7), + DataType::Decimal128(32, 15), + DataType::Decimal128(38, 10), + DataType::Decimal128(25, 8), ]; let comparison_op_types = [ Operator::NotEq, @@ -699,66 +697,66 @@ mod tests { fn test_decimal_mathematics_op_type() { assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int8).unwrap(), - DataType::Decimal(3, 0) + DataType::Decimal128(3, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int16).unwrap(), - DataType::Decimal(5, 0) + DataType::Decimal128(5, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int32).unwrap(), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Int64).unwrap(), - DataType::Decimal(20, 0) + DataType::Decimal128(20, 0) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Float32).unwrap(), - DataType::Decimal(14, 7) + DataType::Decimal128(14, 7) ); assert_eq!( coerce_numeric_type_to_decimal(&DataType::Float64).unwrap(), - DataType::Decimal(30, 15) + DataType::Decimal128(30, 15) ); let op = Operator::Plus; - let left_decimal_type = DataType::Decimal(10, 3); - let right_decimal_type = DataType::Decimal(20, 4); + let left_decimal_type = DataType::Decimal128(10, 3); + let right_decimal_type = DataType::Decimal128(20, 4); let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(21, 4), result.unwrap()); + assert_eq!(DataType::Decimal128(21, 4), result.unwrap()); let op = Operator::Minus; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(21, 4), result.unwrap()); + assert_eq!(DataType::Decimal128(21, 4), result.unwrap()); let op = Operator::Multiply; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(31, 7), result.unwrap()); + assert_eq!(DataType::Decimal128(31, 7), result.unwrap()); let op = Operator::Divide; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(35, 24), result.unwrap()); + assert_eq!(DataType::Decimal128(35, 24), result.unwrap()); let op = Operator::Modulo; let result = coercion_decimal_mathematics_type( &op, &left_decimal_type, &right_decimal_type, ); - assert_eq!(DataType::Decimal(11, 4), result.unwrap()); + assert_eq!(DataType::Decimal128(11, 4), result.unwrap()); } #[test] diff --git a/datafusion/expr/src/type_coercion.rs b/datafusion/expr/src/type_coercion.rs index 33a540d6f1ef..27eee3d30066 100644 --- a/datafusion/expr/src/type_coercion.rs +++ b/datafusion/expr/src/type_coercion.rs @@ -182,7 +182,7 @@ pub fn can_coerce_from(type_into: &DataType, type_from: &DataType) -> bool { | UInt64 | Float32 | Float64 - | Decimal(_, _) + | Decimal128(_, _) ), Timestamp(TimeUnit::Nanosecond, None) => { matches!(type_from, Null | Timestamp(_, None)) diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index f2f5d6002cc8..25ca8d6e36d6 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -689,7 +689,7 @@ pub fn can_hash(data_type: &DataType) -> bool { }, DataType::Utf8 => true, DataType::LargeUtf8 => true, - DataType::Decimal(_, _) => true, + DataType::Decimal128(_, _) => true, DataType::Date32 => true, DataType::Date64 => true, DataType::Dictionary(key_type, value_type) diff --git a/datafusion/jit/Cargo.toml b/datafusion/jit/Cargo.toml index e5b9e9297c09..8da87bbd7607 100644 --- a/datafusion/jit/Cargo.toml +++ b/datafusion/jit/Cargo.toml @@ -36,7 +36,7 @@ path = "src/lib.rs" jit = [] [dependencies] -arrow = { version = "19.0.0" } +arrow = { version = "20.0.0" } cranelift = "0.86.1" cranelift-jit = "0.86.1" cranelift-module = "0.86.1" diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 30d943c867ba..695f5be9b185 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -37,7 +37,7 @@ default = ["unicode_expressions"] unicode_expressions = [] [dependencies] -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } async-trait = "0.1.41" chrono = { version = "0.4", default-features = false } datafusion-common = { path = "../common", version = "10.0.0" } diff --git a/datafusion/optimizer/src/decorrelate_scalar_subquery.rs b/datafusion/optimizer/src/decorrelate_scalar_subquery.rs index d4f8372bd326..561757dc8745 100644 --- a/datafusion/optimizer/src/decorrelate_scalar_subquery.rs +++ b/datafusion/optimizer/src/decorrelate_scalar_subquery.rs @@ -69,7 +69,7 @@ impl DecorrelateScalarSubquery { _ => return Ok(()), }; let subquery = - self.optimize(&*subquery.subquery, optimizer_config)?; + self.optimize(&subquery.subquery, optimizer_config)?; let subquery = Arc::new(subquery); let subquery = Subquery { subquery }; let res = SubqueryInfo::new(subquery, expr, *op, lhs); @@ -163,7 +163,7 @@ fn optimize_scalar( "optimizing:\n{}", query_info.query.subquery.display_indent() ); - let proj = Projection::try_from_plan(&*query_info.query.subquery) + let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("scalar subqueries must have a projection", e))?; let proj = only_or_err(proj.expr.as_slice()) .map_err(|e| context!("exactly one expression should be projected", e))?; @@ -173,7 +173,7 @@ fn optimize_scalar( .map_err(|e| context!("Exactly one input is expected. Is this a join?", e))?; let aggr = Aggregate::try_from_plan(sub_input) .map_err(|e| context!("scalar subqueries must aggregate a value", e))?; - let filter = Filter::try_from_plan(&*aggr.input).map_err(|e| { + let filter = Filter::try_from_plan(&aggr.input).map_err(|e| { context!("scalar subqueries must have a filter to be correlated", e) })?; diff --git a/datafusion/optimizer/src/decorrelate_where_exists.rs b/datafusion/optimizer/src/decorrelate_where_exists.rs index 2c25bcbb28e7..90fff3f80054 100644 --- a/datafusion/optimizer/src/decorrelate_where_exists.rs +++ b/datafusion/optimizer/src/decorrelate_where_exists.rs @@ -56,8 +56,7 @@ impl DecorrelateWhereExists { for it in filters.iter() { match it { Expr::Exists { subquery, negated } => { - let subquery = - self.optimize(&*subquery.subquery, optimizer_config)?; + let subquery = self.optimize(&subquery.subquery, optimizer_config)?; let subquery = Arc::new(subquery); let subquery = Subquery { subquery }; let subquery = SubqueryInfo::new(subquery.clone(), *negated); diff --git a/datafusion/optimizer/src/decorrelate_where_in.rs b/datafusion/optimizer/src/decorrelate_where_in.rs index f90d94d8c16f..5da7d80c1691 100644 --- a/datafusion/optimizer/src/decorrelate_where_in.rs +++ b/datafusion/optimizer/src/decorrelate_where_in.rs @@ -60,8 +60,7 @@ impl DecorrelateWhereIn { subquery, negated, } => { - let subquery = - self.optimize(&*subquery.subquery, optimizer_config)?; + let subquery = self.optimize(&subquery.subquery, optimizer_config)?; let subquery = Arc::new(subquery); let subquery = Subquery { subquery }; let subquery = @@ -132,7 +131,7 @@ fn optimize_where_in( outer_other_exprs: &[Expr], optimizer_config: &mut OptimizerConfig, ) -> datafusion_common::Result { - let proj = Projection::try_from_plan(&*query_info.query.subquery) + let proj = Projection::try_from_plan(&query_info.query.subquery) .map_err(|e| context!("a projection is required", e))?; let mut subqry_input = proj.input.clone(); let proj = only_or_err(proj.expr.as_slice()) diff --git a/datafusion/optimizer/src/simplify_expressions.rs b/datafusion/optimizer/src/simplify_expressions.rs index 0b865238f42c..8bb829024f39 100644 --- a/datafusion/optimizer/src/simplify_expressions.rs +++ b/datafusion/optimizer/src/simplify_expressions.rs @@ -159,15 +159,7 @@ fn is_false(expr: &Expr) -> bool { /// returns true if `haystack` looks like (needle OP X) or (X OP needle) fn is_op_with(target_op: Operator, haystack: &Expr, needle: &Expr) -> bool { - match haystack { - Expr::BinaryExpr { left, op, right } - if op == &target_op - && (needle == left.as_ref() || needle == right.as_ref()) => - { - true - } - _ => false, - } + matches!(haystack, Expr::BinaryExpr { left, op, right } if op == &target_op && (needle == left.as_ref() || needle == right.as_ref())) } /// returns the contained boolean value in `expr` as @@ -1903,7 +1895,7 @@ mod tests { let optimized_plan = rule .optimize(plan, &mut config) .expect("failed to optimize plan"); - return format!("{:?}", optimized_plan); + format!("{:?}", optimized_plan) } #[test] @@ -1971,8 +1963,7 @@ mod tests { .build() .unwrap(); - let expected = - "Cannot cast string '' to value of arrow::datatypes::types::Int32Type type"; + let expected = "Cannot cast string '' to value of Int32 type"; let actual = get_optimized_plan_err(&plan, &Utc::now()); assert_contains!(actual, expected); } diff --git a/datafusion/physical-expr/Cargo.toml b/datafusion/physical-expr/Cargo.toml index 6199949f0256..5f25b2e4f188 100644 --- a/datafusion/physical-expr/Cargo.toml +++ b/datafusion/physical-expr/Cargo.toml @@ -40,7 +40,7 @@ unicode_expressions = ["unicode-segmentation"] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } blake2 = { version = "^0.10.2", optional = true } blake3 = { version = "1.0", optional = true } chrono = { version = "0.4", default-features = false } diff --git a/datafusion/physical-expr/src/aggregate/average.rs b/datafusion/physical-expr/src/aggregate/average.rs index a55e0e35278f..9248a5e6bb3f 100644 --- a/datafusion/physical-expr/src/aggregate/average.rs +++ b/datafusion/physical-expr/src/aggregate/average.rs @@ -54,7 +54,7 @@ impl Avg { // the result of avg just support FLOAT64 and Decimal data type. assert!(matches!( data_type, - DataType::Float64 | DataType::Decimal(_, _) + DataType::Float64 | DataType::Decimal128(_, _) )); Self { name: name.into(), @@ -301,10 +301,10 @@ mod tests { generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Avg, ScalarValue::Decimal128(Some(35000), 14, 4), - DataType::Decimal(14, 4) + DataType::Decimal128(14, 4) ) } @@ -318,10 +318,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Avg, ScalarValue::Decimal128(Some(32500), 14, 4), - DataType::Decimal(14, 4) + DataType::Decimal128(14, 4) ) } @@ -336,10 +336,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Avg, ScalarValue::Decimal128(None, 14, 4), - DataType::Decimal(14, 4) + DataType::Decimal128(14, 4) ) } diff --git a/datafusion/physical-expr/src/aggregate/build_in.rs b/datafusion/physical-expr/src/aggregate/build_in.rs index 7bfa9e0a1787..f47982becdba 100644 --- a/datafusion/physical-expr/src/aggregate/build_in.rs +++ b/datafusion/physical-expr/src/aggregate/build_in.rs @@ -292,7 +292,7 @@ mod tests { DataType::Int32, DataType::Float32, DataType::Float64, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), DataType::Utf8, ]; for fun in funcs { @@ -453,7 +453,7 @@ mod tests { DataType::Int32, DataType::Float32, DataType::Float64, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), DataType::Utf8, ]; for fun in funcs { @@ -898,7 +898,7 @@ mod tests { let observed = return_type( &AggregateFunction::ApproxMedian, - &[DataType::Decimal(10, 6)], + &[DataType::Decimal128(10, 6)], ); assert!(observed.is_err()); @@ -914,13 +914,14 @@ mod tests { assert_eq!(DataType::Int32, observed); // test decimal for min - let observed = return_type(&AggregateFunction::Min, &[DataType::Decimal(10, 6)])?; - assert_eq!(DataType::Decimal(10, 6), observed); + let observed = + return_type(&AggregateFunction::Min, &[DataType::Decimal128(10, 6)])?; + assert_eq!(DataType::Decimal128(10, 6), observed); // test decimal for max let observed = - return_type(&AggregateFunction::Max, &[DataType::Decimal(28, 13)])?; - assert_eq!(DataType::Decimal(28, 13), observed); + return_type(&AggregateFunction::Max, &[DataType::Decimal128(28, 13)])?; + assert_eq!(DataType::Decimal128(28, 13), observed); Ok(()) } @@ -939,11 +940,13 @@ mod tests { let observed = return_type(&AggregateFunction::Sum, &[DataType::Float64])?; assert_eq!(DataType::Float64, observed); - let observed = return_type(&AggregateFunction::Sum, &[DataType::Decimal(10, 5)])?; - assert_eq!(DataType::Decimal(20, 5), observed); + let observed = + return_type(&AggregateFunction::Sum, &[DataType::Decimal128(10, 5)])?; + assert_eq!(DataType::Decimal128(20, 5), observed); - let observed = return_type(&AggregateFunction::Sum, &[DataType::Decimal(35, 5)])?; - assert_eq!(DataType::Decimal(38, 5), observed); + let observed = + return_type(&AggregateFunction::Sum, &[DataType::Decimal128(35, 5)])?; + assert_eq!(DataType::Decimal128(38, 5), observed); Ok(()) } @@ -970,7 +973,7 @@ mod tests { assert_eq!(DataType::Int64, observed); let observed = - return_type(&AggregateFunction::Count, &[DataType::Decimal(28, 13)])?; + return_type(&AggregateFunction::Count, &[DataType::Decimal128(28, 13)])?; assert_eq!(DataType::Int64, observed); Ok(()) } @@ -986,11 +989,13 @@ mod tests { let observed = return_type(&AggregateFunction::Avg, &[DataType::Int32])?; assert_eq!(DataType::Float64, observed); - let observed = return_type(&AggregateFunction::Avg, &[DataType::Decimal(10, 6)])?; - assert_eq!(DataType::Decimal(14, 10), observed); + let observed = + return_type(&AggregateFunction::Avg, &[DataType::Decimal128(10, 6)])?; + assert_eq!(DataType::Decimal128(14, 10), observed); - let observed = return_type(&AggregateFunction::Avg, &[DataType::Decimal(36, 6)])?; - assert_eq!(DataType::Decimal(38, 10), observed); + let observed = + return_type(&AggregateFunction::Avg, &[DataType::Decimal128(36, 6)])?; + assert_eq!(DataType::Decimal128(38, 10), observed); Ok(()) } diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs b/datafusion/physical-expr/src/aggregate/min_max.rs index 077f4d725de3..0391382f91c4 100644 --- a/datafusion/physical-expr/src/aggregate/min_max.rs +++ b/datafusion/physical-expr/src/aggregate/min_max.rs @@ -207,7 +207,7 @@ macro_rules! typed_min_max_batch_decimal128 { macro_rules! min_max_batch { ($VALUES:expr, $OP:ident) => {{ match $VALUES.data_type() { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { typed_min_max_batch_decimal128!($VALUES, precision, scale, $OP) } // all types that have a natural order @@ -803,10 +803,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(Some(1), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -821,10 +821,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(None, 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -840,10 +840,10 @@ mod tests { generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(Some(1), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -892,10 +892,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Max, ScalarValue::Decimal128(Some(5), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -909,10 +909,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Max, ScalarValue::Decimal128(Some(5), 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } @@ -926,10 +926,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Min, ScalarValue::Decimal128(None, 10, 0), - DataType::Decimal(10, 0) + DataType::Decimal128(10, 0) ) } diff --git a/datafusion/physical-expr/src/aggregate/sum.rs b/datafusion/physical-expr/src/aggregate/sum.rs index b0a7de6c633c..634b21c61a33 100644 --- a/datafusion/physical-expr/src/aggregate/sum.rs +++ b/datafusion/physical-expr/src/aggregate/sum.rs @@ -176,7 +176,7 @@ fn sum_decimal_batch( pub(crate) fn sum_batch(values: &ArrayRef, sum_type: &DataType) -> Result { let values = &cast(values, sum_type)?; Ok(match values.data_type() { - DataType::Decimal(precision, scale) => { + DataType::Decimal128(precision, scale) => { sum_decimal_batch(values, precision, scale)? } DataType::Float64 => typed_sum_delta_batch!(values, Float64Array, Float64), @@ -544,7 +544,7 @@ mod tests { .collect::() .with_precision_and_scale(10, 0)?, ); - let result = sum_batch(&array, &DataType::Decimal(10, 0))?; + let result = sum_batch(&array, &DataType::Decimal128(10, 0))?; assert_eq!(ScalarValue::Decimal128(Some(15), 10, 0), result); // test agg @@ -557,10 +557,10 @@ mod tests { generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Sum, ScalarValue::Decimal128(Some(15), 20, 0), - DataType::Decimal(20, 0) + DataType::Decimal128(20, 0) ) } @@ -579,7 +579,7 @@ mod tests { .collect::() .with_precision_and_scale(10, 0)?, ); - let result = sum_batch(&array, &DataType::Decimal(10, 0))?; + let result = sum_batch(&array, &DataType::Decimal128(10, 0))?; assert_eq!(ScalarValue::Decimal128(Some(13), 10, 0), result); // test agg @@ -591,10 +591,10 @@ mod tests { ); generic_test_op!( array, - DataType::Decimal(35, 0), + DataType::Decimal128(35, 0), Sum, ScalarValue::Decimal128(Some(13), 38, 0), - DataType::Decimal(38, 0) + DataType::Decimal128(38, 0) ) } @@ -613,16 +613,16 @@ mod tests { .collect::() .with_precision_and_scale(10, 0)?, ); - let result = sum_batch(&array, &DataType::Decimal(10, 0))?; + let result = sum_batch(&array, &DataType::Decimal128(10, 0))?; assert_eq!(ScalarValue::Decimal128(None, 10, 0), result); // test agg generic_test_op!( array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Sum, ScalarValue::Decimal128(None, 20, 0), - DataType::Decimal(20, 0) + DataType::Decimal128(20, 0) ) } diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs b/datafusion/physical-expr/src/aggregate/sum_distinct.rs index d939a033e368..96ba81834959 100644 --- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs +++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs @@ -289,9 +289,9 @@ mod tests { ); generic_test_sum_distinct!( array, - DataType::Decimal(35, 0), + DataType::Decimal128(35, 0), ScalarValue::Decimal128(Some(1), 38, 0), - DataType::Decimal(38, 0) + DataType::Decimal128(38, 0) ) } } diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f199466e9ac9..64e35311625a 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -334,7 +334,7 @@ macro_rules! binary_primitive_array_op { match $LEFT.data_type() { // TODO support decimal type // which is not the primitive type - DataType::Decimal(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), + DataType::Decimal128(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array), DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array), DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array), @@ -359,7 +359,7 @@ macro_rules! binary_primitive_array_op { macro_rules! binary_primitive_array_op_scalar { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ let result: Result> = match $LEFT.data_type() { - DataType::Decimal(_,_) => compute_decimal_op_scalar!($LEFT, $RIGHT, $OP, Decimal128Array), + DataType::Decimal128(_,_) => compute_decimal_op_scalar!($LEFT, $RIGHT, $OP, Decimal128Array), DataType::Int8 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int8Array), DataType::Int16 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int16Array), DataType::Int32 => compute_op_scalar!($LEFT, $RIGHT, $OP, Int32Array), @@ -386,7 +386,7 @@ macro_rules! binary_array_op { ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{ match $LEFT.data_type() { DataType::Null => compute_null_op!($LEFT, $RIGHT, $OP, NullArray), - DataType::Decimal(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), + DataType::Decimal128(_,_) => compute_decimal_op!($LEFT, $RIGHT, $OP, Decimal128Array), DataType::Int8 => compute_op!($LEFT, $RIGHT, $OP, Int8Array), DataType::Int16 => compute_op!($LEFT, $RIGHT, $OP, Int16Array), DataType::Int32 => compute_op!($LEFT, $RIGHT, $OP, Int32Array), @@ -2208,7 +2208,7 @@ mod tests { // compare decimal array with other array type let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int64, true), - Field::new("b", DataType::Decimal(10, 0), true), + Field::new("b", DataType::Decimal128(10, 0), true), ])); let value: i64 = 123; @@ -2252,7 +2252,7 @@ mod tests { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Float64, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let value: i128 = 123; @@ -2353,7 +2353,7 @@ mod tests { fn arithmetic_decimal_expr_test() -> Result<()> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let value: i128 = 123; let decimal_array = Arc::new(create_decimal_array( @@ -2391,7 +2391,7 @@ mod tests { // subtract: decimal array subtract int32 array let schema = Arc::new(Schema::new(vec![ Field::new("b", DataType::Int32, true), - Field::new("a", DataType::Decimal(10, 2), true), + Field::new("a", DataType::Decimal128(10, 2), true), ])); let expect = Arc::new(create_decimal_array( &[Some(-12177), None, Some(-12178), Some(-12276)], @@ -2424,7 +2424,7 @@ mod tests { // divide: int32 array divide decimal array let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let expect = Arc::new(create_decimal_array( &[ @@ -2447,7 +2447,7 @@ mod tests { // modulus: int32 array modulus decimal array let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, true), - Field::new("b", DataType::Decimal(10, 2), true), + Field::new("b", DataType::Decimal128(10, 2), true), ])); let expect = Arc::new(create_decimal_array( &[Some(000), None, Some(100), Some(000)], diff --git a/datafusion/physical-expr/src/expressions/binary/adapter.rs b/datafusion/physical-expr/src/expressions/binary/adapter.rs index b0293cdf0be6..12b8fab89d76 100644 --- a/datafusion/physical-expr/src/expressions/binary/adapter.rs +++ b/datafusion/physical-expr/src/expressions/binary/adapter.rs @@ -38,7 +38,7 @@ macro_rules! make_dyn_comp_op { // Call `op_decimal` (e.g. `eq_decimal) until // arrow has native support // https://github.com/apache/arrow-rs/issues/1200 - (DataType::Decimal(_, _), DataType::Decimal(_, _)) => { + (DataType::Decimal128(_, _), DataType::Decimal128(_, _)) => { [<$OP _decimal>](as_decimal_array(left), as_decimal_array(right)) }, // By default call the arrow kernel diff --git a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs index ba8fff716e08..69b47944d779 100644 --- a/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs +++ b/datafusion/physical-expr/src/expressions/binary/kernels_arrow.rs @@ -133,7 +133,7 @@ where { Ok(left .iter() - .map(|left| left.map(|left| op(left, right))) + .map(|left| left.map(|left| op(left.as_i128(), right))) .collect()) } @@ -152,7 +152,7 @@ where .zip(right.iter()) .map(|(left, right)| { if let (Some(left), Some(right)) = (left, right) { - Some(op(left, right)) + Some(op(left.as_i128(), right.as_i128())) } else { None } @@ -288,7 +288,7 @@ where .zip(right.iter()) .map(|(left, right)| { if let (Some(left), Some(right)) = (left, right) { - Some(op(left, right)).transpose() + Some(op(left.as_i128(), right.as_i128())).transpose() } else { Ok(None) } @@ -307,7 +307,7 @@ where left.iter() .map(|left| { if let Some(left) = left { - Some(op(left, right)).transpose() + Some(op(left.as_i128(), right)).transpose() } else { Ok(None) } diff --git a/datafusion/physical-expr/src/expressions/cast.rs b/datafusion/physical-expr/src/expressions/cast.rs index 72503bbdb442..b575ca31104e 100644 --- a/datafusion/physical-expr/src/expressions/cast.rs +++ b/datafusion/physical-expr/src/expressions/cast.rs @@ -290,9 +290,9 @@ mod tests { generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), vec![ Some(convert(1_234_000)), Some(convert(2_222_000)), @@ -312,9 +312,9 @@ mod tests { let convert = |v: i128| Decimal128::new(10, 2, &v.to_le_bytes()); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(123)), Some(convert(222)), @@ -339,7 +339,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int8Array, DataType::Int8, vec![ @@ -360,7 +360,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int16Array, DataType::Int16, vec![ @@ -381,7 +381,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int32Array, DataType::Int32, vec![ @@ -402,7 +402,7 @@ mod tests { .with_precision_and_scale(10, 0)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int64Array, DataType::Int64, vec![ @@ -431,7 +431,7 @@ mod tests { .with_precision_and_scale(10, 3)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Float32Array, DataType::Float32, vec![ @@ -452,7 +452,7 @@ mod tests { .with_precision_and_scale(20, 6)?; generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), Float64Array, DataType::Float64, vec![ @@ -477,7 +477,7 @@ mod tests { DataType::Int8, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(3, 0), + DataType::Decimal128(3, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -495,7 +495,7 @@ mod tests { DataType::Int16, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(5, 0), + DataType::Decimal128(5, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -513,7 +513,7 @@ mod tests { DataType::Int32, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -531,7 +531,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 0), + DataType::Decimal128(20, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -549,7 +549,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 2), + DataType::Decimal128(20, 2), vec![ Some(convert(100)), Some(convert(200)), @@ -567,7 +567,7 @@ mod tests { DataType::Float32, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(150)), Some(convert(250)), @@ -585,7 +585,7 @@ mod tests { DataType::Float64, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(20, 4), + DataType::Decimal128(20, 4), vec![ Some(convert(15000)), Some(convert(25000)), @@ -678,9 +678,9 @@ mod tests { match result { Ok(_) => panic!("expected error"), Err(e) => { - assert!(e.to_string().contains( - "Cast error: Cannot cast string '9.1' to value of arrow::datatypes::types::Int32Type type" - )) + assert!(e + .to_string() + .contains("Cannot cast string '9.1' to value of Int32 type")) } } Ok(()) diff --git a/datafusion/physical-expr/src/expressions/in_list.rs b/datafusion/physical-expr/src/expressions/in_list.rs index 899a20835ba7..a391bf51dda1 100644 --- a/datafusion/physical-expr/src/expressions/in_list.rs +++ b/datafusion/physical-expr/src/expressions/in_list.rs @@ -254,6 +254,45 @@ macro_rules! collection_contains_check { }}; } +macro_rules! collection_contains_check_decimal { + ($ARRAY:expr, $VALUES:expr, $NEGATED:expr, $CONTAINS_NULL:expr) => {{ + let bool_array = if $NEGATED { + // Not in + if $CONTAINS_NULL { + $ARRAY + .iter() + .map(|vop| match vop.map(|v| !$VALUES.contains(&v.as_i128())) { + Some(true) => None, + x => x, + }) + .collect::() + } else { + $ARRAY + .iter() + .map(|vop| vop.map(|v| !$VALUES.contains(&v.as_i128()))) + .collect::() + } + } else { + // In + if $CONTAINS_NULL { + $ARRAY + .iter() + .map(|vop| match vop.map(|v| $VALUES.contains(&v.as_i128())) { + Some(false) => None, + x => x, + }) + .collect::() + } else { + $ARRAY + .iter() + .map(|vop| vop.map(|v| $VALUES.contains(&v.as_i128()))) + .collect::() + } + }; + ColumnarValue::Array(Arc::new(bool_array)) + }}; +} + // whether each value on the left (can be null) is contained in the non-null list fn in_list_utf8( array: &GenericStringArray, @@ -315,7 +354,7 @@ fn make_list_contains_decimal( }) .collect::>(); - collection_contains_check!(array, values, negated, contains_null) + collection_contains_check_decimal!(array, values, negated, contains_null) } fn make_set_contains_decimal( @@ -335,7 +374,7 @@ fn make_set_contains_decimal( .collect::>(); let native_set: HashSet = HashSet::from_iter(native_array); - collection_contains_check!(array, native_set, negated, contains_null) + collection_contains_check_decimal!(array, native_set, negated, contains_null) } fn set_contains_utf8( @@ -631,7 +670,7 @@ impl PhysicalExpr for InListExpr { .unwrap(); Ok(set_contains_utf8(array, set, self.negated)) } - DataType::Decimal(_, _) => { + DataType::Decimal128(_, _) => { let array = array.as_any().downcast_ref::().unwrap(); Ok(make_set_contains_decimal(array, set, self.negated)) } @@ -760,7 +799,7 @@ impl PhysicalExpr for InListExpr { let null_array = new_null_array(&DataType::Boolean, array.len()); Ok(ColumnarValue::Array(Arc::new(null_array))) } - DataType::Decimal(_, _) => { + DataType::Decimal128(_, _) => { let decimal_array = array.as_any().downcast_ref::().unwrap(); Ok(make_list_contains_decimal( @@ -1032,7 +1071,8 @@ mod tests { #[test] fn in_list_decimal() -> Result<()> { // Now, we can check the NULL type - let schema = Schema::new(vec![Field::new("a", DataType::Decimal(13, 4), true)]); + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(13, 4), true)]); let array = vec![Some(100_0000_i128), None, Some(200_5000_i128)] .into_iter() .collect::(); @@ -1278,7 +1318,8 @@ mod tests { #[test] fn in_list_set_decimal() -> Result<()> { - let schema = Schema::new(vec![Field::new("a", DataType::Decimal(13, 4), true)]); + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(13, 4), true)]); let array = vec![Some(100_0000_i128), Some(200_5000_i128), None] .into_iter() .collect::(); @@ -1320,7 +1361,8 @@ mod tests { #[test] fn test_cast_static_filter_to_set() -> Result<()> { // random schema - let schema = Schema::new(vec![Field::new("a", DataType::Decimal(13, 4), true)]); + let schema = + Schema::new(vec![Field::new("a", DataType::Decimal128(13, 4), true)]); // list of phy expr let mut phy_exprs = vec![ lit(1i64), diff --git a/datafusion/physical-expr/src/expressions/try_cast.rs b/datafusion/physical-expr/src/expressions/try_cast.rs index 5e8cc30feed0..0333cb30098a 100644 --- a/datafusion/physical-expr/src/expressions/try_cast.rs +++ b/datafusion/physical-expr/src/expressions/try_cast.rs @@ -237,9 +237,9 @@ mod tests { let convert = |v: i128| Decimal128::new(20, 6, &v.to_le_bytes()); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), vec![ Some(convert(1_234_000)), Some(convert(2_222_000)), @@ -254,9 +254,9 @@ mod tests { let convert = |v: i128| Decimal128::new(10, 2, &v.to_le_bytes()); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(123)), Some(convert(222)), @@ -279,7 +279,7 @@ mod tests { // decimal to i8 generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int8Array, DataType::Int8, vec![ @@ -296,7 +296,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 0); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int16Array, DataType::Int16, vec![ @@ -313,7 +313,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 0); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int32Array, DataType::Int32, vec![ @@ -330,7 +330,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 0); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), Int64Array, DataType::Int64, vec![ @@ -348,7 +348,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 10, 3); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(10, 3), + DataType::Decimal128(10, 3), Float32Array, DataType::Float32, vec![ @@ -364,7 +364,7 @@ mod tests { let decimal_array = create_decimal_array(&array, 20, 6); generic_decimal_to_other_test_cast!( decimal_array, - DataType::Decimal(20, 6), + DataType::Decimal128(20, 6), Float64Array, DataType::Float64, vec![ @@ -389,7 +389,7 @@ mod tests { DataType::Int8, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(3, 0), + DataType::Decimal128(3, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -406,7 +406,7 @@ mod tests { DataType::Int16, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(5, 0), + DataType::Decimal128(5, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -423,7 +423,7 @@ mod tests { DataType::Int32, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(10, 0), + DataType::Decimal128(10, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -440,7 +440,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 0), + DataType::Decimal128(20, 0), vec![ Some(convert(1)), Some(convert(2)), @@ -457,7 +457,7 @@ mod tests { DataType::Int64, vec![1, 2, 3, 4, 5], Decimal128Array, - DataType::Decimal(20, 2), + DataType::Decimal128(20, 2), vec![ Some(convert(100)), Some(convert(200)), @@ -474,7 +474,7 @@ mod tests { DataType::Float32, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(10, 2), + DataType::Decimal128(10, 2), vec![ Some(convert(150)), Some(convert(250)), @@ -491,7 +491,7 @@ mod tests { DataType::Float64, vec![1.5, 2.5, 3.0, 1.123_456_8, 5.50], Decimal128Array, - DataType::Decimal(20, 4), + DataType::Decimal128(20, 4), vec![ Some(convert(15000)), Some(convert(25000)), diff --git a/datafusion/physical-expr/src/type_coercion.rs b/datafusion/physical-expr/src/type_coercion.rs index fb5f59ef376d..c7648cc264d9 100644 --- a/datafusion/physical-expr/src/type_coercion.rs +++ b/datafusion/physical-expr/src/type_coercion.rs @@ -78,7 +78,7 @@ mod tests { Schema::new( t.iter() .enumerate() - .map(|(i, t)| Field::new(&*format!("c{}", i), t.clone(), true)) + .map(|(i, t)| Field::new(&format!("c{}", i), t.clone(), true)) .collect(), ) }; diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml index c2966dcdee86..dc8991975bd5 100644 --- a/datafusion/proto/Cargo.toml +++ b/datafusion/proto/Cargo.toml @@ -37,13 +37,13 @@ default = [] json = ["pbjson", "pbjson-build", "serde", "serde_json"] [dependencies] -arrow = { version = "19.0.0" } +arrow = { version = "20.0.0" } datafusion = { path = "../core", version = "10.0.0" } datafusion-common = { path = "../common", version = "10.0.0" } datafusion-expr = { path = "../expr", version = "10.0.0" } pbjson = { version = "0.3", optional = true } pbjson-types = { version = "0.3", optional = true } -prost = "0.10" +prost = "0.11.0" serde = { version = "1.0", optional = true } serde_json = { version = "1.0", optional = true } @@ -53,4 +53,4 @@ tokio = "1.18" [build-dependencies] pbjson-build = { version = "0.3", optional = true } -prost-build = { version = "0.10" } +prost-build = { version = "0.11.1" } diff --git a/datafusion/proto/src/from_proto.rs b/datafusion/proto/src/from_proto.rs index efe174082047..c1c88e70d156 100644 --- a/datafusion/proto/src/from_proto.rs +++ b/datafusion/proto/src/from_proto.rs @@ -226,7 +226,7 @@ impl From for DataType { DataType::Time64(TimeUnit::Nanosecond) } protobuf::PrimitiveScalarType::Null => DataType::Null, - protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal(0, 0), + protobuf::PrimitiveScalarType::Decimal128 => DataType::Decimal128(0, 0), protobuf::PrimitiveScalarType::Date64 => DataType::Date64, protobuf::PrimitiveScalarType::TimeSecond => { DataType::Timestamp(TimeUnit::Second, None) @@ -309,7 +309,7 @@ impl TryFrom<&protobuf::arrow_type::ArrowTypeEnum> for DataType { arrow_type::ArrowTypeEnum::Decimal(protobuf::Decimal { whole, fractional, - }) => DataType::Decimal(*whole as usize, *fractional as usize), + }) => DataType::Decimal128(*whole as usize, *fractional as usize), arrow_type::ArrowTypeEnum::List(list) => { let list_type = list.as_ref().field_type.as_deref().required("field_type")?; diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs index 88230766d907..c69723442790 100644 --- a/datafusion/proto/src/lib.rs +++ b/datafusion/proto/src/lib.rs @@ -153,7 +153,7 @@ mod roundtrip_tests { pub expr: ::core::option::Option, } - #[derive(Clone, PartialEq, ::prost::Message)] + #[derive(Clone, PartialEq, Eq, ::prost::Message)] pub struct TopKExecProto { #[prost(uint64, tag = "1")] pub k: u64, @@ -569,7 +569,7 @@ mod roundtrip_tests { DataType::FixedSizeBinary(1234), DataType::FixedSizeBinary(-432), DataType::LargeBinary, - DataType::Decimal(1345, 5431), + DataType::Decimal128(1345, 5431), // Recursive list tests DataType::List(new_box_field("Level1", DataType::Binary, true)), DataType::List(new_box_field( @@ -651,7 +651,7 @@ mod roundtrip_tests { ])), ), DataType::Dictionary( - Box::new(DataType::Decimal(10, 50)), + Box::new(DataType::Decimal128(10, 50)), Box::new(DataType::FixedSizeList( new_box_field("Level1", DataType::Binary, true), 4, @@ -724,7 +724,7 @@ mod roundtrip_tests { DataType::LargeBinary, DataType::Utf8, DataType::LargeUtf8, - DataType::Decimal(1345, 5431), + DataType::Decimal128(1345, 5431), // Recursive list tests DataType::List(new_box_field("Level1", DataType::Binary, true)), DataType::List(new_box_field( @@ -806,7 +806,7 @@ mod roundtrip_tests { ])), ), DataType::Dictionary( - Box::new(DataType::Decimal(10, 50)), + Box::new(DataType::Decimal128(10, 50)), Box::new(DataType::FixedSizeList( new_box_field("Level1", DataType::Binary, true), 4, diff --git a/datafusion/proto/src/to_proto.rs b/datafusion/proto/src/to_proto.rs index bae41bc2759a..e78706e527da 100644 --- a/datafusion/proto/src/to_proto.rs +++ b/datafusion/proto/src/to_proto.rs @@ -219,7 +219,7 @@ impl From<&DataType> for protobuf::arrow_type::ArrowTypeEnum { value: Some(Box::new(value_type.as_ref().into())), })) } - DataType::Decimal(whole, fractional) => Self::Decimal(protobuf::Decimal { + DataType::Decimal128(whole, fractional) => Self::Decimal(protobuf::Decimal { whole: *whole as u64, fractional: *fractional as u64, }), @@ -1244,7 +1244,7 @@ impl TryFrom<&DataType> for protobuf::scalar_type::Datatype { | DataType::Union(_, _, _) | DataType::Dictionary(_, _) | DataType::Map(_, _) - | DataType::Decimal(_, _) + | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => { return Err(Error::invalid_scalar_type(val)); } diff --git a/datafusion/row/Cargo.toml b/datafusion/row/Cargo.toml index 2227103f4b48..1621a216e1a9 100644 --- a/datafusion/row/Cargo.toml +++ b/datafusion/row/Cargo.toml @@ -37,7 +37,7 @@ path = "src/lib.rs" jit = ["datafusion-jit"] [dependencies] -arrow = { version = "19.0.0" } +arrow = { version = "20.0.0" } datafusion-common = { path = "../common", version = "10.0.0" } datafusion-jit = { path = "../jit", version = "10.0.0", optional = true } paste = "^1.0" diff --git a/datafusion/row/src/layout.rs b/datafusion/row/src/layout.rs index e5214f7c307e..1518df9bf55a 100644 --- a/datafusion/row/src/layout.rs +++ b/datafusion/row/src/layout.rs @@ -166,7 +166,7 @@ fn word_aligned_offsets(null_width: usize, schema: &Schema) -> (Vec, usiz let mut offset = null_width; for f in schema.fields() { offsets.push(offset); - assert!(!matches!(f.data_type(), DataType::Decimal(_, _))); + assert!(!matches!(f.data_type(), DataType::Decimal128(_, _))); // All of the current support types can fit into one single 8-bytes word. // When we decide to support Decimal type in the future, its width would be // of two 8-bytes words and should adapt the width calculation below. diff --git a/datafusion/row/src/lib.rs b/datafusion/row/src/lib.rs index 5a76693564ab..7d715f9da4ea 100644 --- a/datafusion/row/src/lib.rs +++ b/datafusion/row/src/lib.rs @@ -388,7 +388,7 @@ mod tests { fn test_unsupported_type_read() { let schema = Arc::new(Schema::new(vec![Field::new( "a", - DataType::Decimal(5, 2), + DataType::Decimal128(5, 2), false, )])); let vector = vec![0; 1024]; diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index 17e658399547..a6499a7f998d 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -38,7 +38,7 @@ unicode_expressions = [] [dependencies] ahash = { version = "0.7", default-features = false } -arrow = { version = "19.0.0", features = ["prettyprint"] } +arrow = { version = "20.0.0", features = ["prettyprint"] } datafusion-common = { path = "../common", version = "10.0.0" } datafusion-expr = { path = "../expr", version = "10.0.0" } hashbrown = "0.12" diff --git a/datafusion/sql/examples/sql.rs b/datafusion/sql/examples/sql.rs index d5af9d5ed287..f03cad0b6fe3 100644 --- a/datafusion/sql/examples/sql.rs +++ b/datafusion/sql/examples/sql.rs @@ -75,7 +75,7 @@ impl MySchemaProvider { "state".to_string(), create_table_source(vec![ Field::new("id", DataType::Int32, false), - Field::new("sales_tax", DataType::Decimal(10, 2), false), + Field::new("sales_tax", DataType::Decimal128(10, 2), false), ]), ); tables.insert( @@ -85,7 +85,7 @@ impl MySchemaProvider { Field::new("customer_id", DataType::Int32, false), Field::new("item_id", DataType::Int32, false), Field::new("quantity", DataType::Int32, false), - Field::new("price", DataType::Decimal(10, 2), false), + Field::new("price", DataType::Decimal128(10, 2), false), ]), ); Self { tables } diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index dc71dc4b2f1c..22a9b1f0e494 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -369,9 +369,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_ref: TableReference = table_name.as_str().into(); // check if table_name exists - if let Err(e) = self.schema_provider.get_table_provider(table_ref) { - return Err(e); - } + let _ = self.schema_provider.get_table_provider(table_ref)?; if self.has_table("information_schema", "tables") { let sql = format!("SELECT column_name, data_type, is_nullable \ @@ -2287,9 +2285,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_name = normalize_sql_object_name(sql_table_name); let table_ref: TableReference = table_name.as_str().into(); - if let Err(e) = self.schema_provider.get_table_provider(table_ref) { - return Err(e); - } + let _ = self.schema_provider.get_table_provider(table_ref)?; // Figure out the where clause let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter(); @@ -2334,9 +2330,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { let table_name = normalize_sql_object_name(sql_table_name); let table_ref: TableReference = table_name.as_str().into(); - if let Err(e) = self.schema_provider.get_table_provider(table_ref) { - return Err(e); - } + let _ = self.schema_provider.get_table_provider(table_ref)?; // Figure out the where clause let columns = vec!["table_name", "table_schema", "table_catalog"].into_iter(); @@ -2626,7 +2620,7 @@ mod tests { fn test_int_decimal_default() { quick_test( "SELECT CAST(10 AS DECIMAL)", - "Projection: CAST(Int64(10) AS Decimal(38, 10))\ + "Projection: CAST(Int64(10) AS Decimal128(38, 10))\ \n EmptyRelation", ); } @@ -2635,7 +2629,7 @@ mod tests { fn test_int_decimal_no_scale() { quick_test( "SELECT CAST(10 AS DECIMAL(5))", - "Projection: CAST(Int64(10) AS Decimal(5, 0))\ + "Projection: CAST(Int64(10) AS Decimal128(5, 0))\ \n EmptyRelation", ); } @@ -4424,7 +4418,7 @@ mod tests { ])), "test_decimal" => Ok(Schema::new(vec![ Field::new("id", DataType::Int32, false), - Field::new("price", DataType::Decimal(10, 2), false), + Field::new("price", DataType::Decimal128(10, 2), false), ])), "person" => Ok(Schema::new(vec![ Field::new("id", DataType::UInt32, false), diff --git a/datafusion/sql/src/utils.rs b/datafusion/sql/src/utils.rs index 00c28f823417..81ea34de187b 100644 --- a/datafusion/sql/src/utils.rs +++ b/datafusion/sql/src/utils.rs @@ -17,7 +17,7 @@ //! SQL Utility Functions -use arrow::datatypes::{DataType, DECIMAL_DEFAULT_SCALE, DECIMAL_MAX_PRECISION}; +use arrow::datatypes::{DataType, DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE}; use sqlparser::ast::Ident; use datafusion_common::{DataFusionError, Result, ScalarValue}; @@ -454,17 +454,17 @@ pub(crate) fn make_decimal_type( "Cannot specify only scale for decimal data type".to_string(), )) } - (None, None) => (DECIMAL_MAX_PRECISION, DECIMAL_DEFAULT_SCALE), + (None, None) => (DECIMAL128_MAX_PRECISION, DECIMAL_DEFAULT_SCALE), }; // Arrow decimal is i128 meaning 38 maximum decimal digits - if precision > DECIMAL_MAX_PRECISION || scale > precision { + if precision > DECIMAL128_MAX_PRECISION || scale > precision { Err(DataFusionError::Internal(format!( "For decimal(precision, scale) precision must be less than or equal to 38 and scale can't be greater than precision. Got ({}, {})", precision, scale ))) } else { - Ok(DataType::Decimal(precision, scale)) + Ok(DataType::Decimal128(precision, scale)) } }