Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion arrow-avro/benches/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,9 @@ macro_rules! dataset {
let schema =
ApacheSchema::parse_str($schema_json).expect("invalid schema for generator");
let arrow_schema = AvroSchema::new($schema_json.parse().unwrap());
let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed");
let fingerprint = arrow_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.expect("fingerprint failed");
let prefix = make_prefix(fingerprint);
SIZES
.iter()
Expand Down
206 changes: 193 additions & 13 deletions arrow-avro/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
/// The Confluent "magic" byte (`0x00`)
pub const CONFLUENT_MAGIC: [u8; 1] = [0x00];

/// The maximum possible length of a prefix.
/// SHA256 (32) + single-object magic (2)
pub const MAX_PREFIX_LEN: usize = 34;

/// The metadata key used for storing the JSON encoded [`Schema`]
pub const SCHEMA_METADATA_KEY: &str = "avro.schema";

Expand Down Expand Up @@ -349,9 +353,9 @@ impl AvroSchema {
.map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}")))
}

/// Returns the Rabin fingerprint of the schema.
pub fn fingerprint(&self) -> Result<Fingerprint, ArrowError> {
Self::generate_fingerprint_rabin(&self.schema()?)
/// Returns the fingerprint of the schema.
pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result<Fingerprint, ArrowError> {
Self::generate_fingerprint(&self.schema()?, hash_type)
}

/// Generates a fingerprint for the given `Schema` using the specified [`FingerprintAlgorithm`].
Expand Down Expand Up @@ -476,6 +480,68 @@ impl AvroSchema {
}
}

/// A stack-allocated, fixed-size buffer for the prefix.
#[derive(Debug, Copy, Clone)]
pub struct Prefix {
buf: [u8; MAX_PREFIX_LEN],
len: u8,
}

impl Prefix {
#[inline]
pub(crate) fn as_slice(&self) -> &[u8] {
&self.buf[..self.len as usize]
}
}

/// Defines the strategy for generating the per-record prefix for an Avro binary stream.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum FingerprintStrategy {
/// Use the 64-bit Rabin fingerprint (default for single-object encoding).
#[default]
Rabin,
/// Use a Confluent Schema Registry 32-bit ID.
Id(u32),
#[cfg(feature = "md5")]
/// Use the 128-bit MD5 fingerprint.
MD5,
#[cfg(feature = "sha256")]
/// Use the 256-bit SHA-256 fingerprint.
SHA256,
}

impl From<Fingerprint> for FingerprintStrategy {
fn from(f: Fingerprint) -> Self {
Self::from(&f)
}
}

impl From<FingerprintAlgorithm> for FingerprintStrategy {
fn from(f: FingerprintAlgorithm) -> Self {
match f {
FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
FingerprintAlgorithm::None => FingerprintStrategy::Id(0),
#[cfg(feature = "md5")]
FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
#[cfg(feature = "sha256")]
FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256,
}
}
}

impl From<&Fingerprint> for FingerprintStrategy {
fn from(f: &Fingerprint) -> Self {
match f {
Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
Fingerprint::Id(id) => FingerprintStrategy::Id(*id),
#[cfg(feature = "md5")]
Fingerprint::MD5(_) => FingerprintStrategy::MD5,
#[cfg(feature = "sha256")]
Fingerprint::SHA256(_) => FingerprintStrategy::SHA256,
}
}
}

/// Supported fingerprint algorithms for Avro schema identification.
/// For use with Confluent Schema Registry IDs, set to None.
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
Expand Down Expand Up @@ -507,6 +573,25 @@ impl From<&Fingerprint> for FingerprintAlgorithm {
}
}

impl From<FingerprintStrategy> for FingerprintAlgorithm {
fn from(s: FingerprintStrategy) -> Self {
Self::from(&s)
}
}

impl From<&FingerprintStrategy> for FingerprintAlgorithm {
fn from(s: &FingerprintStrategy) -> Self {
match s {
FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
FingerprintStrategy::Id(_) => FingerprintAlgorithm::None,
#[cfg(feature = "md5")]
FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
#[cfg(feature = "sha256")]
FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256,
}
}
}

