diff --git a/rust/arrow/src/array/array_struct.rs b/rust/arrow/src/array/array_struct.rs index 50e7eea3db6..55f5f775e5b 100644 --- a/rust/arrow/src/array/array_struct.rs +++ b/rust/arrow/src/array/array_struct.rs @@ -69,6 +69,10 @@ impl StructArray { } /// Return child array whose field name equals to column_name + /// + /// Note: A schema can currently have duplicate field names, in which case + /// the first field will always be selected. + /// This issue will be addressed in [ARROW-11178](https://issues.apache.org/jira/browse/ARROW-11178) pub fn column_by_name(&self, column_name: &str) -> Option<&ArrayRef> { self.column_names() .iter() diff --git a/rust/arrow/src/ipc/reader.rs b/rust/arrow/src/ipc/reader.rs index 65dcfa615c8..3dfc9413f48 100644 --- a/rust/arrow/src/ipc/reader.rs +++ b/rust/arrow/src/ipc/reader.rs @@ -919,14 +919,16 @@ impl RecordBatchReader for StreamReader { mod tests { use super::*; + use std::fs::File; + use flate2::read::GzDecoder; use crate::util::integration_util::*; - use std::fs::File; #[test] - fn read_generated_files() { + fn read_generated_files_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -940,15 +942,15 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path )) .unwrap(); let mut reader = FileReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } @@ -974,6 +976,8 @@ mod tests { "generated_datetime", "generated_dictionary", "generated_nested", + "generated_null_trivial", + "generated_null", "generated_primitive_no_batches", "generated_primitive_zerolength", "generated_primitive", @@ -990,8 +994,9 @@ mod tests { } #[test] - fn read_generated_streams() { + fn read_generated_streams_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -1005,15 +1010,81 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + // the next batch must be empty + assert!(reader.next().is_none()); + // the stream must indicate that it's finished + assert!(reader.is_finished()); + }); + } + + #[test] + fn read_generated_files_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_generated_streams_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_interval", + "generated_datetime", + "generated_dictionary", + "generated_nested", + "generated_null_trivial", + "generated_null", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path )) .unwrap(); let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); // the next batch must be empty assert!(reader.next().is_none()); @@ -1072,11 +1143,11 @@ mod tests { } /// Read gzipped JSON file - fn read_gzip_json(path: &str) -> ArrowJson { + fn read_gzip_json(version: &str, path: &str) -> ArrowJson { let testdata = crate::util::test_util::arrow_test_data(); let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path )) .unwrap(); let mut gz = GzDecoder::new(&file); diff --git a/rust/arrow/src/ipc/writer.rs b/rust/arrow/src/ipc/writer.rs index 688829acc93..a8bc914a565 100644 --- a/rust/arrow/src/ipc/writer.rs +++ b/rust/arrow/src/ipc/writer.rs @@ -44,6 +44,13 @@ pub struct IpcWriteOptions { /// The legacy format is for releases before 0.15.0, and uses metadata V4 write_legacy_ipc_format: bool, /// The metadata version to write. The Rust IPC writer supports V4+ + /// + /// *Default versions per crate* + /// + /// When creating the default IpcWriteOptions, the following metadata versions are used: + /// + /// version 2.0.0: V4, with legacy format enabled + /// version 4.0.0: V5 metadata_version: ipc::MetadataVersion, } @@ -93,8 +100,8 @@ impl Default for IpcWriteOptions { fn default() -> Self { Self { alignment: 8, - write_legacy_ipc_format: true, - metadata_version: ipc::MetadataVersion::V4, + write_legacy_ipc_format: false, + metadata_version: ipc::MetadataVersion::V5, } } } @@ -732,15 +739,17 @@ fn pad_to_8(len: u32) -> usize { mod tests { use super::*; + use std::fs::File; + use std::io::Read; + use std::sync::Arc; + use flate2::read::GzDecoder; + use ipc::MetadataVersion; use crate::array::*; use crate::datatypes::Field; use crate::ipc::reader::*; use crate::util::integration_util::*; - use std::fs::File; - use std::io::Read; - use std::sync::Arc; #[test] fn test_write_file() { @@ -789,8 +798,7 @@ mod tests { } } - #[test] - fn test_write_null_file() { + fn write_null_file(options: IpcWriteOptions, suffix: &str) { let schema = Schema::new(vec![ Field::new("nulls", DataType::Null, true), Field::new("int32s", DataType::Int32, false), @@ -811,16 +819,18 @@ mod tests { ], ) .unwrap(); + let file_name = format!("target/debug/testdata/nulls_{}.arrow_file", suffix); { - let file = File::create("target/debug/testdata/nulls.arrow_file").unwrap(); - let mut writer = FileWriter::try_new(file, &schema).unwrap(); + let file = File::create(&file_name).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &schema, options).unwrap(); writer.write(&batch).unwrap(); // this is inside a block to test the implicit finishing of the file on `Drop` } { - let file = File::open("target/debug/testdata/nulls.arrow_file").unwrap(); + let file = File::open(&file_name).unwrap(); let reader = FileReader::try_new(file).unwrap(); reader.for_each(|maybe_batch| { maybe_batch @@ -836,10 +846,42 @@ mod tests { }); } } + #[test] + fn test_write_null_file_v4() { + write_null_file( + IpcWriteOptions::try_new(8, false, MetadataVersion::V4).unwrap(), + "v4_a8", + ); + write_null_file( + IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap(), + "v4_a8l", + ); + write_null_file( + IpcWriteOptions::try_new(64, false, MetadataVersion::V4).unwrap(), + "v4_a64", + ); + write_null_file( + IpcWriteOptions::try_new(64, true, MetadataVersion::V4).unwrap(), + "v4_a64l", + ); + } #[test] - fn read_and_rewrite_generated_files() { + fn test_write_null_file_v5() { + write_null_file( + IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(), + "v5_a8", + ); + write_null_file( + IpcWriteOptions::try_new(64, false, MetadataVersion::V5).unwrap(), + "v5_a64", + ); + } + + #[test] + fn read_and_rewrite_generated_files_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -853,8 +895,8 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path )) .unwrap(); @@ -862,9 +904,11 @@ mod tests { // read and rewrite the file to a temp location { - let file = - File::create(format!("target/debug/testdata/{}.arrow_file", path)) - .unwrap(); + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); let mut writer = FileWriter::try_new(file, &reader.schema()).unwrap(); while let Some(Ok(batch)) = reader.next() { writer.write(&batch).unwrap(); @@ -872,19 +916,23 @@ mod tests { writer.finish().unwrap(); } - let file = - File::open(format!("target/debug/testdata/{}.arrow_file", path)).unwrap(); + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); let mut reader = FileReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } #[test] - fn read_and_rewrite_generated_streams() { + fn read_and_rewrite_generated_streams_014() { let testdata = crate::util::test_util::arrow_test_data(); + let version = "0.14.1"; // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", @@ -898,8 +946,8 @@ mod tests { ]; paths.iter().for_each(|path| { let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.stream", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path )) .unwrap(); @@ -907,8 +955,11 @@ mod tests { // read and rewrite the stream to a temp location { - let file = File::create(format!("target/debug/testdata/{}.stream", path)) - .unwrap(); + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); let mut writer = StreamWriter::try_new(file, &reader.schema()).unwrap(); reader.for_each(|batch| { writer.write(&batch.unwrap()).unwrap(); @@ -917,21 +968,147 @@ mod tests { } let file = - File::open(format!("target/debug/testdata/{}.stream", path)).unwrap(); + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); + let mut reader = StreamReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_generated_files_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_large_batch", + "generated_nested", + // "generated_nested_large_offsets", + "generated_null_trivial", + "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + )) + .unwrap(); + + let mut reader = FileReader::try_new(file).unwrap(); + + // read and rewrite the file to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + // write IPC version 5 + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + let mut writer = + FileWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } + writer.finish().unwrap(); + } + + let file = File::open(format!( + "target/debug/testdata/{}-{}.arrow_file", + version, path + )) + .unwrap(); + let mut reader = FileReader::try_new(file).unwrap(); + + // read expected JSON output + let arrow_json = read_gzip_json(version, path); + assert!(arrow_json.equals_reader(&mut reader)); + }); + } + + #[test] + fn read_and_rewrite_generated_streams_100() { + let testdata = crate::util::test_util::arrow_test_data(); + let version = "1.0.0-littleendian"; + // the test is repetitive, thus we can read all supported files at once + let paths = vec![ + "generated_custom_metadata", + "generated_datetime", + "generated_dictionary_unsigned", + "generated_dictionary", + // "generated_duplicate_fieldnames", + "generated_interval", + "generated_large_batch", + "generated_nested", + // "generated_nested_large_offsets", + "generated_null_trivial", + "generated_null", + "generated_primitive_large_offsets", + "generated_primitive_no_batches", + "generated_primitive_zerolength", + "generated_primitive", + // "generated_recursive_nested", + ]; + paths.iter().for_each(|path| { + let file = File::open(format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + )) + .unwrap(); + + let reader = StreamReader::try_new(file).unwrap(); + + // read and rewrite the stream to a temp location + { + let file = File::create(format!( + "target/debug/testdata/{}-{}.stream", + version, path + )) + .unwrap(); + let options = + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); + let mut writer = + StreamWriter::try_new_with_options(file, &reader.schema(), options) + .unwrap(); + reader.for_each(|batch| { + writer.write(&batch.unwrap()).unwrap(); + }); + writer.finish().unwrap(); + } + + let file = + File::open(format!("target/debug/testdata/{}-{}.stream", version, path)) + .unwrap(); let mut reader = StreamReader::try_new(file).unwrap(); // read expected JSON output - let arrow_json = read_gzip_json(path); + let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader)); }); } /// Read gzipped JSON file - fn read_gzip_json(path: &str) -> ArrowJson { + fn read_gzip_json(version: &str, path: &str) -> ArrowJson { let testdata = crate::util::test_util::arrow_test_data(); let file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz", - testdata, path + "{}/arrow-ipc-stream/integration/{}/{}.json.gz", + testdata, version, path )) .unwrap(); let mut gz = GzDecoder::new(&file); diff --git a/rust/arrow/src/util/integration_util.rs b/rust/arrow/src/util/integration_util.rs index c80cfd81d42..88d6a88983d 100644 --- a/rust/arrow/src/util/integration_util.rs +++ b/rust/arrow/src/util/integration_util.rs @@ -220,8 +220,10 @@ impl ArrowJsonBatch { let json_array: Vec = json_from_col(&col, field.data_type()); match field.data_type() { DataType::Null => { - let arr = arr.as_any().downcast_ref::().unwrap(); - arr.equals_json(&json_array.iter().collect::>()[..]) + let arr: &NullArray = + arr.as_any().downcast_ref::().unwrap(); + // NullArrays should have the same length, json_array is empty + arr.len() == col.count } DataType::Boolean => { let arr = arr.as_any().downcast_ref::().unwrap(); @@ -519,6 +521,7 @@ fn json_from_col(col: &ArrowJsonColumn, data_type: &DataType) -> Vec { converted_col.as_slice(), ) } + DataType::Null => vec![], _ => merge_json_array( col.validity.as_ref().unwrap().as_slice(), &col.data.clone().unwrap(),