Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3184,7 +3184,7 @@ impl Connection {

let packet: Packet = packet.into();

let mut qlog = QlogRecvPacket::new(len);
let mut qlog = QlogRecvPacket::new(len, remote, path_id);
#[cfg(feature = "qlog")]
qlog.header(&packet, Some(packet_number));

Expand Down Expand Up @@ -3429,7 +3429,7 @@ impl Connection {
ecn: Option<EcnCodepoint>,
partial_decode: PartialDecode,
) {
let qlog = QlogRecvPacket::new(partial_decode.len());
let qlog = QlogRecvPacket::new(partial_decode.len(), remote, path_id);
if let Some(decoded) = packet_crypto::unprotect_header(
partial_decode,
&self.spaces,
Expand Down
9 changes: 6 additions & 3 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ impl<'a, 'b> PacketBuilder<'a, 'b> {
pub(super) fn finish(
self,
conn: &mut Connection,
now: Instant,
#[allow(unused_mut)] mut qlog: QlogSentPacket,
#[allow(unused)] now: Instant,
#[allow(unused)] mut qlog: QlogSentPacket,
) -> (usize, bool) {
debug_assert!(
self.buf.len() <= self.buf.datagram_max_offset() - self.tag_len,
Expand Down Expand Up @@ -319,7 +319,10 @@ impl<'a, 'b> PacketBuilder<'a, 'b> {
trace!(size = %packet_len, short_header = %self.short_header, "wrote packet");
#[cfg(feature = "qlog")]
qlog.finalize(packet_len);
conn.config.qlog_sink.emit_packet_sent(conn, qlog, now);
#[cfg(feature = "qlog")]
conn.config
.qlog_sink
.emit_packet_sent(conn, self.path, qlog, now);
(packet_len, pad)
}

Expand Down
55 changes: 46 additions & 9 deletions quinn-proto/src/connection/qlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#[cfg(feature = "qlog")]
use qlog::{
events::{
Event, EventData, RawInfo,
Event, EventData, ExData, RawInfo,
quic::{
PacketHeader, PacketLost, PacketLostTrigger, PacketReceived, PacketSent, PacketType,
QuicFrame, StreamType,
Expand All @@ -19,7 +19,7 @@ use std::sync::{Arc, Mutex};
use tracing::warn;

use crate::{
Connection, ConnectionId, Instant,
Connection, ConnectionId, Instant, PathId,
connection::{PathData, SentPacket},
packet::SpaceId,
};
Expand All @@ -36,16 +36,26 @@ pub struct QlogStream(pub(crate) Arc<Mutex<QlogStreamer>>);

#[cfg(feature = "qlog")]
impl QlogStream {
fn emit_event(&self, initial_dst_cid: ConnectionId, event: EventData, now: Instant) {
fn emit_event_ex(
&self,
initial_dst_cid: ConnectionId,
event: EventData,
now: Instant,
ex: ExData,
) {
// Time will be overwritten by `add_event_with_instant`
let mut event = Event::with_time(0.0, event);
let mut event = Event::with_time_ex(0.0, event, ex);
event.group_id = Some(initial_dst_cid.to_string());

let mut qlog_streamer = self.0.lock().unwrap();
if let Err(e) = qlog_streamer.add_event_with_instant(event, now) {
warn!("could not emit qlog event: {e}");
}
}

fn emit_event(&self, initial_dst_cid: ConnectionId, event: EventData, now: Instant) {
self.emit_event_ex(initial_dst_cid, event, now, Default::default());
}
}

/// A [`QlogStream`] that may be either dynamically disabled or compiled out entirely
Expand Down Expand Up @@ -158,16 +168,28 @@ impl QlogSink {
}
}

pub(super) fn emit_packet_sent(&self, conn: &Connection, packet: QlogSentPacket, now: Instant) {
pub(super) fn emit_packet_sent(
&self,
conn: &Connection,
path_id: PathId,
packet: QlogSentPacket,
now: Instant,
) {
#[cfg(feature = "qlog")]
{
let Some(stream) = self.stream.as_ref() else {
return;
};
stream.emit_event(
let remote = conn.path_data(path_id).remote;
let ex = [
("dst_addr", remote.to_string()),
("path_id", path_id.to_string()),
];
stream.emit_event_ex(
conn.initial_dst_cid,
EventData::PacketSent(packet.inner),
now,
ex.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
);
}
}
Expand All @@ -186,7 +208,16 @@ impl QlogSink {
let mut packet = packet;
packet.emit_padding();
let event = packet.inner;
stream.emit_event(conn.initial_dst_cid, EventData::PacketReceived(event), now);
let ex = [
("src_addr", packet.remote.to_string()),
("path_id", packet.path_id.to_string()),
];
stream.emit_event_ex(
conn.initial_dst_cid,
EventData::PacketReceived(event),
now,
ex.into_iter().map(|(k, v)| (k.into(), v.into())).collect(),
);
}
}
}
Expand Down Expand Up @@ -231,21 +262,27 @@ pub(crate) struct QlogRecvPacket {
inner: PacketReceived,
#[cfg(feature = "qlog")]
padding: usize,
#[cfg(feature = "qlog")]
remote: SocketAddr,
#[cfg(feature = "qlog")]
path_id: PathId,
}

#[cfg(not(feature = "qlog"))]
impl QlogRecvPacket {
pub(crate) fn new(_len: usize) -> Self {
pub(crate) fn new(_len: usize, _remote: SocketAddr, _path_id: PathId) -> Self {
Self {}
}
}

#[cfg(feature = "qlog")]
impl QlogRecvPacket {
pub(crate) fn new(len: usize) -> Self {
pub(crate) fn new(len: usize, remote: SocketAddr, path_id: PathId) -> Self {
let mut this = Self {
inner: Default::default(),
padding: 0,
remote,
path_id,
};
this.inner.header.length = Some(len as u16);
this
Expand Down
Loading