Skip to content
263 changes: 249 additions & 14 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
// specific language governing permissions and limitations
// under the License.
use crate::decoder::{VariantBasicType, VariantPrimitiveType};
use crate::{ShortString, Variant, VariantDecimal16, VariantDecimal4, VariantDecimal8};
use crate::{
ShortString, Variant, VariantDecimal16, VariantDecimal4, VariantDecimal8, VariantMetadata,
};
use arrow_schema::ArrowError;
use indexmap::{IndexMap, IndexSet};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};

const BASIC_TYPE_BITS: u8 = 2;
const UNIX_EPOCH_DATE: chrono::NaiveDate = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
Expand Down Expand Up @@ -218,8 +220,46 @@ impl ValueBuffer {
self.0.len()
}

fn append_non_nested_value<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, value: T) {
let variant = value.into();
fn new_object<'a>(
&'a mut self,
metadata_builder: &'a mut MetadataBuilder,
) -> ObjectBuilder<'a> {
let parent_state = ParentState::Variant {
buffer: self,
metadata_builder,
};
let validate_unique_fields = false;
ObjectBuilder::new(parent_state, validate_unique_fields)
}

fn new_list<'a>(&'a mut self, metadata_builder: &'a mut MetadataBuilder) -> ListBuilder<'a> {
let parent_state = ParentState::Variant {
buffer: self,
metadata_builder,
};
let validate_unique_fields = false;
ListBuilder::new(parent_state, validate_unique_fields)
}

/// Appends a variant to the buffer.
///
/// # Panics
///
/// This method will panic if the variant contains duplicate field names in objects
/// when validation is enabled. For a fallible version, use [`ValueBuffer::try_append_variant`]
fn append_variant<'m, 'd>(
&mut self,
variant: Variant<'m, 'd>,
metadata_builder: &mut MetadataBuilder,
) {
self.try_append_variant(variant, metadata_builder).unwrap();
}

