Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,9 @@ name = "parquet_variant_compute"
bench = false

[dev-dependencies]
criterion = { version = "0.6", default-features = false }
rand = { version = "0.9.1" }

[[bench]]
name = "variant_get"
harness = false
59 changes: 59 additions & 0 deletions parquet-variant-compute/benches/variant_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;

use arrow::array::ArrayRef;
use criterion::{criterion_group, criterion_main, Criterion};
use parquet_variant::{Variant, VariantBuilder};
use parquet_variant_compute::{
variant_get::{variant_get, GetOptions},
VariantArray, VariantArrayBuilder,
};
use rand::{rngs::StdRng, Rng, SeedableRng};

fn create_primitive_variant(size: usize) -> VariantArray {
let mut rng = StdRng::seed_from_u64(42);

let mut variant_builder = VariantArrayBuilder::new(1);

for _ in 0..size {
let mut builder = VariantBuilder::new();
builder.append_value(rng.random::<i64>());
let (metadata, value) = builder.finish();
variant_builder.append_variant(Variant::try_new(&metadata, &value).unwrap());
}

variant_builder.build()
}

pub fn variant_get_bench(c: &mut Criterion) {
let variant_array = create_primitive_variant(8192);
let input: ArrayRef = Arc::new(variant_array);

let options = GetOptions {
path: vec![].into(),
as_type: None,
cast_options: Default::default(),
};

c.bench_function("variant_get_primitive", |b| {
b.iter(|| variant_get(&input.clone(), options.clone()))
});
}

criterion_group!(benches, variant_get_bench);
criterion_main!(benches);
1 change: 1 addition & 0 deletions parquet-variant-compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod from_json;
mod to_json;
mod variant_array;
mod variant_array_builder;
pub mod variant_get;

pub use variant_array::VariantArray;
pub use variant_array_builder::VariantArrayBuilder;
Expand Down
197 changes: 197 additions & 0 deletions parquet-variant-compute/src/variant_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;

use arrow::{
array::{Array, ArrayRef},
compute::CastOptions,
error::Result,
};
use arrow_schema::{ArrowError, Field};
use parquet_variant::path::VariantPath;

use crate::{VariantArray, VariantArrayBuilder};

/// Returns an array with the specified path extracted from the variant values.
///
/// The return array type depends on the `as_type` field of the options parameter
/// 1. `as_type: None`: a VariantArray is returned. The values in this new VariantArray will point
/// to the specified path.
/// 2. `as_type: Some(<specific field>)`: an array of the specified type is returned.
pub fn variant_get(input: &ArrayRef, options: GetOptions) -> Result<ArrayRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While trying to review this PR, one thing that I thought might help could be to separate out the Variant traversal and the construction of the output

For example, perhaps you could implement a function to find the appropriate sub Variant like

impl Variant {
  fn get_path(&self, path: &VariantPath) -> Option<Variant> {
    ...
  }
}

With that building block, you could implement the basic "extract a variant" kernel to a new VariantArray using the newly introduced VariantArrayBuilder

let mut output_builder = VariantArrayBuilder::new();
// loop over each input row
for input_variant in input_variant_array.iter() {
  // copy the input variant to the output builder
  let mut vb = VariantBuilder::new()
  vb.append(input_variant)
  let (metadata, value) = vb.build();
  output_builder.append_buffers()
}

I think once we get #7914 from @friendlymatthew in that should work

One downside of this approach is that it will copy the variant over and if the source and destinations are both BinaryViewArray we can probably be much more clever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like the "row-based" approach I mentioned in the description. The reason I did it this way: it looks more like a "columnar" approach. I assumed perf would be better. Perhaps we should benchmark both the approaches? I'll see what I can do

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the row based approach will likely be slower for non-shredded variants, but it will always potentially be needed in some cases (for example when the source arrays are not BinaryView)

If we have the get_path method, I think we can potentially implement fast copies of variants by playing games with pointers -- basically by checking if the return variant has a pointer into the same buffer of the BinaryViewArray we can make a view that points there.

However, i think that will be somewhat tricky and require some unsafe so I suggest we get the first version in plae that does the copy, and once we have it working (and tests written, etc) then I think we'll be in a better position to optimize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. That makes sense. I'll clean up the PR.

but it will always potentially be needed in some cases (for example when the source arrays are not BinaryView)

