diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index 74c3dd3fb72f..85d66a9cf706 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -37,6 +37,7 @@ parquet-variant = { workspace = true } parquet-variant-json = { workspace = true } chrono = { workspace = true } uuid = { version = "1.18.0", features = ["v4"]} +serde_json = "1.0" [lib] name = "parquet_variant_compute" diff --git a/parquet-variant-compute/benches/variant_kernels.rs b/parquet-variant-compute/benches/variant_kernels.rs index 13ff77d9fb18..383697ab8cc6 100644 --- a/parquet-variant-compute/benches/variant_kernels.rs +++ b/parquet-variant-compute/benches/variant_kernels.rs @@ -23,12 +23,15 @@ use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES, Variant, VariantBuilder}; use parquet_variant_compute::{ GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, variant_get, }; +use parquet_variant_json::append_json; use rand::Rng; use rand::SeedableRng; use rand::distr::Alphanumeric; use rand::rngs::StdRng; +use serde_json::Value; use std::fmt::Write; use std::sync::Arc; + fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { let input_array = StringArray::from_iter_values(json_repeated_struct(8000)); let array_ref: ArrayRef = Arc::new(input_array); @@ -66,6 +69,58 @@ fn benchmark_batch_json_string_to_variant(c: &mut Criterion) { }); }); + let input_array = StringArray::from_iter_values(random_structure(8000, 200)); + let total_input_bytes = input_array + .iter() + .flatten() // filter None + .map(|v| v.len()) + .sum::(); + let id = format!( + "batch_json_string_to_variant object - 1 depth(200 fields) random_json({} bytes per document)", + total_input_bytes / input_array.len() + ); + let array_ref: ArrayRef = Arc::new(input_array); + let string_array = array_ref.as_any().downcast_ref::().unwrap(); + let mut json_array: Vec = Vec::with_capacity(string_array.len()); + for i in 0..string_array.len() { + json_array.push(serde_json::from_str(string_array.value(i)).unwrap()); + } + c.bench_function(&id, |b| { + b.iter(|| { + let mut variant_array_builder = VariantArrayBuilder::new(string_array.len()); + for json in &json_array { + append_json(json, &mut variant_array_builder).unwrap(); + } + let _ = variant_array_builder.build(); + }); + }); + + let input_array = StringArray::from_iter_values(random_structure(8000, 100)); + let total_input_bytes = input_array + .iter() + .flatten() // filter None + .map(|v| v.len()) + .sum::(); + let id = format!( + "batch_json_string_to_variant object - 1 depth(100 fields) random_json({} bytes per document)", + total_input_bytes / input_array.len() + ); + let array_ref: ArrayRef = Arc::new(input_array); + let string_array = array_ref.as_any().downcast_ref::().unwrap(); + let mut json_array: Vec = Vec::with_capacity(string_array.len()); + for i in 0..string_array.len() { + json_array.push(serde_json::from_str(string_array.value(i)).unwrap()); + } + c.bench_function(&id, |b| { + b.iter(|| { + let mut variant_array_builder = VariantArrayBuilder::new(string_array.len()); + for json in &json_array { + append_json(json, &mut variant_array_builder).unwrap(); + } + let _ = variant_array_builder.build(); + }); + }); + let input_array = StringArray::from_iter_values(random_json_structure(8000)); let total_input_bytes = input_array .iter() @@ -240,6 +295,22 @@ fn random_json_structure(count: usize) -> impl Iterator { (0..count).map(move |_| generator.next().to_string()) } +fn random_structure(count: usize, max_fields: usize) -> impl Iterator { + let mut generator = RandomJsonGenerator { + null_weight: 5, + string_weight: 25, + number_weight: 25, + boolean_weight: 10, + object_weight: 25, + array_weight: 0, + max_fields, + max_array_length: 0, + max_depth: 1, + ..Default::default() + }; + (0..count).map(move |_| generator.next_object().to_string()) +} + /// Creates JSON with random structure and fields. /// /// Each type is created in proportion controlled by the @@ -299,6 +370,82 @@ impl RandomJsonGenerator { &self.output_buffer } + fn next_object(&mut self) -> &str { + self.output_buffer.clear(); + self.append_random_json_for_object(); + &self.output_buffer + } + + fn append_random_json_for_object(&mut self) { + // use destructuring to ensure each field is used + let Self { + rng, + null_weight, + string_weight, + number_weight, + boolean_weight, + max_fields, + output_buffer, + .. + } = self; + + write!(output_buffer, "{{").unwrap(); + for i in 0..*max_fields { + let key_length = rng.random_range(1..=20); + let key: String = (0..key_length) + .map(|_| rng.sample(Alphanumeric) as char) + .collect(); + write!(output_buffer, "\"{key}\":").unwrap(); + + let total_weight = *null_weight + *string_weight + *number_weight + *boolean_weight; + + // Generate a random number to determine the type + let mut random_value: usize = rng.random_range(0..total_weight); + + if random_value <= *null_weight { + write!(output_buffer, "null").unwrap(); + } else { + random_value -= *null_weight; + + if random_value <= *string_weight { + // Generate a random string between 1 and 20 characters + let length = rng.random_range(1..=20); + let random_string: String = (0..length) + .map(|_| rng.sample(Alphanumeric) as char) + .collect(); + write!(output_buffer, "\"{random_string}\"",).unwrap(); + } else { + random_value -= *string_weight; + + if random_value <= *number_weight { + // 50% chance of generating an integer or a float + if rng.random_bool(0.5) { + // Generate a random integer + let random_integer: i64 = rng.random_range(-1000..1000); + write!(output_buffer, "{random_integer}",).unwrap(); + } else { + // Generate a random float + let random_float: f64 = rng.random_range(-1000.0..1000.0); + write!(output_buffer, "{random_float}",).unwrap(); + } + } else { + random_value -= *number_weight; + + if random_value <= *boolean_weight { + // Generate a random boolean + let random_boolean: bool = rng.random(); + write!(output_buffer, "{random_boolean}",).unwrap(); + } + } + } + } + if i < *max_fields - 1 { + write!(output_buffer, ",").unwrap(); + } + } + write!(&mut self.output_buffer, "}}").unwrap(); + } + /// Appends a random JSON value to the output buffer. fn append_random_json(&mut self, current_depth: usize) { // use destructuring to ensure each field is used diff --git a/parquet-variant-json/src/from_json.rs b/parquet-variant-json/src/from_json.rs index 33e1b2e6db9b..4c22785ef106 100644 --- a/parquet-variant-json/src/from_json.rs +++ b/parquet-variant-json/src/from_json.rs @@ -102,7 +102,7 @@ fn variant_from_number<'m, 'v>(n: &Number) -> Result, ArrowError } } -fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { +pub fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> { match json { Value::Null => builder.append_value(Variant::Null), Value::Bool(b) => builder.append_value(*b), diff --git a/parquet-variant-json/src/lib.rs b/parquet-variant-json/src/lib.rs index f24c740818be..6b42b15bd480 100644 --- a/parquet-variant-json/src/lib.rs +++ b/parquet-variant-json/src/lib.rs @@ -34,5 +34,5 @@ mod from_json; mod to_json; -pub use from_json::JsonToVariant; +pub use from_json::{JsonToVariant, append_json}; pub use to_json::VariantToJson; diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 7094d935a5eb..e6122f062c38 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -14,7 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use crate::decoder::{VariantBasicType, VariantPrimitiveType}; +use crate::decoder::{OffsetSizeBytes, VariantBasicType, VariantPrimitiveType}; use crate::{ ShortString, Variant, VariantDecimal4, VariantDecimal8, VariantDecimal16, VariantList, VariantMetadata, VariantObject, @@ -43,21 +43,15 @@ fn short_string_header(len: usize) -> u8 { (len as u8) << 2 | VariantBasicType::ShortString as u8 } -pub(crate) fn int_size(v: usize) -> u8 { +pub(crate) fn int_size(v: usize) -> OffsetSizeBytes { match v { - 0..=0xFF => 1, - 0x100..=0xFFFF => 2, - 0x10000..=0xFFFFFF => 3, - _ => 4, + 0..=0xFF => OffsetSizeBytes::One, + 0x100..=0xFFFF => OffsetSizeBytes::Two, + 0x10000..=0xFFFFFF => OffsetSizeBytes::Three, + _ => OffsetSizeBytes::Four, } } -/// Write little-endian integer to buffer at a specific position -fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) { - let bytes = value.to_le_bytes(); - buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]); -} - /// Wrapper around a `Vec` that provides methods for appending /// primitive values, variant types, and metadata. /// @@ -358,63 +352,6 @@ impl ValueBuilder { ); state.finish(); } - - /// Writes out the header byte for a variant object or list, from the starting position - /// of the builder, will return the position after this write - pub(crate) fn append_header_start_from_buf_pos( - &mut self, - start_pos: usize, // the start position where the header will be inserted - header_byte: u8, - is_large: bool, - num_fields: usize, - ) -> usize { - let buffer = self.inner_mut(); - - // Write header at the original start position - let mut header_pos = start_pos; - - // Write header byte - buffer[header_pos] = header_byte; - header_pos += 1; - - // Write number of fields - if is_large { - buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes()); - header_pos += 4; - } else { - buffer[header_pos] = num_fields as u8; - header_pos += 1; - } - - header_pos - } - - /// Writes out the offsets for an array of offsets, including the final offset (data size). - /// from the starting position of the buffer, will return the position after this write - pub(crate) fn append_offset_array_start_from_buf_pos( - &mut self, - start_pos: usize, - offsets: impl IntoIterator, - data_size: Option, - nbytes: u8, - ) -> usize { - let buf = self.inner_mut(); - - let mut current_pos = start_pos; - for relative_offset in offsets { - write_offset_at_pos(buf, current_pos, relative_offset, nbytes); - current_pos += nbytes as usize; - } - - // Write data_size - if let Some(data_size) = data_size { - // Write data_size at the end of the offsets - write_offset_at_pos(buf, current_pos, data_size, nbytes); - current_pos += nbytes as usize; - } - - current_pos - } } /// A trait for managing state specific to different builder types. diff --git a/parquet-variant/src/builder/list.rs b/parquet-variant/src/builder/list.rs index 4c2682c50ac4..5064904ca7de 100644 --- a/parquet-variant/src/builder/list.rs +++ b/parquet-variant/src/builder/list.rs @@ -174,7 +174,7 @@ impl<'a, S: BuilderSpecificState> ListBuilder<'a, S> { // Make sure to reserve enough capacity to handle the extra bytes we'll truncate. let mut bytes_to_splice = Vec::with_capacity(header_size + 3); // Write header - let header = array_header(is_large, offset_size); + let header = array_header(is_large, offset_size as _); bytes_to_splice.push(header); append_packed_u32(&mut bytes_to_splice, num_elements as u32, num_elements_size); diff --git a/parquet-variant/src/builder/metadata.rs b/parquet-variant/src/builder/metadata.rs index 10163ba3e0cf..efccc2e4c63e 100644 --- a/parquet-variant/src/builder/metadata.rs +++ b/parquet-variant/src/builder/metadata.rs @@ -206,7 +206,7 @@ impl WritableMetadataBuilder { // Determine appropriate offset size based on the larger of dict size or total string size let max_offset = std::cmp::max(total_dict_size, nkeys); - let offset_size = int_size(max_offset); + let offset_size = int_size(max_offset) as u8; let offset_start = 1 + offset_size as usize; let string_start = offset_start + (nkeys + 1) * offset_size as usize; diff --git a/parquet-variant/src/builder/object.rs b/parquet-variant/src/builder/object.rs index ab04360c16a7..876c2e2d4c7c 100644 --- a/parquet-variant/src/builder/object.rs +++ b/parquet-variant/src/builder/object.rs @@ -24,14 +24,50 @@ use crate::{ use arrow_schema::ArrowError; use indexmap::IndexMap; -fn object_header(large: bool, id_size: u8, offset_size: u8) -> u8 { - let large_bit = if large { 1 } else { 0 }; - (large_bit << (BASIC_TYPE_BITS + 4)) - | ((id_size - 1) << (BASIC_TYPE_BITS + 2)) - | ((offset_size - 1) << BASIC_TYPE_BITS) +fn object_header() -> u8 { + (LARGE_BIT << (BASIC_TYPE_BITS + 4)) + | ((ID_SIZE - 1) << (BASIC_TYPE_BITS + 2)) + | ((OFFSET_SIZE - 1) << BASIC_TYPE_BITS) | VariantBasicType::Object as u8 } +struct ObjectHeaderWriter(); + +impl ObjectHeaderWriter { + fn write( + dst: &mut Vec, + num_fields: usize, + field_ids: impl Iterator, + offsets: impl Iterator, + data_size: usize, + ) { + let is_large = num_fields > u8::MAX as usize; + // num_fields will consume 4 bytes when it is larger than u8::MAX + if is_large { + dst.push(object_header::<1, { ID_SIZE }, { OFFSET_SIZE }>()); + append_packed_u32::<4>(dst, num_fields); + } else { + dst.push(object_header::<0, { ID_SIZE }, { OFFSET_SIZE }>()); + append_packed_u32::<1>(dst, num_fields); + } + + for id in field_ids { + append_packed_u32::(dst, id as usize); + } + + for off in offsets { + append_packed_u32::(dst, off); + } + + append_packed_u32::(dst, data_size); + } +} + +#[inline(always)] +fn append_packed_u32(dest: &mut Vec, value: usize) { + dest.extend_from_slice(&value.to_le_bytes()[..SIZE as usize]); +} + /// A builder for creating [`Variant::Object`] values. /// /// See the examples on [`VariantBuilder`] for usage. @@ -245,41 +281,45 @@ impl<'a, S: BuilderSpecificState> ObjectBuilder<'a, S> { (num_fields * id_size as usize) + // field IDs ((num_fields + 1) * offset_size as usize); // field offsets + data_size + let mut bytes_to_splice = Vec::with_capacity(header_size); + + macro_rules! write_header { + ($offset_size:expr, $id_size:expr) => { + ObjectHeaderWriter::<{ $offset_size as u8 }, { $id_size as u8 }>::write( + &mut bytes_to_splice, + num_fields, + self.fields.keys().copied(), + self.fields.values().copied(), + data_size, + ) + }; + } + + use crate::decoder::OffsetSizeBytes::*; + match (offset_size, id_size) { + (One, One) => write_header!(One, One), + (One, Two) => write_header!(One, Two), + (One, Three) => write_header!(One, Three), + (One, Four) => write_header!(One, Four), + (Two, One) => write_header!(Two, One), + (Two, Two) => write_header!(Two, Two), + (Two, Three) => write_header!(Two, Three), + (Two, Four) => write_header!(Two, Four), + (Three, One) => write_header!(Three, One), + (Three, Two) => write_header!(Three, Two), + (Three, Three) => write_header!(Three, Three), + (Three, Four) => write_header!(Three, Four), + (Four, One) => write_header!(Four, One), + (Four, Two) => write_header!(Four, Two), + (Four, Three) => write_header!(Four, Three), + (Four, Four) => write_header!(Four, Four), + } + // Shift existing data to make room for the header - value_builder.inner_mut().splice( - starting_offset..starting_offset, - std::iter::repeat_n(0u8, header_size), - ); + value_builder + .inner_mut() + .splice(starting_offset..starting_offset, bytes_to_splice); - // Write header at the original start position - let mut header_pos = starting_offset; - - // Write header byte - let header = object_header(is_large, id_size, offset_size); - - header_pos = self - .parent_state - .value_builder() - .append_header_start_from_buf_pos(header_pos, header, is_large, num_fields); - - header_pos = self - .parent_state - .value_builder() - .append_offset_array_start_from_buf_pos( - header_pos, - self.fields.keys().copied().map(|id| id as usize), - None, - id_size, - ); - - self.parent_state - .value_builder() - .append_offset_array_start_from_buf_pos( - header_pos, - self.fields.values().copied(), - Some(data_size), - offset_size, - ); self.parent_state.finish(); } }