diff --git a/parquet-variant-compute/Cargo.toml b/parquet-variant-compute/Cargo.toml index c596a3904512..832cd4688483 100644 --- a/parquet-variant-compute/Cargo.toml +++ b/parquet-variant-compute/Cargo.toml @@ -41,3 +41,9 @@ name = "parquet_variant_compute" bench = false [dev-dependencies] +criterion = { version = "0.6", default-features = false } +rand = { version = "0.9.1" } + +[[bench]] +name = "variant_get" +harness = false diff --git a/parquet-variant-compute/benches/variant_get.rs b/parquet-variant-compute/benches/variant_get.rs new file mode 100644 index 000000000000..4452e879b7d8 --- /dev/null +++ b/parquet-variant-compute/benches/variant_get.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::sync::Arc; + +use arrow::array::ArrayRef; +use criterion::{criterion_group, criterion_main, Criterion}; +use parquet_variant::{Variant, VariantBuilder}; +use parquet_variant_compute::{ + variant_get::{variant_get, GetOptions}, + VariantArray, VariantArrayBuilder, +}; +use rand::{rngs::StdRng, Rng, SeedableRng}; + +fn create_primitive_variant(size: usize) -> VariantArray { + let mut rng = StdRng::seed_from_u64(42); + + let mut variant_builder = VariantArrayBuilder::new(1); + + for _ in 0..size { + let mut builder = VariantBuilder::new(); + builder.append_value(rng.random::()); + let (metadata, value) = builder.finish(); + variant_builder.append_variant(Variant::try_new(&metadata, &value).unwrap()); + } + + variant_builder.build() +} + +pub fn variant_get_bench(c: &mut Criterion) { + let variant_array = create_primitive_variant(8192); + let input: ArrayRef = Arc::new(variant_array); + + let options = GetOptions { + path: vec![].into(), + as_type: None, + cast_options: Default::default(), + }; + + c.bench_function("variant_get_primitive", |b| { + b.iter(|| variant_get(&input.clone(), options.clone())) + }); +} + +criterion_group!(benches, variant_get_bench); +criterion_main!(benches); diff --git a/parquet-variant-compute/src/lib.rs b/parquet-variant-compute/src/lib.rs index c593cf405171..e6d004102e05 100644 --- a/parquet-variant-compute/src/lib.rs +++ b/parquet-variant-compute/src/lib.rs @@ -19,6 +19,7 @@ mod from_json; mod to_json; mod variant_array; mod variant_array_builder; +pub mod variant_get; pub use variant_array::VariantArray; pub use variant_array_builder::VariantArrayBuilder; diff --git a/parquet-variant-compute/src/variant_get.rs b/parquet-variant-compute/src/variant_get.rs new file mode 100644 index 000000000000..7d37a8b64511 --- /dev/null +++ b/parquet-variant-compute/src/variant_get.rs @@ -0,0 +1,197 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::sync::Arc; + +use arrow::{ + array::{Array, ArrayRef}, + compute::CastOptions, + error::Result, +}; +use arrow_schema::{ArrowError, Field}; +use parquet_variant::path::VariantPath; + +use crate::{VariantArray, VariantArrayBuilder}; + +/// Returns an array with the specified path extracted from the variant values. +/// +/// The return array type depends on the `as_type` field of the options parameter +/// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point +/// to the specified path. +/// 2. `as_type: Some()`: an array of the specified type is returned. +pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result { + let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| { + ArrowError::InvalidArgumentError( + "expected a VariantArray as the input for variant_get".to_owned(), + ) + })?; + + if let Some(as_type) = options.as_type { + return Err(ArrowError::NotYetImplemented(format!( + "getting a {} from a VariantArray is not implemented yet", + as_type + ))); + } + + let mut builder = VariantArrayBuilder::new(variant_array.len()); + for i in 0..variant_array.len() { + let new_variant = variant_array.value(i); + // TODO: perf? + let new_variant = new_variant.get_path(&options.path); + match new_variant { + // TODO: we're decoding the value and doing a copy into a variant value again. This + // copy can be much smarter. + Some(new_variant) => builder.append_variant(new_variant), + None => builder.append_null(), + } + } + + Ok(Arc::new(builder.build())) +} + +/// Controls the action of the variant_get kernel. +#[derive(Debug, Clone)] +pub struct GetOptions<'a> { + /// What path to extract + pub path: VariantPath<'a>, + /// if `as_type` is None, the returned array will itself be a VariantArray. + /// + /// if `as_type` is `Some(type)` the field is returned as the specified type. + pub as_type: Option, + /// Controls the casting behavior (e.g. error vs substituting null on cast error). + pub cast_options: CastOptions<'a>, +} + +impl<'a> GetOptions<'a> { + /// Construct options to get the specified path as a variant. + pub fn new_with_path(path: VariantPath<'a>) -> Self { + Self { + path, + as_type: None, + cast_options: Default::default(), + } + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::array::{Array, ArrayRef, StringArray}; + use parquet_variant::path::{VariantPath, VariantPathElement}; + + use crate::batch_json_string_to_variant; + use crate::VariantArray; + + use super::{variant_get, GetOptions}; + + fn single_variant_get_test(input_json: &str, path: VariantPath, expected_json: &str) { + // Create input array from JSON string + let input_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(input_json)])); + let input_variant_array_ref: ArrayRef = + Arc::new(batch_json_string_to_variant(&input_array_ref).unwrap()); + + let result = + variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap(); + + // Create expected array from JSON string + let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)])); + let expected_variant_array = batch_json_string_to_variant(&expected_array_ref).unwrap(); + + let result_array: &VariantArray = result.as_any().downcast_ref().unwrap(); + assert_eq!( + result_array.len(), + 1, + "Expected result array to have length 1" + ); + assert!( + result_array.nulls().is_none(), + "Expected no nulls in result array" + ); + let result_variant = result_array.value(0); + let expected_variant = expected_variant_array.value(0); + assert_eq!( + result_variant, expected_variant, + "Result variant does not match expected variant" + ); + } + + #[test] + fn get_primitive_variant_field() { + single_variant_get_test( + r#"{"some_field": 1234}"#, + vec![VariantPathElement::field("some_field".into())].into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_list_index() { + single_variant_get_test( + "[1234, 5678]", + vec![VariantPathElement::index(0)].into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_inside_object_of_object() { + single_variant_get_test( + r#"{"top_level_field": {"inner_field": 1234}}"#, + vec![ + VariantPathElement::field("top_level_field".into()), + VariantPathElement::field("inner_field".into()), + ] + .into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_inside_list_of_object() { + single_variant_get_test( + r#"[{"some_field": 1234}]"#, + vec![ + VariantPathElement::index(0), + VariantPathElement::field("some_field".into()), + ] + .into(), + "1234", + ); + } + + #[test] + fn get_primitive_variant_inside_object_of_list() { + single_variant_get_test( + r#"{"some_field": [1234]}"#, + vec![ + VariantPathElement::field("some_field".into()), + VariantPathElement::index(0), + ] + .into(), + "1234", + ); + } + + #[test] + fn get_complex_variant() { + single_variant_get_test( + r#"{"top_level_field": {"inner_field": 1234}}"#, + vec![VariantPathElement::field("top_level_field".into())].into(), + r#"{"inner_field": 1234}"#, + ); + } +} diff --git a/parquet-variant/src/builder.rs b/parquet-variant/src/builder.rs index 714267e39b25..ae82cfec9d3a 100644 --- a/parquet-variant/src/builder.rs +++ b/parquet-variant/src/builder.rs @@ -376,8 +376,6 @@ impl MetadataBuilder { fn upsert_field_name(&mut self, field_name: &str) -> u32 { let (id, new_entry) = self.field_names.insert_full(field_name.to_string()); - dbg!(new_entry); - if new_entry { let n = self.num_field_names(); @@ -1070,7 +1068,6 @@ impl<'a> ObjectBuilder<'a> { let metadata_builder = self.parent_state.metadata_builder(); let field_id = metadata_builder.upsert_field_name(key); - dbg!(field_id); let field_start = self.buffer.offset(); if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields { @@ -2487,8 +2484,6 @@ mod tests { let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1)); - dbg!("building"); - builder.append_value(variant.clone()); let (metadata, value) = builder.finish(); diff --git a/parquet-variant/src/lib.rs b/parquet-variant/src/lib.rs index 221c4e427ff3..d04c59605fc4 100644 --- a/parquet-variant/src/lib.rs +++ b/parquet-variant/src/lib.rs @@ -29,6 +29,7 @@ mod builder; mod decoder; +pub mod path; mod utils; mod variant; diff --git a/parquet-variant/src/path.rs b/parquet-variant/src/path.rs new file mode 100644 index 000000000000..1643d9c87c5f --- /dev/null +++ b/parquet-variant/src/path.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::{borrow::Cow, ops::Deref}; + +/// Represents a qualified path to a potential subfield or index of a variant value. +#[derive(Debug, Clone)] +pub struct VariantPath<'a>(Vec>); + +impl<'a> VariantPath<'a> { + pub fn new(path: Vec>) -> Self { + Self(path) + } + + pub fn path(&self) -> &Vec { + &self.0 + } +} + +impl<'a> From>> for VariantPath<'a> { + fn from(value: Vec>) -> Self { + Self::new(value) + } +} + +impl<'a> Deref for VariantPath<'a> { + type Target = [VariantPathElement<'a>]; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Element of a path +#[derive(Debug, Clone)] +pub enum VariantPathElement<'a> { + /// Access field with name `name` + Field { name: Cow<'a, str> }, + /// Access the list element at `index` + Index { index: usize }, +} + +impl<'a> VariantPathElement<'a> { + pub fn field(name: Cow<'a, str>) -> VariantPathElement<'a> { + VariantPathElement::Field { name } + } + + pub fn index(index: usize) -> VariantPathElement<'a> { + VariantPathElement::Index { index } + } +} diff --git a/parquet-variant/src/variant.rs b/parquet-variant/src/variant.rs index ce593cd2b04d..29b191970837 100644 --- a/parquet-variant/src/variant.rs +++ b/parquet-variant/src/variant.rs @@ -22,6 +22,7 @@ pub use self::object::VariantObject; use crate::decoder::{ self, get_basic_type, get_primitive_type, VariantBasicType, VariantPrimitiveType, }; +use crate::path::{VariantPath, VariantPathElement}; use crate::utils::{first_byte_from_slice, slice_from_slice}; use std::ops::Deref; @@ -1063,6 +1064,17 @@ impl<'m, 'v> Variant<'m, 'v> { _ => None, } } + + /// Return a new Variant with the path followed. + /// + /// If the path is not found, `None` is returned. + pub fn get_path(&self, path: &VariantPath) -> Option { + path.iter() + .try_fold(self.clone(), |output, element| match element { + VariantPathElement::Field { name } => output.get_object_field(name), + VariantPathElement::Index { index } => output.get_list_element(*index), + }) + } } impl From<()> for Variant<'_, '_> {