Not sure I follow. Perhaps worth a separate discussion in a follow-up issue.

let variant_array: &VariantArray = input.as_any().downcast_ref().ok_or_else(|| {
ArrowError::InvalidArgumentError(
"expected a VariantArray as the input for variant_get".to_owned(),
)
})?;

if let Some(as_type) = options.as_type {
return Err(ArrowError::NotYetImplemented(format!(
"getting a {} from a VariantArray is not implemented yet",
as_type
)));
}

let mut builder = VariantArrayBuilder::new(variant_array.len());
for i in 0..variant_array.len() {
let new_variant = variant_array.value(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized something -- when dealing with shredded variants, this value method will do a lot of work to unshred and encode the whole thing (see e.g. #7915 (comment)). And that work is not memoized anywhere unless the caller is able to do so. For efficiency reasons we should strongly consider some kind of direct pathing support in VariantArray itself. Otherwise, it would be far too easy for a caller to accidentally do quadratic work when repeatedly calling value+get_path pairs for different paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to understand this -- is this what you're suggesting?

  • We add a new get_path method in VariantArray that does this:
    • For each row, look up the type of the variant and perform the pathing without decoding. So, for example, if it's a VariantObject and there's a VariantPathElement::Field path, it would get the offset for the given field (not sure how yet) and advance the value slice by that much.
    • We would then create a new VariantArray with the metadata directly copied over and the value copied starting from the advanced slice.
    • For shredded variants, if the path ends up on a shredded value, what would be the expected behavior? I'm guessing that the shredded fields will be represented as an Array of the concrete type (an Int32Array for example) and not a VariantArray. Will we wrap these in a VariantArray and send it back? This is one case where having the path + cast in the same operation would help.
  • This variant_get would then simply re-use VariantArray::get_path and perform the appropriate cast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For efficiency reasons we should strongly consider some kind of direct pathing support in VariantArray itself. Otherwise, it would be far too easy for a caller to accidentally do quadratic work when repeatedly calling value+get_path pairs for different paths.

I think @scovich is saying that the variant_get kernel (on VariantArray should have a special case that knows how to look for a shredded sub field -- and if for example it is asking for a and the the typed_value.a column exists, variant_get could simply return that a column (already as an arrow array, no actual Variant manipulation required)

// TODO: perf?
let new_variant = new_variant.get_path(&options.path);
match new_variant {
// TODO: we're decoding the value and doing a copy into a variant value again. This
// copy can be much smarter.
Some(new_variant) => builder.append_variant(new_variant),
None => builder.append_null(),
}
}

Ok(Arc::new(builder.build()))
}

/// Controls the action of the variant_get kernel.
#[derive(Debug, Clone)]
pub struct GetOptions<'a> {
/// What path to extract
pub path: VariantPath<'a>,
/// if `as_type` is None, the returned array will itself be a VariantArray.
///
/// if `as_type` is `Some(type)` the field is returned as the specified type.
pub as_type: Option<Field>,
/// Controls the casting behavior (e.g. error vs substituting null on cast error).
pub cast_options: CastOptions<'a>,
}

impl<'a> GetOptions<'a> {
/// Construct options to get the specified path as a variant.
pub fn new_with_path(path: VariantPath<'a>) -> Self {
Self {
path,
as_type: None,
cast_options: Default::default(),
}
}
}

#[cfg(test)]
mod test {
use std::sync::Arc;

use arrow::array::{Array, ArrayRef, StringArray};
use parquet_variant::path::{VariantPath, VariantPathElement};

use crate::batch_json_string_to_variant;
use crate::VariantArray;

use super::{variant_get, GetOptions};

fn single_variant_get_test(input_json: &str, path: VariantPath, expected_json: &str) {
// Create input array from JSON string
let input_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(input_json)]));
let input_variant_array_ref: ArrayRef =
Arc::new(batch_json_string_to_variant(&input_array_ref).unwrap());

let result =
variant_get(&input_variant_array_ref, GetOptions::new_with_path(path)).unwrap();

// Create expected array from JSON string
let expected_array_ref: ArrayRef = Arc::new(StringArray::from(vec![Some(expected_json)]));
let expected_variant_array = batch_json_string_to_variant(&expected_array_ref).unwrap();

