Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ default = ["deflate", "snappy", "zstd", "bzip2", "xz"]
deflate = ["flate2"]
snappy = ["snap", "crc"]
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
md5 = ["dep:md5"]
sha256 = ["dep:sha2"]

[dependencies]
arrow-schema = { workspace = true }
Expand All @@ -59,6 +61,8 @@ strum_macros = "0.27"
uuid = "1.17"
indexmap = "2.10"
rand = "0.9"
md5 = { version = "0.8", optional = true }
sha2 = { version = "0.10", optional = true }

[dev-dependencies]
arrow-data = { workspace = true }
Expand Down
265 changes: 149 additions & 116 deletions arrow-avro/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,42 @@ extern crate uuid;

use apache_avro::types::Value;
use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema};
use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC};
use arrow_avro::schema::{Fingerprint, FingerprintAlgorithm, CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC};
use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput};
use once_cell::sync::Lazy;
use std::{hint::black_box, time::Duration};
use uuid::Uuid;

fn make_prefix(fp: Fingerprint) -> [u8; 10] {
let Fingerprint::Rabin(val) = fp;
let mut buf = [0u8; 10];
buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit
buf
fn make_prefix(fp: Fingerprint) -> Vec<u8> {
match fp {
Fingerprint::Rabin(val) => {
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of::<u64>());
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
buf.extend_from_slice(&val.to_le_bytes()); // little-endian
buf
}
Fingerprint::Id(id) => {
let mut buf = Vec::with_capacity(CONFLUENT_MAGIC.len() + size_of::<u32>());
buf.extend_from_slice(&CONFLUENT_MAGIC); // 00
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
buf
}
#[cfg(feature = "md5")]
Fingerprint::MD5(val) => {
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val));
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
buf.extend_from_slice(&val);
buf
}
#[cfg(feature = "sha256")]
Fingerprint::SHA256(val) => {
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val));
buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
buf.extend_from_slice(&val);
buf
}
}
}

fn encode_records_with_prefix(
Expand Down Expand Up @@ -336,6 +359,27 @@ fn new_decoder(
.expect("failed to build decoder")
}

fn new_decoder_id(
schema_json: &'static str,
batch_size: usize,
utf8view: bool,
id: u32,
) -> arrow_avro::reader::Decoder {
let schema = AvroSchema::new(schema_json.parse().unwrap());
let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::None);
// Register the schema with a provided Confluent-style ID
store
.set(Fingerprint::Id(id), schema.clone())
.expect("failed to set schema with id");
ReaderBuilder::new()
.with_writer_schema_store(store)
.with_active_fingerprint(Fingerprint::Id(id))
.with_batch_size(batch_size)
.with_utf8_view(utf8view)
.build_decoder()
.expect("failed to build decoder for id")
}

const SIZES: [usize; 3] = [100, 10_000, 1_000_000];

const INT_SCHEMA: &str =
Expand Down Expand Up @@ -373,7 +417,7 @@ macro_rules! dataset {
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
let schema =
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
let arrow_schema = AvroSchema::new($schema_json.to_string());
let arrow_schema = AvroSchema::new($schema_json.parse().unwrap());
let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed");
let prefix = make_prefix(fingerprint);
SIZES
Expand All @@ -384,6 +428,24 @@ macro_rules! dataset {
};
}

/// Additional helper for Confluent's ID-based wire format (00 + BE u32).
macro_rules! dataset_id {
($name:ident, $schema_json:expr, $gen_fn:ident, $id:expr) => {
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
let schema =
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
let prefix = make_prefix(Fingerprint::Id($id));
SIZES
.iter()
.map(|&n| $gen_fn(&schema, n, &prefix))
.collect()
});
};
}

const ID_BENCH_ID: u32 = 7;

dataset_id!(INT_DATA_ID, INT_SCHEMA, gen_int, ID_BENCH_ID);
dataset!(INT_DATA, INT_SCHEMA, gen_int);
dataset!(LONG_DATA, LONG_SCHEMA, gen_long);
dataset!(FLOAT_DATA, FLOAT_SCHEMA, gen_float);
Expand All @@ -406,19 +468,20 @@ dataset!(ENUM_DATA, ENUM_SCHEMA, gen_enum);
dataset!(MIX_DATA, MIX_SCHEMA, gen_mixed);
dataset!(NEST_DATA, NEST_SCHEMA, gen_nested);

