Skip to content
4 changes: 4 additions & 0 deletions arrow-json/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,7 @@ rand = { version = "0.9", default-features = false, features = ["std", "std_rng"
[[bench]]
name = "serde"
harness = false

[[bench]]
name = "reader"
harness = false
126 changes: 126 additions & 0 deletions arrow-json/benches/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use arrow_json::ReaderBuilder;
use arrow_schema::{DataType, Field, Schema};
use criterion::{
BenchmarkId, Criterion, SamplingMode, Throughput, criterion_group, criterion_main,
};
use std::fmt::Write;
use std::hint::black_box;
use std::sync::Arc;

// Projection benchmark constants
const WIDE_PROJECTION_ROWS: usize = 1 << 14; // 16K rows
const WIDE_PROJECTION_TOTAL_FIELDS: usize = 100; // 100 fields total, select only 3

fn bench_decode_schema(
c: &mut Criterion,
name: &str,
data: &[u8],
schema: Arc<Schema>,
rows: usize,
projection: bool,
) {
let mut group = c.benchmark_group(name);
group.throughput(Throughput::Bytes(data.len() as u64));
group.sample_size(50);
group.measurement_time(std::time::Duration::from_secs(5));
group.warm_up_time(std::time::Duration::from_secs(2));
group.sampling_mode(SamplingMode::Flat);
group.bench_function(BenchmarkId::from_parameter(rows), |b| {
b.iter(|| {
let mut decoder = ReaderBuilder::new(schema.clone())
.with_batch_size(rows)
.with_projection(projection)
.build_decoder()
.unwrap();

let mut offset = 0;
while offset < data.len() {
let read = decoder.decode(black_box(&data[offset..])).unwrap();
if read == 0 {
break;
}
offset += read;
}

let batch = decoder.flush().unwrap();
black_box(batch);
})
});
group.finish();
}

fn build_wide_projection_json(rows: usize, total_fields: usize) -> Vec<u8> {
// Estimate: each field ~15 bytes ("fXX":VVVVVVV,), total ~15*100 + overhead
let per_row_size = total_fields * 15 + 10;
let mut data = String::with_capacity(rows * per_row_size);

for _row in 0..rows {
data.push('{');
for i in 0..total_fields {
if i > 0 {
data.push(',');
}
// Use fixed-width values for stable benchmarks: 7 digits
let _ = write!(data, "\"f{}\":{:07}", i, i);
}
data.push('}');
data.push('\n');
}
data.into_bytes()
}

fn criterion_benchmark(c: &mut Criterion) {
// Wide projection workload: tests overhead of parsing unused fields
let wide_projection_data =
build_wide_projection_json(WIDE_PROJECTION_ROWS, WIDE_PROJECTION_TOTAL_FIELDS);

// Full schema: all 100 fields
let mut full_fields = Vec::new();
for i in 0..WIDE_PROJECTION_TOTAL_FIELDS {
full_fields.push(Field::new(format!("f{}", i), DataType::Int64, false));
}
let full_schema = Arc::new(Schema::new(full_fields));
bench_decode_schema(
c,
"decode_wide_projection_full_json",
&wide_projection_data,
full_schema,
WIDE_PROJECTION_ROWS,
false,
);

// Projected schema: only 3 fields (f0, f10, f50) out of 100
let projected_schema = Arc::new(Schema::new(vec![
Field::new("f0", DataType::Int64, false),
Field::new("f10", DataType::Int64, false),
Field::new("f50", DataType::Int64, false),
]));
bench_decode_schema(
c,
"decode_wide_projection_narrow_json",
&wide_projection_data,
projected_schema,
WIDE_PROJECTION_ROWS,
true,
);
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
65 changes: 62 additions & 3 deletions arrow-json/src/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ use crate::StructMode;
use crate::reader::binary_array::{
BinaryArrayDecoder, BinaryViewDecoder, FixedSizeBinaryArrayDecoder,
};
use std::collections::HashSet;
use std::io::BufRead;
use std::sync::Arc;

Expand Down Expand Up @@ -182,6 +183,7 @@ pub struct ReaderBuilder {
batch_size: usize,
coerce_primitive: bool,
strict_mode: bool,
projection: bool,
is_field: bool,
struct_mode: StructMode,

Expand All @@ -202,6 +204,7 @@ impl ReaderBuilder {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
projection: false,
is_field: false,
struct_mode: Default::default(),
schema,
Expand Down Expand Up @@ -243,6 +246,7 @@ impl ReaderBuilder {
batch_size: 1024,
coerce_primitive: false,
strict_mode: false,
projection: false,
is_field: true,
struct_mode: Default::default(),
schema: Arc::new(Schema::new([field.into()])),
Expand Down Expand Up @@ -275,6 +279,12 @@ impl ReaderBuilder {
}
}

/// Enables projection-aware parsing to skip fields not present in the schema.
/// This is ignored when `strict_mode` is true, which always checks projection.
pub fn with_projection(self, projection: bool) -> Self {
Self { projection, ..self }
}

/// Set the [`StructMode`] for the reader, which determines whether structs
/// can be decoded from JSON as objects or lists. For more details refer to
/// the enum documentation. Default is to use `ObjectOnly`.
Expand Down Expand Up @@ -303,6 +313,19 @@ impl ReaderBuilder {
}
};

let num_fields = self.schema.flattened_fields().len();

// Extract projection field set from schema for projection-aware parsing
// - strict_mode: fail-fast on unknown fields during tape parsing
// - projection: skip JSON fields not present in the schema
let enable_projection = self.strict_mode || self.projection;
let projection: Option<HashSet<String>> = match &data_type {
DataType::Struct(fields) if enable_projection && !fields.is_empty() => {
Some(fields.iter().map(|f| f.name().clone()).collect())
}
_ => None,
};

let decoder = make_decoder(
data_type,
self.coerce_primitive,
Expand All @@ -311,12 +334,15 @@ impl ReaderBuilder {
self.struct_mode,
)?;

let num_fields = self.schema.flattened_fields().len();

Ok(Decoder {
decoder,
is_field: self.is_field,
tape_decoder: TapeDecoder::new(self.batch_size, num_fields),
tape_decoder: TapeDecoder::new(
self.batch_size,
num_fields,
projection,
self.strict_mode,
),
batch_size: self.batch_size,
schema: self.schema,
})
Expand Down Expand Up @@ -1783,6 +1809,39 @@ mod tests {
);
}

#[test]
fn test_projection_skip_unknown_fields() {
// JSON has fields a, b, c but schema only has a, c
let buf = r#"
{"a": 1, "b": "ignored", "c": true}
{"a": 2, "b": "also ignored", "c": false}
"#;

let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::Boolean, true),
]));

// with_projection(true): skip unknown field "b" and succeed
let batch = ReaderBuilder::new(schema)
.with_projection(true)
.build(Cursor::new(buf.as_bytes()))
.unwrap()
.read()
.unwrap()
.unwrap();

assert_eq!(batch.num_rows(), 2);
assert_eq!(batch.num_columns(), 2);

let a = batch.column(0).as_primitive::<Int32Type>();
assert_eq!(a.values(), &[1, 2]);

let c = batch.column(1).as_boolean();
assert!(c.value(0));
assert!(!c.value(1));
}

fn read_file(path: &str, schema: Option<Schema>) -> Reader<BufReader<File>> {
let file = File::open(path).unwrap();
let mut reader = BufReader::new(file);
Expand Down
Loading
Loading