let result_array: &VariantArray = result.as_any().downcast_ref().unwrap();
assert_eq!(
result_array.len(),
1,
"Expected result array to have length 1"
);
assert!(
result_array.nulls().is_none(),
"Expected no nulls in result array"
);
let result_variant = result_array.value(0);
let expected_variant = expected_variant_array.value(0);
assert_eq!(
result_variant, expected_variant,
"Result variant does not match expected variant"
);
}

#[test]
fn get_primitive_variant_field() {
single_variant_get_test(
r#"{"some_field": 1234}"#,
vec![VariantPathElement::field("some_field".into())].into(),
"1234",
);
}

#[test]
fn get_primitive_variant_list_index() {
single_variant_get_test(
"[1234, 5678]",
vec![VariantPathElement::index(0)].into(),
"1234",
);
}

#[test]
fn get_primitive_variant_inside_object_of_object() {
single_variant_get_test(
r#"{"top_level_field": {"inner_field": 1234}}"#,
vec![
VariantPathElement::field("top_level_field".into()),
VariantPathElement::field("inner_field".into()),
]
.into(),
"1234",
);
}

#[test]
fn get_primitive_variant_inside_list_of_object() {
single_variant_get_test(
r#"[{"some_field": 1234}]"#,
vec![
VariantPathElement::index(0),
VariantPathElement::field("some_field".into()),
]
.into(),
"1234",
);
}

#[test]
fn get_primitive_variant_inside_object_of_list() {
single_variant_get_test(
r#"{"some_field": [1234]}"#,
vec![
VariantPathElement::field("some_field".into()),
VariantPathElement::index(0),
]
.into(),
"1234",
);
}

#[test]
fn get_complex_variant() {
single_variant_get_test(
r#"{"top_level_field": {"inner_field": 1234}}"#,
vec![VariantPathElement::field("top_level_field".into())].into(),
r#"{"inner_field": 1234}"#,
);
}
}
5 changes: 0 additions & 5 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,6 @@ impl MetadataBuilder {
fn upsert_field_name(&mut self, field_name: &str) -> u32 {
let (id, new_entry) = self.field_names.insert_full(field_name.to_string());

dbg!(new_entry);

if new_entry {
let n = self.num_field_names();

Expand Down Expand Up @@ -1070,7 +1068,6 @@ impl<'a> ObjectBuilder<'a> {
let metadata_builder = self.parent_state.metadata_builder();

let field_id = metadata_builder.upsert_field_name(key);
dbg!(field_id);
let field_start = self.buffer.offset();

if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields {
Expand Down Expand Up @@ -2487,8 +2484,6 @@ mod tests {

let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1));

dbg!("building");

builder.append_value(variant.clone());

let (metadata, value) = builder.finish();
Expand Down
1 change: 1 addition & 0 deletions parquet-variant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

mod builder;
mod decoder;
pub mod path;
mod utils;
mod variant;

Expand Down
64 changes: 64 additions & 0 deletions parquet-variant/src/path.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::{borrow::Cow, ops::Deref};

/// Represents a qualified path to a potential subfield or index of a variant value.
#[derive(Debug, Clone)]
pub struct VariantPath<'a>(Vec<VariantPathElement<'a>>);

impl<'a> VariantPath<'a> {
pub fn new(path: Vec<VariantPathElement<'a>>) -> Self {
Self(path)
}

pub fn path(&self) -> &Vec<VariantPathElement> {
&self.0
}
}

impl<'a> From<Vec<VariantPathElement<'a>>> for VariantPath<'a> {
fn from(value: Vec<VariantPathElement<'a>>) -> Self {
Self::new(value)
}
}

impl<'a> Deref for VariantPath<'a> {
type Target = [VariantPathElement<'a>];

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Element of a path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is lovely

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I... don't remember writing this lol
I'll fix this

#[derive(Debug, Clone)]
pub enum VariantPathElement<'a> {
/// Access field with name `name`
Field { name: Cow<'a, str> },
/// Access the list element at `index`
Index { index: usize },
}

impl<'a> VariantPathElement<'a> {
pub fn field(name: Cow<'a, str>) -> VariantPathElement<'a> {
VariantPathElement::Field { name }
}

pub fn index(index: usize) -> VariantPathElement<'a> {
VariantPathElement::Index { index }
}
}
Loading
Loading