@@ -107,7 +107,9 @@ pub async fn request_ephemeral_peer(
107
107
enable_post_quantum : bool ,
108
108
enable_daita : bool ,
109
109
) -> Result < EphemeralPeer , Error > {
110
+ log:: debug!( "Connecting to relay config service at {service_address}" ) ;
110
111
let client = connect_relay_config_client ( service_address) . await ?;
112
+ log:: debug!( "Connected to relay config service at {service_address}" ) ;
111
113
112
114
request_ephemeral_peer_with (
113
115
client,
@@ -128,6 +130,7 @@ pub async fn request_ephemeral_peer_with(
128
130
) -> Result < EphemeralPeer , Error > {
129
131
let ( pq_request, kem_secrets) = if enable_quantum_resistant {
130
132
let ( pq_request, kem_secrets) = post_quantum_secrets ( ) . await ;
133
+ log:: debug!( "Generated PQ secrets" ) ;
131
134
( Some ( pq_request) , Some ( kem_secrets) )
132
135
} else {
133
136
( None , None )
@@ -275,20 +278,92 @@ fn xor_assign(dst: &mut [u8; 32], src: &[u8; 32]) {
275
278
/// value has been speficically lowered, to avoid MTU issues. See the `socket` module.
276
279
#[ cfg( not( target_os = "ios" ) ) ]
277
280
async fn connect_relay_config_client ( ip : Ipv4Addr ) -> Result < RelayConfigService , Error > {
278
- use futures :: TryFutureExt ;
281
+ use hyper_util :: rt :: tokio :: TokioIo ;
279
282
280
283
let endpoint = Endpoint :: from_static ( "tcp://0.0.0.0:0" ) ;
281
284
let addr = SocketAddr :: new ( IpAddr :: V4 ( ip) , CONFIG_SERVICE_PORT ) ;
282
285
283
286
let connection = endpoint
284
287
. connect_with_connector ( service_fn ( move |_| async move {
285
288
let sock = socket:: TcpSocket :: new ( ) ?;
286
- sock. connect ( addr)
287
- . map_ok ( hyper_util:: rt:: tokio:: TokioIo :: new)
288
- . await
289
+ let stream = sock. connect ( addr) . await ?;
290
+ let sniffer = socket_sniffer:: SocketSniffer {
291
+ s : stream,
292
+ rx_bytes : 0 ,
293
+ tx_bytes : 0 ,
294
+ start_time : std:: time:: Instant :: now ( ) ,
295
+ } ;
296
+ Ok :: < _ , std:: io:: Error > ( TokioIo :: new ( sniffer) )
289
297
} ) )
290
298
. await
291
299
. map_err ( Error :: GrpcConnectError ) ?;
292
300
293
301
Ok ( RelayConfigService :: new ( connection) )
294
302
}
303
+
304
+ mod socket_sniffer {
305
+ pub struct SocketSniffer < S > {
306
+ pub s : S ,
307
+ pub rx_bytes : usize ,
308
+ pub tx_bytes : usize ,
309
+ pub start_time : std:: time:: Instant ,
310
+ }
311
+ use std:: {
312
+ io,
313
+ pin:: Pin ,
314
+ task:: { Context , Poll } ,
315
+ } ;
316
+
317
+ use tokio:: io:: AsyncWrite ;
318
+
319
+ use tokio:: io:: { AsyncRead , ReadBuf } ;
320
+
321
+ impl < S > Drop for SocketSniffer < S > {
322
+ fn drop ( & mut self ) {
323
+ let duration = self . start_time . elapsed ( ) ;
324
+ log:: debug!(
325
+ "Tunnel config client connection ended. RX: {} bytes, TX: {} bytes, duration: {} s" ,
326
+ self . rx_bytes,
327
+ self . tx_bytes,
328
+ duration. as_secs( )
329
+ ) ;
330
+ }
331
+ }
332
+
333
+ impl < S : AsyncRead + AsyncWrite + Unpin > AsyncRead for SocketSniffer < S > {
334
+ fn poll_read (
335
+ mut self : Pin < & mut Self > ,
336
+ cx : & mut Context < ' _ > ,
337
+ buf : & mut ReadBuf < ' _ > ,
338
+ ) -> Poll < io:: Result < ( ) > > {
339
+ let initial_data = buf. filled ( ) . len ( ) ;
340
+ let bytes = std:: task:: ready!( Pin :: new( & mut self . s) . poll_read( cx, buf) ) ;
341
+ if bytes. is_ok ( ) {
342
+ self . rx_bytes += buf. filled ( ) . len ( ) . saturating_sub ( initial_data) ;
343
+ }
344
+ Poll :: Ready ( bytes)
345
+ }
346
+ }
347
+
348
+ impl < S : AsyncRead + AsyncWrite + Unpin > AsyncWrite for SocketSniffer < S > {
349
+ fn poll_write (
350
+ mut self : Pin < & mut Self > ,
351
+ cx : & mut Context < ' _ > ,
352
+ buf : & [ u8 ] ,
353
+ ) -> Poll < io:: Result < usize > > {
354
+ let bytes = std:: task:: ready!( Pin :: new( & mut self . s) . poll_write( cx, buf) ) ;
355
+ if let Ok ( bytes) = bytes {
356
+ self . tx_bytes += bytes;
357
+ }
358
+ Poll :: Ready ( bytes)
359
+ }
360
+
361
+ fn poll_flush ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < io:: Result < ( ) > > {
362
+ Pin :: new ( & mut self . s ) . poll_flush ( cx)
363
+ }
364
+
365
+ fn poll_shutdown ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < io:: Result < ( ) > > {
366
+ Pin :: new ( & mut self . s ) . poll_shutdown ( cx)
367
+ }
368
+ }
369
+ }
0 commit comments