From 5b946c2aafe9824568e7ee646c382480cb92878e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20GARNIER?= Date: Tue, 16 May 2023 15:29:41 +0200 Subject: [PATCH 1/2] fix(fluent source): fix ack message format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fluent ack messages were sent as plain JSON instead of msgpack format. Signed-off-by: Benoît GARNIER --- src/sources/fluent/mod.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 76e51ae7a066d..4c8b0e4446cee 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::io::{self, Read}; use std::net::SocketAddr; use std::time::Duration; @@ -9,8 +10,8 @@ use codecs::{BytesDeserializerConfig, StreamDecodingError}; use flate2::read::MultiGzDecoder; use lookup::lookup_v2::parse_value_path; use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix}; -use rmp_serde::{decode, Deserializer}; -use serde::Deserialize; +use rmp_serde::{decode, Deserializer, Serializer}; +use serde::{Deserialize, Serialize}; use smallvec::{smallvec, SmallVec}; use tokio_util::codec::Decoder; use vector_config::configurable_component; @@ -532,15 +533,17 @@ impl TcpSourceAcker for FluentAcker { return None; } - let mut acks = String::new(); + let mut buf = Vec::new(); + let mut ser = Serializer::new(&mut buf); + for chunk in self.chunks { - let ack = match ack { - TcpSourceAck::Ack => format!(r#"{{"ack": "{}"}}"#, chunk), - _ => String::from("{}"), + let mut ack_map = HashMap::new(); + if let TcpSourceAck::Ack = ack { + ack_map.insert("ack", chunk); }; - acks.push_str(&ack); + ack_map.serialize(&mut ser).unwrap(); } - Some(acks.into()) + Some(buf.into()) } } @@ -861,7 +864,8 @@ mod tests { async fn ack_delivered_with_chunk() { let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await; assert_eq!(result.unwrap().unwrap(), output.len()); - assert!(output.starts_with(b"{\"ack\":")); + let expected: Vec = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ... + assert_eq!(output[..expected.len()], expected); } #[tokio::test] @@ -875,7 +879,8 @@ mod tests { async fn ack_failed_with_chunk() { let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await; assert_eq!(result.unwrap().unwrap(), output.len()); - assert_eq!(output, &b"{}"[..]); + let expected: Vec = vec![0x80]; // { } + assert_eq!(output, expected); } async fn check_acknowledgements( From f03e5ac41dce29c968e43037e2961a5fbdad88ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Beno=C3=AEt=20GARNIER?= Date: Thu, 18 May 2023 10:36:12 +0200 Subject: [PATCH 2/2] fix(fluent source): don't allocate ack map on each interation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Benoît GARNIER --- src/sources/fluent/mod.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 4c8b0e4446cee..f25f413590324 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -535,9 +535,10 @@ impl TcpSourceAcker for FluentAcker { let mut buf = Vec::new(); let mut ser = Serializer::new(&mut buf); + let mut ack_map = HashMap::new(); for chunk in self.chunks { - let mut ack_map = HashMap::new(); + ack_map.clear(); if let TcpSourceAck::Ack = ack { ack_map.insert("ack", chunk); };