diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index 19e86539558f..30c23e1932ae 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -42,6 +42,7 @@ snappy = ["snap", "crc"] canonical_extension_types = ["arrow-schema/canonical_extension_types"] md5 = ["dep:md5"] sha256 = ["dep:sha2"] +small_decimals = [] [dependencies] arrow-schema = { workspace = true } diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index d19e9b8cccd7..503fae3d6a75 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -21,10 +21,10 @@ use crate::schema::{ }; use arrow_schema::{ ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, - DECIMAL128_MAX_SCALE, + DECIMAL256_MAX_PRECISION, }; -use serde_json::Value; -use std::borrow::Cow; +#[cfg(feature = "small_decimals")] +use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; use std::collections::HashMap; use std::sync::Arc; @@ -401,7 +401,7 @@ pub enum Codec { /// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type /// The i32 parameter indicates the fixed binary size Fixed(i32), - /// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256 data types + /// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types /// /// The fields are `(precision, scale, fixed_size)`. /// - `precision` (`usize`): Total number of digits. @@ -447,20 +447,28 @@ impl Codec { } Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano), Self::Fixed(size) => DataType::FixedSizeBinary(*size), - Self::Decimal(precision, scale, size) => { + Self::Decimal(precision, scale, _size) => { let p = *precision as u8; let s = scale.unwrap_or(0) as i8; - let too_large_for_128 = match *size { - Some(sz) => sz > 16, - None => { - (p as usize) > DECIMAL128_MAX_PRECISION as usize - || (s as usize) > DECIMAL128_MAX_SCALE as usize + #[cfg(feature = "small_decimals")] + { + if *precision <= DECIMAL32_MAX_PRECISION as usize { + DataType::Decimal32(p, s) + } else if *precision <= DECIMAL64_MAX_PRECISION as usize { + DataType::Decimal64(p, s) + } else if *precision <= DECIMAL128_MAX_PRECISION as usize { + DataType::Decimal128(p, s) + } else { + DataType::Decimal256(p, s) + } + } + #[cfg(not(feature = "small_decimals"))] + { + if *precision <= DECIMAL128_MAX_PRECISION as usize { + DataType::Decimal128(p, s) + } else { + DataType::Decimal256(p, s) } - }; - if too_large_for_128 { - DataType::Decimal256(p, s) - } else { - DataType::Decimal128(p, s) } } Self::Uuid => DataType::FixedSizeBinary(16), @@ -506,6 +514,29 @@ impl From for Codec { } } +/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro +/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian). +/// +/// Per Avro spec (Decimal logical type), for a fixed length `n`: +/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋. +/// +/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports +/// Decimal256, which is 32 bytes and has max precision 76). +const fn max_precision_for_fixed_bytes(n: usize) -> Option { + // Precomputed exact table for n = 1..=32 + // 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28, + // 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52, + // 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76 + const MAX_P: [usize; 32] = [ + 2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57, + 59, 62, 64, 67, 69, 71, 74, 76, + ]; + match n { + 1..=32 => Some(MAX_P[n - 1]), + _ => None, + } +} + fn parse_decimal_attributes( attributes: &Attributes, fallback_size: Option, @@ -529,6 +560,34 @@ fn parse_decimal_attributes( .and_then(|v| v.as_u64()) .map(|s| s as usize) .or(fallback_size); + if precision == 0 { + return Err(ArrowError::ParseError( + "Decimal requires precision > 0".to_string(), + )); + } + if scale > precision { + return Err(ArrowError::ParseError(format!( + "Decimal has invalid scale > precision: scale={scale}, precision={precision}" + ))); + } + if precision > DECIMAL256_MAX_PRECISION as usize { + return Err(ArrowError::ParseError(format!( + "Decimal precision {precision} exceeds maximum supported by Arrow ({})", + DECIMAL256_MAX_PRECISION + ))); + } + if let Some(sz) = size { + let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| { + ArrowError::ParseError(format!( + "Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes" + )) + })?; + if precision > max_p { + return Err(ArrowError::ParseError(format!( + "Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})" + ))); + } + } Ok((precision, scale, size)) } @@ -747,7 +806,7 @@ impl<'a> Maker<'a> { Ok(field) } ComplexType::Array(a) => { - let mut field = self.parse_type(a.items.as_ref(), namespace)?; + let field = self.parse_type(a.items.as_ref(), namespace)?; Ok(AvroDataType { nullability: None, metadata: a.attributes.field_metadata(), diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs index d1910790e56d..13e0f07b4544 100644 --- a/arrow-avro/src/reader/mod.rs +++ b/arrow-avro/src/reader/mod.rs @@ -697,7 +697,7 @@ mod test { }; use arrow_array::types::{Int32Type, IntervalMonthDayNanoType}; use arrow_array::*; - use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; + use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema}; use bytes::{Buf, BufMut, Bytes}; use futures::executor::block_on; @@ -2176,37 +2176,137 @@ mod test { #[test] fn test_decimal() { - let files = [ - ("avro/fixed_length_decimal.avro", 25, 2), - ("avro/fixed_length_decimal_legacy.avro", 13, 2), - ("avro/int32_decimal.avro", 4, 2), - ("avro/int64_decimal.avro", 10, 2), + // Choose expected Arrow types depending on the `small_decimals` feature flag. + // With `small_decimals` enabled, Decimal32/Decimal64 are used where their + // precision allows; otherwise, those cases resolve to Decimal128. + #[cfg(feature = "small_decimals")] + let files: [(&str, DataType); 8] = [ + ( + "avro/fixed_length_decimal.avro", + DataType::Decimal128(25, 2), + ), + ( + "avro/fixed_length_decimal_legacy.avro", + DataType::Decimal64(13, 2), + ), + ("avro/int32_decimal.avro", DataType::Decimal32(4, 2)), + ("avro/int64_decimal.avro", DataType::Decimal64(10, 2)), + ( + "test/data/int256_decimal.avro", + DataType::Decimal256(76, 10), + ), + ( + "test/data/fixed256_decimal.avro", + DataType::Decimal256(76, 10), + ), + ( + "test/data/fixed_length_decimal_legacy_32.avro", + DataType::Decimal32(9, 2), + ), + ("test/data/int128_decimal.avro", DataType::Decimal128(38, 2)), + ]; + #[cfg(not(feature = "small_decimals"))] + let files: [(&str, DataType); 8] = [ + ( + "avro/fixed_length_decimal.avro", + DataType::Decimal128(25, 2), + ), + ( + "avro/fixed_length_decimal_legacy.avro", + DataType::Decimal128(13, 2), + ), + ("avro/int32_decimal.avro", DataType::Decimal128(4, 2)), + ("avro/int64_decimal.avro", DataType::Decimal128(10, 2)), + ( + "test/data/int256_decimal.avro", + DataType::Decimal256(76, 10), + ), + ( + "test/data/fixed256_decimal.avro", + DataType::Decimal256(76, 10), + ), + ( + "test/data/fixed_length_decimal_legacy_32.avro", + DataType::Decimal128(9, 2), + ), + ("test/data/int128_decimal.avro", DataType::Decimal128(38, 2)), ]; - let decimal_values: Vec = (1..=24).map(|n| n as i128 * 100).collect(); - for (file, precision, scale) in files { - let file_path = arrow_test_data(file); + for (file, expected_dt) in files { + let (precision, scale) = match expected_dt { + DataType::Decimal32(p, s) + | DataType::Decimal64(p, s) + | DataType::Decimal128(p, s) + | DataType::Decimal256(p, s) => (p, s), + _ => unreachable!("Unexpected decimal type in test inputs"), + }; + assert!(scale >= 0, "test data uses non-negative scales only"); + let scale_u32 = scale as u32; + let file_path: String = if file.starts_with("avro/") { + arrow_test_data(file) + } else { + std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join(file) + .to_string_lossy() + .into_owned() + }; + let pow10: i128 = 10i128.pow(scale_u32); + let values_i128: Vec = (1..=24).map(|n| (n as i128) * pow10).collect(); + let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef { + match *dt { + DataType::Decimal32(p, s) => { + let it = values.iter().map(|&v| v as i32); + Arc::new( + Decimal32Array::from_iter_values(it) + .with_precision_and_scale(p, s) + .unwrap(), + ) + } + DataType::Decimal64(p, s) => { + let it = values.iter().map(|&v| v as i64); + Arc::new( + Decimal64Array::from_iter_values(it) + .with_precision_and_scale(p, s) + .unwrap(), + ) + } + DataType::Decimal128(p, s) => { + let it = values.iter().copied(); + Arc::new( + Decimal128Array::from_iter_values(it) + .with_precision_and_scale(p, s) + .unwrap(), + ) + } + DataType::Decimal256(p, s) => { + let it = values.iter().map(|&v| i256::from_i128(v)); + Arc::new( + Decimal256Array::from_iter_values(it) + .with_precision_and_scale(p, s) + .unwrap(), + ) + } + _ => unreachable!("Unexpected decimal type in test"), + } + }; let actual_batch = read_file(&file_path, 8, false); - let expected_array = Decimal128Array::from_iter_values(decimal_values.clone()) - .with_precision_and_scale(precision, scale) - .unwrap(); + let actual_nullable = actual_batch.schema().field(0).is_nullable(); + let expected_array = build_expected(&expected_dt, &values_i128); let mut meta = HashMap::new(); meta.insert("precision".to_string(), precision.to_string()); meta.insert("scale".to_string(), scale.to_string()); - let field_with_meta = Field::new("value", DataType::Decimal128(precision, scale), true) - .with_metadata(meta); - let expected_schema = Arc::new(Schema::new(vec![field_with_meta])); + let field = + Field::new("value", expected_dt.clone(), actual_nullable).with_metadata(meta); + let expected_schema = Arc::new(Schema::new(vec![field])); let expected_batch = - RecordBatch::try_new(expected_schema.clone(), vec![Arc::new(expected_array)]) - .expect("Failed to build expected RecordBatch"); + RecordBatch::try_new(expected_schema.clone(), vec![expected_array]).unwrap(); assert_eq!( actual_batch, expected_batch, - "Decoded RecordBatch does not match the expected Decimal128 data for file {file}" + "Decoded RecordBatch does not match for {file}" ); let actual_batch_small = read_file(&file_path, 3, false); assert_eq!( - actual_batch_small, - expected_batch, - "Decoded RecordBatch does not match the expected Decimal128 data for file {file} with batch size 3" + actual_batch_small, expected_batch, + "Decoded RecordBatch does not match for {file} with batch size 3" ); } } diff --git a/arrow-avro/src/reader/record.rs b/arrow-avro/src/reader/record.rs index 6e5756ef41ff..b6a74674ca9c 100644 --- a/arrow-avro/src/reader/record.rs +++ b/arrow-avro/src/reader/record.rs @@ -21,8 +21,8 @@ use crate::reader::cursor::AvroCursor; use crate::reader::header::Header; use crate::schema::*; use arrow_array::builder::{ - ArrayBuilder, Decimal128Builder, Decimal256Builder, IntervalMonthDayNanoBuilder, - PrimitiveBuilder, + ArrayBuilder, Decimal128Builder, Decimal256Builder, Decimal32Builder, Decimal64Builder, + IntervalMonthDayNanoBuilder, PrimitiveBuilder, }; use arrow_array::types::*; use arrow_array::*; @@ -31,6 +31,8 @@ use arrow_schema::{ ArrowError, DataType, Field as ArrowField, FieldRef, Fields, IntervalUnit, Schema as ArrowSchema, SchemaRef, DECIMAL128_MAX_PRECISION, DECIMAL256_MAX_PRECISION, }; +#[cfg(feature = "small_decimals")] +use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; use std::cmp::Ordering; use std::collections::HashMap; use std::io::Read; @@ -39,6 +41,25 @@ use uuid::Uuid; const DEFAULT_CAPACITY: usize = 1024; +/// Macro to decode a decimal payload for a given width and integer type. +macro_rules! decode_decimal { + ($size:expr, $buf:expr, $builder:expr, $N:expr, $Int:ty) => {{ + let bytes = read_decimal_bytes_be::<{ $N }>($buf, $size)?; + $builder.append_value(<$Int>::from_be_bytes(bytes)); + }}; +} + +/// Macro to finish a decimal builder into an array with precision/scale and nulls. +macro_rules! flush_decimal { + ($builder:expr, $precision:expr, $scale:expr, $nulls:expr, $ArrayTy:ty) => {{ + let (_, vals, _) = $builder.finish().into_parts(); + let dec = <$ArrayTy>::new(vals, $nulls) + .with_precision_and_scale(*$precision as u8, $scale.unwrap_or(0) as i8) + .map_err(|e| ArrowError::ParseError(e.to_string()))?; + Arc::new(dec) as ArrayRef + }}; +} + #[derive(Debug)] pub(crate) struct RecordDecoderBuilder<'a> { data_type: &'a AvroDataType, @@ -101,8 +122,6 @@ impl RecordDecoder { /// # Arguments /// * `data_type` - The Avro data type to decode. /// * `use_utf8view` - A flag indicating whether to use `Utf8View` for string types. - /// * `strict_mode` - A flag to enable strict decoding, returning an error if the data - /// does not conform to the schema. /// /// # Errors /// This function will return an error if the provided `data_type` is not a `Record`. @@ -245,6 +264,8 @@ enum Decoder { Enum(Vec, Arc<[String]>), Duration(IntervalMonthDayNanoBuilder), Uuid(Vec), + Decimal32(usize, Option, Option, Decimal32Builder), + Decimal64(usize, Option, Option, Decimal64Builder), Decimal128(usize, Option, Option, Decimal128Builder), Decimal256(usize, Option, Option, Decimal256Builder), Nullable(Nullability, NullBufferBuilder, Box), @@ -329,36 +350,43 @@ impl Decoder { (Codec::Decimal(precision, scale, size), _) => { let p = *precision; let s = *scale; - let sz = *size; let prec = p as u8; let scl = s.unwrap_or(0) as i8; - match (sz, p) { - (Some(fixed_size), _) if fixed_size <= 16 => { - let builder = - Decimal128Builder::new().with_precision_and_scale(prec, scl)?; - Self::Decimal128(p, s, sz, builder) - } - (Some(fixed_size), _) if fixed_size <= 32 => { - let builder = - Decimal256Builder::new().with_precision_and_scale(prec, scl)?; - Self::Decimal256(p, s, sz, builder) - } - (Some(fixed_size), _) => { + #[cfg(feature = "small_decimals")] + { + if p <= DECIMAL32_MAX_PRECISION as usize { + let builder = Decimal32Builder::with_capacity(DEFAULT_CAPACITY) + .with_precision_and_scale(prec, scl)?; + Self::Decimal32(p, s, *size, builder) + } else if p <= DECIMAL64_MAX_PRECISION as usize { + let builder = Decimal64Builder::with_capacity(DEFAULT_CAPACITY) + .with_precision_and_scale(prec, scl)?; + Self::Decimal64(p, s, *size, builder) + } else if p <= DECIMAL128_MAX_PRECISION as usize { + let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY) + .with_precision_and_scale(prec, scl)?; + Self::Decimal128(p, s, *size, builder) + } else if p <= DECIMAL256_MAX_PRECISION as usize { + let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY) + .with_precision_and_scale(prec, scl)?; + Self::Decimal256(p, s, *size, builder) + } else { return Err(ArrowError::ParseError(format!( - "Unsupported decimal size: {fixed_size:?}" + "Decimal precision {p} exceeds maximum supported" ))); } - (None, p) if p <= DECIMAL128_MAX_PRECISION as usize => { - let builder = - Decimal128Builder::new().with_precision_and_scale(prec, scl)?; - Self::Decimal128(p, s, sz, builder) - } - (None, p) if p <= DECIMAL256_MAX_PRECISION as usize => { - let builder = - Decimal256Builder::new().with_precision_and_scale(prec, scl)?; - Self::Decimal256(p, s, sz, builder) - } - (None, _) => { + } + #[cfg(not(feature = "small_decimals"))] + { + if p <= DECIMAL128_MAX_PRECISION as usize { + let builder = Decimal128Builder::with_capacity(DEFAULT_CAPACITY) + .with_precision_and_scale(prec, scl)?; + Self::Decimal128(p, s, *size, builder) + } else if p <= DECIMAL256_MAX_PRECISION as usize { + let builder = Decimal256Builder::with_capacity(DEFAULT_CAPACITY) + .with_precision_and_scale(prec, scl)?; + Self::Decimal256(p, s, *size, builder) + } else { return Err(ArrowError::ParseError(format!( "Decimal precision {p} exceeds maximum supported" ))); @@ -473,6 +501,8 @@ impl Decoder { Self::Fixed(sz, accum) => { accum.extend(std::iter::repeat_n(0u8, *sz as usize)); } + Self::Decimal32(_, _, _, builder) => builder.append_value(0), + Self::Decimal64(_, _, _, builder) => builder.append_value(0), Self::Decimal128(_, _, _, builder) => builder.append_value(0), Self::Decimal256(_, _, _, builder) => builder.append_value(i256::ZERO), Self::Enum(indices, _) => indices.push(0), @@ -548,25 +578,17 @@ impl Decoder { let fx = buf.get_fixed(*sz as usize)?; accum.extend_from_slice(fx); } + Self::Decimal32(_, _, size, builder) => { + decode_decimal!(size, buf, builder, 4, i32); + } + Self::Decimal64(_, _, size, builder) => { + decode_decimal!(size, buf, builder, 8, i64); + } Self::Decimal128(_, _, size, builder) => { - let raw = if let Some(s) = size { - buf.get_fixed(*s)? - } else { - buf.get_bytes()? - }; - let ext = sign_extend_to::<16>(raw)?; - let val = i128::from_be_bytes(ext); - builder.append_value(val); + decode_decimal!(size, buf, builder, 16, i128); } Self::Decimal256(_, _, size, builder) => { - let raw = if let Some(s) = size { - buf.get_fixed(*s)? - } else { - buf.get_bytes()? - }; - let ext = sign_extend_to::<32>(raw)?; - let val = i256::from_be_bytes(ext); - builder.append_value(val); + decode_decimal!(size, buf, builder, 32, i256); } Self::Enum(indices, _) => { indices.push(buf.get_int()?); @@ -742,21 +764,17 @@ impl Decoder { .map_err(|e| ArrowError::ParseError(e.to_string()))?; Arc::new(arr) } + Self::Decimal32(precision, scale, _, builder) => { + flush_decimal!(builder, precision, scale, nulls, Decimal32Array) + } + Self::Decimal64(precision, scale, _, builder) => { + flush_decimal!(builder, precision, scale, nulls, Decimal64Array) + } Self::Decimal128(precision, scale, _, builder) => { - let (_, vals, _) = builder.finish().into_parts(); - let scl = scale.unwrap_or(0); - let dec = Decimal128Array::new(vals, nulls) - .with_precision_and_scale(*precision as u8, scl as i8) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; - Arc::new(dec) + flush_decimal!(builder, precision, scale, nulls, Decimal128Array) } Self::Decimal256(precision, scale, _, builder) => { - let (_, vals, _) = builder.finish().into_parts(); - let scl = scale.unwrap_or(0); - let dec = Decimal256Array::new(vals, nulls) - .with_precision_and_scale(*precision as u8, scl as i8) - .map_err(|e| ArrowError::ParseError(e.to_string()))?; - Arc::new(dec) + flush_decimal!(builder, precision, scale, nulls, Decimal256Array) } Self::Enum(indices, symbols) => flush_dict(indices, symbols, nulls)?, Self::EnumResolved { @@ -838,8 +856,6 @@ fn process_blockwise( match block_count.cmp(&0) { Ordering::Equal => break, Ordering::Less => { - // If block_count is negative, read the absolute value of count, - // then read the block size as a long and discard let count = (-block_count) as usize; // A negative count is followed by a long of the size in bytes let size_in_bytes = buf.get_long()? as usize; @@ -858,7 +874,6 @@ fn process_blockwise( total += count; } Ordering::Greater => { - // If block_count is positive, decode that many items let count = block_count as usize; for _ in 0..count { on_item(buf)?; @@ -888,29 +903,77 @@ fn flush_primitive( PrimitiveArray::new(flush_values(values).into(), nulls) } -/// Sign extends a byte slice to a fixed-size array of N bytes. -/// This is done by filling the leading bytes with 0x00 for positive numbers -/// or 0xFF for negative numbers. #[inline] -fn sign_extend_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { - if raw.len() > N { - return Err(ArrowError::ParseError(format!( - "Cannot extend a slice of length {} to {} bytes.", - raw.len(), - N - ))); - } - let mut arr = [0u8; N]; - let pad_len = N - raw.len(); - // Determine the byte to use for padding based on the sign bit of the raw data. - let extension_byte = if raw.is_empty() || (raw[0] & 0x80 == 0) { - 0x00 - } else { - 0xFF - }; - arr[..pad_len].fill(extension_byte); - arr[pad_len..].copy_from_slice(raw); - Ok(arr) +fn read_decimal_bytes_be( + buf: &mut AvroCursor<'_>, + size: &Option, +) -> Result<[u8; N], ArrowError> { + match size { + Some(n) if *n == N => { + let raw = buf.get_fixed(N)?; + let mut arr = [0u8; N]; + arr.copy_from_slice(raw); + Ok(arr) + } + Some(n) => { + let raw = buf.get_fixed(*n)?; + sign_cast_to::(raw) + } + None => { + let raw = buf.get_bytes()?; + sign_cast_to::(raw) + } + } +} + +/// Sign-extend or (when larger) validate-and-truncate a big-endian two's-complement +/// integer into exactly `N` bytes. This matches Avro's decimal binary encoding: +/// the payload is a big-endian two's-complement integer, and when narrowing it must +/// be representable without changing sign or value. +/// +/// If `raw.len() < N`, the value is sign-extended. +/// If `raw.len() > N`, all truncated leading bytes must match the sign-extension byte +/// and the MSB of the first kept byte must match the sign (to avoid silent overflow). +#[inline] +fn sign_cast_to(raw: &[u8]) -> Result<[u8; N], ArrowError> { + let len = raw.len(); + // Fast path: exact width, just copy + if len == N { + let mut out = [0u8; N]; + out.copy_from_slice(raw); + return Ok(out); + } + // Determine sign byte from MSB of first byte (empty => positive) + let first = raw.first().copied().unwrap_or(0u8); + let sign_byte = if (first & 0x80) == 0 { 0x00 } else { 0xFF }; + // Pre-fill with sign byte to support sign extension + let mut out = [sign_byte; N]; + if len > N { + // Validate truncation: all dropped leading bytes must equal sign_byte, + // and the MSB of the first kept byte must match the sign. + let extra = len - N; + // Any non-sign byte in the truncated prefix indicates overflow + if raw[..extra].iter().any(|&b| b != sign_byte) { + return Err(ArrowError::ParseError(format!( + "Decimal value with {} bytes cannot be represented in {} bytes without overflow", + len, N + ))); + } + if N > 0 { + let first_kept = raw[extra]; + let sign_bit_mismatch = ((first_kept ^ sign_byte) & 0x80) != 0; + if sign_bit_mismatch { + return Err(ArrowError::ParseError(format!( + "Decimal value with {} bytes cannot be represented in {} bytes without overflow", + len, N + ))); + } + } + out.copy_from_slice(&raw[extra..]); + return Ok(out); + } + out[N - len..].copy_from_slice(raw); + Ok(out) } /// Lightweight skipper for non‑projected writer fields @@ -1078,8 +1141,9 @@ mod tests { use super::*; use crate::codec::AvroField; use arrow_array::{ - cast::AsArray, Array, Decimal128Array, DictionaryArray, FixedSizeBinaryArray, - IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, StructArray, + cast::AsArray, Array, Decimal128Array, Decimal256Array, Decimal32Array, DictionaryArray, + FixedSizeBinaryArray, IntervalMonthDayNanoArray, ListArray, MapArray, StringArray, + StructArray, }; fn encode_avro_int(value: i32) -> Vec { @@ -1526,7 +1590,7 @@ mod tests { #[test] fn test_decimal_decoding_fixed256() { - let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32))); + let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32))); let mut decoder = Decoder::try_new(&dt).unwrap(); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -1553,7 +1617,7 @@ mod tests { #[test] fn test_decimal_decoding_fixed128() { - let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16))); + let dt = avro_from_codec(Codec::Decimal(28, Some(2), Some(16))); let mut decoder = Decoder::try_new(&dt).unwrap(); let row1 = [ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, @@ -1576,6 +1640,79 @@ mod tests { assert_eq!(dec.value_as_string(1), "-1.23"); } + #[test] + fn test_decimal_decoding_fixed32_from_32byte_fixed_storage() { + let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(32))); + let mut decoder = Decoder::try_new(&dt).unwrap(); + let row1 = [ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x30, 0x39, + ]; + let row2 = [ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF, 0x85, + ]; + let mut data = Vec::new(); + data.extend_from_slice(&row1); + data.extend_from_slice(&row2); + let mut cursor = AvroCursor::new(&data); + decoder.decode(&mut cursor).unwrap(); + decoder.decode(&mut cursor).unwrap(); + let arr = decoder.flush(None).unwrap(); + #[cfg(feature = "small_decimals")] + { + let dec = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec.len(), 2); + assert_eq!(dec.value_as_string(0), "123.45"); + assert_eq!(dec.value_as_string(1), "-1.23"); + } + #[cfg(not(feature = "small_decimals"))] + { + let dec = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec.len(), 2); + assert_eq!(dec.value_as_string(0), "123.45"); + assert_eq!(dec.value_as_string(1), "-1.23"); + } + } + + #[test] + fn test_decimal_decoding_fixed32_from_16byte_fixed_storage() { + let dt = avro_from_codec(Codec::Decimal(5, Some(2), Some(16))); + let mut decoder = Decoder::try_new(&dt).unwrap(); + let row1 = [ + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x30, 0x39, + ]; + let row2 = [ + 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0x85, + ]; + let mut data = Vec::new(); + data.extend_from_slice(&row1); + data.extend_from_slice(&row2); + let mut cursor = AvroCursor::new(&data); + decoder.decode(&mut cursor).unwrap(); + decoder.decode(&mut cursor).unwrap(); + + let arr = decoder.flush(None).unwrap(); + #[cfg(feature = "small_decimals")] + { + let dec = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec.len(), 2); + assert_eq!(dec.value_as_string(0), "123.45"); + assert_eq!(dec.value_as_string(1), "-1.23"); + } + #[cfg(not(feature = "small_decimals"))] + { + let dec = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec.len(), 2); + assert_eq!(dec.value_as_string(0), "123.45"); + assert_eq!(dec.value_as_string(1), "-1.23"); + } + } + #[test] fn test_decimal_decoding_bytes_with_nulls() { let dt = avro_from_codec(Codec::Decimal(4, Some(1), None)); @@ -1592,21 +1729,34 @@ mod tests { data.extend_from_slice(&encode_avro_int(0)); data.extend_from_slice(&encode_avro_bytes(&[0xFB, 0x2E])); let mut cursor = AvroCursor::new(&data); - decoder.decode(&mut cursor).unwrap(); // row1 - decoder.decode(&mut cursor).unwrap(); // row2 - decoder.decode(&mut cursor).unwrap(); // row3 + decoder.decode(&mut cursor).unwrap(); + decoder.decode(&mut cursor).unwrap(); + decoder.decode(&mut cursor).unwrap(); let arr = decoder.flush(None).unwrap(); - let dec_arr = arr.as_any().downcast_ref::().unwrap(); - assert_eq!(dec_arr.len(), 3); - assert!(dec_arr.is_valid(0)); - assert!(!dec_arr.is_valid(1)); - assert!(dec_arr.is_valid(2)); - assert_eq!(dec_arr.value_as_string(0), "123.4"); - assert_eq!(dec_arr.value_as_string(2), "-123.4"); + #[cfg(feature = "small_decimals")] + { + let dec_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec_arr.len(), 3); + assert!(dec_arr.is_valid(0)); + assert!(!dec_arr.is_valid(1)); + assert!(dec_arr.is_valid(2)); + assert_eq!(dec_arr.value_as_string(0), "123.4"); + assert_eq!(dec_arr.value_as_string(2), "-123.4"); + } + #[cfg(not(feature = "small_decimals"))] + { + let dec_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec_arr.len(), 3); + assert!(dec_arr.is_valid(0)); + assert!(!dec_arr.is_valid(1)); + assert!(dec_arr.is_valid(2)); + assert_eq!(dec_arr.value_as_string(0), "123.4"); + assert_eq!(dec_arr.value_as_string(2), "-123.4"); + } } #[test] - fn test_decimal_decoding_bytes_with_nulls_fixed_size() { + fn test_decimal_decoding_bytes_with_nulls_fixed_size_narrow_result() { let dt = avro_from_codec(Codec::Decimal(6, Some(2), Some(16))); let inner = Decoder::try_new(&dt).unwrap(); let mut decoder = Decoder::Nullable( @@ -1633,13 +1783,26 @@ mod tests { decoder.decode(&mut cursor).unwrap(); decoder.decode(&mut cursor).unwrap(); let arr = decoder.flush(None).unwrap(); - let dec_arr = arr.as_any().downcast_ref::().unwrap(); - assert_eq!(dec_arr.len(), 3); - assert!(dec_arr.is_valid(0)); - assert!(!dec_arr.is_valid(1)); - assert!(dec_arr.is_valid(2)); - assert_eq!(dec_arr.value_as_string(0), "1234.56"); - assert_eq!(dec_arr.value_as_string(2), "-1234.56"); + #[cfg(feature = "small_decimals")] + { + let dec_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec_arr.len(), 3); + assert!(dec_arr.is_valid(0)); + assert!(!dec_arr.is_valid(1)); + assert!(dec_arr.is_valid(2)); + assert_eq!(dec_arr.value_as_string(0), "1234.56"); + assert_eq!(dec_arr.value_as_string(2), "-1234.56"); + } + #[cfg(not(feature = "small_decimals"))] + { + let dec_arr = arr.as_any().downcast_ref::().unwrap(); + assert_eq!(dec_arr.len(), 3); + assert!(dec_arr.is_valid(0)); + assert!(!dec_arr.is_valid(1)); + assert!(dec_arr.is_valid(2)); + assert_eq!(dec_arr.value_as_string(0), "1234.56"); + assert_eq!(dec_arr.value_as_string(2), "-1234.56"); + } } #[test] @@ -1660,7 +1823,6 @@ mod tests { .as_any() .downcast_ref::>() .unwrap(); - assert_eq!(dict_array.len(), 3); let values = dict_array .values() diff --git a/arrow-avro/test/data/README.md b/arrow-avro/test/data/README.md new file mode 100644 index 000000000000..51416c8416d4 --- /dev/null +++ b/arrow-avro/test/data/README.md @@ -0,0 +1,147 @@ + + +# Avro test files for `arrow-avro` + +This directory contains small Avro Object Container Files (OCF) used by +`arrow-avro` tests to validate the `Reader` implementation. These files are generated from +a set of python scripts and will gradually be removed as they are merged into `arrow-testing`. + +## Decimal Files + +This directory contains OCF files used to exercise decoding of Avro’s `decimal` logical type +across both `bytes` and `fixed` encodings, and to cover Arrow decimal widths ranging +from `Decimal32` up through `Decimal256`. The files were generated from a +script (see **How these files were created** below). + +> **Avro decimal recap.** Avro’s `decimal` logical type annotates either a +> `bytes` or `fixed` primitive and stores the **two’s‑complement big‑endian +> representation of the unscaled integer** (value × 10^scale). Implementations +> should reject invalid combinations such as `scale > precision`. + +> **Arrow decimal recap.** Arrow defines `Decimal32`, `Decimal64`, `Decimal128`, +> and `Decimal256` types with maximum precisions of 9, 18, 38, and 76 digits, +> respectively. Tests here validate that the Avro reader selects compatible +> Arrow decimal widths given the Avro decimal’s precision and storage. + +--- + +All files are one‑column Avro OCFs with a field named `value`. Each contains 24 +rows with the sequence `1 … 24` rendered at the file’s declared `scale` +(i.e., at scale 10: `1.0000000000`, `2.0000000000`). + +| File | Avro storage | Decimal (precision, scale) | Intended Arrow width | +|---|---|---|---| +| `int256_decimal.avro` | `bytes` + `logicalType: decimal` | (76, 10) | `Decimal256` | +| `fixed256_decimal.avro` | `fixed[32]` + `logicalType: decimal` | (76, 10) | `Decimal256` | +| `fixed_length_decimal_legacy_32.avro` | `fixed[4]` + `logicalType: decimal` | (9, 2) | `Decimal32` (legacy fixed‑width path) | +| `int128_decimal.avro` | `bytes` + `logicalType: decimal` | (38, 2) | `Decimal128` | + +### Schemas (for reference) + +#### int256_decimal.avro + +```json +{ + "type": "record", + "name": "OneColDecimal256Bytes", + "fields": [{ + "name": "value", + "type": { "type": "bytes", "logicalType": "decimal", "precision": 76, "scale": 10 } + }] +} +``` + +#### fixed256_decimal.avro + +```json +{ + "type": "record", + "name": "OneColDecimal256Fixed", + "fields": [{ + "name": "value", + "type": { + "type": "fixed", "name": "Decimal256Fixed", "size": 32, + "logicalType": "decimal", "precision": 76, "scale": 10 + } + }] +} +``` + +#### fixed_length_decimal_legacy_32.avro + +```json +{ + "type": "record", + "name": "OneColDecimal32FixedLegacy", + "fields": [{ + "name": "value", + "type": { + "type": "fixed", "name": "Decimal32FixedLegacy", "size": 4, + "logicalType": "decimal", "precision": 9, "scale": 2 + } + }] +} +``` + +#### int128_decimal.avro + +```json +{ + "type": "record", + "name": "OneColDecimal128Bytes", + "fields": [{ + "name": "value", + "type": { "type": "bytes", "logicalType": "decimal", "precision": 38, "scale": 2 } + }] +} +``` + +### How these files were created + +All four files were generated by the Python script +`create_avro_decimal_files.py` authored for this purpose. The script uses +`fastavro` to write OCFs and encodes decimal values as required by the Avro +spec (two’s‑complement big‑endian of the unscaled integer). + +#### Re‑generation + +From the repository root (defaults write into arrow-avro/test/data): + +```bash +# 1) Ensure Python 3 is available, then install fastavro +python -m pip install --upgrade fastavro + +# 2) Fetch the script +curl -L -o create_avro_decimal_files.py \ +https://gist.githubusercontent.com/jecsand838/3890349bdb33082a3e8fdcae3257eef7/raw/create_avro_decimal_files.py + +# 3) Generate the files (prints a verification dump by default) +python create_avro_decimal_files.py -o arrow-avro/test/data +``` + +Options: +* --num-rows (default 24) — number of rows to emit per file +* --scale (default 10) — the decimal scale used for the 256 files +* --no-verify — skip reading the files back for printed verification + +## Other Files + +This directory contains other small OCF files used by `arrow-avro` tests. Details on these will be added in +follow-up PRs. \ No newline at end of file diff --git a/arrow-avro/test/data/fixed256_decimal.avro b/arrow-avro/test/data/fixed256_decimal.avro new file mode 100644 index 000000000000..d1fc97dd8c83 Binary files /dev/null and b/arrow-avro/test/data/fixed256_decimal.avro differ diff --git a/arrow-avro/test/data/fixed_length_decimal_legacy_32.avro b/arrow-avro/test/data/fixed_length_decimal_legacy_32.avro new file mode 100644 index 000000000000..b746df9619b5 Binary files /dev/null and b/arrow-avro/test/data/fixed_length_decimal_legacy_32.avro differ diff --git a/arrow-avro/test/data/int128_decimal.avro b/arrow-avro/test/data/int128_decimal.avro new file mode 100644 index 000000000000..bd54d20ba487 Binary files /dev/null and b/arrow-avro/test/data/int128_decimal.avro differ diff --git a/arrow-avro/test/data/int256_decimal.avro b/arrow-avro/test/data/int256_decimal.avro new file mode 100644 index 000000000000..62ad7ea4df08 Binary files /dev/null and b/arrow-avro/test/data/int256_decimal.avro differ