/// A schema fingerprint in one of the supported formats.
///
/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
Expand All @@ -529,6 +614,38 @@ pub enum Fingerprint {
SHA256([u8; 32]),
}

impl From<FingerprintStrategy> for Fingerprint {
fn from(s: FingerprintStrategy) -> Self {
Self::from(&s)
}
}

impl From<&FingerprintStrategy> for Fingerprint {
fn from(s: &FingerprintStrategy) -> Self {
match s {
FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
#[cfg(feature = "md5")]
FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
#[cfg(feature = "sha256")]
FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]),
}
}
}

impl From<FingerprintAlgorithm> for Fingerprint {
fn from(s: FingerprintAlgorithm) -> Self {
match s {
FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
FingerprintAlgorithm::None => Fingerprint::Id(0),
#[cfg(feature = "md5")]
FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
#[cfg(feature = "sha256")]
FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]),
}
}
}

impl Fingerprint {
/// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID).
///
Expand All @@ -540,6 +657,53 @@ impl Fingerprint {
pub fn load_fingerprint_id(id: u32) -> Self {
Fingerprint::Id(u32::from_be(id))
}

/// Constructs a serialized prefix represented as a `Vec<u8>` based on the variant of the enum.
///
/// This method serializes data in different formats depending on the variant of `self`:
/// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`)
/// followed by the big-endian byte representation of the `id`.
/// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header
/// (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`.
/// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the
/// `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`.
/// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is
/// a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`.
///
/// # Returns
///
/// A `Prefix` containing the serialized prefix data.
///
/// # Features
///
/// - You can optionally enable the `md5` feature to include the `MD5` variant.
/// - You can optionally enable the `sha256` feature to include the `SHA256` variant.
///
pub fn make_prefix(&self) -> Prefix {
let mut buf = [0u8; MAX_PREFIX_LEN];
let len = match self {
Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()),
Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()),
#[cfg(feature = "md5")]
Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
#[cfg(feature = "sha256")]
Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val),
};
Prefix { buf, len }
}
}

fn write_prefix<const MAGIC_LEN: usize, const PAYLOAD_LEN: usize>(
buf: &mut [u8; MAX_PREFIX_LEN],
magic: &[u8; MAGIC_LEN],
payload: &[u8; PAYLOAD_LEN],
) -> u8 {
debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN);
let total = MAGIC_LEN + PAYLOAD_LEN;
let prefix_slice = &mut buf[..total];
prefix_slice[..MAGIC_LEN].copy_from_slice(magic);
prefix_slice[MAGIC_LEN..total].copy_from_slice(payload);
total as u8
}

/// An in-memory cache of Avro schemas, indexed by their fingerprint.
Expand Down Expand Up @@ -1744,17 +1908,25 @@ mod tests {
let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
schemas.insert(
int_avro_schema.fingerprint().unwrap(),
int_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap(),
int_avro_schema.clone(),
);
schemas.insert(
record_avro_schema.fingerprint().unwrap(),
record_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap(),
record_avro_schema.clone(),
);
let store = SchemaStore::try_from(schemas).unwrap();
let int_fp = int_avro_schema.fingerprint().unwrap();
let int_fp = int_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap();
assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
let rec_fp = record_avro_schema.fingerprint().unwrap();
let rec_fp = record_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap();
assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
}

Expand All @@ -1764,21 +1936,29 @@ mod tests {
let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
let mut schemas: HashMap<Fingerprint, AvroSchema> = HashMap::new();
schemas.insert(
int_avro_schema.fingerprint().unwrap(),
int_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap(),
int_avro_schema.clone(),
);
schemas.insert(
record_avro_schema.fingerprint().unwrap(),
record_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap(),
record_avro_schema.clone(),
);
// Insert duplicate of int schema
schemas.insert(
int_avro_schema.fingerprint().unwrap(),
int_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap(),
int_avro_schema.clone(),
);
let store = SchemaStore::try_from(schemas).unwrap();
assert_eq!(store.schemas.len(), 2);
let int_fp = int_avro_schema.fingerprint().unwrap();
let int_fp = int_avro_schema
.fingerprint(FingerprintAlgorithm::Rabin)
.unwrap();
assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
}

