diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 96af73348156..19e86539558f 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -40,6 +40,8 @@ default = ["deflate", "snappy", "zstd", "bzip2", "xz"] deflate = ["flate2"] snappy = ["snap", "crc"] canonical_extension_types = ["arrow-schema/canonical_extension_types"] +md5 = ["dep:md5"] +sha256 = ["dep:sha2"] [dependencies] arrow-schema = { workspace = true } @@ -59,6 +61,8 @@ strum_macros = "0.27" uuid = "1.17" indexmap = "2.10" rand = "0.9" +md5 = { version = "0.8", optional = true } +sha2 = { version = "0.10", optional = true } [dev-dependencies] arrow-data = { workspace = true } diff --git a/arrow-avro/benches/decoder.rs b/arrow-avro/benches/decoder.rs index df802daea154..0ca240d12fc9 100644 --- a/arrow-avro/benches/decoder.rs +++ b/arrow-avro/benches/decoder.rs @@ -27,19 +27,42 @@ extern crate uuid; use apache_avro::types::Value; use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema}; -use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC}; +use arrow_avro::schema::{Fingerprint, FingerprintAlgorithm, CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC}; use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema}; use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, Criterion, Throughput}; use once_cell::sync::Lazy; use std::{hint::black_box, time::Duration}; use uuid::Uuid; -fn make_prefix(fp: Fingerprint) -> [u8; 10] { - let Fingerprint::Rabin(val) = fp; - let mut buf = [0u8; 10]; - buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01 - buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit - buf +fn make_prefix(fp: Fingerprint) -> Vec { + match fp { + Fingerprint::Rabin(val) => { + let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of::()); + buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01 + buf.extend_from_slice(&val.to_le_bytes()); // little-endian + buf + } + Fingerprint::Id(id) => { + let mut buf = Vec::with_capacity(CONFLUENT_MAGIC.len() + size_of::()); + buf.extend_from_slice(&CONFLUENT_MAGIC); // 00 + buf.extend_from_slice(&id.to_be_bytes()); // big-endian + buf + } + #[cfg(feature = "md5")] + Fingerprint::MD5(val) => { + let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val)); + buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01 + buf.extend_from_slice(&val); + buf + } + #[cfg(feature = "sha256")] + Fingerprint::SHA256(val) => { + let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() + size_of_val(&val)); + buf.extend_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01 + buf.extend_from_slice(&val); + buf + } + } } fn encode_records_with_prefix( @@ -336,6 +359,27 @@ fn new_decoder( .expect("failed to build decoder") } +fn new_decoder_id( + schema_json: &'static str, + batch_size: usize, + utf8view: bool, + id: u32, +) -> arrow_avro::reader::Decoder { + let schema = AvroSchema::new(schema_json.parse().unwrap()); + let mut store = arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::None); + // Register the schema with a provided Confluent-style ID + store + .set(Fingerprint::Id(id), schema.clone()) + .expect("failed to set schema with id"); + ReaderBuilder::new() + .with_writer_schema_store(store) + .with_active_fingerprint(Fingerprint::Id(id)) + .with_batch_size(batch_size) + .with_utf8_view(utf8view) + .build_decoder() + .expect("failed to build decoder for id") +} + const SIZES: [usize; 3] = [100, 10_000, 1_000_000]; const INT_SCHEMA: &str = @@ -373,7 +417,7 @@ macro_rules! dataset { static $name: Lazy>> = Lazy::new(|| { let schema = ApacheSchema::parse_str($schema_json).expect("invalid schema for generator"); - let arrow_schema = AvroSchema::new($schema_json.to_string()); + let arrow_schema = AvroSchema::new($schema_json.parse().unwrap()); let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed"); let prefix = make_prefix(fingerprint); SIZES @@ -384,6 +428,24 @@ macro_rules! dataset { }; } +/// Additional helper for Confluent's ID-based wire format (00 + BE u32). +macro_rules! dataset_id { + ($name:ident, $schema_json:expr, $gen_fn:ident, $id:expr) => { + static $name: Lazy>> = Lazy::new(|| { + let schema = + ApacheSchema::parse_str($schema_json).expect("invalid schema for generator"); + let prefix = make_prefix(Fingerprint::Id($id)); + SIZES + .iter() + .map(|&n| $gen_fn(&schema, n, &prefix)) + .collect() + }); + }; +} + +const ID_BENCH_ID: u32 = 7; + +dataset_id!(INT_DATA_ID, INT_SCHEMA, gen_int, ID_BENCH_ID); dataset!(INT_DATA, INT_SCHEMA, gen_int); dataset!(LONG_DATA, LONG_SCHEMA, gen_long); dataset!(FLOAT_DATA, FLOAT_SCHEMA, gen_float); @@ -406,19 +468,20 @@ dataset!(ENUM_DATA, ENUM_SCHEMA, gen_enum); dataset!(MIX_DATA, MIX_SCHEMA, gen_mixed); dataset!(NEST_DATA, NEST_SCHEMA, gen_nested); -fn bench_scenario( +fn bench_with_decoder( c: &mut Criterion, name: &str, - schema_json: &'static str, data_sets: &[Vec], - utf8view: bool, - batch_size: usize, -) { + rows: &[usize], + mut new_decoder: F, +) where + F: FnMut() -> arrow_avro::reader::Decoder, +{ let mut group = c.benchmark_group(name); - for (idx, &rows) in SIZES.iter().enumerate() { + for (idx, &row_count) in rows.iter().enumerate() { let datum = &data_sets[idx]; group.throughput(Throughput::Bytes(datum.len() as u64)); - match rows { + match row_count { 10_000 => { group .sample_size(25) @@ -433,9 +496,9 @@ fn bench_scenario( } _ => {} } - group.bench_function(BenchmarkId::from_parameter(rows), |b| { + group.bench_function(BenchmarkId::from_parameter(row_count), |b| { b.iter_batched_ref( - || new_decoder(schema_json, batch_size, utf8view), + &mut new_decoder, |decoder| { black_box(decoder.decode(datum).unwrap()); black_box(decoder.flush().unwrap().unwrap()); @@ -449,105 +512,75 @@ fn bench_scenario( fn criterion_benches(c: &mut Criterion) { for &batch_size in &[SMALL_BATCH, LARGE_BATCH] { - bench_scenario( - c, - "Interval", - INTERVAL_SCHEMA, - &INTERVAL_DATA, - false, - batch_size, - ); - bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size); - bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size); - bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, batch_size); - bench_scenario(c, "Boolean", BOOL_SCHEMA, &BOOL_DATA, false, batch_size); - bench_scenario(c, "Float64", DOUBLE_SCHEMA, &DOUBLE_DATA, false, batch_size); - bench_scenario( - c, - "Binary(Bytes)", - BYTES_SCHEMA, - &BYTES_DATA, - false, - batch_size, - ); - bench_scenario(c, "String", STRING_SCHEMA, &STRING_DATA, false, batch_size); - bench_scenario( - c, - "StringView", - STRING_SCHEMA, - &STRING_DATA, - true, - batch_size, - ); - bench_scenario(c, "Date32", DATE_SCHEMA, &DATE_DATA, false, batch_size); - bench_scenario( - c, - "TimeMillis", - TMILLIS_SCHEMA, - &TMILLIS_DATA, - false, - batch_size, - ); - bench_scenario( - c, - "TimeMicros", - TMICROS_SCHEMA, - &TMICROS_DATA, - false, - batch_size, - ); - bench_scenario( - c, - "TimestampMillis", - TSMILLIS_SCHEMA, - &TSMILLIS_DATA, - false, - batch_size, - ); - bench_scenario( - c, - "TimestampMicros", - TSMICROS_SCHEMA, - &TSMICROS_DATA, - false, - batch_size, - ); - bench_scenario(c, "Map", MAP_SCHEMA, &MAP_DATA, false, batch_size); - bench_scenario(c, "Array", ARRAY_SCHEMA, &ARRAY_DATA, false, batch_size); - bench_scenario( - c, - "Decimal128", - DECIMAL_SCHEMA, - &DECIMAL_DATA, - false, - batch_size, - ); - bench_scenario(c, "UUID", UUID_SCHEMA, &UUID_DATA, false, batch_size); - bench_scenario( - c, - "FixedSizeBinary", - FIXED_SCHEMA, - &FIXED_DATA, - false, - batch_size, - ); - bench_scenario( - c, - "Enum(Dictionary)", - ENUM_SCHEMA, - &ENUM_DATA, - false, - batch_size, - ); - bench_scenario(c, "Mixed", MIX_SCHEMA, &MIX_DATA, false, batch_size); - bench_scenario( - c, - "Nested(Struct)", - NEST_SCHEMA, - &NEST_DATA, - false, - batch_size, - ); + bench_with_decoder(c, "Interval", &INTERVAL_DATA, &SIZES, || { + new_decoder(INTERVAL_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Int32", &INT_DATA, &SIZES, || { + new_decoder(INT_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Int32_Id", &INT_DATA_ID, &SIZES, || { + new_decoder_id(INT_SCHEMA, batch_size, false, ID_BENCH_ID) + }); + bench_with_decoder(c, "Int64", &LONG_DATA, &SIZES, || { + new_decoder(LONG_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Float32", &FLOAT_DATA, &SIZES, || { + new_decoder(FLOAT_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Boolean", &BOOL_DATA, &SIZES, || { + new_decoder(BOOL_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Float64", &DOUBLE_DATA, &SIZES, || { + new_decoder(DOUBLE_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Binary(Bytes)", &BYTES_DATA, &SIZES, || { + new_decoder(BYTES_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "String", &STRING_DATA, &SIZES, || { + new_decoder(STRING_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "StringView", &STRING_DATA, &SIZES, || { + new_decoder(STRING_SCHEMA, batch_size, true) + }); + bench_with_decoder(c, "Date32", &DATE_DATA, &SIZES, || { + new_decoder(DATE_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "TimeMillis", &TMILLIS_DATA, &SIZES, || { + new_decoder(TMILLIS_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "TimeMicros", &TMICROS_DATA, &SIZES, || { + new_decoder(TMICROS_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "TimestampMillis", &TSMILLIS_DATA, &SIZES, || { + new_decoder(TSMILLIS_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "TimestampMicros", &TSMICROS_DATA, &SIZES, || { + new_decoder(TSMICROS_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Map", &MAP_DATA, &SIZES, || { + new_decoder(MAP_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Array", &ARRAY_DATA, &SIZES, || { + new_decoder(ARRAY_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Decimal128", &DECIMAL_DATA, &SIZES, || { + new_decoder(DECIMAL_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "UUID", &UUID_DATA, &SIZES, || { + new_decoder(UUID_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "FixedSizeBinary", &FIXED_DATA, &SIZES, || { + new_decoder(FIXED_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Enum(Dictionary)", &ENUM_DATA, &SIZES, || { + new_decoder(ENUM_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Mixed", &MIX_DATA, &SIZES, || { + new_decoder(MIX_SCHEMA, batch_size, false) + }); + bench_with_decoder(c, "Nested(Struct)", &NEST_DATA, &SIZES, || { + new_decoder(NEST_SCHEMA, batch_size, false) + }); } } diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index 3f2daff0a3b1..9a77cd788c7a 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -91,8 +91,8 @@ //! use crate::codec::{AvroField, AvroFieldBuilder}; use crate::schema::{ - compare_schemas, generate_fingerprint, AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, - SchemaStore, SINGLE_OBJECT_MAGIC, + compare_schemas, AvroSchema, Fingerprint, FingerprintAlgorithm, Schema, SchemaStore, + CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC, }; use arrow_array::{Array, RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, SchemaRef}; @@ -185,7 +185,7 @@ impl Decoder { }; } match self.handle_prefix(&data[total_consumed..])? { - Some(0) => break, // insufficient bytes + Some(0) => break, // Insufficient bytes Some(n) => { total_consumed += n; self.apply_pending_schema_if_batch_empty(); @@ -201,31 +201,60 @@ impl Decoder { Ok(total_consumed) } - // Attempt to handle a single‑object‑encoding prefix at the current position. - // + // Attempt to handle a prefix at the current position. // * Ok(None) – buffer does not start with the prefix. // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller should await more bytes. // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and fingerprint). fn handle_prefix(&mut self, buf: &[u8]) -> Result, ArrowError> { - // Need at least the magic bytes to decide (2 bytes). - let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else { - return Ok(Some(0)); // Get more bytes - }; + match self.fingerprint_algorithm { + FingerprintAlgorithm::Rabin => { + self.handle_prefix_common(buf, &SINGLE_OBJECT_MAGIC, |bytes| { + Fingerprint::Rabin(u64::from_le_bytes(bytes)) + }) + } + FingerprintAlgorithm::None => { + self.handle_prefix_common(buf, &CONFLUENT_MAGIC, |bytes| { + Fingerprint::Id(u32::from_be_bytes(bytes)) + }) + } + #[cfg(feature = "md5")] + FingerprintAlgorithm::MD5 => { + self.handle_prefix_common(buf, &SINGLE_OBJECT_MAGIC, |bytes| { + Fingerprint::MD5(bytes) + }) + } + #[cfg(feature = "sha256")] + FingerprintAlgorithm::SHA256 => { + self.handle_prefix_common(buf, &SINGLE_OBJECT_MAGIC, |bytes| { + Fingerprint::SHA256(bytes) + }) + } + } + } + + /// This method checks for the provided `magic` bytes at the start of `buf` and, if present, + /// attempts to read the following fingerprint of `N` bytes, converting it to a + /// [`Fingerprint`] using `fingerprint_from`. + fn handle_prefix_common( + &mut self, + buf: &[u8], + magic: &[u8; MAGIC_LEN], + fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint, + ) -> Result, ArrowError> { + // Need at least the magic bytes to decide + // 2 bytes for Avro Spec and 1 byte for Confluent Wire Protocol. + if buf.len() < MAGIC_LEN { + return Ok(Some(0)); + } // Bail out early if the magic does not match. - if magic_bytes != SINGLE_OBJECT_MAGIC { - return Ok(None); // Continue to decode the next record + if &buf[..MAGIC_LEN] != magic { + return Ok(None); } // Try to parse the fingerprint that follows the magic. - let fingerprint_size = match self.fingerprint_algorithm { - FingerprintAlgorithm::Rabin => self - .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| { - Fingerprint::Rabin(u64::from_le_bytes(bytes)) - })?, - }; + let consumed_fp = self.handle_fingerprint(&buf[MAGIC_LEN..], fingerprint_from)?; // Convert the inner result into a “bytes consumed” count. // NOTE: Incomplete fingerprint consumes no bytes. - let consumed = fingerprint_size.map_or(0, |n| n + SINGLE_OBJECT_MAGIC.len()); - Ok(Some(consumed)) + Ok(Some(consumed_fp.map_or(0, |n| n + MAGIC_LEN))) } // Attempts to read and install a new fingerprint of `N` bytes. @@ -239,7 +268,7 @@ impl Decoder { ) -> Result, ArrowError> { // Need enough bytes to get fingerprint (next N bytes) let Some(fingerprint_bytes) = buf.get(..N) else { - return Ok(None); // Insufficient bytes + return Ok(None); // insufficient bytes }; // SAFETY: length checked above. let new_fingerprint = fingerprint_from(fingerprint_bytes.try_into().unwrap()); @@ -658,7 +687,7 @@ mod test { use crate::reader::{read_header, Decoder, Reader, ReaderBuilder}; use crate::schema::{ AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema as AvroRaw, - SchemaStore, AVRO_ENUM_SYMBOLS_METADATA_KEY, SINGLE_OBJECT_MAGIC, + SchemaStore, AVRO_ENUM_SYMBOLS_METADATA_KEY, CONFLUENT_MAGIC, SINGLE_OBJECT_MAGIC, }; use crate::test_util::arrow_test_data; use arrow::array::ArrayDataBuilder; @@ -760,6 +789,17 @@ mod test { out.extend_from_slice(&v.to_le_bytes()); out } + Fingerprint::Id(v) => { + panic!("make_prefix expects a Rabin fingerprint, got ({v})"); + } + #[cfg(feature = "md5")] + Fingerprint::MD5(v) => { + panic!("make_prefix expects a Rabin fingerprint, got ({v:?})"); + } + #[cfg(feature = "sha256")] + Fingerprint::SHA256(id) => { + panic!("make_prefix expects a Rabin fingerprint, got ({id:?})"); + } } } @@ -773,6 +813,21 @@ mod test { .expect("decoder") } + fn make_id_prefix(id: u32, additional: usize) -> Vec { + let capacity = CONFLUENT_MAGIC.len() + size_of::() + additional; + let mut out = Vec::with_capacity(capacity); + out.extend_from_slice(&CONFLUENT_MAGIC); + out.extend_from_slice(&id.to_be_bytes()); + out + } + + fn make_message_id(id: u32, value: i64) -> Vec { + let encoded_value = encode_zigzag(value); + let mut msg = make_id_prefix(id, encoded_value.len()); + msg.extend_from_slice(&encoded_value); + msg + } + fn make_value_schema(pt: PrimitiveType) -> AvroSchema { let json_schema = format!( r#"{{"type":"record","name":"S","fields":[{{"name":"v","type":"{}"}}]}}"#, @@ -1258,6 +1313,11 @@ mod test { let mut decoder = make_decoder(&store, fp_int, &schema_long); let long_bytes = match fp_long { Fingerprint::Rabin(v) => v.to_le_bytes(), + Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got ({id})"), + #[cfg(feature = "md5")] + Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got ({v:?})"), + #[cfg(feature = "sha256")] + Fingerprint::SHA256(v) => panic!("expected Rabin fingerprint, got ({v:?})"), }; let mut buf = Vec::from(SINGLE_OBJECT_MAGIC); buf.extend_from_slice(&long_bytes[..4]); @@ -1276,8 +1336,14 @@ mod test { RecordDecoder::try_new_with_options(root_long.data_type(), decoder.utf8_view).unwrap(); let _ = decoder.cache.insert(fp_long, long_decoder); let mut buf = Vec::from(SINGLE_OBJECT_MAGIC); - let Fingerprint::Rabin(v) = fp_long; - buf.extend_from_slice(&v.to_le_bytes()); + match fp_long { + Fingerprint::Rabin(v) => buf.extend_from_slice(&v.to_le_bytes()), + Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got ({id})"), + #[cfg(feature = "md5")] + Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got ({v:?})"), + #[cfg(feature = "sha256")] + Fingerprint::SHA256(v) => panic!("expected Rabin fingerprint, got ({v:?})"), + } let consumed = decoder.handle_prefix(&buf).unwrap().unwrap(); assert_eq!(consumed, buf.len()); assert!(decoder.pending_schema.is_some()); @@ -1355,6 +1421,83 @@ mod test { } #[test] + fn test_two_messages_same_schema_id() { + let writer_schema = make_value_schema(PrimitiveType::Int); + let reader_schema = writer_schema.clone(); + let id = 100u32; + // Set up store with None fingerprint algorithm and register schema by id + let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); + let _ = store + .set(Fingerprint::Id(id), writer_schema.clone()) + .expect("set id schema"); + let msg1 = make_message_id(id, 21); + let msg2 = make_message_id(id, 22); + let input = [msg1.clone(), msg2.clone()].concat(); + let mut decoder = ReaderBuilder::new() + .with_batch_size(8) + .with_reader_schema(reader_schema) + .with_writer_schema_store(store) + .with_active_fingerprint(Fingerprint::Id(id)) + .build_decoder() + .unwrap(); + let _ = decoder.decode(&input).unwrap(); + let batch = decoder.flush().unwrap().expect("batch"); + assert_eq!(batch.num_rows(), 2); + let col = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.value(0), 21); + assert_eq!(col.value(1), 22); + } + + #[test] + fn test_unknown_id_fingerprint_is_error() { + let writer_schema = make_value_schema(PrimitiveType::Int); + let id_known = 7u32; + let id_unknown = 9u32; + let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); + let _ = store + .set(Fingerprint::Id(id_known), writer_schema.clone()) + .expect("set id schema"); + let mut decoder = ReaderBuilder::new() + .with_batch_size(8) + .with_reader_schema(writer_schema) + .with_writer_schema_store(store) + .with_active_fingerprint(Fingerprint::Id(id_known)) + .build_decoder() + .unwrap(); + let prefix = make_id_prefix(id_unknown, 0); + let err = decoder.decode(&prefix).expect_err("decode should error"); + let msg = err.to_string(); + assert!( + msg.contains("Unknown fingerprint"), + "unexpected message: {msg}" + ); + } + + #[test] + fn test_handle_prefix_id_incomplete_magic() { + let writer_schema = make_value_schema(PrimitiveType::Int); + let id = 5u32; + let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); + let _ = store + .set(Fingerprint::Id(id), writer_schema.clone()) + .expect("set id schema"); + let mut decoder = ReaderBuilder::new() + .with_batch_size(8) + .with_reader_schema(writer_schema) + .with_writer_schema_store(store) + .with_active_fingerprint(Fingerprint::Id(id)) + .build_decoder() + .unwrap(); + let buf = &crate::schema::CONFLUENT_MAGIC[..0]; // empty incomplete magic + let res = decoder.handle_prefix(buf).unwrap(); + assert_eq!(res, Some(0)); + assert!(decoder.pending_schema.is_none()); + } + fn test_split_message_across_chunks() { let writer_schema = make_value_schema(PrimitiveType::Int); let reader_schema = writer_schema.clone(); @@ -1791,18 +1934,18 @@ mod test { let expected = RecordBatch::try_from_iter_with_nullable([( "foo", Arc::new(BinaryArray::from_iter_values(vec![ - b"\x00".as_ref(), - b"\x01".as_ref(), - b"\x02".as_ref(), - b"\x03".as_ref(), - b"\x04".as_ref(), - b"\x05".as_ref(), - b"\x06".as_ref(), - b"\x07".as_ref(), - b"\x08".as_ref(), - b"\t".as_ref(), - b"\n".as_ref(), - b"\x0b".as_ref(), + b"\x00" as &[u8], + b"\x01" as &[u8], + b"\x02" as &[u8], + b"\x03" as &[u8], + b"\x04" as &[u8], + b"\x05" as &[u8], + b"\x06" as &[u8], + b"\x07" as &[u8], + b"\x08" as &[u8], + b"\t" as &[u8], + b"\n" as &[u8], + b"\x0b" as &[u8], ])) as Arc, true, )]) diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index a631119466bd..46ac30b495c6 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -20,6 +20,8 @@ use arrow_schema::{ }; use serde::{Deserialize, Serialize}; use serde_json::{json, Map as JsonMap, Value}; +#[cfg(feature = "sha256")] +use sha2::{Digest, Sha256}; use std::cmp::PartialEq; use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; @@ -31,6 +33,9 @@ pub const SCHEMA_METADATA_KEY: &str = "avro.schema"; /// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`) pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01]; +/// The Confluent "magic" byte (`0x00`) +pub const CONFLUENT_MAGIC: [u8; 1] = [0x00]; + /// Metadata key used to represent Avro enum symbols in an Arrow schema. pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols"; @@ -49,8 +54,8 @@ pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc"; /// Compare two Avro schemas for equality (identical schemas). /// Returns true if the schemas have the same parsing canonical form (i.e., logically identical). pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result { - let canon_writer = generate_canonical_form(writer)?; - let canon_reader = generate_canonical_form(reader)?; + let canon_writer = AvroSchema::generate_canonical_form(writer)?; + let canon_reader = AvroSchema::generate_canonical_form(reader)?; Ok(canon_writer == canon_reader) } @@ -368,17 +373,117 @@ impl AvroSchema { /// Returns the Rabin fingerprint of the schema. pub fn fingerprint(&self) -> Result { - generate_fingerprint_rabin(&self.schema()?) + Self::generate_fingerprint_rabin(&self.schema()?) + } + + /// Generates a fingerprint for the given `Schema` using the specified [`FingerprintAlgorithm`]. + /// + /// The fingerprint is computed over the schema's Parsed Canonical Form + /// as defined by the Avro specification. Depending on `hash_type`, this + /// will return one of the supported [`Fingerprint`] variants: + /// - [`Fingerprint::Rabin`] for [`FingerprintAlgorithm::Rabin`] + /// - [`Fingerprint::MD5`] for [`FingerprintAlgorithm::MD5`] + /// - [`Fingerprint::SHA256`] for [`FingerprintAlgorithm::SHA256`] + /// + /// Note: [`FingerprintAlgorithm::None`] cannot be used to generate a fingerprint + /// and will result in an error. If you intend to use a Schema Registry ID-based + /// wire format, load or set the [`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`] + /// or [`SchemaStore::set`]. + /// + /// See also: + /// + /// # Errors + /// Returns an error if generating the canonical form of the schema fails, + /// or if `hash_type` is [`FingerprintAlgorithm::None`]. + /// + /// # Examples + /// ```no_run + /// use arrow_avro::schema::{AvroSchema, FingerprintAlgorithm}; + /// + /// let avro = AvroSchema::new("\"string\"".to_string()); + /// let schema = avro.schema().unwrap(); + /// let fp = AvroSchema::generate_fingerprint(&schema, FingerprintAlgorithm::Rabin).unwrap(); + /// ``` + pub fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, + ) -> Result { + let canonical = Self::generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + FingerprintAlgorithm::None => Err(ArrowError::SchemaError( + "FingerprintAlgorithm of None cannot be used to generate a fingerprint; \ + if using Fingerprint::Id, pass the registry ID in instead using the set method." + .to_string(), + )), + #[cfg(feature = "md5")] + FingerprintAlgorithm::MD5 => Ok(Fingerprint::MD5(compute_fingerprint_md5(&canonical))), + #[cfg(feature = "sha256")] + FingerprintAlgorithm::SHA256 => { + Ok(Fingerprint::SHA256(compute_fingerprint_sha256(&canonical))) + } + } + } + + /// Generates the 64-bit Rabin fingerprint for the given `Schema`. + /// + /// The fingerprint is computed from the canonical form of the schema. + /// This is also known as `CRC-64-AVRO`. + /// + /// # Returns + /// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. + pub fn generate_fingerprint_rabin(schema: &Schema) -> Result { + Self::generate_fingerprint(schema, FingerprintAlgorithm::Rabin) + } + + /// Generates the Parsed Canonical Form for the given [`Schema`]. + /// + /// The canonical form is a standardized JSON representation of the schema, + /// primarily used for generating a schema fingerprint for equality checking. + /// + /// This form strips attributes that do not affect the schema's identity, + /// such as `doc` fields, `aliases`, and any properties not defined in the + /// Avro specification. + /// + /// + pub fn generate_canonical_form(schema: &Schema) -> Result { + build_canonical(schema, None) } } /// Supported fingerprint algorithms for Avro schema identification. -/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +/// For use with Confluent Schema Registry IDs, set to None. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] pub enum FingerprintAlgorithm { /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. #[default] Rabin, + /// Represents a fingerprint not based on a hash algorithm, (e.g., a 32-bit Schema Registry ID.) + None, + #[cfg(feature = "md5")] + /// 128-bit MD5 message digest. + MD5, + #[cfg(feature = "sha256")] + /// 256-bit SHA-256 digest. + SHA256, +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + Fingerprint::Id(_) => FingerprintAlgorithm::None, + #[cfg(feature = "md5")] + Fingerprint::MD5(_) => FingerprintAlgorithm::MD5, + #[cfg(feature = "sha256")] + Fingerprint::SHA256(_) => FingerprintAlgorithm::SHA256, + } + } } /// A schema fingerprint in one of the supported formats. @@ -386,64 +491,36 @@ pub enum FingerprintAlgorithm { /// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` /// instance always stores only one variant, matching its configured /// `FingerprintAlgorithm`, but the enum makes the API uniform. -/// Currently only `Rabin` is supported /// /// +/// #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub enum Fingerprint { /// A 64-bit Rabin fingerprint. Rabin(u64), + /// A 32-bit Schema Registry ID. + Id(u32), + #[cfg(feature = "md5")] + /// A 128-bit MD5 fingerprint. + MD5([u8; 16]), + #[cfg(feature = "sha256")] + /// A 256-bit SHA-256 fingerprint. + SHA256([u8; 32]), } -/// Allow easy extraction of the algorithm used to create a fingerprint. -impl From<&Fingerprint> for FingerprintAlgorithm { - fn from(fp: &Fingerprint) -> Self { - match fp { - Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, - } - } -} - -/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. -pub(crate) fn generate_fingerprint( - schema: &Schema, - hash_type: FingerprintAlgorithm, -) -> Result { - let canonical = generate_canonical_form(schema).map_err(|e| { - ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) - })?; - match hash_type { - FingerprintAlgorithm::Rabin => { - Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) - } +impl Fingerprint { + /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID). + /// + /// The provided `id` is in big-endian wire order; this converts it to host order + /// and returns `Fingerprint::Id`. + /// + /// # Returns + /// A `Fingerprint::Id` variant containing the 32-bit fingerprint. + pub fn load_fingerprint_id(id: u32) -> Self { + Fingerprint::Id(u32::from_be(id)) } } -/// Generates the 64-bit Rabin fingerprint for the given `Schema`. -/// -/// The fingerprint is computed from the canonical form of the schema. -/// This is also known as `CRC-64-AVRO`. -/// -/// # Returns -/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. -pub fn generate_fingerprint_rabin(schema: &Schema) -> Result { - generate_fingerprint(schema, FingerprintAlgorithm::Rabin) -} - -/// Generates the Parsed Canonical Form for the given [`Schema`]. -/// -/// The canonical form is a standardized JSON representation of the schema, -/// primarily used for generating a schema fingerprint for equality checking. -/// -/// This form strips attributes that do not affect the schema's identity, -/// such as `doc` fields, `aliases`, and any properties not defined in the -/// Avro specification. -/// -/// -pub fn generate_canonical_form(schema: &Schema) -> Result { - build_canonical(schema, None) -} - /// An in-memory cache of Avro schemas, indexed by their fingerprint. /// /// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. @@ -478,17 +555,16 @@ pub struct SchemaStore { schemas: HashMap, } -impl TryFrom<&[AvroSchema]> for SchemaStore { +impl TryFrom> for SchemaStore { type Error = ArrowError; - /// Creates a `SchemaStore` from a slice of schemas. - /// Each schema in the slice is registered with the new store. - fn try_from(schemas: &[AvroSchema]) -> Result { - let mut store = SchemaStore::new(); - for schema in schemas { - store.register(schema.clone())?; - } - Ok(store) + /// Creates a `SchemaStore` from a HashMap of schemas. + /// Each schema in the HashMap is registered with the new store. + fn try_from(schemas: HashMap) -> Result { + Ok(Self { + schemas, + ..Self::default() + }) } } @@ -498,23 +574,35 @@ impl SchemaStore { Self::default() } - /// Registers a schema with the store and returns its fingerprint. + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new_with_type(fingerprint_algorithm: FingerprintAlgorithm) -> Self { + Self { + fingerprint_algorithm, + ..Self::default() + } + } + + /// Registers a schema with the store and the provided fingerprint. + /// Note: Confluent wire format implementations should leverage this method. /// - /// A fingerprint is calculated for the given schema using the store's configured - /// hash type. If a schema with the same fingerprint does not already exist in the - /// store, the new schema is inserted. If the fingerprint already exists, the - /// existing schema is not overwritten. + /// A schema is set in the store, using the provided fingerprint. If a schema + /// with the same fingerprint does not already exist in the store, the new schema + /// is inserted. If the fingerprint already exists, the existing schema is not overwritten. /// /// # Arguments /// + /// * `fingerprint` - A reference to the `Fingerprint` of the schema to register. /// * `schema` - The `AvroSchema` to register. /// /// # Returns /// - /// A `Result` containing the `Fingerprint` of the schema if successful, + /// A `Result` returning the provided `Fingerprint` of the schema if successful, /// or an `ArrowError` on failure. - pub fn register(&mut self, schema: AvroSchema) -> Result { - let fingerprint = generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?; + pub fn set( + &mut self, + fingerprint: Fingerprint, + schema: AvroSchema, + ) -> Result { match self.schemas.entry(fingerprint) { Entry::Occupied(entry) => { if entry.get() != &schema { @@ -530,6 +618,37 @@ impl SchemaStore { Ok(fingerprint) } + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. If FingerprintAlgorithm is set to None, this + /// method will return an error. Confluent wire format implementations should leverage the + /// set method instead. + /// + /// # Arguments + /// + /// * `schema` - The `AvroSchema` to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: AvroSchema) -> Result { + if self.fingerprint_algorithm == FingerprintAlgorithm::None { + return Err(ArrowError::SchemaError( + "Invalid FingerprintAlgorithm; unable to generate fingerprint. \ + Use the set method directly instead, providing a valid fingerprint" + .to_string(), + )); + } + let fingerprint = + AvroSchema::generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?; + self.set(fingerprint, schema)?; + Ok(fingerprint) + } + /// Looks up a schema by its `Fingerprint`. /// /// # Arguments @@ -715,6 +834,29 @@ pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 { fp } +#[cfg(feature = "md5")] +/// Compute the **128‑bit MD5** fingerprint of the canonical form. +/// +/// Returns a 16‑byte array (`[u8; 16]`) containing the full MD5 digest, +/// exactly as required by the Avro specification. +#[inline] +pub(crate) fn compute_fingerprint_md5(canonical_form: &str) -> [u8; 16] { + let digest = md5::compute(canonical_form.as_bytes()); + digest.0 +} + +#[cfg(feature = "sha256")] +/// Compute the **256‑bit SHA‑256** fingerprint of the canonical form. +/// +/// Returns a 32‑byte array (`[u8; 32]`) containing the full SHA‑256 digest. +#[inline] +pub(crate) fn compute_fingerprint_sha256(canonical_form: &str) -> [u8; 32] { + let mut hasher = Sha256::new(); + hasher.update(canonical_form.as_bytes()); + let digest = hasher.finalize(); + digest.into() +} + #[inline] fn is_internal_arrow_key(key: &str) -> bool { key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY @@ -1393,8 +1535,16 @@ mod tests { fn test_try_from_schemas_rabin() { let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); - let schemas = vec![int_avro_schema.clone(), record_avro_schema.clone()]; - let store = SchemaStore::try_from(schemas.as_slice()).unwrap(); + let mut schemas: HashMap = HashMap::new(); + schemas.insert( + int_avro_schema.fingerprint().unwrap(), + int_avro_schema.clone(), + ); + schemas.insert( + record_avro_schema.fingerprint().unwrap(), + record_avro_schema.clone(), + ); + let store = SchemaStore::try_from(schemas).unwrap(); let int_fp = int_avro_schema.fingerprint().unwrap(); assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema)); let rec_fp = record_avro_schema.fingerprint().unwrap(); @@ -1405,12 +1555,21 @@ mod tests { fn test_try_from_with_duplicates() { let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); - let schemas = vec![ + let mut schemas: HashMap = HashMap::new(); + schemas.insert( + int_avro_schema.fingerprint().unwrap(), int_avro_schema.clone(), - record_avro_schema, + ); + schemas.insert( + record_avro_schema.fingerprint().unwrap(), + record_avro_schema.clone(), + ); + // Insert duplicate of int schema + schemas.insert( + int_avro_schema.fingerprint().unwrap(), int_avro_schema.clone(), - ]; - let store = SchemaStore::try_from(schemas.as_slice()).unwrap(); + ); + let store = SchemaStore::try_from(schemas).unwrap(); assert_eq!(store.schemas.len(), 2); let int_fp = int_avro_schema.fingerprint().unwrap(); assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema)); @@ -1421,14 +1580,40 @@ mod tests { let mut store = SchemaStore::new(); let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); let fp_enum = store.register(schema.clone()).unwrap(); - let Fingerprint::Rabin(fp_val) = fp_enum; - assert_eq!( - store.lookup(&Fingerprint::Rabin(fp_val)).cloned(), - Some(schema.clone()) - ); - assert!(store - .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1))) - .is_none()); + match fp_enum { + Fingerprint::Rabin(fp_val) => { + assert_eq!( + store.lookup(&Fingerprint::Rabin(fp_val)).cloned(), + Some(schema.clone()) + ); + assert!(store + .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1))) + .is_none()); + } + Fingerprint::Id(id) => { + unreachable!("This test should only generate Rabin fingerprints") + } + #[cfg(feature = "md5")] + Fingerprint::MD5(id) => { + unreachable!("This test should only generate Rabin fingerprints") + } + #[cfg(feature = "sha256")] + Fingerprint::SHA256(id) => { + unreachable!("This test should only generate Rabin fingerprints") + } + } + } + + #[test] + fn test_set_and_lookup_id() { + let mut store = SchemaStore::new(); + let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let id = 42u32; + let fp = Fingerprint::Id(id); + let out_fp = store.set(fp, schema.clone()).unwrap(); + assert_eq!(out_fp, fp); + assert_eq!(store.lookup(&fp).cloned(), Some(schema.clone())); + assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none()); } #[test] @@ -1442,10 +1627,43 @@ mod tests { assert_eq!(store.schemas.len(), 1); } + #[test] + fn test_set_and_lookup_with_provided_fingerprint() { + let mut store = SchemaStore::new(); + let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let fp = schema.fingerprint().unwrap(); + let out_fp = store.set(fp, schema.clone()).unwrap(); + assert_eq!(out_fp, fp); + assert_eq!(store.lookup(&fp).cloned(), Some(schema)); + } + + #[test] + fn test_set_duplicate_same_schema_ok() { + let mut store = SchemaStore::new(); + let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let fp = schema.fingerprint().unwrap(); + let _ = store.set(fp, schema.clone()).unwrap(); + let _ = store.set(fp, schema.clone()).unwrap(); + assert_eq!(store.schemas.len(), 1); + } + + #[test] + fn test_set_duplicate_different_schema_collision_error() { + let mut store = SchemaStore::new(); + let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let schema2 = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); + // Use the same Fingerprint::Id to simulate a collision across different schemas + let fp = Fingerprint::Id(123); + let _ = store.set(fp, schema1).unwrap(); + let err = store.set(fp, schema2).unwrap_err(); + let msg = format!("{err}"); + assert!(msg.contains("Schema fingerprint collision")); + } + #[test] fn test_canonical_form_generation_primitive() { let schema = int_schema(); - let canonical_form = generate_canonical_form(&schema).unwrap(); + let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap(); assert_eq!(canonical_form, r#""int""#); } @@ -1453,7 +1671,7 @@ mod tests { fn test_canonical_form_generation_record() { let schema = record_schema(); let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#; - let canonical_form = generate_canonical_form(&schema).unwrap(); + let canonical_form = AvroSchema::generate_canonical_form(&schema).unwrap(); assert_eq!(canonical_form, expected_canonical_form); } @@ -1522,7 +1740,7 @@ mod tests { }, })); let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#; - let canonical_form = generate_canonical_form(&schema_with_attrs).unwrap(); + let canonical_form = AvroSchema::generate_canonical_form(&schema_with_attrs).unwrap(); assert_eq!(canonical_form, expected_canonical_form); }