diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs index 0cac8c578680..3f94391c2511 100644 --- a/arrow-avro/src/codec.rs +++ b/arrow-avro/src/codec.rs @@ -16,8 +16,9 @@ // under the License. use crate::schema::{ - Attributes, AvroSchema, ComplexType, Enum, Nullability, PrimitiveType, Record, Schema, Type, - TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY, + Array, Attributes, AvroSchema, ComplexType, Enum, Fixed, Map, Nullability, PrimitiveType, + Record, Schema, Type, TypeName, AVRO_ENUM_SYMBOLS_METADATA_KEY, + AVRO_FIELD_DEFAULT_METADATA_KEY, }; use arrow_schema::{ ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, @@ -25,6 +26,8 @@ use arrow_schema::{ }; #[cfg(feature = "small_decimals")] use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION}; +use indexmap::IndexMap; +use serde_json::Value; use std::collections::HashMap; use std::sync::Arc; @@ -33,11 +36,11 @@ use std::sync::Arc; pub(crate) enum ResolutionInfo { /// Indicates that the writer's type should be promoted to the reader's type. Promotion(Promotion), - /// Indicates that a default value should be used for a field. (Implemented in a Follow-up PR) + /// Indicates that a default value should be used for a field. DefaultValue(AvroLiteral), /// Provides mapping information for resolving enums. EnumMapping(EnumMapping), - /// Provides resolution information for record fields. (Implemented in a Follow-up PR) + /// Provides resolution information for record fields. Record(ResolvedRecord), } @@ -64,6 +67,10 @@ pub(crate) enum AvroLiteral { String(String), /// Represents an enum symbol. Enum(String), + /// Represents a JSON array default for an Avro array, containing element literals. + Array(Vec), + /// Represents a JSON object default for an Avro map/struct, mapping string keys to value literals. + Map(IndexMap), /// Represents an unsupported literal type. Unsupported, } @@ -193,6 +200,225 @@ impl AvroDataType { pub fn nullability(&self) -> Option { self.nullability } + + #[inline] + fn parse_default_literal(&self, default_json: &Value) -> Result { + fn expect_string<'v>( + default_json: &'v Value, + data_type: &str, + ) -> Result<&'v str, ArrowError> { + match default_json { + Value::String(s) => Ok(s.as_str()), + _ => Err(ArrowError::SchemaError(format!( + "Default value must be a JSON string for {data_type}" + ))), + } + } + + fn parse_bytes_default( + default_json: &Value, + expected_len: Option, + ) -> Result, ArrowError> { + let s = expect_string(default_json, "bytes/fixed logical types")?; + let mut out = Vec::with_capacity(s.len()); + for ch in s.chars() { + let cp = ch as u32; + if cp > 0xFF { + return Err(ArrowError::SchemaError(format!( + "Invalid codepoint U+{cp:04X} in bytes/fixed default; must be ≤ 0xFF" + ))); + } + out.push(cp as u8); + } + if let Some(len) = expected_len { + if out.len() != len { + return Err(ArrowError::SchemaError(format!( + "Default length {} does not match expected fixed size {len}", + out.len(), + ))); + } + } + Ok(out) + } + + fn parse_json_i64(default_json: &Value, data_type: &str) -> Result { + match default_json { + Value::Number(n) => n.as_i64().ok_or_else(|| { + ArrowError::SchemaError(format!("Default {data_type} must be an integer")) + }), + _ => Err(ArrowError::SchemaError(format!( + "Default {data_type} must be a JSON integer" + ))), + } + } + + fn parse_json_f64(default_json: &Value, data_type: &str) -> Result { + match default_json { + Value::Number(n) => n.as_f64().ok_or_else(|| { + ArrowError::SchemaError(format!("Default {data_type} must be a number")) + }), + _ => Err(ArrowError::SchemaError(format!( + "Default {data_type} must be a JSON number" + ))), + } + } + + // Handle JSON nulls per-spec: allowed only for `null` type or unions with null FIRST + if default_json.is_null() { + return match self.codec() { + Codec::Null => Ok(AvroLiteral::Null), + _ if self.nullability() == Some(Nullability::NullFirst) => Ok(AvroLiteral::Null), + _ => Err(ArrowError::SchemaError( + "JSON null default is only valid for `null` type or for a union whose first branch is `null`" + .to_string(), + )), + }; + } + let lit = match self.codec() { + Codec::Null => { + return Err(ArrowError::SchemaError( + "Default for `null` type must be JSON null".to_string(), + )) + } + Codec::Boolean => match default_json { + Value::Bool(b) => AvroLiteral::Boolean(*b), + _ => { + return Err(ArrowError::SchemaError( + "Boolean default must be a JSON boolean".to_string(), + )) + } + }, + Codec::Int32 | Codec::Date32 | Codec::TimeMillis => { + let i = parse_json_i64(default_json, "int")?; + if i < i32::MIN as i64 || i > i32::MAX as i64 { + return Err(ArrowError::SchemaError(format!( + "Default int {i} out of i32 range" + ))); + } + AvroLiteral::Int(i as i32) + } + Codec::Int64 + | Codec::TimeMicros + | Codec::TimestampMillis(_) + | Codec::TimestampMicros(_) => AvroLiteral::Long(parse_json_i64(default_json, "long")?), + Codec::Float32 => { + let f = parse_json_f64(default_json, "float")?; + if !f.is_finite() || f < f32::MIN as f64 || f > f32::MAX as f64 { + return Err(ArrowError::SchemaError(format!( + "Default float {f} out of f32 range or not finite" + ))); + } + AvroLiteral::Float(f as f32) + } + Codec::Float64 => AvroLiteral::Double(parse_json_f64(default_json, "double")?), + Codec::Utf8 | Codec::Utf8View | Codec::Uuid => { + AvroLiteral::String(expect_string(default_json, "string/uuid")?.to_string()) + } + Codec::Binary => AvroLiteral::Bytes(parse_bytes_default(default_json, None)?), + Codec::Fixed(sz) => { + AvroLiteral::Bytes(parse_bytes_default(default_json, Some(*sz as usize))?) + } + Codec::Decimal(_, _, fixed_size) => { + AvroLiteral::Bytes(parse_bytes_default(default_json, *fixed_size)?) + } + Codec::Enum(symbols) => { + let s = expect_string(default_json, "enum")?; + if symbols.iter().any(|sym| sym == s) { + AvroLiteral::Enum(s.to_string()) + } else { + return Err(ArrowError::SchemaError(format!( + "Default enum symbol {s:?} not found in reader enum symbols" + ))); + } + } + Codec::Interval => AvroLiteral::Bytes(parse_bytes_default(default_json, Some(12))?), + Codec::List(item_dt) => match default_json { + Value::Array(items) => AvroLiteral::Array( + items + .iter() + .map(|v| item_dt.parse_default_literal(v)) + .collect::>()?, + ), + _ => { + return Err(ArrowError::SchemaError( + "Default value must be a JSON array for Avro array type".to_string(), + )) + } + }, + Codec::Map(val_dt) => match default_json { + Value::Object(map) => { + let mut out = IndexMap::with_capacity(map.len()); + for (k, v) in map { + out.insert(k.clone(), val_dt.parse_default_literal(v)?); + } + AvroLiteral::Map(out) + } + _ => { + return Err(ArrowError::SchemaError( + "Default value must be a JSON object for Avro map type".to_string(), + )) + } + }, + Codec::Struct(fields) => match default_json { + Value::Object(obj) => { + let mut out: IndexMap = + IndexMap::with_capacity(fields.len()); + for f in fields.as_ref() { + let name = f.name().to_string(); + if let Some(sub) = obj.get(&name) { + out.insert(name, f.data_type().parse_default_literal(sub)?); + } else { + // Cache metadata lookup once + let stored_default = + f.data_type().metadata.get(AVRO_FIELD_DEFAULT_METADATA_KEY); + if stored_default.is_none() + && f.data_type().nullability() == Some(Nullability::default()) + { + out.insert(name, AvroLiteral::Null); + } else if let Some(default_json) = stored_default { + let v: Value = + serde_json::from_str(default_json).map_err(|e| { + ArrowError::SchemaError(format!( + "Failed to parse stored subfield default JSON for '{}': {e}", + f.name(), + )) + })?; + out.insert(name, f.data_type().parse_default_literal(&v)?); + } else { + return Err(ArrowError::SchemaError(format!( + "Record default missing required subfield '{}' with non-nullable type {:?}", + f.name(), + f.data_type().codec() + ))); + } + } + } + AvroLiteral::Map(out) + } + _ => { + return Err(ArrowError::SchemaError( + "Default value for record/struct must be a JSON object".to_string(), + )) + } + }, + }; + Ok(lit) + } + + fn store_default(&mut self, default_json: &Value) -> Result<(), ArrowError> { + let json_text = serde_json::to_string(default_json).map_err(|e| { + ArrowError::ParseError(format!("Failed to serialize default to JSON: {e}")) + })?; + self.metadata + .insert(AVRO_FIELD_DEFAULT_METADATA_KEY.to_string(), json_text); + Ok(()) + } + + fn parse_and_store_default(&mut self, default_json: &Value) -> Result { + let lit = self.parse_default_literal(default_json)?; + self.store_default(default_json)?; + Ok(lit) + } } /// A named [`AvroDataType`] @@ -625,7 +851,6 @@ impl<'a> Resolver<'a> { let (namespace, name) = name .rsplit_once('.') .unwrap_or_else(|| (namespace.unwrap_or(""), name)); - self.map .get(&(namespace, name)) .ok_or_else(|| ArrowError::ParseError(format!("Failed to resolve {namespace}.{name}"))) @@ -924,6 +1149,18 @@ impl<'a> Maker<'a> { return self.resolve_primitives(write_primitive, read_primitive, reader_schema); } match (writer_schema, reader_schema) { + ( + Schema::Complex(ComplexType::Array(writer_array)), + Schema::Complex(ComplexType::Array(reader_array)), + ) => self.resolve_array(writer_array, reader_array, namespace), + ( + Schema::Complex(ComplexType::Map(writer_map)), + Schema::Complex(ComplexType::Map(reader_map)), + ) => self.resolve_map(writer_map, reader_map, namespace), + ( + Schema::Complex(ComplexType::Fixed(writer_fixed)), + Schema::Complex(ComplexType::Fixed(reader_fixed)), + ) => self.resolve_fixed(writer_fixed, reader_fixed, reader_schema, namespace), ( Schema::Complex(ComplexType::Record(writer_record)), Schema::Complex(ComplexType::Record(reader_record)), @@ -940,20 +1177,71 @@ impl<'a> Maker<'a> { ), (Schema::TypeName(TypeName::Ref(_)), _) => self.parse_type(reader_schema, namespace), (_, Schema::TypeName(TypeName::Ref(_))) => self.parse_type(reader_schema, namespace), - // if both sides are the same complex kind (non-record), adopt the reader type. - // This aligns with Avro spec: arrays, maps, and enums resolve recursively; - // for identical shapes we can just parse the reader schema. - (Schema::Complex(ComplexType::Array(_)), Schema::Complex(ComplexType::Array(_))) - | (Schema::Complex(ComplexType::Map(_)), Schema::Complex(ComplexType::Map(_))) - | (Schema::Complex(ComplexType::Fixed(_)), Schema::Complex(ComplexType::Fixed(_))) => { - self.parse_type(reader_schema, namespace) - } _ => Err(ArrowError::NotYetImplemented( "Other resolutions not yet implemented".to_string(), )), } } + fn resolve_array( + &mut self, + writer_array: &Array<'a>, + reader_array: &Array<'a>, + namespace: Option<&'a str>, + ) -> Result { + Ok(AvroDataType { + nullability: None, + metadata: reader_array.attributes.field_metadata(), + codec: Codec::List(Arc::new(self.make_data_type( + writer_array.items.as_ref(), + Some(reader_array.items.as_ref()), + namespace, + )?)), + resolution: None, + }) + } + + fn resolve_map( + &mut self, + writer_map: &Map<'a>, + reader_map: &Map<'a>, + namespace: Option<&'a str>, + ) -> Result { + Ok(AvroDataType { + nullability: None, + metadata: reader_map.attributes.field_metadata(), + codec: Codec::Map(Arc::new(self.make_data_type( + &writer_map.values, + Some(&reader_map.values), + namespace, + )?)), + resolution: None, + }) + } + + fn resolve_fixed<'s>( + &mut self, + writer_fixed: &Fixed<'a>, + reader_fixed: &Fixed<'a>, + reader_schema: &'s Schema<'a>, + namespace: Option<&'a str>, + ) -> Result { + ensure_names_match( + "Fixed", + writer_fixed.name, + &writer_fixed.aliases, + reader_fixed.name, + &reader_fixed.aliases, + )?; + if writer_fixed.size != reader_fixed.size { + return Err(ArrowError::SchemaError(format!( + "Fixed size mismatch for {}: writer={}, reader={}", + reader_fixed.name, writer_fixed.size, reader_fixed.size + ))); + } + self.parse_type(reader_schema, namespace) + } + fn resolve_primitives( &mut self, write_primitive: PrimitiveType, @@ -1135,52 +1423,85 @@ impl<'a> Maker<'a> { )?; let writer_ns = writer_record.namespace.or(namespace); let reader_ns = reader_record.namespace.or(namespace); - // Map writer field name -> index - let mut writer_index_map = - HashMap::<&str, usize>::with_capacity(writer_record.fields.len()); - for (idx, write_field) in writer_record.fields.iter().enumerate() { - writer_index_map.insert(write_field.name, idx); - } - // Prepare outputs - let mut reader_fields: Vec = Vec::with_capacity(reader_record.fields.len()); + let reader_md = reader_record.attributes.field_metadata(); + let writer_index_map: HashMap<&str, usize> = writer_record + .fields + .iter() + .enumerate() + .map(|(idx, wf)| (wf.name, idx)) + .collect(); let mut writer_to_reader: Vec> = vec![None; writer_record.fields.len()]; - let mut skip_fields: Vec> = vec![None; writer_record.fields.len()]; - //let mut default_fields: Vec = Vec::new(); - // Build reader fields and mapping - for (reader_idx, r_field) in reader_record.fields.iter().enumerate() { - if let Some(&writer_idx) = writer_index_map.get(r_field.name) { - // Field exists in a writer: resolve types (including promotions and union-of-null) - let w_schema = &writer_record.fields[writer_idx].r#type; - let resolved_dt = - self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?; - reader_fields.push(AvroField { - name: r_field.name.to_string(), - data_type: resolved_dt, - }); - writer_to_reader[writer_idx] = Some(reader_idx); - } else { - return Err(ArrowError::NotYetImplemented( - "New fields from reader with default values not yet implemented".to_string(), - )); - } - } - // Any writer fields not mapped should be skipped - for (writer_idx, writer_field) in writer_record.fields.iter().enumerate() { - if writer_to_reader[writer_idx].is_none() { - // Parse writer field type to know how to skip data - let writer_dt = self.parse_type(&writer_field.r#type, writer_ns)?; - skip_fields[writer_idx] = Some(writer_dt); - } - } - // Implement writer-only fields to skip in Follow-up PR here - // Build resolved record AvroDataType + let reader_fields: Vec = reader_record + .fields + .iter() + .enumerate() + .map(|(reader_idx, r_field)| -> Result { + if let Some(&writer_idx) = writer_index_map.get(r_field.name) { + let w_schema = &writer_record.fields[writer_idx].r#type; + let dt = self.make_data_type(w_schema, Some(&r_field.r#type), reader_ns)?; + writer_to_reader[writer_idx] = Some(reader_idx); + Ok(AvroField { + name: r_field.name.to_string(), + data_type: dt, + }) + } else { + let mut dt = self.parse_type(&r_field.r#type, reader_ns)?; + match r_field.default.as_ref() { + Some(default_json) => { + dt.resolution = Some(ResolutionInfo::DefaultValue( + dt.parse_and_store_default(default_json)?, + )); + } + None => { + if dt.nullability() == Some(Nullability::NullFirst) { + dt.resolution = Some(ResolutionInfo::DefaultValue( + dt.parse_and_store_default(&Value::Null)?, + )); + } else { + return Err(ArrowError::SchemaError(format!( + "Reader field '{}' not present in writer schema must have a default value", + r_field.name + ))); + } + } + } + Ok(AvroField { + name: r_field.name.to_string(), + data_type: dt, + }) + } + }) + .collect::>()?; + let default_fields: Vec = reader_fields + .iter() + .enumerate() + .filter_map(|(index, field)| { + matches!( + field.data_type().resolution, + Some(ResolutionInfo::DefaultValue(_)) + ) + .then_some(index) + }) + .collect(); + let skip_fields: Vec> = writer_record + .fields + .iter() + .enumerate() + .map(|(writer_index, writer_field)| { + if writer_to_reader[writer_index].is_some() { + Ok(None) + } else { + self.parse_type(&writer_field.r#type, writer_ns).map(Some) + } + }) + .collect::>()?; let resolved = AvroDataType::new_with_resolution( Codec::Struct(Arc::from(reader_fields)), - reader_record.attributes.field_metadata(), + reader_md, None, Some(ResolutionInfo::Record(ResolvedRecord { writer_to_reader: Arc::from(writer_to_reader), - default_fields: Arc::default(), + default_fields: Arc::from(default_fields), skip_fields: Arc::from(skip_fields), })), ); @@ -1712,4 +2033,440 @@ mod tests { panic!("Top-level schema is not a struct"); } } + + fn json_string(s: &str) -> Value { + Value::String(s.to_string()) + } + + fn assert_default_stored(dt: &AvroDataType, default_json: &Value) { + let stored = dt + .metadata + .get(AVRO_FIELD_DEFAULT_METADATA_KEY) + .cloned() + .unwrap_or_default(); + let expected = serde_json::to_string(default_json).unwrap(); + assert_eq!(stored, expected, "stored default metadata should match"); + } + + #[test] + fn test_validate_and_store_default_null_and_nullability_rules() { + let mut dt_null = AvroDataType::new(Codec::Null, HashMap::new(), None); + let lit = dt_null.parse_and_store_default(&Value::Null).unwrap(); + assert_eq!(lit, AvroLiteral::Null); + assert_default_stored(&dt_null, &Value::Null); + let mut dt_int = AvroDataType::new(Codec::Int32, HashMap::new(), None); + let err = dt_int.parse_and_store_default(&Value::Null).unwrap_err(); + assert!( + err.to_string() + .contains("JSON null default is only valid for `null` type"), + "unexpected error: {err}" + ); + let mut dt_int_nf = + AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullFirst)); + let lit2 = dt_int_nf.parse_and_store_default(&Value::Null).unwrap(); + assert_eq!(lit2, AvroLiteral::Null); + assert_default_stored(&dt_int_nf, &Value::Null); + let mut dt_int_ns = + AvroDataType::new(Codec::Int32, HashMap::new(), Some(Nullability::NullSecond)); + let err2 = dt_int_ns.parse_and_store_default(&Value::Null).unwrap_err(); + assert!( + err2.to_string() + .contains("JSON null default is only valid for `null` type"), + "unexpected error: {err2}" + ); + } + + #[test] + fn test_validate_and_store_default_primitives_and_temporal() { + let mut dt_bool = AvroDataType::new(Codec::Boolean, HashMap::new(), None); + let lit = dt_bool.parse_and_store_default(&Value::Bool(true)).unwrap(); + assert_eq!(lit, AvroLiteral::Boolean(true)); + assert_default_stored(&dt_bool, &Value::Bool(true)); + let mut dt_i32 = AvroDataType::new(Codec::Int32, HashMap::new(), None); + let lit = dt_i32 + .parse_and_store_default(&serde_json::json!(123)) + .unwrap(); + assert_eq!(lit, AvroLiteral::Int(123)); + assert_default_stored(&dt_i32, &serde_json::json!(123)); + let err = dt_i32 + .parse_and_store_default(&serde_json::json!(i64::from(i32::MAX) + 1)) + .unwrap_err(); + assert!(format!("{err}").contains("out of i32 range")); + let mut dt_i64 = AvroDataType::new(Codec::Int64, HashMap::new(), None); + let lit = dt_i64 + .parse_and_store_default(&serde_json::json!(1234567890)) + .unwrap(); + assert_eq!(lit, AvroLiteral::Long(1234567890)); + assert_default_stored(&dt_i64, &serde_json::json!(1234567890)); + let mut dt_f32 = AvroDataType::new(Codec::Float32, HashMap::new(), None); + let lit = dt_f32 + .parse_and_store_default(&serde_json::json!(1.25)) + .unwrap(); + assert_eq!(lit, AvroLiteral::Float(1.25)); + assert_default_stored(&dt_f32, &serde_json::json!(1.25)); + let err = dt_f32 + .parse_and_store_default(&serde_json::json!(1e39)) + .unwrap_err(); + assert!(format!("{err}").contains("out of f32 range")); + let mut dt_f64 = AvroDataType::new(Codec::Float64, HashMap::new(), None); + let lit = dt_f64 + .parse_and_store_default(&serde_json::json!(std::f64::consts::PI)) + .unwrap(); + assert_eq!(lit, AvroLiteral::Double(std::f64::consts::PI)); + assert_default_stored(&dt_f64, &serde_json::json!(std::f64::consts::PI)); + let mut dt_str = AvroDataType::new(Codec::Utf8, HashMap::new(), None); + let l = dt_str + .parse_and_store_default(&json_string("hello")) + .unwrap(); + assert_eq!(l, AvroLiteral::String("hello".into())); + assert_default_stored(&dt_str, &json_string("hello")); + let mut dt_strv = AvroDataType::new(Codec::Utf8View, HashMap::new(), None); + let l = dt_strv + .parse_and_store_default(&json_string("view")) + .unwrap(); + assert_eq!(l, AvroLiteral::String("view".into())); + assert_default_stored(&dt_strv, &json_string("view")); + let mut dt_uuid = AvroDataType::new(Codec::Uuid, HashMap::new(), None); + let l = dt_uuid + .parse_and_store_default(&json_string("00000000-0000-0000-0000-000000000000")) + .unwrap(); + assert_eq!( + l, + AvroLiteral::String("00000000-0000-0000-0000-000000000000".into()) + ); + let mut dt_bin = AvroDataType::new(Codec::Binary, HashMap::new(), None); + let l = dt_bin.parse_and_store_default(&json_string("ABC")).unwrap(); + assert_eq!(l, AvroLiteral::Bytes(vec![65, 66, 67])); + let err = dt_bin + .parse_and_store_default(&json_string("€")) // U+20AC + .unwrap_err(); + assert!(format!("{err}").contains("Invalid codepoint")); + let mut dt_date = AvroDataType::new(Codec::Date32, HashMap::new(), None); + let ld = dt_date + .parse_and_store_default(&serde_json::json!(1)) + .unwrap(); + assert_eq!(ld, AvroLiteral::Int(1)); + let mut dt_tmill = AvroDataType::new(Codec::TimeMillis, HashMap::new(), None); + let lt = dt_tmill + .parse_and_store_default(&serde_json::json!(86_400_000)) + .unwrap(); + assert_eq!(lt, AvroLiteral::Int(86_400_000)); + let mut dt_tmicros = AvroDataType::new(Codec::TimeMicros, HashMap::new(), None); + let ltm = dt_tmicros + .parse_and_store_default(&serde_json::json!(1_000_000)) + .unwrap(); + assert_eq!(ltm, AvroLiteral::Long(1_000_000)); + let mut dt_ts_milli = AvroDataType::new(Codec::TimestampMillis(true), HashMap::new(), None); + let l1 = dt_ts_milli + .parse_and_store_default(&serde_json::json!(123)) + .unwrap(); + assert_eq!(l1, AvroLiteral::Long(123)); + let mut dt_ts_micro = + AvroDataType::new(Codec::TimestampMicros(false), HashMap::new(), None); + let l2 = dt_ts_micro + .parse_and_store_default(&serde_json::json!(456)) + .unwrap(); + assert_eq!(l2, AvroLiteral::Long(456)); + } + + #[test] + fn test_validate_and_store_default_fixed_decimal_interval() { + let mut dt_fixed = AvroDataType::new(Codec::Fixed(4), HashMap::new(), None); + let l = dt_fixed + .parse_and_store_default(&json_string("WXYZ")) + .unwrap(); + assert_eq!(l, AvroLiteral::Bytes(vec![87, 88, 89, 90])); + let err = dt_fixed + .parse_and_store_default(&json_string("TOO LONG")) + .unwrap_err(); + assert!(err.to_string().contains("Default length")); + let mut dt_dec_fixed = + AvroDataType::new(Codec::Decimal(10, Some(2), Some(3)), HashMap::new(), None); + let l = dt_dec_fixed + .parse_and_store_default(&json_string("abc")) + .unwrap(); + assert_eq!(l, AvroLiteral::Bytes(vec![97, 98, 99])); + let err = dt_dec_fixed + .parse_and_store_default(&json_string("toolong")) + .unwrap_err(); + assert!(err.to_string().contains("Default length")); + let mut dt_dec_bytes = + AvroDataType::new(Codec::Decimal(10, Some(2), None), HashMap::new(), None); + let l = dt_dec_bytes + .parse_and_store_default(&json_string("freeform")) + .unwrap(); + assert_eq!( + l, + AvroLiteral::Bytes("freeform".bytes().collect::>()) + ); + let mut dt_interval = AvroDataType::new(Codec::Interval, HashMap::new(), None); + let l = dt_interval + .parse_and_store_default(&json_string("ABCDEFGHIJKL")) + .unwrap(); + assert_eq!( + l, + AvroLiteral::Bytes("ABCDEFGHIJKL".bytes().collect::>()) + ); + let err = dt_interval + .parse_and_store_default(&json_string("short")) + .unwrap_err(); + assert!(err.to_string().contains("Default length")); + } + + #[test] + fn test_validate_and_store_default_enum_list_map_struct() { + let symbols: Arc<[String]> = ["RED".to_string(), "GREEN".to_string(), "BLUE".to_string()] + .into_iter() + .collect(); + let mut dt_enum = AvroDataType::new(Codec::Enum(symbols), HashMap::new(), None); + let l = dt_enum + .parse_and_store_default(&json_string("GREEN")) + .unwrap(); + assert_eq!(l, AvroLiteral::Enum("GREEN".into())); + let err = dt_enum + .parse_and_store_default(&json_string("YELLOW")) + .unwrap_err(); + assert!(err.to_string().contains("Default enum symbol")); + let item = AvroDataType::new(Codec::Int64, HashMap::new(), None); + let mut dt_list = AvroDataType::new(Codec::List(Arc::new(item)), HashMap::new(), None); + let val = serde_json::json!([1, 2, 3]); + let l = dt_list.parse_and_store_default(&val).unwrap(); + assert_eq!( + l, + AvroLiteral::Array(vec![ + AvroLiteral::Long(1), + AvroLiteral::Long(2), + AvroLiteral::Long(3) + ]) + ); + let err = dt_list + .parse_and_store_default(&serde_json::json!({"not":"array"})) + .unwrap_err(); + assert!(err.to_string().contains("JSON array")); + let val_dt = AvroDataType::new(Codec::Float64, HashMap::new(), None); + let mut dt_map = AvroDataType::new(Codec::Map(Arc::new(val_dt)), HashMap::new(), None); + let mv = serde_json::json!({"x": 1.5, "y": 2.5}); + let l = dt_map.parse_and_store_default(&mv).unwrap(); + let mut expected = IndexMap::new(); + expected.insert("x".into(), AvroLiteral::Double(1.5)); + expected.insert("y".into(), AvroLiteral::Double(2.5)); + assert_eq!(l, AvroLiteral::Map(expected)); + // Not object -> error + let err = dt_map + .parse_and_store_default(&serde_json::json!(123)) + .unwrap_err(); + assert!(err.to_string().contains("JSON object")); + let mut field_a = AvroField { + name: "a".into(), + data_type: AvroDataType::new(Codec::Int32, HashMap::new(), None), + }; + let field_b = AvroField { + name: "b".into(), + data_type: AvroDataType::new( + Codec::Int64, + HashMap::new(), + Some(Nullability::NullFirst), + ), + }; + let mut c_md = HashMap::new(); + c_md.insert(AVRO_FIELD_DEFAULT_METADATA_KEY.into(), "\"xyz\"".into()); + let field_c = AvroField { + name: "c".into(), + data_type: AvroDataType::new(Codec::Utf8, c_md, None), + }; + field_a.data_type.metadata.insert("doc".into(), "na".into()); + let struct_fields: Arc<[AvroField]> = Arc::from(vec![field_a, field_b, field_c]); + let mut dt_struct = AvroDataType::new(Codec::Struct(struct_fields), HashMap::new(), None); + let default_obj = serde_json::json!({"a": 7}); + let l = dt_struct.parse_and_store_default(&default_obj).unwrap(); + let mut expected = IndexMap::new(); + expected.insert("a".into(), AvroLiteral::Int(7)); + expected.insert("b".into(), AvroLiteral::Null); + expected.insert("c".into(), AvroLiteral::String("xyz".into())); + assert_eq!(l, AvroLiteral::Map(expected)); + assert_default_stored(&dt_struct, &default_obj); + let req_field = AvroField { + name: "req".into(), + data_type: AvroDataType::new(Codec::Boolean, HashMap::new(), None), + }; + let mut dt_bad = AvroDataType::new( + Codec::Struct(Arc::from(vec![req_field])), + HashMap::new(), + None, + ); + let err = dt_bad + .parse_and_store_default(&serde_json::json!({})) + .unwrap_err(); + assert!( + err.to_string().contains("missing required subfield 'req'"), + "unexpected error: {err}" + ); + let err = dt_struct + .parse_and_store_default(&serde_json::json!(10)) + .unwrap_err(); + err.to_string().contains("must be a JSON object"); + } + + #[test] + fn test_resolve_array_promotion_and_reader_metadata() { + let mut w_add: HashMap<&str, Value> = HashMap::new(); + w_add.insert("who", json_string("writer")); + let mut r_add: HashMap<&str, Value> = HashMap::new(); + r_add.insert("who", json_string("reader")); + let writer_schema = Schema::Complex(ComplexType::Array(Array { + items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))), + attributes: Attributes { + logical_type: None, + additional: w_add, + }, + })); + let reader_schema = Schema::Complex(ComplexType::Array(Array { + items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Long))), + attributes: Attributes { + logical_type: None, + additional: r_add, + }, + })); + let mut maker = Maker::new(false, false); + let dt = maker + .make_data_type(&writer_schema, Some(&reader_schema), None) + .unwrap(); + assert_eq!(dt.metadata.get("who"), Some(&"\"reader\"".to_string())); + if let Codec::List(inner) = dt.codec() { + assert!(matches!(inner.codec(), Codec::Int64)); + assert_eq!( + inner.resolution, + Some(ResolutionInfo::Promotion(Promotion::IntToLong)) + ); + } else { + panic!("expected list codec"); + } + } + + #[test] + fn test_resolve_fixed_success_name_and_size_match_and_alias() { + let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed { + name: "MD5", + namespace: None, + aliases: vec!["Hash16"], + size: 16, + attributes: Attributes::default(), + })); + let reader_schema = Schema::Complex(ComplexType::Fixed(Fixed { + name: "Hash16", + namespace: None, + aliases: vec![], + size: 16, + attributes: Attributes::default(), + })); + let mut maker = Maker::new(false, false); + let dt = maker + .make_data_type(&writer_schema, Some(&reader_schema), None) + .unwrap(); + assert!(matches!(dt.codec(), Codec::Fixed(16))); + } + + #[test] + fn test_resolve_records_mapping_default_fields_and_skip_fields() { + let writer = Schema::Complex(ComplexType::Record(Record { + name: "R", + namespace: None, + doc: None, + aliases: vec![], + fields: vec![ + crate::schema::Field { + name: "a", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), + default: None, + }, + crate::schema::Field { + name: "skipme", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), + default: None, + }, + crate::schema::Field { + name: "b", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), + default: None, + }, + ], + attributes: Attributes::default(), + })); + let reader = Schema::Complex(ComplexType::Record(Record { + name: "R", + namespace: None, + doc: None, + aliases: vec![], + fields: vec![ + crate::schema::Field { + name: "b", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), + default: None, + }, + crate::schema::Field { + name: "a", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::Long)), + default: None, + }, + crate::schema::Field { + name: "name", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), + default: Some(json_string("anon")), + }, + crate::schema::Field { + name: "opt", + doc: None, + r#type: Schema::Union(vec![ + Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)), + Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)), + ]), + default: None, // should default to null because NullFirst + }, + ], + attributes: Attributes::default(), + })); + let mut maker = Maker::new(false, false); + let dt = maker + .make_data_type(&writer, Some(&reader), None) + .expect("record resolution"); + let fields = match dt.codec() { + Codec::Struct(f) => f, + other => panic!("expected struct, got {other:?}"), + }; + assert_eq!(fields.len(), 4); + assert_eq!(fields[0].name(), "b"); + assert_eq!(fields[1].name(), "a"); + assert_eq!(fields[2].name(), "name"); + assert_eq!(fields[3].name(), "opt"); + assert!(matches!( + fields[1].data_type().resolution, + Some(ResolutionInfo::Promotion(Promotion::IntToLong)) + )); + let rec = match dt.resolution { + Some(ResolutionInfo::Record(ref r)) => r.clone(), + other => panic!("expected record resolution, got {other:?}"), + }; + assert_eq!(rec.writer_to_reader.as_ref(), &[Some(1), None, Some(0)]); + assert_eq!(rec.default_fields.as_ref(), &[2usize, 3usize]); + assert!(rec.skip_fields[0].is_none()); + assert!(rec.skip_fields[2].is_none()); + let skip1 = rec.skip_fields[1].as_ref().expect("skip field present"); + assert!(matches!(skip1.codec(), Codec::Utf8)); + let name_md = &fields[2].data_type().metadata; + assert_eq!( + name_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY), + Some(&"\"anon\"".to_string()) + ); + let opt_md = &fields[3].data_type().metadata; + assert_eq!( + opt_md.get(AVRO_FIELD_DEFAULT_METADATA_KEY), + Some(&"null".to_string()) + ); + } }