From d347467a4774d582a3d1f46f8b255e47ba700f8d Mon Sep 17 00:00:00 2001 From: Samyak S Sarnayak Date: Sat, 12 Jul 2025 22:15:58 +0530 Subject: [PATCH 1/4] Add `variant_get` compute kernel In parquet-variant: - Add a new function `Variant::get_path`: this traverses the path to create a new Variant (does not cast any of it). - Add a new module `parquet_variant::path`: adds structs/enums to define a path to access a variant value deeply. In parquet-variant-compute: - Add a new compute kernel `variant_get`: does the path traversal over a `VariantArray`. In the future, this would also cast the values to a specified type. - Includes some basic unit tests. - Includes a simple micro-benchmark for reference. Current limitations: - It can only return another VariantArray. Casts are not implemented yet. - Only top-level object/list access is supported. It panics on finding a nested object/list. Needs https://github.com/apache/arrow-rs/pull/7914 to fix this. - Perf is a TODO. --- parquet-variant-compute/Cargo.toml | 6 + .../benches/variant_get.rs | 59 +++++ parquet-variant-compute/src/lib.rs | 1 + parquet-variant-compute/src/variant_get.rs | 202 ++++++++++++++++++ parquet-variant/src/lib.rs | 1 + parquet-variant/src/path.rs | 64 ++++++ parquet-variant/src/variant.rs | 15 ++ 7 files changed, 348 insertions(+) create mode 100644 parquet-variant-compute/benches/variant_get.rs create mode 100644 parquet-variant-compute/src/variant_get.rs create mode 100644 parquet-variant/src/path.rs diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index a053803c5551..feb7379e8f94 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -42,3 +42,9 @@ name = "parquet_variant_compute" bench = false [dev-dependencies] +criterion = { version = "0.6", default-features = false } +rand = { version = "0.9.1" } + +[[bench]] +name = "variant_get" +harness = false diff --git a/parquet-variant-compute/benches/variant_get.rs b/parquet-variant-compute/benches/variant_get.rs new file mode 100644 index 000000000000..4452e879b7d8 --- /dev/null +++ b/parquet-variant-compute/benches/variant_get.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::sync::Arc; + +use arrow::array::ArrayRef; +use criterion::{criterion_group, criterion_main, Criterion}; +use parquet_variant::{Variant, VariantBuilder}; +use parquet_variant_compute::{ + variant_get::{variant_get, GetOptions}, + VariantArray, VariantArrayBuilder, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; + +fn create_primitive_variant(size: usize) -> VariantArray { + let mut rng = StdRng::seed_from_u64(42); + + let mut variant_builder = VariantArrayBuilder::new(1); + + for _ in 0..size { + let mut builder = VariantBuilder::new(); + builder.append_value(rng.random::()); + let (metadata, value) = builder.finish(); + variant_builder.append_variant(Variant::try_new(&metadata, &value).unwrap()); + } + + variant_builder.build() +} + +pub fn variant_get_bench(c: &mut Criterion) { + let variant_array = create_primitive_variant(8192); + let input: ArrayRef = Arc::new(variant_array); + + let options = GetOptions { + path: vec![].into(), + as_type: None, + cast_options: Default::default(), + }; + + c.bench_function("variant_get_primitive", |b| { + b.iter(|| variant_get(&input.clone(), options.clone())) + }); +} + +criterion_group!(benches, variant_get_bench); +criterion_main!(benches); diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index c593cf405171..e6d004102e05 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -19,6 +19,7 @@ mod from_json; mod to_json; mod variant_array; mod variant_array_builder; +pub mod variant_get; pub use variant_array::VariantArray; pub use variant_array_builder::VariantArrayBuilder; diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs new file mode 100644 index 000000000000..5106d07500ba --- /dev/null +++ b/parquet-variant-compute/src/variant_get.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::sync::Arc; + +use arrow::{ + array::{Array, ArrayRef}, + compute::CastOptions, + error::Result, +}; +use arrow_schema::{ArrowError, Field}; +use parquet_variant::path::VariantPath; + +use crate::{VariantArray, VariantArrayBuilder}; + +/// Returns an array with the specified path extracted from the variant values. +/// +/// The return array type depends on the `as_type` field of the options parameter +/// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point +/// to the specified path. +/// 2. `as_type: Some()`: an array of the specified type is returned. +pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { + let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { + ArrowError::InvalidArgumentError( + "expected a VariantArray as the input for variant_get".to_owned(), + ) + })?; + + if let Some(as_type) = options.as_type { + return Err(ArrowError::NotYetImplemented(format!( + "getting a {} from a VariantArray is not implemented yet", + as_type + ))); + } + + let mut builder = VariantArrayBuilder::new(variant_array.len()); + for i in 0..variant_array.len() { + let new_variant = variant_array.value(i); + // TODO: perf? + let new_variant = new_variant.get_path(&options.path); + if let Some(new_variant) = new_variant { + // TODO: we're decoding the value and doing a copy into a variant value again. This + // copy can be much smarter. + builder.append_variant(new_variant); + } else { + builder.append_null(); + } + } + + Ok(Arc::new(builder.build())) +} + +/// Controls the action of the variant_get kernel. +#[derive(Debug, Clone)] +pub struct GetOptions<'a> { + /// What path to extract + pub path: VariantPath<'a>, + /// if `as_type` is None, the returned array will itself be a VariantArray. + /// + /// if `as_type` is `Some(type)` the field is returned as the specified type if possible. To specify returning + /// a Variant, pass a Field with variant type in the metadata. + pub as_type: Option, + /// Controls the casting behavior (e.g. error vs substituting null on cast error). + pub cast_options: CastOptions<'a>, +} + +impl<'a> GetOptions<'a> { + /// Construct options to get the specified path as a variant. + pub fn new_with_path(path: VariantPath<'a>) -> Self { + Self { + path, + as_type: None, + cast_options: Default::default(), + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::array::{Array, ArrayRef, StringArray}; + use parquet_variant::path::{VariantPath, VariantPathElement}; + + use crate::batch_json_string_to_variant; + use crate::VariantArray; + + use super::{variant_get, GetOptions}; + + fn single_variant_get_test(input_json: &str, path: VariantPath, expected_json: &str) { + // Create input array from JSON string + let input_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(input_json)])); + let input_variant_array_ref: ArrayRef = + Arc::new(batch_json_string_to_variant(&input_array_ref).unwrap()); + + let result = + variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap(); + + // Create expected array from JSON string + let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)])); + let expected_variant_array = batch_json_string_to_variant(&expected_array_ref).unwrap(); + + let result_array: &VariantArray = result.as_any().downcast_ref().unwrap(); + assert_eq!( + result_array.len(), + 1, + "Expected result array to have length 1" + ); + assert!( + result_array.nulls().is_none(), + "Expected no nulls in result array" + ); + let result_variant = result_array.value(0); + let expected_variant = expected_variant_array.value(0); + assert_eq!( + result_variant, expected_variant, + "Result variant does not match expected variant" + ); + } + + #[test] + fn get_primitive_variant_field() { + single_variant_get_test( + r#"{"some_field": 1234}"#, + vec![VariantPathElement::field("some_field".into())].into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_list_index() { + single_variant_get_test( + "[1234, 5678]", + vec![VariantPathElement::index(0)].into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_inside_object_of_object() { + single_variant_get_test( + r#"{"top_level_field": {"inner_field": 1234}}"#, + vec![ + VariantPathElement::field("top_level_field".into()), + VariantPathElement::field("inner_field".into()), + ] + .into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_inside_list_of_object() { + single_variant_get_test( + r#"[{"some_field": 1234}]"#, + vec![ + VariantPathElement::index(0), + VariantPathElement::field("some_field".into()), + ] + .into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_inside_object_of_list() { + single_variant_get_test( + r#"{"some_field": [1234]}"#, + vec![ + VariantPathElement::field("some_field".into()), + VariantPathElement::index(0), + ] + .into(), + "1234", + ); + } + + #[test] + #[should_panic( + expected = "Nested values are handled specially by ObjectBuilder and ListBuilder" + )] + fn get_complex_variant() { + single_variant_get_test( + r#"{"top_level_field": {"inner_field": 1234}}"#, + vec![VariantPathElement::field("top_level_field".into())].into(), + r#"{"inner_field": 1234}"#, + ); + } +} diff --git a/parquet-variant/src/lib.rs b/parquet-variant/src/lib.rs index 221c4e427ff3..d04c59605fc4 100644 --- a/parquet-variant/src/lib.rs +++ b/parquet-variant/src/lib.rs @@ -29,6 +29,7 @@ mod builder; mod decoder; +pub mod path; mod utils; mod variant; diff --git a/parquet-variant/src/path.rs b/parquet-variant/src/path.rs new file mode 100644 index 000000000000..1643d9c87c5f --- /dev/null +++ b/parquet-variant/src/path.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::{borrow::Cow, ops::Deref}; + +/// Represents a qualified path to a potential subfield or index of a variant value. +#[derive(Debug, Clone)] +pub struct VariantPath<'a>(Vec>); + +impl<'a> VariantPath<'a> { + pub fn new(path: Vec>) -> Self { + Self(path) + } + + pub fn path(&self) -> &Vec { + &self.0 + } +} + +impl<'a> From>> for VariantPath<'a> { + fn from(value: Vec>) -> Self { + Self::new(value) + } +} + +impl<'a> Deref for VariantPath<'a> { + type Target = [VariantPathElement<'a>]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Element of a path +#[derive(Debug, Clone)] +pub enum VariantPathElement<'a> { + /// Access field with name `name` + Field { name: Cow<'a, str> }, + /// Access the list element at `index` + Index { index: usize }, +} + +impl<'a> VariantPathElement<'a> { + pub fn field(name: Cow<'a, str>) -> VariantPathElement<'a> { + VariantPathElement::Field { name } + } + + pub fn index(index: usize) -> VariantPathElement<'a> { + VariantPathElement::Index { index } + } +} diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs index ce593cd2b04d..6920beb3f8aa 100644 --- a/parquet-variant/src/variant.rs +++ b/parquet-variant/src/variant.rs @@ -22,6 +22,7 @@ pub use self::object::VariantObject; use crate::decoder::{ self, get_basic_type, get_primitive_type, VariantBasicType, VariantPrimitiveType, }; +use crate::path::{VariantPath, VariantPathElement}; use crate::utils::{first_byte_from_slice, slice_from_slice}; use std::ops::Deref; @@ -1063,6 +1064,20 @@ impl<'m, 'v> Variant<'m, 'v> { _ => None, } } + + /// Return a new Variant with the path followed. + /// + /// If the path is not found, `None` is returned. + pub fn get_path(&self, path: &VariantPath) -> Option { + let mut output = self.clone(); + for element in path.iter() { + output = match element { + VariantPathElement::Field { name } => output.get_object_field(name)?, + VariantPathElement::Index { index } => output.get_list_element(*index)?, + }; + } + Some(output) + } } impl From<()> for Variant<'_, '_> { From bb54116f8d65709351328bf1e73741b387a2519f Mon Sep 17 00:00:00 2001 From: Samyak S Sarnayak Date: Tue, 15 Jul 2025 22:37:43 +0530 Subject: [PATCH 2/4] fixup! Add `variant_get` compute kernel --- parquet-variant-compute/src/variant_get.rs | 10 ++++------ parquet-variant/src/variant.rs | 13 +++++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index 5106d07500ba..aae000fb4a17 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -51,12 +51,11 @@ pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { let new_variant = variant_array.value(i); // TODO: perf? let new_variant = new_variant.get_path(&options.path); - if let Some(new_variant) = new_variant { + match new_variant { // TODO: we're decoding the value and doing a copy into a variant value again. This // copy can be much smarter. - builder.append_variant(new_variant); - } else { - builder.append_null(); + Some(new_variant) => builder.append_variant(new_variant), + None => builder.append_null(), } } @@ -70,8 +69,7 @@ pub struct GetOptions<'a> { pub path: VariantPath<'a>, /// if `as_type` is None, the returned array will itself be a VariantArray. /// - /// if `as_type` is `Some(type)` the field is returned as the specified type if possible. To specify returning - /// a Variant, pass a Field with variant type in the metadata. + /// if `as_type` is `Some(type)` the field is returned as the specified type. pub as_type: Option, /// Controls the casting behavior (e.g. error vs substituting null on cast error). pub cast_options: CastOptions<'a>, diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs index 6920beb3f8aa..29b191970837 100644 --- a/parquet-variant/src/variant.rs +++ b/parquet-variant/src/variant.rs @@ -1069,14 +1069,11 @@ impl<'m, 'v> Variant<'m, 'v> { /// /// If the path is not found, `None` is returned. pub fn get_path(&self, path: &VariantPath) -> Option { - let mut output = self.clone(); - for element in path.iter() { - output = match element { - VariantPathElement::Field { name } => output.get_object_field(name)?, - VariantPathElement::Index { index } => output.get_list_element(*index)?, - }; - } - Some(output) + path.iter() + .try_fold(self.clone(), |output, element| match element { + VariantPathElement::Field { name } => output.get_object_field(name), + VariantPathElement::Index { index } => output.get_list_element(*index), + }) } } From e41c28188c5a2784e33ccccc313710d0d492d86c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 16 Jul 2025 15:05:12 -0400 Subject: [PATCH 3/4] Remove dbg --- parquet-variant/src/builder.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 714267e39b25..ae82cfec9d3a 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -376,8 +376,6 @@ impl MetadataBuilder { fn upsert_field_name(&mut self, field_name: &str) -> u32 { let (id, new_entry) = self.field_names.insert_full(field_name.to_string()); - dbg!(new_entry); - if new_entry { let n = self.num_field_names(); @@ -1070,7 +1068,6 @@ impl<'a> ObjectBuilder<'a> { let metadata_builder = self.parent_state.metadata_builder(); let field_id = metadata_builder.upsert_field_name(key); - dbg!(field_id); let field_start = self.buffer.offset(); if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { @@ -2487,8 +2484,6 @@ mod tests { let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1)); - dbg!("building"); - builder.append_value(variant.clone()); let (metadata, value) = builder.finish(); From 8f162af29687ca67f59e01e7886f60132a5b8243 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 16 Jul 2025 15:05:40 -0400 Subject: [PATCH 4/4] Test now passes --- parquet-variant-compute/src/variant_get.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs index aae000fb4a17..7d37a8b64511 100644 --- a/parquet-variant-compute/src/variant_get.rs +++ b/parquet-variant-compute/src/variant_get.rs @@ -187,9 +187,6 @@ mod test { } #[test] - #[should_panic( - expected = "Nested values are handled specially by ObjectBuilder and ListBuilder" - )] fn get_complex_variant() { single_variant_get_test( r#"{"top_level_field": {"inner_field": 1234}}"#,