Skip to content

Commit

Permalink
Upgrade to arrow 20.0.0 (but no change to object_store), including `p…
Browse files Browse the repository at this point in the history
…rost`, and `tonic` (#3083)

* Upgrade arrow

fix decimal (#4)

Fix human error

Patch crates io to fix build (#5)

* fix decimal

* patch crate versions

Patch objectstore

Test in CI

Undo override?

Fix more errors

Fix last error?

Formatting

Clippy

Fixes

Fix refs

Able to get session context, but JDBC driver hung

Upgrade to arrow 20

Upgrade to RC2

Formatting

Fix some imports

Install protoc

Try platform agnostic path

Debug in CI :(

Debug in CI :(

Debug in CI :(

Not worth it, just separate builds

Variables

Fixes

Fix windows?

Fix windows?

Hackily fix windows

Down to 1 failure

Fix protoc

All? tests pass

Formatting

* Fix remaining tests

* Clippy

* Update docs for Windows

* Try with old objectstore

* Revert path "fixes" that broke windows

* Update to arrow 20
  • Loading branch information
Brent Gardner committed Aug 9, 2022
1 parent 1e44417 commit 3eb55e9
Show file tree
Hide file tree
Showing 62 changed files with 527 additions and 416 deletions.
76 changes: 73 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
export PATH=$PATH:$HOME/d/protoc/bin
protoc --version
- name: Cache Cargo
uses: actions/cache@v3
with:
Expand All @@ -94,6 +104,7 @@ jobs:
rust-version: ${{ matrix.rust }}
- name: Run tests
run: |
export PATH=$PATH:$HOME/d/protoc/bin
cargo test --features avro,jit,scheduler,json
# test datafusion-sql examples
cargo run --example sql
Expand Down Expand Up @@ -159,17 +170,65 @@ jobs:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres

windows-and-macos:
name: Test on ${{ matrix.os }} Rust ${{ matrix.rust }}
windows:
name: Test on Windows Rust ${{ matrix.rust }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [windows-latest, macos-latest]
os: [windows-latest]
rust: [stable]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-win64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
export PATH=$PATH:$HOME/d/protoc/bin
protoc.exe --version
# TODO: this won't cache anything, which is expensive. Setup this action
# with a OS-dependent path.
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
rustup default ${{ matrix.rust }}
rustup component add rustfmt
- name: Run tests
shell: bash
run: |
export PATH=$PATH:$HOME/d/protoc/bin
cargo test
env:
# do not produce debug symbols to keep memory usage down
RUSTFLAGS: "-C debuginfo=0"

macos:
name: Test on MacOS Rust ${{ matrix.rust }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [macos-latest]
rust: [stable]
steps:
- uses: actions/checkout@v2
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-osx-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
echo "$HOME/d/protoc/bin" >> $GITHUB_PATH
export PATH=$PATH:$HOME/d/protoc/bin
protoc --version
# TODO: this won't cache anything, which is expensive. Setup this action
# with a OS-dependent path.
- name: Setup Rust toolchain
Expand Down Expand Up @@ -250,6 +309,16 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- name: Install protobuf compiler
shell: bash
run: |
mkdir -p $HOME/d/protoc
cd $HOME/d/protoc
export PROTO_ZIP="protoc-21.4-linux-x86_64.zip"
curl -LO https://github.com/protocolbuffers/protobuf/releases/download/v21.4/$PROTO_ZIP
unzip $PROTO_ZIP
export PATH=$PATH:$HOME/d/protoc/bin
protoc --version
- name: Setup Rust toolchain
run: |
rustup toolchain install ${{ matrix.rust }}
Expand All @@ -263,6 +332,7 @@ jobs:
key: cargo-coverage-cache3-
- name: Run coverage
run: |
export PATH=$PATH:$HOME/d/protoc/bin
rustup toolchain install stable
rustup default stable
cargo install --version 0.20.1 cargo-tarpaulin
Expand Down
9 changes: 9 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ list to help you get started.

This section describes how you can get started at developing DataFusion.

### Windows setup

```shell
wget https://az792536.vo.msecnd.net/vms/VMBuild_20190311/VirtualBox/MSEdge/MSEdge.Win10.VirtualBox.zip
choco install -y git rustup.install visualcpp-build-tools
git-bash.exe
cargo build
```

### Bootstrap environment

DataFusion is written in Rust and it uses a standard rust toolkit:
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ rust-version = "1.59"
readme = "README.md"

[dependencies]
arrow = { version = "19.0.0" }
arrow = { version = "20.0.0" }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "10.0.0" }
dirs = "4.0.0"
Expand Down
6 changes: 3 additions & 3 deletions datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ path = "examples/avro_sql.rs"
required-features = ["datafusion/avro"]

[dev-dependencies]
arrow-flight = { version = "19.0.0" }
arrow-flight = { version = "20.0.0" }
async-trait = "0.1.41"
datafusion = { path = "../datafusion/core" }
futures = "0.3"
num_cpus = "1.13.0"
prost = "0.10"
prost = "0.11.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.82"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
tonic = "0.7"
tonic = "0.8"
5 changes: 3 additions & 2 deletions datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,12 @@ pyarrow = ["pyo3"]

[dependencies]
apache-avro = { version = "0.14", features = ["snappy"], optional = true }
arrow = { version = "19.0.0", features = ["prettyprint"] }
arrow = { version = "20.0.0", features = ["prettyprint"] }
avro-rs = { version = "0.13", features = ["snappy"], optional = true }
cranelift-module = { version = "0.86.1", optional = true }
object_store = { version = "0.3", optional = true }
ordered-float = "3.0"
parquet = { version = "19.0.0", features = ["arrow"], optional = true }
parquet = { version = "20.0.0", features = ["arrow"], optional = true }
pyo3 = { version = "0.16", optional = true }
serde_json = "1.0"
sqlparser = "0.20"
2 changes: 1 addition & 1 deletion datafusion/common/src/from_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ where
offsets.push(length_so_far);
values.extend_from_slice(s);
}
let array_data = ArrayData::builder(Self::get_data_type())
let array_data = ArrayData::builder(Self::DATA_TYPE)
.len(slice.len())
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_buffer(Buffer::from_slice_ref(&values));
Expand Down
22 changes: 11 additions & 11 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use arrow::{
IntervalMonthDayNanoType, IntervalUnit, IntervalYearMonthType, TimeUnit,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DECIMAL_MAX_PRECISION,
DECIMAL128_MAX_PRECISION,
},
util::decimal::{BasicDecimal, Decimal128},
};
Expand Down Expand Up @@ -611,7 +611,7 @@ impl ScalarValue {
scale: usize,
) -> Result<Self> {
// make sure the precision and scale is valid
if precision <= DECIMAL_MAX_PRECISION && scale <= precision {
if precision <= DECIMAL128_MAX_PRECISION && scale <= precision {
return Ok(ScalarValue::Decimal128(Some(value), precision, scale));
}
Err(DataFusionError::Internal(format!(
Expand Down Expand Up @@ -654,7 +654,7 @@ impl ScalarValue {
ScalarValue::Int32(_) => DataType::Int32,
ScalarValue::Int64(_) => DataType::Int64,
ScalarValue::Decimal128(_, precision, scale) => {
DataType::Decimal(*precision, *scale)
DataType::Decimal128(*precision, *scale)
}
ScalarValue::TimestampSecond(_, tz_opt) => {
DataType::Timestamp(TimeUnit::Second, tz_opt.clone())
Expand Down Expand Up @@ -935,7 +935,7 @@ impl ScalarValue {
}

let array: ArrayRef = match &data_type {
DataType::Decimal(precision, scale) => {
DataType::Decimal128(precision, scale) => {
let decimal_array =
ScalarValue::iter_to_decimal_array(scalars, precision, scale)?;
Arc::new(decimal_array)
Expand Down Expand Up @@ -1448,7 +1448,7 @@ impl ScalarValue {

Ok(match array.data_type() {
DataType::Null => ScalarValue::Null,
DataType::Decimal(precision, scale) => {
DataType::Decimal128(precision, scale) => {
ScalarValue::get_decimal_value_from_array(array, index, precision, scale)
}
DataType::Boolean => typed_cast!(array, index, BooleanArray, Boolean),
Expand Down Expand Up @@ -1899,7 +1899,7 @@ impl TryFrom<&DataType> for ScalarValue {
DataType::UInt16 => ScalarValue::UInt16(None),
DataType::UInt32 => ScalarValue::UInt32(None),
DataType::UInt64 => ScalarValue::UInt64(None),
DataType::Decimal(precision, scale) => {
DataType::Decimal128(precision, scale) => {
ScalarValue::Decimal128(None, *precision, *scale)
}
DataType::Utf8 => ScalarValue::Utf8(None),
Expand Down Expand Up @@ -2145,7 +2145,7 @@ mod tests {
#[test]
fn scalar_decimal_test() {
let decimal_value = ScalarValue::Decimal128(Some(123), 10, 1);
assert_eq!(DataType::Decimal(10, 1), decimal_value.get_datatype());
assert_eq!(DataType::Decimal128(10, 1), decimal_value.get_datatype());
let try_into_value: i128 = decimal_value.clone().try_into().unwrap();
assert_eq!(123_i128, try_into_value);
assert!(!decimal_value.is_null());
Expand All @@ -2163,14 +2163,14 @@ mod tests {
let array = decimal_value.to_array();
let array = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
assert_eq!(1, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
assert_eq!(123i128, array.value(0).as_i128());

// decimal scalar to array with size
let array = decimal_value.to_array_of_size(10);
let array_decimal = array.as_any().downcast_ref::<Decimal128Array>().unwrap();
assert_eq!(10, array.len());
assert_eq!(DataType::Decimal(10, 1), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 1), array.data_type().clone());
assert_eq!(123i128, array_decimal.value(0).as_i128());
assert_eq!(123i128, array_decimal.value(9).as_i128());
// test eq array
Expand Down Expand Up @@ -2208,7 +2208,7 @@ mod tests {
// convert the vec to decimal array and check the result
let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
assert_eq!(3, array.len());
assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone());

let decimal_vec = vec![
ScalarValue::Decimal128(Some(1), 10, 2),
Expand All @@ -2218,7 +2218,7 @@ mod tests {
];
let array = ScalarValue::iter_to_array(decimal_vec.into_iter()).unwrap();
assert_eq!(4, array.len());
assert_eq!(DataType::Decimal(10, 2), array.data_type().clone());
assert_eq!(DataType::Decimal128(10, 2), array.data_type().clone());

assert!(ScalarValue::try_new_decimal128(1, 10, 2)
.unwrap()
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ unicode_expressions = ["datafusion-physical-expr/regex_expressions", "datafusion
[dependencies]
ahash = { version = "0.7", default-features = false }
apache-avro = { version = "0.14", optional = true }
arrow = { version = "19.0.0", features = ["prettyprint"] }
arrow = { version = "20.0.0", features = ["prettyprint"] }
async-trait = "0.1.41"
bytes = "1.1"
chrono = { version = "0.4", default-features = false }
Expand All @@ -78,7 +78,7 @@ num_cpus = "1.13.0"
object_store = "0.3.0"
ordered-float = "3.0"
parking_lot = "0.12"
parquet = { version = "19.0.0", features = ["arrow", "async"] }
parquet = { version = "20.0.0", features = ["arrow", "async"] }
paste = "^1.0"
pin-project-lite = "^0.2.7"
pyo3 = { version = "0.16", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/fuzz-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "19.0.0", features = ["prettyprint"] }
arrow = { version = "20.0.0", features = ["prettyprint"] }
env_logger = "0.9.0"
rand = "0.8"
10 changes: 4 additions & 6 deletions datafusion/core/src/avro_to_arrow/arrow_array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,10 @@ impl<'a, R: Read> AvroArrowArrayReader<'a, R> {
"Failed to parse avro value: {:?}",
e
))),
other => {
return Err(ArrowError::ParseError(format!(
"Row needs to be of type object, got: {:?}",
other
)))
}
other => Err(ArrowError::ParseError(format!(
"Row needs to be of type object, got: {:?}",
other
))),
})
.collect::<ArrowResult<Vec<Vec<(String, Value)>>>>()?;
if rows.is_empty() {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/avro_to_arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ fn schema_to_field_with_props(
AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32),
AvroSchema::Decimal {
precision, scale, ..
} => DataType::Decimal(*precision, *scale),
} => DataType::Decimal128(*precision, *scale),
AvroSchema::Uuid => DataType::FixedSizeBinary(16),
AvroSchema::Date => DataType::Date32,
AvroSchema::TimeMillis => DataType::Time32(TimeUnit::Millisecond),
Expand Down Expand Up @@ -217,7 +217,7 @@ fn default_field_name(dt: &DataType) -> &str {
DataType::Union(_, _, _) => "union",
DataType::Dictionary(_, _) => "map",
DataType::Map(_, _) => unimplemented!("Map support not implemented"),
DataType::Decimal(_, _) => "decimal",
DataType::Decimal128(_, _) => "decimal",
DataType::Decimal256(_, _) => "decimal",
}
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,7 @@ impl InformationSchemaColumnsBuilder {
Float32 => (Some(24), Some(2), None),
// Numbers from postgres `double` type
Float64 => (Some(24), Some(2), None),
Decimal(precision, scale) => {
Decimal128(precision, scale) => {
(Some(*precision as u64), Some(10), Some(*scale as u64))
}
_ => (None, None, None),
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1073,30 +1073,30 @@ mod tests {
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(4, 2), column.data_type());
assert_eq!(&DataType::Decimal128(4, 2), column.data_type());

// parquet use the int64 as the physical type to store decimal
let exec = get_exec("int64_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(10, 2), column.data_type());
assert_eq!(&DataType::Decimal128(10, 2), column.data_type());

// parquet use the fixed length binary as the physical type to store decimal
let exec = get_exec("fixed_length_decimal.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(25, 2), column.data_type());
assert_eq!(&DataType::Decimal128(25, 2), column.data_type());

let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?;
let batches = collect(exec, task_ctx.clone()).await?;
assert_eq!(1, batches.len());
assert_eq!(1, batches[0].num_columns());
let column = batches[0].column(0);
assert_eq!(&DataType::Decimal(13, 2), column.data_type());
assert_eq!(&DataType::Decimal128(13, 2), column.data_type());

// parquet use the fixed length binary as the physical type to store decimal
// TODO: arrow-rs don't support convert the physical type of binary to decimal
Expand Down
Loading

0 comments on commit 3eb55e9

Please sign in to comment.