Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::io::{self, Read};
use std::net::SocketAddr;
use std::time::Duration;
Expand All @@ -9,8 +10,8 @@ use codecs::{BytesDeserializerConfig, StreamDecodingError};
use flate2::read::MultiGzDecoder;
use lookup::lookup_v2::parse_value_path;
use lookup::{metadata_path, owned_value_path, path, OwnedValuePath, PathPrefix};
use rmp_serde::{decode, Deserializer};
use serde::Deserialize;
use rmp_serde::{decode, Deserializer, Serializer};
use serde::{Deserialize, Serialize};
use smallvec::{smallvec, SmallVec};
use tokio_util::codec::Decoder;
use vector_config::configurable_component;
Expand Down Expand Up @@ -532,15 +533,18 @@ impl TcpSourceAcker for FluentAcker {
return None;
}

let mut acks = String::new();
let mut buf = Vec::new();
let mut ser = Serializer::new(&mut buf);
let mut ack_map = HashMap::new();

for chunk in self.chunks {
let ack = match ack {
TcpSourceAck::Ack => format!(r#"{{"ack": "{}"}}"#, chunk),
_ => String::from("{}"),
ack_map.clear();
if let TcpSourceAck::Ack = ack {
ack_map.insert("ack", chunk);
};
acks.push_str(&ack);
ack_map.serialize(&mut ser).unwrap();
}
Some(acks.into())
Some(buf.into())
}
}

Expand Down Expand Up @@ -861,7 +865,8 @@ mod tests {
async fn ack_delivered_with_chunk() {
let (result, output) = check_acknowledgements(EventStatus::Delivered, true).await;
assert_eq!(result.unwrap().unwrap(), output.len());
assert!(output.starts_with(b"{\"ack\":"));
let expected: Vec<u8> = vec![0x81, 0xa3, 0x61, 0x63]; // { "ack": ...
assert_eq!(output[..expected.len()], expected);
}

#[tokio::test]
Expand All @@ -875,7 +880,8 @@ mod tests {
async fn ack_failed_with_chunk() {
let (result, output) = check_acknowledgements(EventStatus::Rejected, true).await;
assert_eq!(result.unwrap().unwrap(), output.len());
assert_eq!(output, &b"{}"[..]);
let expected: Vec<u8> = vec![0x80]; // { }
assert_eq!(output, expected);
}

async fn check_acknowledgements(
Expand Down