Expand Down Expand Up @@ -1838,7 +2018,7 @@ mod tests {
fn test_set_and_lookup_with_provided_fingerprint() {
let mut store = SchemaStore::new();
let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
let fp = schema.fingerprint().unwrap();
let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
let out_fp = store.set(fp, schema.clone()).unwrap();
assert_eq!(out_fp, fp);
assert_eq!(store.lookup(&fp).cloned(), Some(schema));
Expand All @@ -1848,7 +2028,7 @@ mod tests {
fn test_set_duplicate_same_schema_ok() {
let mut store = SchemaStore::new();
let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
let fp = schema.fingerprint().unwrap();
let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap();
let _ = store.set(fp, schema.clone()).unwrap();
let _ = store.set(fp, schema.clone()).unwrap();
assert_eq!(store.schemas.len(), 1);
Expand Down
37 changes: 32 additions & 5 deletions arrow-avro/src/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
//! Avro Encoder for Arrow types.

use crate::codec::{AvroDataType, AvroField, Codec};
use crate::schema::Nullability;
use crate::schema::{Fingerprint, Nullability, Prefix};
use arrow_array::cast::AsArray;
use arrow_array::types::{
ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType,
Expand All @@ -33,6 +33,7 @@ use arrow_array::{
use arrow_array::{Decimal32Array, Decimal64Array};
use arrow_buffer::NullBuffer;
use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit};
use serde::Serialize;
use std::io::Write;
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -522,6 +523,7 @@ struct FieldBinding {
pub struct RecordEncoderBuilder<'a> {
avro_root: &'a AvroField,
arrow_schema: &'a ArrowSchema,
fingerprint: Option<Fingerprint>,
}

impl<'a> RecordEncoderBuilder<'a> {
Expand All @@ -530,9 +532,15 @@ impl<'a> RecordEncoderBuilder<'a> {
Self {
avro_root,
arrow_schema,
fingerprint: None,
}
}

pub(crate) fn with_fingerprint(mut self, fingerprint: Option<Fingerprint>) -> Self {
self.fingerprint = fingerprint;
self
}

/// Build the `RecordEncoder` by walking the Avro **record** root in Avro order,
/// resolving each field to an Arrow index by name.
pub fn build(self) -> Result<RecordEncoder, ArrowError> {
Expand All @@ -557,7 +565,10 @@ impl<'a> RecordEncoderBuilder<'a> {
)?,
});
}
Ok(RecordEncoder { columns })
Ok(RecordEncoder {
columns,
prefix: self.fingerprint.map(|fp| fp.make_prefix()),
})
}
}

Expand All @@ -569,6 +580,8 @@ impl<'a> RecordEncoderBuilder<'a> {
#[derive(Debug, Clone)]
pub struct RecordEncoder {
columns: Vec<FieldBinding>,
/// Optional pre-built, variable-length prefix written before each record.
prefix: Option<Prefix>,
}

impl RecordEncoder {
Expand Down Expand Up @@ -602,9 +615,23 @@ impl RecordEncoder {
/// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes.
pub fn encode<W: Write>(&self, out: &mut W, batch: &RecordBatch) -> Result<(), ArrowError> {
let mut column_encoders = self.prepare_for_batch(batch)?;
for row in 0..batch.num_rows() {
for encoder in column_encoders.iter_mut() {
encoder.encode(out, row)?;
let n = batch.num_rows();
match self.prefix {
Some(prefix) => {
for row in 0..n {
out.write_all(prefix.as_slice())
.map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?;
for enc in column_encoders.iter_mut() {
enc.encode(out, row)?;
}
}
}
None => {
for row in 0..n {
for enc in column_encoders.iter_mut() {
enc.encode(out, row)?;
}
}
}
}
Ok(())
Expand Down
Loading
Loading