fn try_append_variant<'m, 'd>(
&mut self,
variant: Variant<'m, 'd>,
metadata_builder: &mut MetadataBuilder,
) -> Result<(), ArrowError> {
match variant {
Variant::Null => self.append_null(),
Variant::BooleanTrue => self.append_bool(true),
Expand All @@ -239,12 +279,38 @@ impl ValueBuffer {
Variant::Binary(v) => self.append_binary(v),
Variant::String(s) => self.append_string(s),
Variant::ShortString(s) => self.append_short_string(s),
Variant::Object(_) | Variant::List(_) => {
unreachable!(
"Nested values are handled specially by ObjectBuilder and ListBuilder"
);
Variant::Object(obj) => {
let metadata_field_names = metadata_builder
.field_names
.iter()
.enumerate()
.map(|(i, f)| (f.clone(), i))
.collect::<HashMap<_, _>>();

let mut object_builder = self.new_object(metadata_builder);

// first add all object fields that exist in metadata builder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the code here is basically trying to preserve the order so that the variants compare deeply equal

Rather than trying to preserve the byte-for-byte equality here, I think a better idea could be to implement a Variant::eq_value type method that compares the Variants for logical equality, rather than byte-for-byte equality.

I am thinking we can merge this PR and then I will make a follow on PR for that idea

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the logical equality function on variants. This will be needed in tests where the result and expected variants come from different sources (and thus might have a mismatch in metadata or ordering).

let mut object_fields = obj.iter().collect::<Vec<_>>();

object_fields
.sort_by_key(|(field_name, _)| metadata_field_names.get(field_name as &str));

for (field_name, value) in object_fields {
object_builder.insert(field_name, value);
}

object_builder.finish()?;
}
Variant::List(list) => {
let mut list_builder = self.new_list(metadata_builder);
for value in list.iter() {
list_builder.append_value(value);
}
list_builder.finish();
}
}

Ok(())
}

/// Writes out the header byte for a variant object or list
Expand Down Expand Up @@ -310,6 +376,8 @@ impl MetadataBuilder {
fn upsert_field_name(&mut self, field_name: &str) -> u32 {
let (id, new_entry) = self.field_names.insert_full(field_name.to_string());

dbg!(new_entry);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is left over


if new_entry {
let n = self.num_field_names();

Expand Down Expand Up @@ -728,6 +796,12 @@ impl VariantBuilder {
}
}

pub fn with_metadata(mut self, metadata: VariantMetadata) -> Self {
self.metadata_builder.extend(metadata.iter());

self
}

/// Create a new VariantBuilder that will write the metadata and values to
/// the specified buffers.
pub fn new_with_buffers(metadata_buffer: Vec<u8>, value_buffer: Vec<u8>) -> Self {
Expand Down Expand Up @@ -792,7 +866,12 @@ impl VariantBuilder {
ObjectBuilder::new(parent_state, validate_unique_fields)
}

/// Append a non-nested value to the builder.
/// Append a value to the builder.
///
/// # Panics
///
/// This method will panic if the variant contains duplicate field names in objects
/// when validation is enabled. For a fallible version, use [`VariantBuilder::try_append_value`]
///
/// # Example
/// ```
Expand All @@ -802,7 +881,21 @@ impl VariantBuilder {
/// builder.append_value(42i8);
/// ```
pub fn append_value<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, value: T) {
self.buffer.append_non_nested_value(value);
let variant = value.into();
self.buffer
.append_variant(variant, &mut self.metadata_builder);
}

/// Append a value to the builder.
pub fn try_append_value<'m, 'd, T: Into<Variant<'m, 'd>>>(
&mut self,
value: T,
) -> Result<(), ArrowError> {
let variant = value.into();
self.buffer
.try_append_variant(variant, &mut self.metadata_builder)?;

Ok(())
}

/// Finish the builder and return the metadata and value buffers.
Expand Down Expand Up @@ -866,10 +959,26 @@ impl<'a> ListBuilder<'a> {
ListBuilder::new(parent_state, validate_unique_fields)
}

/// Appends a new primitive value to this list
/// Appends a variant to the list.
///
/// # Panics
///
/// This method will panic if the variant contains duplicate field names in objects
/// when validation is enabled. For a fallible version, use [`ListBuilder::try_append_value`].
pub fn append_value<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, value: T) {
self.try_append_value(value).unwrap();
}

/// Appends a new primitive value to this list
pub fn try_append_value<'m, 'd, T: Into<Variant<'m, 'd>>>(
&mut self,
value: T,
) -> Result<(), ArrowError> {
self.offsets.push(self.buffer.offset());
self.buffer.append_non_nested_value(value);
self.buffer
.try_append_variant(value.into(), self.parent_state.metadata_builder())?;

Ok(())
}

/// Finalizes this list and appends it to its parent, which otherwise remains unmodified.
Expand Down Expand Up @@ -926,22 +1035,40 @@ impl<'a> ObjectBuilder<'a> {
}
}

/// Add a field with key and value to the object
///
/// # Panics
///
/// This method will panic if the variant contains duplicate field names in objects
/// when validation is enabled. For a fallible version, use [`ObjectBuilder::try_insert`]
pub fn insert<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, key: &str, value: T) {
self.try_insert(key, value).unwrap();
}

/// Add a field with key and value to the object
///
/// Note: when inserting duplicate keys, the new value overwrites the previous mapping,
/// but the old value remains in the buffer, resulting in a larger variant
pub fn insert<'m, 'd, T: Into<Variant<'m, 'd>>>(&mut self, key: &str, value: T) {
pub fn try_insert<'m, 'd, T: Into<Variant<'m, 'd>>>(
&mut self,
key: &str,
value: T,
) -> Result<(), ArrowError> {
// Get metadata_builder from parent state
let metadata_builder = self.parent_state.metadata_builder();

let field_id = metadata_builder.upsert_field_name(key);
dbg!(field_id);
let field_start = self.buffer.offset();

if self.fields.insert(field_id, field_start).is_some() && self.validate_unique_fields {
self.duplicate_fields.insert(field_id);
}

self.buffer.append_non_nested_value(value);
self.buffer
.try_append_variant(value.into(), metadata_builder)?;

Ok(())
}

/// Enables validation for unique field keys when inserting into this object.
Expand Down Expand Up @@ -2339,4 +2466,112 @@ mod tests {
let variant = Variant::try_new_with_metadata(metadata, &value).unwrap();
assert_eq!(variant, Variant::Int8(2));
}

// matthew
#[test]
fn test_append_object() {
let (m1, v1) = make_object();
let variant = Variant::new(&m1, &v1);

let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1));

dbg!("building");

builder.append_value(variant.clone());

let (metadata, value) = builder.finish();
assert_eq!(variant, Variant::new(&metadata, &value));
}

/// make an object variant with field names in reverse lexicographical order
fn make_object() -> (Vec<u8>, Vec<u8>) {
let mut builder = VariantBuilder::new();

let mut obj = builder.new_object();

obj.insert("b", true);
obj.insert("a", false);
obj.finish().unwrap();
builder.finish()
}

#[test]
fn test_append_nested_object() {
let (m1, v1) = make_nested_object();
let variant = Variant::new(&m1, &v1);

// because we can guarantee metadata is validated through the builder
let mut builder = VariantBuilder::new().with_metadata(VariantMetadata::new(&m1));
builder.append_value(variant.clone());

let (metadata, value) = builder.finish();
let result_variant = Variant::new(&metadata, &value);

assert_eq!(variant, result_variant);
}

/// make a nested object variant
fn make_nested_object() -> (Vec<u8>, Vec<u8>) {
let mut builder = VariantBuilder::new();

{
let mut outer_obj = builder.new_object();

{
let mut inner_obj = outer_obj.new_object("b");
inner_obj.insert("a", "inner_value");
inner_obj.finish().unwrap();
}

outer_obj.finish().unwrap();
}

builder.finish()
}

#[test]
fn test_append_list() {
let (m1, v1) = make_list();
let variant = Variant::new(&m1, &v1);
let mut builder = VariantBuilder::new();
builder.append_value(variant.clone());
let (metadata, value) = builder.finish();
assert_eq!(variant, Variant::new(&metadata, &value));
}

/// make a simple List variant
fn make_list() -> (Vec<u8>, Vec<u8>) {
let mut builder = VariantBuilder::new();
let mut list = builder.new_list();
list.append_value(1234);
list.append_value("a string value");
list.finish();
builder.finish()
}

#[test]
fn test_append_nested_list() {
let (m1, v1) = make_nested_list();
let variant = Variant::new(&m1, &v1);
let mut builder = VariantBuilder::new();
builder.append_value(variant.clone());
let (metadata, value) = builder.finish();
assert_eq!(variant, Variant::new(&metadata, &value));
}

fn make_nested_list() -> (Vec<u8>, Vec<u8>) {
let mut builder = VariantBuilder::new();
let mut list = builder.new_list();

let mut inner_list = list.new_list();

inner_list.append_value("the dog licked the oil");
inner_list.append_value(4.3);

inner_list.finish();

list.finish();

builder.finish()
}
}
2 changes: 1 addition & 1 deletion parquet-variant/src/variant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl VariantMetadataHeader {
/// [Variant Spec]: https://github.com/apache/parquet-format/blob/master/VariantEncoding.md#metadata-encoding
#[derive(Debug, Clone, PartialEq)]
pub struct VariantMetadata<'m> {
bytes: &'m [u8],
pub(crate) bytes: &'m [u8],
header: VariantMetadataHeader,
dictionary_size: u32,
first_value_byte: u32,
Expand Down
Loading