diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index ab19b6de527ec..60a8dd4007865 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -18,19 +18,15 @@ //! Parquet integration tests use crate::parquet::utils::MetricsFinder; use arrow::array::Decimal128Array; -use arrow::datatypes::i256; use arrow::{ array::{ - make_array, Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, - Decimal256Array, DictionaryArray, FixedSizeBinaryArray, Float16Array, - Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, - LargeBinaryArray, LargeStringArray, StringArray, StructArray, - Time32MillisecondArray, Time32SecondArray, Time64MicrosecondArray, - Time64NanosecondArray, TimestampMicrosecondArray, TimestampMillisecondArray, - TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, - UInt64Array, UInt8Array, + make_array, Array, ArrayRef, BinaryArray, Date32Array, Date64Array, + FixedSizeBinaryArray, Float64Array, Int16Array, Int32Array, Int64Array, + Int8Array, LargeBinaryArray, LargeStringArray, StringArray, + TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array, }, - datatypes::{DataType, Field, Int32Type, Int8Type, Schema, TimeUnit}, + datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, util::pretty::pretty_format_batches, }; @@ -41,7 +37,6 @@ use datafusion::{ prelude::{ParquetReadOptions, SessionConfig, SessionContext}, }; use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; -use half::f16; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; use std::sync::Arc; @@ -74,37 +69,23 @@ fn init() { /// What data to use #[derive(Debug, Clone, Copy)] enum Scenario { - Boolean, Timestamps, Dates, Int, Int32Range, UInt, UInt32Range, - Time32Second, - Time32Millisecond, - Time64Nanosecond, - Time64Microsecond, - /// 7 Rows, for each i8, i16, i32, i64, u8, u16, u32, u64, f32, f64 - /// -MIN, -100, -1, 0, 1, 100, MAX - NumericLimits, - Float16, - Float32, Float64, Decimal, - Decimal256, DecimalBloomFilterInt32, DecimalBloomFilterInt64, DecimalLargePrecision, DecimalLargePrecisionBloomFilter, /// StringArray, BinaryArray, FixedSizeBinaryArray ByteArray, - /// DictionaryArray - Dictionary, PeriodsInColumnNames, WithNullValues, WithNullValuesPageLevel, - StructArray, UTF8, } @@ -320,16 +301,6 @@ impl ContextWithParquet { } } -fn make_boolean_batch(v: Vec>) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new( - "bool", - DataType::Boolean, - true, - )])); - let array = Arc::new(BooleanArray::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() -} - /// Return record batch with a few rows of data for all of the supported timestamp types /// values with the specified offset /// @@ -483,55 +454,6 @@ fn make_int_batches(start: i8, end: i8) -> RecordBatch { .unwrap() } -/// Return record batch with Time32Second, Time32Millisecond sequences -fn make_time32_batches(scenario: Scenario, v: Vec) -> RecordBatch { - match scenario { - Scenario::Time32Second => { - let schema = Arc::new(Schema::new(vec![Field::new( - "second", - DataType::Time32(TimeUnit::Second), - true, - )])); - let array = Arc::new(Time32SecondArray::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array]).unwrap() - } - Scenario::Time32Millisecond => { - let schema = Arc::new(Schema::new(vec![Field::new( - "millisecond", - DataType::Time32(TimeUnit::Millisecond), - true, - )])); - let array = Arc::new(Time32MillisecondArray::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array]).unwrap() - } - _ => panic!("Unsupported scenario for Time32"), - } -} - -/// Return record batch with Time64Microsecond, Time64Nanosecond sequences -fn make_time64_batches(scenario: Scenario, v: Vec) -> RecordBatch { - match scenario { - Scenario::Time64Microsecond => { - let schema = Arc::new(Schema::new(vec![Field::new( - "microsecond", - DataType::Time64(TimeUnit::Microsecond), - true, - )])); - let array = Arc::new(Time64MicrosecondArray::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array]).unwrap() - } - Scenario::Time64Nanosecond => { - let schema = Arc::new(Schema::new(vec![Field::new( - "nanosecond", - DataType::Time64(TimeUnit::Nanosecond), - true, - )])); - let array = Arc::new(Time64NanosecondArray::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array]).unwrap() - } - _ => panic!("Unsupported scenario for Time64"), - } -} /// Return record batch with u8, u16, u32, and u64 sequences /// /// Columns are named @@ -586,18 +508,6 @@ fn make_f64_batch(v: Vec) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } -fn make_f32_batch(v: Vec) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float32, true)])); - let array = Arc::new(Float32Array::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() -} - -fn make_f16_batch(v: Vec) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new("f", DataType::Float16, true)])); - let array = Arc::new(Float16Array::from(v)) as ArrayRef; - RecordBatch::try_new(schema, vec![array.clone()]).unwrap() -} - /// Return record batch with decimal vector /// /// Columns are named @@ -616,24 +526,6 @@ fn make_decimal_batch(v: Vec, precision: u8, scale: i8) -> RecordBatch { RecordBatch::try_new(schema, vec![array.clone()]).unwrap() } -/// Return record batch with decimal256 vector -/// -/// Columns are named -/// "decimal256_col" -> Decimal256Array -fn make_decimal256_batch(v: Vec, precision: u8, scale: i8) -> RecordBatch { - let schema = Arc::new(Schema::new(vec![Field::new( - "decimal256_col", - DataType::Decimal256(precision, scale), - true, - )])); - let array = Arc::new( - Decimal256Array::from(v) - .with_precision_and_scale(precision, scale) - .unwrap(), - ) as ArrayRef; - RecordBatch::try_new(schema, vec![array]).unwrap() -} - /// Return record batch with a few rows of data for all of the supported date /// types with the specified offset (in days) /// @@ -842,39 +734,6 @@ fn make_int_batches_with_null( .unwrap() } -fn make_numeric_limit_batch() -> RecordBatch { - let i8 = Int8Array::from(vec![i8::MIN, 100, -1, 0, 1, -100, i8::MAX]); - let i16 = Int16Array::from(vec![i16::MIN, 100, -1, 0, 1, -100, i16::MAX]); - let i32 = Int32Array::from(vec![i32::MIN, 100, -1, 0, 1, -100, i32::MAX]); - let i64 = Int64Array::from(vec![i64::MIN, 100, -1, 0, 1, -100, i64::MAX]); - let u8 = UInt8Array::from(vec![u8::MIN, 100, 1, 0, 1, 100, u8::MAX]); - let u16 = UInt16Array::from(vec![u16::MIN, 100, 1, 0, 1, 100, u16::MAX]); - let u32 = UInt32Array::from(vec![u32::MIN, 100, 1, 0, 1, 100, u32::MAX]); - let u64 = UInt64Array::from(vec![u64::MIN, 100, 1, 0, 1, 100, u64::MAX]); - let f32 = Float32Array::from(vec![f32::MIN, 100.0, -1.0, 0.0, 1.0, -100.0, f32::MAX]); - let f64 = Float64Array::from(vec![f64::MIN, 100.0, -1.0, 0.0, 1.0, -100.0, f64::MAX]); - let f32_nan = - Float32Array::from(vec![f32::NAN, 100.0, -1.0, 0.0, 1.0, -100.0, f32::NAN]); - let f64_nan = - Float64Array::from(vec![f64::NAN, 100.0, -1.0, 0.0, 1.0, -100.0, f64::NAN]); - - RecordBatch::try_from_iter(vec![ - ("i8", Arc::new(i8) as _), - ("i16", Arc::new(i16) as _), - ("i32", Arc::new(i32) as _), - ("i64", Arc::new(i64) as _), - ("u8", Arc::new(u8) as _), - ("u16", Arc::new(u16) as _), - ("u32", Arc::new(u32) as _), - ("u64", Arc::new(u64) as _), - ("f32", Arc::new(f32) as _), - ("f64", Arc::new(f64) as _), - ("f32_nan", Arc::new(f32_nan) as _), - ("f64_nan", Arc::new(f64_nan) as _), - ]) - .unwrap() -} - fn make_utf8_batch(value: Vec>) -> RecordBatch { let utf8 = StringArray::from(value.clone()); let large_utf8 = LargeStringArray::from(value); @@ -885,61 +744,8 @@ fn make_utf8_batch(value: Vec>) -> RecordBatch { .unwrap() } -fn make_dict_batch() -> RecordBatch { - let values = [ - Some("abc"), - Some("def"), - None, - Some("def"), - Some("abc"), - Some("fffff"), - Some("aaa"), - ]; - let dict_i8_array = DictionaryArray::::from_iter(values.iter().cloned()); - let dict_i32_array = DictionaryArray::::from_iter(values.iter().cloned()); - - // Dictionary array of integers - let int64_values = Int64Array::from(vec![0, -100, 100]); - let keys = Int8Array::from_iter([ - Some(0), - Some(1), - None, - Some(0), - Some(0), - Some(2), - Some(0), - ]); - let dict_i8_int_array = - DictionaryArray::::try_new(keys, Arc::new(int64_values)).unwrap(); - - RecordBatch::try_from_iter(vec![ - ("string_dict_i8", Arc::new(dict_i8_array) as _), - ("string_dict_i32", Arc::new(dict_i32_array) as _), - ("int_dict_i8", Arc::new(dict_i8_int_array) as _), - ]) - .unwrap() -} - fn create_data_batch(scenario: Scenario) -> Vec { match scenario { - Scenario::Boolean => { - vec![ - make_boolean_batch(vec![ - Some(true), - Some(false), - Some(true), - Some(false), - None, - ]), - make_boolean_batch(vec![ - Some(false), - Some(false), - Some(false), - Some(false), - Some(false), - ]), - ] - } Scenario::Timestamps => { vec![ make_timestamp_batch(TimeDelta::try_seconds(0).unwrap()), @@ -978,45 +784,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { Scenario::UInt32Range => { vec![make_uint32_range(0, 10), make_uint32_range(200000, 300000)] } - Scenario::NumericLimits => { - vec![make_numeric_limit_batch()] - } - Scenario::Float16 => { - vec![ - make_f16_batch( - vec![-5.0, -4.0, -3.0, -2.0, -1.0] - .into_iter() - .map(f16::from_f32) - .collect(), - ), - make_f16_batch( - vec![-4.0, -3.0, -2.0, -1.0, 0.0] - .into_iter() - .map(f16::from_f32) - .collect(), - ), - make_f16_batch( - vec![0.0, 1.0, 2.0, 3.0, 4.0] - .into_iter() - .map(f16::from_f32) - .collect(), - ), - make_f16_batch( - vec![5.0, 6.0, 7.0, 8.0, 9.0] - .into_iter() - .map(f16::from_f32) - .collect(), - ), - ] - } - Scenario::Float32 => { - vec![ - make_f32_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), - make_f32_batch(vec![-4.0, -3.0, -2.0, -1.0, 0.0]), - make_f32_batch(vec![0.0, 1.0, 2.0, 3.0, 4.0]), - make_f32_batch(vec![5.0, 6.0, 7.0, 8.0, 9.0]), - ] - } + Scenario::Float64 => { vec![ make_f64_batch(vec![-5.0, -4.0, -3.0, -2.0, -1.0]), @@ -1033,44 +801,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_decimal_batch(vec![2000, 3000, 3000, 4000, 6000], 9, 2), ] } - Scenario::Decimal256 => { - // decimal256 record batch - vec![ - make_decimal256_batch( - vec![ - i256::from(100), - i256::from(200), - i256::from(300), - i256::from(400), - i256::from(600), - ], - 9, - 2, - ), - make_decimal256_batch( - vec![ - i256::from(-500), - i256::from(100), - i256::from(300), - i256::from(400), - i256::from(600), - ], - 9, - 2, - ), - make_decimal256_batch( - vec![ - i256::from(2000), - i256::from(3000), - i256::from(3000), - i256::from(4000), - i256::from(6000), - ], - 9, - 2, - ), - ] - } + Scenario::DecimalBloomFilterInt32 => { // decimal record batch vec![ @@ -1186,9 +917,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { ), ] } - Scenario::Dictionary => { - vec![make_dict_batch()] - } + Scenario::PeriodsInColumnNames => { vec![ // all frontend @@ -1223,120 +952,7 @@ fn create_data_batch(scenario: Scenario) -> Vec { make_int_batches_with_null(5, 1, 6), ] } - Scenario::StructArray => { - let struct_array_data = struct_array(vec![ - (Some(1), Some(6.0), Some(12.0)), - (Some(2), Some(8.5), None), - (None, Some(8.5), Some(14.0)), - ]); - - let schema = Arc::new(Schema::new(vec![Field::new( - "struct", - struct_array_data.data_type().clone(), - true, - )])); - vec![RecordBatch::try_new(schema, vec![struct_array_data]).unwrap()] - } - Scenario::Time32Second => { - vec![ - make_time32_batches( - Scenario::Time32Second, - vec![18506, 18507, 18508, 18509], - ), - make_time32_batches( - Scenario::Time32Second, - vec![18510, 18511, 18512, 18513], - ), - make_time32_batches( - Scenario::Time32Second, - vec![18514, 18515, 18516, 18517], - ), - make_time32_batches( - Scenario::Time32Second, - vec![18518, 18519, 18520, 18521], - ), - ] - } - Scenario::Time32Millisecond => { - vec![ - make_time32_batches( - Scenario::Time32Millisecond, - vec![3600000, 3600001, 3600002, 3600003], - ), - make_time32_batches( - Scenario::Time32Millisecond, - vec![3600004, 3600005, 3600006, 3600007], - ), - make_time32_batches( - Scenario::Time32Millisecond, - vec![3600008, 3600009, 3600010, 3600011], - ), - make_time32_batches( - Scenario::Time32Millisecond, - vec![3600012, 3600013, 3600014, 3600015], - ), - ] - } - Scenario::Time64Microsecond => { - vec![ - make_time64_batches( - Scenario::Time64Microsecond, - vec![1234567890123, 1234567890124, 1234567890125, 1234567890126], - ), - make_time64_batches( - Scenario::Time64Microsecond, - vec![1234567890127, 1234567890128, 1234567890129, 1234567890130], - ), - make_time64_batches( - Scenario::Time64Microsecond, - vec![1234567890131, 1234567890132, 1234567890133, 1234567890134], - ), - make_time64_batches( - Scenario::Time64Microsecond, - vec![1234567890135, 1234567890136, 1234567890137, 1234567890138], - ), - ] - } - Scenario::Time64Nanosecond => { - vec![ - make_time64_batches( - Scenario::Time64Nanosecond, - vec![ - 987654321012345, - 987654321012346, - 987654321012347, - 987654321012348, - ], - ), - make_time64_batches( - Scenario::Time64Nanosecond, - vec![ - 987654321012349, - 987654321012350, - 987654321012351, - 987654321012352, - ], - ), - make_time64_batches( - Scenario::Time64Nanosecond, - vec![ - 987654321012353, - 987654321012354, - 987654321012355, - 987654321012356, - ], - ), - make_time64_batches( - Scenario::Time64Nanosecond, - vec![ - 987654321012357, - 987654321012358, - 987654321012359, - 987654321012360, - ], - ), - ] - } + Scenario::UTF8 => { vec![ make_utf8_batch(vec![Some("a"), Some("b"), Some("c"), Some("d"), None]), @@ -1404,27 +1020,3 @@ async fn make_test_file_page(scenario: Scenario, row_per_page: usize) -> NamedTe writer.close().unwrap(); output_file } - -// returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values -fn struct_array(input: Vec<(Option, Option, Option)>) -> ArrayRef { - let int_32: Int32Array = input.iter().map(|(i, _, _)| i).collect(); - let float_32: Float32Array = input.iter().map(|(_, f, _)| f).collect(); - let float_64: Float64Array = input.iter().map(|(_, _, f)| f).collect(); - - let nullable = true; - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new("int32_col", DataType::Int32, nullable)), - Arc::new(int_32) as ArrayRef, - ), - ( - Arc::new(Field::new("float32_col", DataType::Float32, nullable)), - Arc::new(float_32) as ArrayRef, - ), - ( - Arc::new(Field::new("float64_col", DataType::Float64, nullable)), - Arc::new(float_64) as ArrayRef, - ), - ]); - Arc::new(struct_array) -}