diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 76e51ae7a066d..f25f413590324 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::io::{self, Read}; use std::net::SocketAddr; use std::time::Duration; @@ -9,8 +10,8 @@ use codecs::{BytesDeserializerConfig, StreamDecodingError}; use flate2::read::MultiGzDecoder; use lookup::lookup_v2::parse_value_path; use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix}; -use rmp_serde::{decode, Deserializer}; -use serde::Deserialize; +use rmp_serde::{decode, Deserializer, Serializer}; +use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use tokio_util::codec::Decoder; use vector_config::configurable_component; @@ -532,15 +533,18 @@ impl TcpSourceAcker for FluentAcker { return None; } - let mut acks = String::new(); + let mut buf = Vec::new(); + let mut ser = Serializer::new(&mut buf); + let mut ack_map = HashMap::new(); + for chunk in self.chunks { - let ack = match ack { - TcpSourceAck::Ack => format!(r#"{{"ack": "{}"}}"#, chunk), - _ => String::from("{}"), + ack_map.clear(); + if let TcpSourceAck::Ack = ack { + ack_map.insert("ack", chunk); }; - acks.push_str(&ack); + ack_map.serialize(&mut ser).unwrap(); } - Some(acks.into()) + Some(buf.into()) } } @@ -861,7 +865,8 @@ mod tests { async fn ack_delivered_with_chunk() { let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await; assert_eq!(result.unwrap().unwrap(), output.len()); - assert!(output.starts_with(b"{\"ack\":")); + let expected: Vec = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ... + assert_eq!(output[..expected.len()], expected); } #[tokio::test] @@ -875,7 +880,8 @@ mod tests { async fn ack_failed_with_chunk() { let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await; assert_eq!(result.unwrap().unwrap(), output.len()); - assert_eq!(output, &b"{}"[..]); + let expected: Vec = vec![0x80]; // { } + assert_eq!(output, expected); } async fn check_acknowledgements(