Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ parquet-variant = { workspace = true }
parquet-variant-json = { workspace = true }
chrono = { workspace = true }
uuid = { version = "1.18.0", features = ["v4"]}
serde_json = "1.0"

[lib]
name = "parquet_variant_compute"
Expand Down
147 changes: 147 additions & 0 deletions parquet-variant-compute/benches/variant_kernels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,15 @@ use parquet_variant::{EMPTY_VARIANT_METADATA_BYTES, Variant, VariantBuilder};
use parquet_variant_compute::{
GetOptions, VariantArray, VariantArrayBuilder, json_to_variant, variant_get,
};
use parquet_variant_json::append_json;
use rand::Rng;
use rand::SeedableRng;
use rand::distr::Alphanumeric;
use rand::rngs::StdRng;
use serde_json::Value;
use std::fmt::Write;
use std::sync::Arc;

fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {
let input_array = StringArray::from_iter_values(json_repeated_struct(8000));
let array_ref: ArrayRef = Arc::new(input_array);
Expand Down Expand Up @@ -66,6 +69,58 @@ fn benchmark_batch_json_string_to_variant(c: &mut Criterion) {
});
});

let input_array = StringArray::from_iter_values(random_structure(8000, 200));
let total_input_bytes = input_array
.iter()
.flatten() // filter None
.map(|v| v.len())
.sum::<usize>();
let id = format!(
"batch_json_string_to_variant object - 1 depth(200 fields) random_json({} bytes per document)",
total_input_bytes / input_array.len()
);
let array_ref: ArrayRef = Arc::new(input_array);
let string_array = array_ref.as_any().downcast_ref::<StringArray>().unwrap();
let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
for i in 0..string_array.len() {
json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
}
c.bench_function(&id, |b| {
b.iter(|| {
let mut variant_array_builder = VariantArrayBuilder::new(string_array.len());
for json in &json_array {
append_json(json, &mut variant_array_builder).unwrap();
}
let _ = variant_array_builder.build();
});
});

let input_array = StringArray::from_iter_values(random_structure(8000, 100));
let total_input_bytes = input_array
.iter()
.flatten() // filter None
.map(|v| v.len())
.sum::<usize>();
let id = format!(
"batch_json_string_to_variant object - 1 depth(100 fields) random_json({} bytes per document)",
total_input_bytes / input_array.len()
);
let array_ref: ArrayRef = Arc::new(input_array);
let string_array = array_ref.as_any().downcast_ref::<StringArray>().unwrap();
let mut json_array: Vec<Value> = Vec::with_capacity(string_array.len());
for i in 0..string_array.len() {
json_array.push(serde_json::from_str(string_array.value(i)).unwrap());
}
c.bench_function(&id, |b| {
b.iter(|| {
let mut variant_array_builder = VariantArrayBuilder::new(string_array.len());
for json in &json_array {
append_json(json, &mut variant_array_builder).unwrap();
}
let _ = variant_array_builder.build();
});
});

let input_array = StringArray::from_iter_values(random_json_structure(8000));
let total_input_bytes = input_array
.iter()
Expand Down Expand Up @@ -240,6 +295,22 @@ fn random_json_structure(count: usize) -> impl Iterator<Item = String> {
(0..count).map(move |_| generator.next().to_string())
}

fn random_structure(count: usize, max_fields: usize) -> impl Iterator<Item = String> {
let mut generator = RandomJsonGenerator {
null_weight: 5,
string_weight: 25,
number_weight: 25,
boolean_weight: 10,
object_weight: 25,
array_weight: 0,
max_fields,
max_array_length: 0,
max_depth: 1,
..Default::default()
};
(0..count).map(move |_| generator.next_object().to_string())
}

/// Creates JSON with random structure and fields.
///
/// Each type is created in proportion controlled by the
Expand Down Expand Up @@ -299,6 +370,82 @@ impl RandomJsonGenerator {
&self.output_buffer
}

fn next_object(&mut self) -> &str {
self.output_buffer.clear();
self.append_random_json_for_object();
&self.output_buffer
}

fn append_random_json_for_object(&mut self) {
// use destructuring to ensure each field is used
let Self {
rng,
null_weight,
string_weight,
number_weight,
boolean_weight,
max_fields,
output_buffer,
..
} = self;

write!(output_buffer, "{{").unwrap();
for i in 0..*max_fields {
let key_length = rng.random_range(1..=20);
let key: String = (0..key_length)
.map(|_| rng.sample(Alphanumeric) as char)
.collect();
write!(output_buffer, "\"{key}\":").unwrap();

let total_weight = *null_weight + *string_weight + *number_weight + *boolean_weight;

// Generate a random number to determine the type
let mut random_value: usize = rng.random_range(0..total_weight);

if random_value <= *null_weight {
write!(output_buffer, "null").unwrap();
} else {
random_value -= *null_weight;

if random_value <= *string_weight {
// Generate a random string between 1 and 20 characters
let length = rng.random_range(1..=20);
let random_string: String = (0..length)
.map(|_| rng.sample(Alphanumeric) as char)
.collect();
write!(output_buffer, "\"{random_string}\"",).unwrap();
} else {
random_value -= *string_weight;

if random_value <= *number_weight {
// 50% chance of generating an integer or a float
if rng.random_bool(0.5) {
// Generate a random integer
let random_integer: i64 = rng.random_range(-1000..1000);
write!(output_buffer, "{random_integer}",).unwrap();
} else {
// Generate a random float
let random_float: f64 = rng.random_range(-1000.0..1000.0);
write!(output_buffer, "{random_float}",).unwrap();
}
} else {
random_value -= *number_weight;

if random_value <= *boolean_weight {
// Generate a random boolean
let random_boolean: bool = rng.random();
write!(output_buffer, "{random_boolean}",).unwrap();
}
}
}
}
if i < *max_fields - 1 {
write!(output_buffer, ",").unwrap();
}
}
write!(&mut self.output_buffer, "}}").unwrap();
}

