diff --git a/lang/csharp/src/apache/main/CodeGen/CodeGen.cs b/lang/csharp/src/apache/main/CodeGen/CodeGen.cs index 0aba034bb4c..e579d8bb07c 100644 --- a/lang/csharp/src/apache/main/CodeGen/CodeGen.cs +++ b/lang/csharp/src/apache/main/CodeGen/CodeGen.cs @@ -1197,7 +1197,7 @@ private static string ReplaceMappedNamespacesInSchema(string input, IEnumerable< if (namespaceMapping == null || input == null) return input; - // Replace namespace in "namespace" definitions: + // Replace namespace in "namespace" definitions: // "namespace": "originalnamespace" -> "namespace": "mappednamespace" // "namespace": "originalnamespace.whatever" -> "namespace": "mappednamespace.whatever" // Note: It keeps the original whitespaces diff --git a/lang/csharp/src/apache/test/Schema/SchemaTests.cs b/lang/csharp/src/apache/test/Schema/SchemaTests.cs index 89c7f18c2b6..021dde6275a 100644 --- a/lang/csharp/src/apache/test/Schema/SchemaTests.cs +++ b/lang/csharp/src/apache/test/Schema/SchemaTests.cs @@ -172,7 +172,7 @@ private static void testToString(Schema sc, string schema) { try { - //remove any excess spaces in the JSON to normalize the match with toString + //remove any excess spaces in the JSON to normalize the match with toString schema = schema.Replace("{ ", "{") .Replace("} ", "}") .Replace("\" ", "\"") @@ -575,7 +575,7 @@ public void TestUnion(string s, Schema.Type[] types) UnionSchema schema = UnionSchema.Create(types.Select(t => (Schema)PrimitiveSchema.Create(t)).ToList()); Assert.AreEqual(sc, schema); - + Assert.AreEqual(Schema.Type.Union, sc.Tag); UnionSchema us = (UnionSchema)sc; Assert.AreEqual(types.Length, us.Count); diff --git a/lang/csharp/src/apache/test/Util/LogicalTypeTests.cs b/lang/csharp/src/apache/test/Util/LogicalTypeTests.cs index d7443391c36..0129b2a5b45 100644 --- a/lang/csharp/src/apache/test/Util/LogicalTypeTests.cs +++ b/lang/csharp/src/apache/test/Util/LogicalTypeTests.cs @@ -67,7 +67,7 @@ public void TestDecimal( "-123456789123456789.56", "000000000000000001.01", "-000000000000000001.01" - )] string s, + )] string s, [Values( "\"bytes\"", "{\"type\": \"fixed\", \"size\": 16, \"name\": \"n\"}" @@ -94,7 +94,7 @@ public void TestDecimalScale( "-1234567891234567890123456789", "0000000000000000000000000001", "-0000000000000000000000000001" - )] string s, + )] string s, [Values(1, 2, 3, 4, 5, 6, 7, 8)] int scale, [Values( "\"bytes\"", @@ -313,7 +313,7 @@ public void TestLocalTimestampMicrosecond(string s, string e) } expectedDate = expectedDate.ToLocalTime(); - + var avroLocalTimestampMicro = new LocalTimestampMicrosecond(); var convertedDate = (DateTime)avroLocalTimestampMicro.ConvertToLogicalValue(avroLocalTimestampMicro.ConvertToBaseValue(date, schema), schema); Assert.AreEqual(expectedDate, convertedDate); @@ -342,7 +342,7 @@ public void TestTimeMillisecond(string s, string e, bool expectRangeError) var timeMilliSchema = (LogicalSchema)Schema.Parse("{\"type\": \"int\", \"logicalType\": \"time-millis\"}"); var time = TimeSpan.Parse(s); - + var avroTimeMilli = new TimeMillisecond(); if (expectRangeError) @@ -385,7 +385,7 @@ public void TestTimeMicrosecond(string s, string e, bool expectRangeError) var timeMicroSchema = (LogicalSchema)Schema.Parse("{\"type\": \"long\", \"logicalType\": \"time-micros\"}"); var time = TimeSpan.Parse(s); - + var avroTimeMicro = new TimeMicrosecond(); if (expectRangeError) diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestGenerateInteropSingleObjectEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestGenerateInteropSingleObjectEncoding.java new file mode 100644 index 00000000000..f41c69f8a01 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestGenerateInteropSingleObjectEncoding.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Arrays; + +/** + * Generates test_message.bin - a single + * object encoded Avro message. + */ +public class TestGenerateInteropSingleObjectEncoding { + private static final String RESOURCES_FOLDER = System.getProperty("share.dir", "../../../share") + + "/test/data/messageV1"; + private static final File SCHEMA_FILE = new File(RESOURCES_FOLDER + "/test_schema.avsc"); + private static final File MESSAGE_FILE = new File(RESOURCES_FOLDER + "/test_message.bin"); + private static Schema SCHEMA; + private static GenericRecordBuilder BUILDER; + + @BeforeClass + public static void setup() throws IOException { + try (FileInputStream fileInputStream = new FileInputStream(SCHEMA_FILE)) { + SCHEMA = new Schema.Parser().parse(fileInputStream); + BUILDER = new GenericRecordBuilder(SCHEMA); + } + } + + @Test + public void generateData() throws IOException { + MessageEncoder encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA); + BUILDER.set("id", 42L).set("name", "Bill").set("tags", Arrays.asList("dog_lover", "cat_hater")).build(); + ByteBuffer buffer = encoder.encode( + BUILDER.set("id", 42L).set("name", "Bill").set("tags", Arrays.asList("dog_lover", "cat_hater")).build()); + new FileOutputStream(MESSAGE_FILE).write(buffer.array()); + } +} diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestInteropSingleObjectEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestInteropSingleObjectEncoding.java new file mode 100644 index 00000000000..df16817f515 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestInteropSingleObjectEncoding.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.util.Arrays; + +/** + * Tests that test_message.bin is properly encoded single + * object + */ +public class TestInteropSingleObjectEncoding { + private static final String RESOURCES_FOLDER = System.getProperty("share.dir", "../../../share") + + "/test/data/messageV1"; + private static final File SCHEMA_FILE = new File(RESOURCES_FOLDER + "/test_schema.avsc"); + private static final File MESSAGE_FILE = new File(RESOURCES_FOLDER + "/test_message.bin"); + private static Schema SCHEMA; + private static GenericRecordBuilder BUILDER; + + @BeforeClass + public static void setup() throws IOException { + try (FileInputStream fileInputStream = new FileInputStream(SCHEMA_FILE)) { + SCHEMA = new Schema.Parser().parse(fileInputStream); + BUILDER = new GenericRecordBuilder(SCHEMA); + } + } + + @Test + public void checkSingleObjectEncoding() throws IOException { + MessageEncoder encoder = new BinaryMessageEncoder<>(GenericData.get(), SCHEMA); + ByteBuffer buffer = encoder.encode( + BUILDER.set("id", 42L).set("name", "Bill").set("tags", Arrays.asList("dog_lover", "cat_hater")).build()); + byte[] fileBuffer = Files.readAllBytes(MESSAGE_FILE.toPath()); + Assert.assertArrayEquals(fileBuffer, buffer.array()); + } +} diff --git a/lang/rust/avro/examples/test_interop_single_object_encoding.rs b/lang/rust/avro/examples/test_interop_single_object_encoding.rs new file mode 100644 index 00000000000..d5e3edafdee --- /dev/null +++ b/lang/rust/avro/examples/test_interop_single_object_encoding.rs @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use apache_avro::{schema::AvroSchema, types::Value}; + +const RESOURCES_FOLDER: &str = "../../share/test/data/messageV1"; + +struct InteropMessage; + +impl AvroSchema for InteropMessage { + fn get_schema() -> apache_avro::Schema { + let schema = std::fs::read_to_string(format!("{}/test_schema.avsc", RESOURCES_FOLDER)) + .expect("File should exist with schema inside"); + apache_avro::Schema::parse_str(schema.as_str()) + .expect("File should exist with schema inside") + } +} + +impl From for Value { + fn from(_: InteropMessage) -> Value { + Value::Record(vec![ + ("id".into(), 42i64.into()), + ("name".into(), "Bill".into()), + ( + "tags".into(), + Value::Array( + vec!["dog_lover", "cat_hater"] + .into_iter() + .map(|s| s.into()) + .collect(), + ), + ), + ]) + } +} + +fn main() { + let file_message = std::fs::read(format!("{}/test_message.bin", RESOURCES_FOLDER)) + .expect("File with single object not found or error occurred while reading"); + let mut generated_encoding: Vec = Vec::new(); + apache_avro::SingleObjectWriter::::with_capacity(1024) + .expect("resolve expected") + .write_value(InteropMessage, &mut generated_encoding) + .expect("Should encode"); + assert_eq!(file_message, generated_encoding) +} diff --git a/lang/rust/avro/src/encode.rs b/lang/rust/avro/src/encode.rs index 7af2c6976e8..cf4e0f5da02 100644 --- a/lang/rust/avro/src/encode.rs +++ b/lang/rust/avro/src/encode.rs @@ -16,12 +16,16 @@ // under the License. use crate::{ - schema::{NamesRef, Namespace, ResolvedSchema, Schema, SchemaKind}, + schema::{Name, Namespace, ResolvedSchema, Schema, SchemaKind}, types::{Value, ValueKind}, util::{zig_i32, zig_i64}, AvroResult, Error, }; -use std::convert::{TryFrom, TryInto}; +use std::{ + borrow::Borrow, + collections::HashMap, + convert::{TryFrom, TryInto}, +}; /// Encode a `Value` into avro format. /// @@ -47,19 +51,19 @@ fn encode_int(i: i32, buffer: &mut Vec) { zig_i32(i, buffer) } -pub(crate) fn encode_internal( +pub(crate) fn encode_internal>( value: &Value, schema: &Schema, - names: &NamesRef, + names: &HashMap, enclosing_namespace: &Namespace, buffer: &mut Vec, ) -> AvroResult<()> { if let Schema::Ref { ref name } = schema { let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - let resolved = *names + let resolved = names .get(&fully_qualified_name) .ok_or(Error::SchemaResolutionError(fully_qualified_name))?; - return encode_internal(value, resolved, names, enclosing_namespace, buffer); + return encode_internal(value, resolved.borrow(), names, enclosing_namespace, buffer); } match value { diff --git a/lang/rust/avro/src/error.rs b/lang/rust/avro/src/error.rs index d7483ea9126..670c6c4ec77 100644 --- a/lang/rust/avro/src/error.rs +++ b/lang/rust/avro/src/error.rs @@ -411,6 +411,10 @@ pub enum Error { value_kind: ValueKind, supported_schema: Vec, }, + #[error( + "Internal buffer not drained properly. Re-initialize the single object writer struct!" + )] + IllegalSingleObjectWriterState, } impl serde::ser::Error for Error { diff --git a/lang/rust/avro/src/lib.rs b/lang/rust/avro/src/lib.rs index 539707cd7aa..fc391f6ca3b 100644 --- a/lang/rust/avro/src/lib.rs +++ b/lang/rust/avro/src/lib.rs @@ -746,7 +746,7 @@ pub use reader::{from_avro_datum, Reader}; pub use schema::Schema; pub use ser::to_value; pub use util::max_allocation_bytes; -pub use writer::{to_avro_datum, Writer}; +pub use writer::{to_avro_datum, GenericSingleObjectWriter, SingleObjectWriter, Writer}; #[macro_use] extern crate log; diff --git a/lang/rust/avro/src/schema.rs b/lang/rust/avro/src/schema.rs index f111e69d3b6..52478890b4b 100644 --- a/lang/rust/avro/src/schema.rs +++ b/lang/rust/avro/src/schema.rs @@ -434,6 +434,86 @@ impl<'s> ResolvedSchema<'s> { } } +pub(crate) struct ResolvedOwnedSchema { + names: Names, + root_schema: Schema, +} + +impl TryFrom for ResolvedOwnedSchema { + type Error = Error; + + fn try_from(schema: Schema) -> AvroResult { + let names = HashMap::new(); + let mut rs = ResolvedOwnedSchema { + names, + root_schema: schema, + }; + Self::from_internal(&rs.root_schema, &mut rs.names, &None)?; + Ok(rs) + } +} + +impl ResolvedOwnedSchema { + pub(crate) fn get_root_schema(&self) -> &Schema { + &self.root_schema + } + pub(crate) fn get_names(&self) -> &Names { + &self.names + } + + fn from_internal( + schema: &Schema, + names: &mut Names, + enclosing_namespace: &Namespace, + ) -> AvroResult<()> { + match schema { + Schema::Array(schema) | Schema::Map(schema) => { + Self::from_internal(schema, names, enclosing_namespace) + } + Schema::Union(UnionSchema { schemas, .. }) => { + for schema in schemas { + Self::from_internal(schema, names, enclosing_namespace)? + } + Ok(()) + } + Schema::Enum { name, .. } | Schema::Fixed { name, .. } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + if names + .insert(fully_qualified_name.clone(), schema.clone()) + .is_some() + { + Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) + } else { + Ok(()) + } + } + Schema::Record { name, fields, .. } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + if names + .insert(fully_qualified_name.clone(), schema.clone()) + .is_some() + { + Err(Error::AmbiguousSchemaDefinition(fully_qualified_name)) + } else { + let record_namespace = fully_qualified_name.namespace; + for field in fields { + Self::from_internal(&field.schema, names, &record_namespace)? + } + Ok(()) + } + } + Schema::Ref { name } => { + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + names + .get(&fully_qualified_name) + .map(|_| ()) + .ok_or(Error::SchemaResolutionError(fully_qualified_name)) + } + _ => Ok(()), + } + } +} + /// Represents a `field` in a `record` Avro schema. #[derive(Clone, Debug, PartialEq)] pub struct RecordField { diff --git a/lang/rust/avro/src/types.rs b/lang/rust/avro/src/types.rs index 89d66facaff..78b000552e9 100644 --- a/lang/rust/avro/src/types.rs +++ b/lang/rust/avro/src/types.rs @@ -20,7 +20,8 @@ use crate::{ decimal::Decimal, duration::Duration, schema::{ - NamesRef, Precision, RecordField, ResolvedSchema, Scale, Schema, SchemaKind, UnionSchema, + Name, NamesRef, Precision, RecordField, ResolvedSchema, Scale, Schema, SchemaKind, + UnionSchema, }, AvroResult, Error, }; @@ -360,7 +361,11 @@ impl Value { } } - pub(crate) fn validate_internal(&self, schema: &Schema, names: &NamesRef) -> Option { + pub(crate) fn validate_internal>( + &self, + schema: &Schema, + names: &HashMap, + ) -> Option { match (self, schema) { (_, &Schema::Ref { ref name }) => names.get(name).map_or_else( || { @@ -370,7 +375,7 @@ impl Value { names.keys() )); }, - |s| self.validate_internal(s, names), + |s| self.validate_internal(s.borrow(), names), ), (&Value::Null, &Schema::Null) => None, (&Value::Boolean(_), &Schema::Boolean) => None, @@ -1099,7 +1104,7 @@ mod tests { ]; for (value, schema, valid, expected_err_message) in value_schema_valid.into_iter() { - let err_message = value.validate_internal(&schema, &HashMap::default()); + let err_message = value.validate_internal::(&schema, &HashMap::default()); assert_eq!(valid, err_message.is_none()); if !valid { let full_err_message = format!( diff --git a/lang/rust/avro/src/writer.rs b/lang/rust/avro/src/writer.rs index e815f517f7c..849e91d974f 100644 --- a/lang/rust/avro/src/writer.rs +++ b/lang/rust/avro/src/writer.rs @@ -18,14 +18,15 @@ //! Logic handling writing in Avro format at user level. use crate::{ encode::{encode, encode_internal, encode_to_vec}, - schema::{ResolvedSchema, Schema}, + rabin::Rabin, + schema::{AvroSchema, ResolvedOwnedSchema, ResolvedSchema, Schema}, ser::Serializer, types::Value, AvroResult, Codec, Error, }; use rand::random; use serde::Serialize; -use std::{collections::HashMap, convert::TryFrom, io::Write}; +use std::{collections::HashMap, convert::TryFrom, io::Write, marker::PhantomData}; const DEFAULT_BLOCK_SIZE: usize = 16000; const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01"; @@ -352,6 +353,113 @@ fn write_avro_datum>( Ok(()) } +/// Writer that encodes messages according to the single object encoding v1 spec +/// Uses an API similar to the current File Writer +/// Writes all object bytes at once, and drains internal buffer +pub struct GenericSingleObjectWriter { + buffer: Vec, + resolved: ResolvedOwnedSchema, +} + +impl GenericSingleObjectWriter { + pub fn new_with_capacity( + schema: &Schema, + initial_buffer_cap: usize, + ) -> AvroResult { + let fingerprint = schema.fingerprint::(); + let mut buffer = Vec::with_capacity(initial_buffer_cap); + let header = [ + 0xC3, + 0x01, + fingerprint.bytes[0], + fingerprint.bytes[1], + fingerprint.bytes[2], + fingerprint.bytes[3], + fingerprint.bytes[4], + fingerprint.bytes[5], + fingerprint.bytes[6], + fingerprint.bytes[7], + ]; + buffer.extend_from_slice(&header); + + Ok(GenericSingleObjectWriter { + buffer, + resolved: ResolvedOwnedSchema::try_from(schema.clone())?, + }) + } + + /// Write the referenced Value to the provided Write object. Returns a result with the number of bytes written including the header + pub fn write_value_ref(&mut self, v: &Value, writer: &mut W) -> AvroResult { + if self.buffer.len() != 10 { + Err(Error::IllegalSingleObjectWriterState) + } else { + write_value_ref_owned_resolved(&self.resolved, v, &mut self.buffer)?; + writer.write_all(&self.buffer).map_err(Error::WriteBytes)?; + let len = self.buffer.len(); + self.buffer.truncate(10); + Ok(len) + } + } + + /// Write the Value to the provided Write object. Returns a result with the number of bytes written including the header + pub fn write_value(&mut self, v: Value, writer: &mut W) -> AvroResult { + self.write_value_ref(&v, writer) + } +} + +/// Writer that encodes messages according to the single object encoding v1 spec +pub struct SingleObjectWriter +where + T: AvroSchema, +{ + inner: GenericSingleObjectWriter, + _model: PhantomData, +} + +impl SingleObjectWriter +where + T: AvroSchema, +{ + pub fn with_capacity(buffer_cap: usize) -> AvroResult> { + let schema = T::get_schema(); + Ok(SingleObjectWriter { + inner: GenericSingleObjectWriter::new_with_capacity(&schema, buffer_cap)?, + _model: PhantomData, + }) + } +} + +impl SingleObjectWriter +where + T: AvroSchema + Into, +{ + /// Write the Into to the provided Write object. Returns a result with the number + /// of bytes written including the header + pub fn write_value(&mut self, data: T, writer: &mut W) -> AvroResult { + let v: Value = data.into(); + self.inner.write_value_ref(&v, writer) + } +} + +impl SingleObjectWriter +where + T: AvroSchema + Serialize, +{ + /// Write the referenced Serialize object to the provided Write object. Returns a result with + /// the number of bytes written including the header + pub fn write_ref(&mut self, data: &T, writer: &mut W) -> AvroResult { + let mut serializer = Serializer::default(); + let v = data.serialize(&mut serializer)?; + self.inner.write_value_ref(&v, writer) + } + + /// Write the Serialize object to the provided Write object. Returns a result with the number + /// of bytes written including the header + pub fn write(&mut self, data: T, writer: &mut W) -> AvroResult { + self.write_ref(&data, writer) + } +} + fn write_value_ref_resolved( resolved_schema: &ResolvedSchema, value: &Value, @@ -373,6 +481,27 @@ fn write_value_ref_resolved( Ok(()) } +fn write_value_ref_owned_resolved( + resolved_schema: &ResolvedOwnedSchema, + value: &Value, + buffer: &mut Vec, +) -> AvroResult<()> { + if let Some(err) = value.validate_internal( + resolved_schema.get_root_schema(), + resolved_schema.get_names(), + ) { + return Err(Error::ValidationWithReason(err)); + } + encode_internal( + value, + resolved_schema.get_root_schema(), + resolved_schema.get_names(), + &None, + buffer, + )?; + Ok(()) +} + /// Encode a compatible value (implementing the `ToAvro` trait) into Avro format, also /// performing schema validation. /// @@ -943,4 +1072,123 @@ mod tests { assert_eq!(writer.user_metadata, user_meta_data); } + + #[derive(Serialize, Clone)] + struct TestSingleObjectWriter { + a: i64, + b: f64, + c: Vec, + } + + impl AvroSchema for TestSingleObjectWriter { + fn get_schema() -> Schema { + let schema = r#" + { + "type":"record", + "name":"TestSingleObjectWrtierSerialize", + "fields":[ + { + "name":"a", + "type":"long" + }, + { + "name":"b", + "type":"double" + }, + { + "name":"c", + "type":{ + "type":"array", + "items":"string" + } + } + ] + } + "#; + Schema::parse_str(schema).unwrap() + } + } + + impl From for Value { + fn from(obj: TestSingleObjectWriter) -> Value { + Value::Record(vec![ + ("a".into(), obj.a.into()), + ("b".into(), obj.b.into()), + ( + "c".into(), + Value::Array(obj.c.into_iter().map(|s| s.into()).collect()), + ), + ]) + } + } + + #[test] + fn test_single_object_writer() { + let mut buf: Vec = Vec::new(); + let obj = TestSingleObjectWriter { + a: 300, + b: 34.555, + c: vec!["cat".into(), "dog".into()], + }; + let mut writer = GenericSingleObjectWriter::new_with_capacity( + &TestSingleObjectWriter::get_schema(), + 1024, + ) + .expect("Should resolve schema"); + let value = obj.into(); + let written_bytes = writer + .write_value_ref(&value, &mut buf) + .expect("Error serializing properly"); + + assert!(buf.len() > 10, "no bytes written"); + assert_eq!(buf.len(), written_bytes); + assert_eq!(buf[0], 0xC3); + assert_eq!(buf[1], 0x01); + assert_eq!( + &buf[2..10], + &TestSingleObjectWriter::get_schema() + .fingerprint::() + .bytes[..] + ); + let mut msg_binary = Vec::new(); + encode( + &value, + &TestSingleObjectWriter::get_schema(), + &mut msg_binary, + ) + .expect("encode should have failed by here as a dependency of any writing"); + assert_eq!(&buf[10..], &msg_binary[..]) + } + + #[test] + fn test_writer_parity() { + let obj1 = TestSingleObjectWriter { + a: 300, + b: 34.555, + c: vec!["cat".into(), "dog".into()], + }; + + let mut buf1: Vec = Vec::new(); + let mut buf2: Vec = Vec::new(); + let mut buf3: Vec = Vec::new(); + + let mut generic_writer = GenericSingleObjectWriter::new_with_capacity( + &TestSingleObjectWriter::get_schema(), + 1024, + ) + .expect("Should resolve schema"); + let mut specific_writer = SingleObjectWriter::::with_capacity(1024) + .expect("Resolved should pass"); + specific_writer + .write(obj1.clone(), &mut buf1) + .expect("Serialization expected"); + specific_writer + .write_value(obj1.clone(), &mut buf2) + .expect("Serialization expected"); + generic_writer + .write_value(obj1.into(), &mut buf3) + .expect("Serialization expected"); + assert_eq!(buf1, buf2); + assert_eq!(buf1, buf3); + } } diff --git a/lang/rust/build.sh b/lang/rust/build.sh index 7b78acd5d5c..c44c05ad29e 100755 --- a/lang/rust/build.sh +++ b/lang/rust/build.sh @@ -17,7 +17,6 @@ set -e # exit on error -root_dir=$(pwd) build_dir="../../build/rust" dist_dir="../../dist/rust" @@ -35,7 +34,7 @@ function prepare_build { mkdir -p $build_dir } -cd `dirname "$0"` +cd $(dirname "$0") for target in "$@" do @@ -65,6 +64,7 @@ do interop-data-test) prepare_build cargo run --all-features --example test_interop_data + cargo run --all-features --example test_interop_single_object_encoding ;; *) echo "Usage: $0 {lint|test|dist|clean|interop-data-generate|interop-data-test}" >&2 diff --git a/share/test/data/messageV1/README.md b/share/test/data/messageV1/README.md new file mode 100644 index 00000000000..f0350288ce3 --- /dev/null +++ b/share/test/data/messageV1/README.md @@ -0,0 +1,45 @@ +BinaryMessage data in single object encoding https://avro.apache.org/docs/current/spec.html#single_object_encoding + +Ground truth data generated with Java Code + +The binary data will be the V1 single object encoding with the schema of +``` +{ + "type":"record", + "namespace":"org.apache.avro", + "name":"TestMessage", + "fields":[ + { + "name":"id", + "type":"long" + }, + { + "name":"name", + "type":"string" + }, + { + "name":"tags", + "type":{ + "type":"array", + "items":"string" + } + }, + { + "name":"scores", + "type":{ + "type":"map", + "values":"double" + } + } + ] +} +``` + +The sample binary message will have the values equal to the json serialized version of the record shown below +``` +{ + "id": 42, + "name": "Bill", + "tags": ["dog_lover", "cat_hater"] +} +``` diff --git a/share/test/data/messageV1/test_message.bin b/share/test/data/messageV1/test_message.bin new file mode 100644 index 00000000000..609337eb914 Binary files /dev/null and b/share/test/data/messageV1/test_message.bin differ diff --git a/share/test/data/messageV1/test_schema.avsc b/share/test/data/messageV1/test_schema.avsc new file mode 100644 index 00000000000..cd49b72a456 --- /dev/null +++ b/share/test/data/messageV1/test_schema.avsc @@ -0,0 +1,22 @@ +{ + "type":"record", + "namespace":"org.apache.avro", + "name":"TestMessage", + "fields":[ + { + "name":"id", + "type":"long" + }, + { + "name":"name", + "type":"string" + }, + { + "name":"tags", + "type":{ + "type":"array", + "items":"string" + } + } + ] +} \ No newline at end of file