From 4413d579be8d8e0ebb7add27c66149c4aec2f27b Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 19 Apr 2022 00:10:00 +0300 Subject: [PATCH 1/3] AVRO-3495: The order of the struct's fields and schema's fields should not matter Signed-off-by: Martin Tzvetanov Grigorov --- lang/rust/avro/src/encode.rs | 27 +++++++++----- lang/rust/avro/src/error.rs | 3 ++ lang/rust/avro/src/types.rs | 66 +++++++++++++++++++++------------- lang/rust/avro/tests/schema.rs | 32 +++++++++++++++++ 4 files changed, 96 insertions(+), 32 deletions(-) diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs index c4c0bd3feaa..04d220276b3 100644 --- a/lang/rust/avro/src/encode.rs +++ b/lang/rust/avro/src/encode.rs @@ -188,18 +188,29 @@ pub(crate) fn encode_internal( if let Schema::Record { ref name, fields: ref schema_fields, + ref lookup, .. } = *schema { let record_namespace = name.fully_qualified_name(enclosing_namespace).namespace; - for (i, &(_, ref value)) in fields.iter().enumerate() { - encode_internal( - value, - &schema_fields[i].schema, - names, - &record_namespace, - buffer, - )?; + for &(ref name, ref value) in fields.iter() { + match lookup.get(name) { + Some(idx) => { + encode_internal( + value, + &schema_fields[*idx].schema, + names, + &record_namespace, + buffer, + )?; + } + None => { + return Err(Error::NoEntryInLookupTable( + name.clone(), + format!("{:?}", lookup), + )); + } + } } } else { error!("invalid schema type for Record: {:?}", schema); diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index 908e040e9be..d7483ea9126 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -403,6 +403,9 @@ pub enum Error { #[error("Signed decimal bytes length {0} not equal to fixed schema size {1}.")] EncodeDecimalAsFixedError(usize, usize), + #[error("There is no entry for {0} in the lookup table: {1}.")] + NoEntryInLookupTable(String, String), + #[error("Can only encode value type {value_kind:?} as one of {supported_schema:?}")] EncodeValueAsSchemaError { value_kind: ValueKind, diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 25d9681209d..4016b84dd6f 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -457,7 +457,14 @@ impl Value { Value::accumulate(acc, value.validate_internal(inner, names)) }) } - (&Value::Record(ref record_fields), &Schema::Record { ref fields, .. }) => { + ( + &Value::Record(ref record_fields), + &Schema::Record { + ref fields, + ref lookup, + .. + }, + ) => { if fields.len() != record_fields.len() { return Some(format!( "The value's records length ({}) is different than the schema's ({})", @@ -466,19 +473,37 @@ impl Value { )); } - fields.iter().zip(record_fields.iter()).fold( - None, - |acc, (field, &(ref name, ref value))| { - if field.name != *name { - return Some(format!( - "Value's name '{}' does not match the expected field's name '{}'", - name, field.name - )); - } - let res = value.validate_internal(&field.schema, names); - Value::accumulate(acc, res) - }, - ) + let record_fields_by_name = record_fields + .iter() + .map(|(name, record_field)| (name.clone(), record_field.clone())) + .collect::>(); + + fields.iter().fold(None, |acc, field| { + match record_fields_by_name.get(&field.name) { + Some(record_field) => Value::accumulate( + acc, + record_field.validate_internal(&field.schema, names), + ), + None => Value::accumulate( + acc, + Some(format!("There is no value for field '{}'", field.name)), + ), + } + }) + + // fields.iter().zip(record_fields.iter()).fold( + // None, + // |acc, (field, &(ref name, ref value))| { + // if field.name != *name { + // return Some(format!( + // "Value's name '{}' does not match the expected field's name '{}'", + // name, field.name + // )); + // } + // let res = value.validate_internal(&field.schema, names); + // Value::accumulate(acc, res) + // }, + // ) } (&Value::Map(ref items), &Schema::Record { ref fields, .. }) => { fields.iter().fold(None, |acc, field| { @@ -1054,7 +1079,7 @@ mod tests { lookup: Default::default(), }, false, - "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: Value's name 'unknown_field_name' does not match the expected field's name 'field_name'", + "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: There is no value for field 'field_name'", ), ( Value::Record(vec![("field_name".to_string(), Value::Null)]), @@ -1247,14 +1272,7 @@ mod tests { ("b".to_string(), Value::String("foo".to_string())), ("a".to_string(), Value::Long(42i64)), ]); - assert!(!value.validate(&schema)); - assert_log_message( - format!( - "Invalid value: {:?} for schema: {:?}. Reason: {}", - value, schema, "Value's name 'a' does not match the expected field's name 'b'" - ) - .as_str(), - ); + assert!(value.validate(&schema)); let value = Value::Record(vec![ ("a".to_string(), Value::Boolean(false)), @@ -1269,7 +1287,7 @@ mod tests { ]); assert!(!value.validate(&schema)); assert_log_message( - "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: Value's name 'c' does not match the expected field's name 'b'" + "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: There is no value for field 'b'" ); let value = Value::Record(vec![ diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs index f58b62f9e7e..33f62893cbf 100644 --- a/lang/rust/avro/tests/schema.rs +++ b/lang/rust/avro/tests/schema.rs @@ -17,6 +17,7 @@ use apache_avro::{ schema::{Name, RecordField}, + to_avro_datum, to_value, types::{Record, Value}, Codec, Error, Reader, Schema, Writer, }; @@ -1311,3 +1312,34 @@ fn test_decimal_valid_type_attributes() { assert_eq!(0, bytes_decimal.get_attribute("scale")); } */ + +#[test] +fn avro_old_issue_47() { + init(); + let schema_str = r#" + { + "type": "record", + "name": "my_record", + "fields": [ + {"name": "a", "type": "long"}, + {"name": "b", "type": "string"} + ] + }"#; + let schema = Schema::parse_str(schema_str).unwrap(); + + use serde::{Deserialize, Serialize}; + + #[derive(Deserialize, Serialize)] + pub struct MyRecord { + b: String, + a: i64, + } + + let record = MyRecord { + b: "hello".to_string(), + a: 1, + }; + + let res = to_avro_datum(&schema, to_value(record).unwrap()).unwrap(); + dbg!(res); +} From 91e32551d146ea1a5d08d2553a8c31d38874003b Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 19 Apr 2022 01:07:04 +0300 Subject: [PATCH 2/3] AVRO-3495: Use the lookup table when comparing values against fields by name Until now it was expected that both the schema fields and the input values are sorted the same way. Use BTreeMap instead of HashMap for the lookup table because otherwise the assertion on the validation error messages is impossible due to random printing of the map's entries Signed-off-by: Martin Tzvetanov Grigorov --- lang/rust/avro/src/schema.rs | 22 ++++----- lang/rust/avro/src/types.rs | 83 ++++++++++++++++---------------- lang/rust/avro_derive/src/lib.rs | 2 +- 3 files changed, 54 insertions(+), 53 deletions(-) diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index 6134c8256ab..e0fcca23b44 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -106,7 +106,7 @@ pub enum Schema { aliases: Aliases, doc: Documentation, fields: Vec, - lookup: HashMap, + lookup: BTreeMap, }, /// An `enum` Avro schema. Enum { @@ -1063,7 +1063,7 @@ impl Parser { } } - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); self.register_resolving_schema(&fully_qualified_name, &aliases); @@ -1856,7 +1856,7 @@ mod tests { order: RecordFieldOrder::Ignore, position: 0, }], - lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]), + lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]), }; assert_eq!(schema_c, schema_c_expected); @@ -1910,7 +1910,7 @@ mod tests { order: RecordFieldOrder::Ignore, position: 0, }], - lookup: HashMap::from_iter(vec![("field_one".to_string(), 0)]), + lookup: BTreeMap::from_iter(vec![("field_one".to_string(), 0)]), }; assert_eq!(schema_option_a, schema_option_a_expected); @@ -1932,7 +1932,7 @@ mod tests { ) .unwrap(); - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); lookup.insert("a".to_owned(), 0); lookup.insert("b".to_owned(), 1); @@ -1988,10 +1988,10 @@ mod tests { ) .unwrap(); - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); lookup.insert("recordField".to_owned(), 0); - let mut node_lookup = HashMap::new(); + let mut node_lookup = BTreeMap::new(); node_lookup.insert("children".to_owned(), 1); node_lookup.insert("label".to_owned(), 0); @@ -2159,7 +2159,7 @@ mod tests { ) .unwrap(); - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); lookup.insert("value".to_owned(), 0); lookup.insert("next".to_owned(), 1); @@ -2225,7 +2225,7 @@ mod tests { ) .unwrap(); - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); lookup.insert("value".to_owned(), 0); lookup.insert("next".to_owned(), 1); @@ -2289,7 +2289,7 @@ mod tests { ) .unwrap(); - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); lookup.insert("enum".to_owned(), 0); lookup.insert("next".to_owned(), 1); @@ -2364,7 +2364,7 @@ mod tests { ) .unwrap(); - let mut lookup = HashMap::new(); + let mut lookup = BTreeMap::new(); lookup.insert("fixed".to_owned(), 0); lookup.insert("next".to_owned(), 1); diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 4016b84dd6f..2ecb23e0ac8 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -25,7 +25,12 @@ use crate::{ AvroResult, Error, }; use serde_json::{Number, Value as JsonValue}; -use std::{collections::HashMap, convert::TryFrom, hash::BuildHasher, str::FromStr, u8}; +use std::{ + collections::{BTreeMap, HashMap}, + convert::TryFrom, + hash::BuildHasher, + str::FromStr, +}; use uuid::Uuid; /// Compute the maximum decimal value precision of a byte array of length `len` could hold. @@ -206,7 +211,7 @@ pub struct Record<'a> { /// Ordered according to the fields in the schema given to create this /// `Record` object. Any unset field defaults to `Value::Null`. pub fields: Vec<(String, Value)>, - schema_lookup: &'a HashMap, + schema_lookup: &'a BTreeMap, } impl<'a> Record<'a> { @@ -473,37 +478,26 @@ impl Value { )); } - let record_fields_by_name = record_fields + record_fields .iter() - .map(|(name, record_field)| (name.clone(), record_field.clone())) - .collect::>(); - - fields.iter().fold(None, |acc, field| { - match record_fields_by_name.get(&field.name) { - Some(record_field) => Value::accumulate( - acc, - record_field.validate_internal(&field.schema, names), - ), - None => Value::accumulate( - acc, - Some(format!("There is no value for field '{}'", field.name)), - ), - } - }) - - // fields.iter().zip(record_fields.iter()).fold( - // None, - // |acc, (field, &(ref name, ref value))| { - // if field.name != *name { - // return Some(format!( - // "Value's name '{}' does not match the expected field's name '{}'", - // name, field.name - // )); - // } - // let res = value.validate_internal(&field.schema, names); - // Value::accumulate(acc, res) - // }, - // ) + .fold(None, |acc, (field_name, record_field)| { + match lookup.get(field_name) { + Some(idx) => { + let field = &fields[*idx]; + Value::accumulate( + acc, + record_field.validate_internal(&field.schema, names), + ) + } + None => Value::accumulate( + acc, + Some(format!( + "There is no schema field for field '{}'", + field_name + )), + ), + } + }) } (&Value::Map(ref items), &Schema::Record { ref fields, .. }) => { fields.iter().fold(None, |acc, field| { @@ -1079,7 +1073,7 @@ mod tests { lookup: Default::default(), }, false, - "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: There is no value for field 'field_name'", + "Invalid value: Record([(\"unknown_field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Int, order: Ignore, position: 0 }], lookup: {} }. Reason: There is no schema field for field 'unknown_field_name'", ), ( Value::Record(vec![("field_name".to_string(), Value::Null)]), @@ -1097,10 +1091,10 @@ mod tests { order: RecordFieldOrder::Ignore, position: 0, }], - lookup: Default::default(), + lookup: [("field_name".to_string(), 0)].iter().cloned().collect(), }, false, - "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []", + "Invalid value: Record([(\"field_name\", Null)]) for schema: Record { name: Name { name: \"record_name\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"field_name\", doc: None, default: None, schema: Ref { name: Name { name: \"missing\", namespace: None } }, order: Ignore, position: 0 }], lookup: {\"field_name\": 0} }. Reason: Unresolved schema reference: 'missing'. Parsed names: []", ), ]; @@ -1229,7 +1223,6 @@ mod tests { fn validate_record() { init(); - use std::collections::HashMap; // { // "type": "record", // "fields": [ @@ -1259,7 +1252,10 @@ mod tests { position: 1, }, ], - lookup: HashMap::new(), + lookup: [("a".to_string(), 0), ("b".to_string(), 1)] + .iter() + .cloned() + .collect(), }; assert!(Value::Record(vec![ @@ -1279,7 +1275,7 @@ mod tests { ("b".to_string(), Value::String("foo".to_string())), ]); assert!(!value.validate(&schema)); - assert_log_message("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: Unsupported value-schema combination"); + assert_log_message("Invalid value: Record([(\"a\", Boolean(false)), (\"b\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1} }. Reason: Unsupported value-schema combination"); let value = Value::Record(vec![ ("a".to_string(), Value::Long(42i64)), @@ -1287,7 +1283,7 @@ mod tests { ]); assert!(!value.validate(&schema)); assert_log_message( - "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: There is no value for field 'b'" + "Invalid value: Record([(\"a\", Long(42)), (\"c\", String(\"foo\"))]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {\"a\": 0, \"b\": 1} }. Reason: There is no schema field for field 'c'" ); let value = Value::Record(vec![ @@ -1296,7 +1292,9 @@ mod tests { ("c".to_string(), Value::Null), ]); assert!(!value.validate(&schema)); - assert_log_message("Invalid value: Record([(\"a\", Long(42)), (\"b\", String(\"foo\")), (\"c\", Null)]) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: The value's records length (3) is different than the schema's (2)"); + assert_log_message( + r#"Invalid value: Record([("a", Long(42)), ("b", String("foo")), ("c", Null)]) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1} }. Reason: The value's records length (3) is different than the schema's (2)"#, + ); assert!(Value::Map( vec![ @@ -1314,7 +1312,10 @@ mod tests { .collect() ) .validate(&schema)); - assert_log_message("Invalid value: Map({\"c\": Long(123)}) for schema: Record { name: Name { name: \"some_record\", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: \"a\", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: \"b\", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {} }. Reason: Field with name '\"a\"' is not a member of the map items\nField with name '\"b\"' is not a member of the map items"); + assert_log_message( + r#"Invalid value: Map({"c": Long(123)}) for schema: Record { name: Name { name: "some_record", namespace: None }, aliases: None, doc: None, fields: [RecordField { name: "a", doc: None, default: None, schema: Long, order: Ascending, position: 0 }, RecordField { name: "b", doc: None, default: None, schema: String, order: Ascending, position: 1 }], lookup: {"a": 0, "b": 1} }. Reason: Field with name '"a"' is not a member of the map items +Field with name '"b"' is not a member of the map items"#, + ); let union_schema = Schema::Union(UnionSchema::new(vec![Schema::Null, schema]).unwrap()); diff --git a/lang/rust/avro_derive/src/lib.rs b/lang/rust/avro_derive/src/lib.rs index 0055249cb3b..a32f2be3348 100644 --- a/lang/rust/avro_derive/src/lib.rs +++ b/lang/rust/avro_derive/src/lib.rs @@ -152,7 +152,7 @@ fn get_data_struct_schema_def( Ok(quote! { let schema_fields = vec![#(#record_field_exprs),*]; let name = apache_avro::schema::Name::new(#full_schema_name).expect(&format!("Unable to parse struct name for schema {}", #full_schema_name)[..]); - let lookup: HashMap = schema_fields + let lookup: std::collections::BTreeMap = schema_fields .iter() .map(|field| (field.name.to_owned(), field.position)) .collect(); From 91c57ab24891bf9c4073fb80be94ec7dcd9f3641 Mon Sep 17 00:00:00 2001 From: Martin Tzvetanov Grigorov Date: Tue, 19 Apr 2022 01:12:53 +0300 Subject: [PATCH 3/3] AVRO-3495: Update the test case Signed-off-by: Martin Tzvetanov Grigorov --- lang/rust/avro/tests/schema.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lang/rust/avro/tests/schema.rs b/lang/rust/avro/tests/schema.rs index 33f62893cbf..62eeb2ddf3d 100644 --- a/lang/rust/avro/tests/schema.rs +++ b/lang/rust/avro/tests/schema.rs @@ -1313,6 +1313,7 @@ fn test_decimal_valid_type_attributes() { } */ +// https://github.com/flavray/avro-rs/issues/47 #[test] fn avro_old_issue_47() { init(); @@ -1340,6 +1341,5 @@ fn avro_old_issue_47() { a: 1, }; - let res = to_avro_datum(&schema, to_value(record).unwrap()).unwrap(); - dbg!(res); + let _ = to_avro_datum(&schema, to_value(record).unwrap()).unwrap(); }