/// Appends a random JSON value to the output buffer.
fn append_random_json(&mut self, current_depth: usize) {
// use destructuring to ensure each field is used
Expand Down
2 changes: 1 addition & 1 deletion parquet-variant-json/src/from_json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ fn variant_from_number<'m, 'v>(n: &Number) -> Result<Variant<'m, 'v>, ArrowError
}
}

fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> {
pub fn append_json(json: &Value, builder: &mut impl VariantBuilderExt) -> Result<(), ArrowError> {
match json {
Value::Null => builder.append_value(Variant::Null),
Value::Bool(b) => builder.append_value(*b),
Expand Down
2 changes: 1 addition & 1 deletion parquet-variant-json/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@
mod from_json;
mod to_json;

pub use from_json::JsonToVariant;
pub use from_json::{JsonToVariant, append_json};
pub use to_json::VariantToJson;
75 changes: 6 additions & 69 deletions parquet-variant/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::decoder::{VariantBasicType, VariantPrimitiveType};
use crate::decoder::{OffsetSizeBytes, VariantBasicType, VariantPrimitiveType};
use crate::{
ShortString, Variant, VariantDecimal4, VariantDecimal8, VariantDecimal16, VariantList,
VariantMetadata, VariantObject,
Expand Down Expand Up @@ -43,21 +43,15 @@ fn short_string_header(len: usize) -> u8 {
(len as u8) << 2 | VariantBasicType::ShortString as u8
}

pub(crate) fn int_size(v: usize) -> u8 {
pub(crate) fn int_size(v: usize) -> OffsetSizeBytes {
match v {
0..=0xFF => 1,
0x100..=0xFFFF => 2,
0x10000..=0xFFFFFF => 3,
_ => 4,
0..=0xFF => OffsetSizeBytes::One,
0x100..=0xFFFF => OffsetSizeBytes::Two,
0x10000..=0xFFFFFF => OffsetSizeBytes::Three,
_ => OffsetSizeBytes::Four,
}
}

/// Write little-endian integer to buffer at a specific position
fn write_offset_at_pos(buf: &mut [u8], start_pos: usize, value: usize, nbytes: u8) {
let bytes = value.to_le_bytes();
buf[start_pos..start_pos + nbytes as usize].copy_from_slice(&bytes[..nbytes as usize]);
}

/// Wrapper around a `Vec<u8>` that provides methods for appending
/// primitive values, variant types, and metadata.
///
Expand Down Expand Up @@ -358,63 +352,6 @@ impl ValueBuilder {
);
state.finish();
}

/// Writes out the header byte for a variant object or list, from the starting position
/// of the builder, will return the position after this write
pub(crate) fn append_header_start_from_buf_pos(
&mut self,
start_pos: usize, // the start position where the header will be inserted
header_byte: u8,
is_large: bool,
num_fields: usize,
) -> usize {
let buffer = self.inner_mut();

// Write header at the original start position
let mut header_pos = start_pos;

// Write header byte
buffer[header_pos] = header_byte;
header_pos += 1;

// Write number of fields
if is_large {
buffer[header_pos..header_pos + 4].copy_from_slice(&(num_fields as u32).to_le_bytes());
header_pos += 4;
} else {
buffer[header_pos] = num_fields as u8;
header_pos += 1;
}

header_pos
}

/// Writes out the offsets for an array of offsets, including the final offset (data size).
/// from the starting position of the buffer, will return the position after this write
pub(crate) fn append_offset_array_start_from_buf_pos(
&mut self,
start_pos: usize,
offsets: impl IntoIterator<Item = usize>,
data_size: Option<usize>,
nbytes: u8,
) -> usize {
let buf = self.inner_mut();

let mut current_pos = start_pos;
for relative_offset in offsets {
write_offset_at_pos(buf, current_pos, relative_offset, nbytes);
current_pos += nbytes as usize;
}

// Write data_size
if let Some(data_size) = data_size {
// Write data_size at the end of the offsets
write_offset_at_pos(buf, current_pos, data_size, nbytes);
current_pos += nbytes as usize;
}

current_pos
}
}

/// A trait for managing state specific to different builder types.
Expand Down
2 changes: 1 addition & 1 deletion parquet-variant/src/builder/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<'a, S: BuilderSpecificState> ListBuilder<'a, S> {
// Make sure to reserve enough capacity to handle the extra bytes we'll truncate.
let mut bytes_to_splice = Vec::with_capacity(header_size + 3);
// Write header
let header = array_header(is_large, offset_size);
let header = array_header(is_large, offset_size as _);
bytes_to_splice.push(header);

append_packed_u32(&mut bytes_to_splice, num_elements as u32, num_elements_size);
Expand Down
2 changes: 1 addition & 1 deletion parquet-variant/src/builder/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl WritableMetadataBuilder {

// Determine appropriate offset size based on the larger of dict size or total string size
let max_offset = std::cmp::max(total_dict_size, nkeys);
let offset_size = int_size(max_offset);
let offset_size = int_size(max_offset) as u8;

let offset_start = 1 + offset_size as usize;
let string_start = offset_start + (nkeys + 1) * offset_size as usize;
Expand Down
Loading
Loading