fn bench_scenario(
fn bench_with_decoder<F>(
c: &mut Criterion,
name: &str,
schema_json: &'static str,
data_sets: &[Vec<u8>],
utf8view: bool,
batch_size: usize,
) {
rows: &[usize],
mut new_decoder: F,
) where
F: FnMut() -> arrow_avro::reader::Decoder,
{
let mut group = c.benchmark_group(name);
for (idx, &rows) in SIZES.iter().enumerate() {
for (idx, &row_count) in rows.iter().enumerate() {
let datum = &data_sets[idx];
group.throughput(Throughput::Bytes(datum.len() as u64));
match rows {
match row_count {
10_000 => {
group
.sample_size(25)
Expand All @@ -433,9 +496,9 @@ fn bench_scenario(
}
_ => {}
}
group.bench_function(BenchmarkId::from_parameter(rows), |b| {
group.bench_function(BenchmarkId::from_parameter(row_count), |b| {
b.iter_batched_ref(
|| new_decoder(schema_json, batch_size, utf8view),
&mut new_decoder,
|decoder| {
black_box(decoder.decode(datum).unwrap());
black_box(decoder.flush().unwrap().unwrap());
Expand All @@ -449,105 +512,75 @@ fn bench_scenario(

fn criterion_benches(c: &mut Criterion) {
for &batch_size in &[SMALL_BATCH, LARGE_BATCH] {
bench_scenario(
c,
"Interval",
INTERVAL_SCHEMA,
&INTERVAL_DATA,
false,
batch_size,
);
bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size);
bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size);
bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, batch_size);
bench_scenario(c, "Boolean", BOOL_SCHEMA, &BOOL_DATA, false, batch_size);
bench_scenario(c, "Float64", DOUBLE_SCHEMA, &DOUBLE_DATA, false, batch_size);
bench_scenario(
c,
"Binary(Bytes)",
BYTES_SCHEMA,
&BYTES_DATA,
false,
batch_size,
);
bench_scenario(c, "String", STRING_SCHEMA, &STRING_DATA, false, batch_size);
bench_scenario(
c,
"StringView",
STRING_SCHEMA,
&STRING_DATA,
true,
batch_size,
);
bench_scenario(c, "Date32", DATE_SCHEMA, &DATE_DATA, false, batch_size);
bench_scenario(
c,
"TimeMillis",
TMILLIS_SCHEMA,
&TMILLIS_DATA,
false,
batch_size,
);
bench_scenario(
c,
"TimeMicros",
TMICROS_SCHEMA,
&TMICROS_DATA,
false,
batch_size,
);
bench_scenario(
c,
"TimestampMillis",
TSMILLIS_SCHEMA,
&TSMILLIS_DATA,
false,
batch_size,
);
bench_scenario(
c,
"TimestampMicros",
TSMICROS_SCHEMA,
&TSMICROS_DATA,
false,
batch_size,
);
bench_scenario(c, "Map", MAP_SCHEMA, &MAP_DATA, false, batch_size);
bench_scenario(c, "Array", ARRAY_SCHEMA, &ARRAY_DATA, false, batch_size);
bench_scenario(
c,
"Decimal128",
DECIMAL_SCHEMA,
&DECIMAL_DATA,
false,
batch_size,
);
bench_scenario(c, "UUID", UUID_SCHEMA, &UUID_DATA, false, batch_size);
bench_scenario(
c,
"FixedSizeBinary",
FIXED_SCHEMA,
&FIXED_DATA,
false,
batch_size,
);
bench_scenario(
c,
"Enum(Dictionary)",
ENUM_SCHEMA,
&ENUM_DATA,
false,
batch_size,
);
bench_scenario(c, "Mixed", MIX_SCHEMA, &MIX_DATA, false, batch_size);
bench_scenario(
c,
"Nested(Struct)",
NEST_SCHEMA,
&NEST_DATA,
false,
batch_size,
);
bench_with_decoder(c, "Interval", &INTERVAL_DATA, &SIZES, || {
new_decoder(INTERVAL_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Int32", &INT_DATA, &SIZES, || {
new_decoder(INT_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Int32_Id", &INT_DATA_ID, &SIZES, || {
new_decoder_id(INT_SCHEMA, batch_size, false, ID_BENCH_ID)
});
bench_with_decoder(c, "Int64", &LONG_DATA, &SIZES, || {
new_decoder(LONG_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Float32", &FLOAT_DATA, &SIZES, || {
new_decoder(FLOAT_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Boolean", &BOOL_DATA, &SIZES, || {
new_decoder(BOOL_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Float64", &DOUBLE_DATA, &SIZES, || {
new_decoder(DOUBLE_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Binary(Bytes)", &BYTES_DATA, &SIZES, || {
new_decoder(BYTES_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "String", &STRING_DATA, &SIZES, || {
new_decoder(STRING_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "StringView", &STRING_DATA, &SIZES, || {
new_decoder(STRING_SCHEMA, batch_size, true)
});
bench_with_decoder(c, "Date32", &DATE_DATA, &SIZES, || {
new_decoder(DATE_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "TimeMillis", &TMILLIS_DATA, &SIZES, || {
new_decoder(TMILLIS_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "TimeMicros", &TMICROS_DATA, &SIZES, || {
new_decoder(TMICROS_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "TimestampMillis", &TSMILLIS_DATA, &SIZES, || {
new_decoder(TSMILLIS_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "TimestampMicros", &TSMICROS_DATA, &SIZES, || {
new_decoder(TSMICROS_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Map", &MAP_DATA, &SIZES, || {
new_decoder(MAP_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Array", &ARRAY_DATA, &SIZES, || {
new_decoder(ARRAY_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Decimal128", &DECIMAL_DATA, &SIZES, || {
new_decoder(DECIMAL_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "UUID", &UUID_DATA, &SIZES, || {
new_decoder(UUID_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "FixedSizeBinary", &FIXED_DATA, &SIZES, || {
new_decoder(FIXED_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Enum(Dictionary)", &ENUM_DATA, &SIZES, || {
new_decoder(ENUM_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Mixed", &MIX_DATA, &SIZES, || {
new_decoder(MIX_SCHEMA, batch_size, false)
});
bench_with_decoder(c, "Nested(Struct)", &NEST_DATA, &SIZES, || {
new_decoder(NEST_SCHEMA, batch_size, false)
});
}
}

Expand Down
Loading
Loading