diff --git a/arrow-avro/benches/decoder.rs b/arrow-avro/benches/decoder.rs index 0ca240d12fc9..5ab0f847efcc 100644 --- a/arrow-avro/benches/decoder.rs +++ b/arrow-avro/benches/decoder.rs @@ -418,7 +418,9 @@ macro_rules! dataset { let schema = ApacheSchema::parse_str($schema_json).expect("invalid schema for generator"); let arrow_schema = AvroSchema::new($schema_json.parse().unwrap()); - let fingerprint = arrow_schema.fingerprint().expect("fingerprint failed"); + let fingerprint = arrow_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .expect("fingerprint failed"); let prefix = make_prefix(fingerprint); SIZES .iter() diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index 1df012f2926c..42c6d8a6c305 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -34,6 +34,10 @@ pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01]; /// The Confluent "magic" byte (`0x00`) pub const CONFLUENT_MAGIC: [u8; 1] = [0x00]; +/// The maximum possible length of a prefix. +/// SHA256 (32) + single-object magic (2) +pub const MAX_PREFIX_LEN: usize = 34; + /// The metadata key used for storing the JSON encoded [`Schema`] pub const SCHEMA_METADATA_KEY: &str = "avro.schema"; @@ -349,9 +353,9 @@ impl AvroSchema { .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}"))) } - /// Returns the Rabin fingerprint of the schema. - pub fn fingerprint(&self) -> Result { - Self::generate_fingerprint_rabin(&self.schema()?) + /// Returns the fingerprint of the schema. + pub fn fingerprint(&self, hash_type: FingerprintAlgorithm) -> Result { + Self::generate_fingerprint(&self.schema()?, hash_type) } /// Generates a fingerprint for the given `Schema` using the specified [`FingerprintAlgorithm`]. @@ -476,6 +480,68 @@ impl AvroSchema { } } +/// A stack-allocated, fixed-size buffer for the prefix. +#[derive(Debug, Copy, Clone)] +pub struct Prefix { + buf: [u8; MAX_PREFIX_LEN], + len: u8, +} + +impl Prefix { + #[inline] + pub(crate) fn as_slice(&self) -> &[u8] { + &self.buf[..self.len as usize] + } +} + +/// Defines the strategy for generating the per-record prefix for an Avro binary stream. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum FingerprintStrategy { + /// Use the 64-bit Rabin fingerprint (default for single-object encoding). + #[default] + Rabin, + /// Use a Confluent Schema Registry 32-bit ID. + Id(u32), + #[cfg(feature = "md5")] + /// Use the 128-bit MD5 fingerprint. + MD5, + #[cfg(feature = "sha256")] + /// Use the 256-bit SHA-256 fingerprint. + SHA256, +} + +impl From for FingerprintStrategy { + fn from(f: Fingerprint) -> Self { + Self::from(&f) + } +} + +impl From for FingerprintStrategy { + fn from(f: FingerprintAlgorithm) -> Self { + match f { + FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin, + FingerprintAlgorithm::None => FingerprintStrategy::Id(0), + #[cfg(feature = "md5")] + FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5, + #[cfg(feature = "sha256")] + FingerprintAlgorithm::SHA256 => FingerprintStrategy::SHA256, + } + } +} + +impl From<&Fingerprint> for FingerprintStrategy { + fn from(f: &Fingerprint) -> Self { + match f { + Fingerprint::Rabin(_) => FingerprintStrategy::Rabin, + Fingerprint::Id(id) => FingerprintStrategy::Id(*id), + #[cfg(feature = "md5")] + Fingerprint::MD5(_) => FingerprintStrategy::MD5, + #[cfg(feature = "sha256")] + Fingerprint::SHA256(_) => FingerprintStrategy::SHA256, + } + } +} + /// Supported fingerprint algorithms for Avro schema identification. /// For use with Confluent Schema Registry IDs, set to None. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] @@ -507,6 +573,25 @@ impl From<&Fingerprint> for FingerprintAlgorithm { } } +impl From for FingerprintAlgorithm { + fn from(s: FingerprintStrategy) -> Self { + Self::from(&s) + } +} + +impl From<&FingerprintStrategy> for FingerprintAlgorithm { + fn from(s: &FingerprintStrategy) -> Self { + match s { + FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin, + FingerprintStrategy::Id(_) => FingerprintAlgorithm::None, + #[cfg(feature = "md5")] + FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5, + #[cfg(feature = "sha256")] + FingerprintStrategy::SHA256 => FingerprintAlgorithm::SHA256, + } + } +} + /// A schema fingerprint in one of the supported formats. /// /// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` @@ -529,6 +614,38 @@ pub enum Fingerprint { SHA256([u8; 32]), } +impl From for Fingerprint { + fn from(s: FingerprintStrategy) -> Self { + Self::from(&s) + } +} + +impl From<&FingerprintStrategy> for Fingerprint { + fn from(s: &FingerprintStrategy) -> Self { + match s { + FingerprintStrategy::Rabin => Fingerprint::Rabin(0), + FingerprintStrategy::Id(id) => Fingerprint::Id(*id), + #[cfg(feature = "md5")] + FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]), + #[cfg(feature = "sha256")] + FingerprintStrategy::SHA256 => Fingerprint::SHA256([0; 32]), + } + } +} + +impl From for Fingerprint { + fn from(s: FingerprintAlgorithm) -> Self { + match s { + FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0), + FingerprintAlgorithm::None => Fingerprint::Id(0), + #[cfg(feature = "md5")] + FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]), + #[cfg(feature = "sha256")] + FingerprintAlgorithm::SHA256 => Fingerprint::SHA256([0; 32]), + } + } +} + impl Fingerprint { /// Loads the 32-bit Schema Registry fingerprint (Confluent Schema Registry ID). /// @@ -540,6 +657,53 @@ impl Fingerprint { pub fn load_fingerprint_id(id: u32) -> Self { Fingerprint::Id(u32::from_be(id)) } + + /// Constructs a serialized prefix represented as a `Vec` based on the variant of the enum. + /// + /// This method serializes data in different formats depending on the variant of `self`: + /// - **`Id(id)`**: Uses the Confluent wire format, which includes a predefined magic header (`CONFLUENT_MAGIC`) + /// followed by the big-endian byte representation of the `id`. + /// - **`Rabin(val)`**: Uses the Avro single-object specification format. This includes a different magic header + /// (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte representation of the `val`. + /// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard extension that adds the + /// `SINGLE_OBJECT_MAGIC` header followed by the provided `bytes`. + /// - **`SHA256(bytes)`** (optional, `sha256` feature enabled): Similar to the `MD5` variant, this is + /// a non-standard extension that attaches the `SINGLE_OBJECT_MAGIC` header followed by the given `bytes`. + /// + /// # Returns + /// + /// A `Prefix` containing the serialized prefix data. + /// + /// # Features + /// + /// - You can optionally enable the `md5` feature to include the `MD5` variant. + /// - You can optionally enable the `sha256` feature to include the `SHA256` variant. + /// + pub fn make_prefix(&self) -> Prefix { + let mut buf = [0u8; MAX_PREFIX_LEN]; + let len = match self { + Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC, &val.to_be_bytes()), + Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, &val.to_le_bytes()), + #[cfg(feature = "md5")] + Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val), + #[cfg(feature = "sha256")] + Self::SHA256(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC, val), + }; + Prefix { buf, len } + } +} + +fn write_prefix( + buf: &mut [u8; MAX_PREFIX_LEN], + magic: &[u8; MAGIC_LEN], + payload: &[u8; PAYLOAD_LEN], +) -> u8 { + debug_assert!(MAGIC_LEN + PAYLOAD_LEN <= MAX_PREFIX_LEN); + let total = MAGIC_LEN + PAYLOAD_LEN; + let prefix_slice = &mut buf[..total]; + prefix_slice[..MAGIC_LEN].copy_from_slice(magic); + prefix_slice[MAGIC_LEN..total].copy_from_slice(payload); + total as u8 } /// An in-memory cache of Avro schemas, indexed by their fingerprint. @@ -1744,17 +1908,25 @@ mod tests { let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); let mut schemas: HashMap = HashMap::new(); schemas.insert( - int_avro_schema.fingerprint().unwrap(), + int_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(), int_avro_schema.clone(), ); schemas.insert( - record_avro_schema.fingerprint().unwrap(), + record_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(), record_avro_schema.clone(), ); let store = SchemaStore::try_from(schemas).unwrap(); - let int_fp = int_avro_schema.fingerprint().unwrap(); + let int_fp = int_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(); assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema)); - let rec_fp = record_avro_schema.fingerprint().unwrap(); + let rec_fp = record_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(); assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema)); } @@ -1764,21 +1936,29 @@ mod tests { let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); let mut schemas: HashMap = HashMap::new(); schemas.insert( - int_avro_schema.fingerprint().unwrap(), + int_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(), int_avro_schema.clone(), ); schemas.insert( - record_avro_schema.fingerprint().unwrap(), + record_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(), record_avro_schema.clone(), ); // Insert duplicate of int schema schemas.insert( - int_avro_schema.fingerprint().unwrap(), + int_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(), int_avro_schema.clone(), ); let store = SchemaStore::try_from(schemas).unwrap(); assert_eq!(store.schemas.len(), 2); - let int_fp = int_avro_schema.fingerprint().unwrap(); + let int_fp = int_avro_schema + .fingerprint(FingerprintAlgorithm::Rabin) + .unwrap(); assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema)); } @@ -1838,7 +2018,7 @@ mod tests { fn test_set_and_lookup_with_provided_fingerprint() { let mut store = SchemaStore::new(); let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); - let fp = schema.fingerprint().unwrap(); + let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap(); let out_fp = store.set(fp, schema.clone()).unwrap(); assert_eq!(out_fp, fp); assert_eq!(store.lookup(&fp).cloned(), Some(schema)); @@ -1848,7 +2028,7 @@ mod tests { fn test_set_duplicate_same_schema_ok() { let mut store = SchemaStore::new(); let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); - let fp = schema.fingerprint().unwrap(); + let fp = schema.fingerprint(FingerprintAlgorithm::Rabin).unwrap(); let _ = store.set(fp, schema.clone()).unwrap(); let _ = store.set(fp, schema.clone()).unwrap(); assert_eq!(store.schemas.len(), 1); diff --git a/arrow-avro/src/writer/encoder.rs b/arrow-avro/src/writer/encoder.rs index fd619249617e..518179530f3d 100644 --- a/arrow-avro/src/writer/encoder.rs +++ b/arrow-avro/src/writer/encoder.rs @@ -18,7 +18,7 @@ //! Avro Encoder for Arrow types. use crate::codec::{AvroDataType, AvroField, Codec}; -use crate::schema::Nullability; +use crate::schema::{Fingerprint, Nullability, Prefix}; use arrow_array::cast::AsArray; use arrow_array::types::{ ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, IntervalDayTimeType, @@ -33,6 +33,7 @@ use arrow_array::{ use arrow_array::{Decimal32Array, Decimal64Array}; use arrow_buffer::NullBuffer; use arrow_schema::{ArrowError, DataType, Field, IntervalUnit, Schema as ArrowSchema, TimeUnit}; +use serde::Serialize; use std::io::Write; use std::sync::Arc; use uuid::Uuid; @@ -522,6 +523,7 @@ struct FieldBinding { pub struct RecordEncoderBuilder<'a> { avro_root: &'a AvroField, arrow_schema: &'a ArrowSchema, + fingerprint: Option, } impl<'a> RecordEncoderBuilder<'a> { @@ -530,9 +532,15 @@ impl<'a> RecordEncoderBuilder<'a> { Self { avro_root, arrow_schema, + fingerprint: None, } } + pub(crate) fn with_fingerprint(mut self, fingerprint: Option) -> Self { + self.fingerprint = fingerprint; + self + } + /// Build the `RecordEncoder` by walking the Avro **record** root in Avro order, /// resolving each field to an Arrow index by name. pub fn build(self) -> Result { @@ -557,7 +565,10 @@ impl<'a> RecordEncoderBuilder<'a> { )?, }); } - Ok(RecordEncoder { columns }) + Ok(RecordEncoder { + columns, + prefix: self.fingerprint.map(|fp| fp.make_prefix()), + }) } } @@ -569,6 +580,8 @@ impl<'a> RecordEncoderBuilder<'a> { #[derive(Debug, Clone)] pub struct RecordEncoder { columns: Vec, + /// Optional pre-built, variable-length prefix written before each record. + prefix: Option, } impl RecordEncoder { @@ -602,9 +615,23 @@ impl RecordEncoder { /// Tip: Wrap `out` in a `std::io::BufWriter` to reduce the overhead of many small writes. pub fn encode(&self, out: &mut W, batch: &RecordBatch) -> Result<(), ArrowError> { let mut column_encoders = self.prepare_for_batch(batch)?; - for row in 0..batch.num_rows() { - for encoder in column_encoders.iter_mut() { - encoder.encode(out, row)?; + let n = batch.num_rows(); + match self.prefix { + Some(prefix) => { + for row in 0..n { + out.write_all(prefix.as_slice()) + .map_err(|e| ArrowError::IoError(format!("write prefix: {e}"), e))?; + for enc in column_encoders.iter_mut() { + enc.encode(out, row)?; + } + } + } + None => { + for row in 0..n { + for enc in column_encoders.iter_mut() { + enc.encode(out, row)?; + } + } } } Ok(()) diff --git a/arrow-avro/src/writer/format.rs b/arrow-avro/src/writer/format.rs index 6fac9e8286a2..a6ddba38d24b 100644 --- a/arrow-avro/src/writer/format.rs +++ b/arrow-avro/src/writer/format.rs @@ -16,7 +16,10 @@ // under the License. use crate::compression::{CompressionCodec, CODEC_METADATA_KEY}; -use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY}; +use crate::schema::{ + AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, CONFLUENT_MAGIC, + SCHEMA_METADATA_KEY, SINGLE_OBJECT_MAGIC, +}; use crate::writer::encoder::write_long; use arrow_schema::{ArrowError, Schema}; use rand::RngCore; @@ -25,7 +28,13 @@ use std::io::Write; /// Format abstraction implemented by each container‐level writer. pub trait AvroFormat: Debug + Default { + /// If `true`, the writer for this format will query `single_object_prefix()` + /// and write the prefix before each record. If `false`, the writer can + /// skip this step. This is a performance hint for the writer. + const NEEDS_PREFIX: bool; + /// Write any bytes required at the very beginning of the output stream + /// (file header, etc.). /// Implementations **must not** write any record data. fn start_stream( &mut self, @@ -45,6 +54,7 @@ pub struct AvroOcfFormat { } impl AvroFormat for AvroOcfFormat { + const NEEDS_PREFIX: bool = false; fn start_stream( &mut self, writer: &mut W, @@ -53,10 +63,15 @@ impl AvroFormat for AvroOcfFormat { ) -> Result<(), ArrowError> { let mut rng = rand::rng(); rng.fill_bytes(&mut self.sync_marker); + // Choose the Avro schema JSON that the file will advertise. + // If `schema.metadata[SCHEMA_METADATA_KEY]` exists, AvroSchema::try_from + // uses it verbatim; otherwise it is generated from the Arrow schema. let avro_schema = AvroSchema::try_from(schema)?; + // Magic writer .write_all(b"Obj\x01") .map_err(|e| ArrowError::IoError(format!("write OCF magic: {e}"), e))?; + // File metadata map: { "avro.schema": , "avro.codec": } let codec_str = match compression { Some(CompressionCodec::Deflate) => "deflate", Some(CompressionCodec::Snappy) => "snappy", @@ -65,6 +80,7 @@ impl AvroFormat for AvroOcfFormat { Some(CompressionCodec::Xz) => "xz", None => "null", }; + // Map block: count=2, then key/value pairs, then terminating count=0 write_long(writer, 2)?; write_string(writer, SCHEMA_METADATA_KEY)?; write_bytes(writer, avro_schema.json_string.as_bytes())?; @@ -75,7 +91,6 @@ impl AvroFormat for AvroOcfFormat { writer .write_all(&self.sync_marker) .map_err(|e| ArrowError::IoError(format!("write OCF sync marker: {e}"), e))?; - Ok(()) } @@ -84,20 +99,31 @@ impl AvroFormat for AvroOcfFormat { } } -/// Raw Avro binary streaming format (no header or footer). +/// Raw Avro binary streaming format using **Single-Object Encoding** per record. +/// +/// Each record written by the stream writer is framed with a prefix determined +/// by the schema fingerprinting algorithm. +/// +/// See: +/// See: #[derive(Debug, Default)] -pub struct AvroBinaryFormat; +pub struct AvroBinaryFormat {} impl AvroFormat for AvroBinaryFormat { + const NEEDS_PREFIX: bool = true; fn start_stream( &mut self, _writer: &mut W, _schema: &Schema, - _compression: Option, + compression: Option, ) -> Result<(), ArrowError> { - Err(ArrowError::NotYetImplemented( - "avro binary format not yet implemented".to_string(), - )) + if compression.is_some() { + return Err(ArrowError::InvalidArgumentError( + "Compression not supported for Avro binary streaming".to_string(), + )); + } + + Ok(()) } fn sync_marker(&self) -> Option<&[u8; 16]> { diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs index f5e84eeb50bb..7a7b0d283750 100644 --- a/arrow-avro/src/writer/mod.rs +++ b/arrow-avro/src/writer/mod.rs @@ -34,7 +34,9 @@ pub mod format; use crate::codec::AvroFieldBuilder; use crate::compression::CompressionCodec; -use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY}; +use crate::schema::{ + AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY, +}; use crate::writer::encoder::{write_long, RecordEncoder, RecordEncoderBuilder}; use crate::writer::format::{AvroBinaryFormat, AvroFormat, AvroOcfFormat}; use arrow_array::RecordBatch; @@ -48,6 +50,7 @@ pub struct WriterBuilder { schema: Schema, codec: Option, capacity: usize, + fingerprint_strategy: Option, } impl WriterBuilder { @@ -57,9 +60,17 @@ impl WriterBuilder { schema, codec: None, capacity: 1024, + fingerprint_strategy: None, } } + /// Set the fingerprinting strategy for the stream writer. + /// This determines the per-record prefix format. + pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self { + self.fingerprint_strategy = Some(strategy); + self + } + /// Change the compression codec. pub fn with_compression(mut self, codec: Option) -> Self { self.codec = codec; @@ -84,6 +95,22 @@ impl WriterBuilder { Some(json) => AvroSchema::new(json.clone()), None => AvroSchema::try_from(&self.schema)?, }; + + let maybe_fingerprint = if F::NEEDS_PREFIX { + match self.fingerprint_strategy { + Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)), + Some(strategy) => { + Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?) + } + None => Some( + avro_schema + .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?, + ), + } + } else { + None + }; + let mut md = self.schema.metadata().clone(); md.insert( SCHEMA_METADATA_KEY.to_string(), @@ -92,7 +119,9 @@ impl WriterBuilder { let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md)); format.start_stream(&mut writer, &schema, self.codec)?; let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?; - let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref()).build()?; + let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref()) + .with_fingerprint(maybe_fingerprint) + .build()?; Ok(Writer { writer, schema, @@ -194,7 +223,8 @@ impl Writer { } fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> { - self.encoder.encode(&mut self.writer, batch) + self.encoder.encode(&mut self.writer, batch)?; + Ok(()) } } @@ -203,9 +233,9 @@ mod tests { use super::*; use crate::compression::CompressionCodec; use crate::reader::ReaderBuilder; - use crate::schema::{AvroSchema, SchemaStore}; + use crate::schema::{AvroSchema, SchemaStore, CONFLUENT_MAGIC}; use crate::test_util::arrow_test_data; - use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch}; + use arrow_array::{ArrayRef, BinaryArray, Int32Array, Int64Array, RecordBatch}; use arrow_schema::{DataType, Field, IntervalUnit, Schema}; use std::fs::File; use std::io::{BufReader, Cursor}; @@ -230,6 +260,73 @@ mod tests { .expect("failed to build test RecordBatch") } + #[test] + fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef], + )?; + let buf: Vec = Vec::new(); + let mut writer = AvroStreamWriter::new(buf, schema.clone())?; + writer.write(&batch)?; + let encoded = writer.into_inner(); + let mut store = SchemaStore::new(); // Rabin by default + let avro_schema = AvroSchema::try_from(&schema)?; + let _fp = store.register(avro_schema)?; + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .build_decoder()?; + let _consumed = decoder.decode(&encoded)?; + let decoded = decoder + .flush()? + .expect("expected at least one batch from decoder"); + assert_eq!(decoded.num_columns(), 1); + assert_eq!(decoded.num_rows(), 2); + let col = decoded + .column(0) + .as_any() + .downcast_ref::() + .expect("int column"); + assert_eq!(col, &Int32Array::from(vec![10, 20])); + Ok(()) + } + + #[test] + fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> { + let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]); + let batch = RecordBatch::try_new( + Arc::new(schema.clone()), + vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef], + )?; + let schema_id: u32 = 42; + let mut writer = WriterBuilder::new(schema.clone()) + .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id)) + .build::<_, AvroBinaryFormat>(Vec::new())?; + writer.write(&batch)?; + let encoded = writer.into_inner(); + let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None); + let avro_schema = AvroSchema::try_from(&schema)?; + let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?; + let mut decoder = ReaderBuilder::new() + .with_writer_schema_store(store) + .build_decoder()?; + let _ = decoder.decode(&encoded)?; + let decoded = decoder + .flush()? + .expect("expected at least one batch from decoder"); + assert_eq!(decoded.num_columns(), 1); + assert_eq!(decoded.num_rows(), 3); + let col = decoded + .column(0) + .as_any() + .downcast_ref::() + .expect("int column"); + assert_eq!(col, &Int32Array::from(vec![1, 2, 3])); + Ok(()) + } + #[test] fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> { let batch = make_batch(); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 04c01f18e1d8..1e4fefbc28eb 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -187,7 +187,7 @@ pub type SchemaRef = Arc; pub struct Schema { /// A sequence of fields that describe the schema. pub fields: Fields, - /// A map of key-value pairs containing additional meta data. + /// A map of key-value pairs containing additional metadata. pub metadata: HashMap, }