diff --git a/examples/localhost.yaml b/examples/localhost.yaml index a75800680b..49f09f6956 100644 --- a/examples/localhost.yaml +++ b/examples/localhost.yaml @@ -14,20 +14,20 @@ workloads: 80: 8080 "default/example2.com": 80: 8080 -# Define another local address, but this one uses TCP. This allows testing HBONE and TCP with one config. -- uid: cluster1//v1/Pod/default/local-tcp - name: local-tcp +- uid: cluster1//v1/Pod/default/local2 + name: local2 namespace: default serviceAccount: default workloadIps: ["127.0.0.2"] - protocol: TCP + protocol: HBONE node: local network: "" services: "default/example.com": - 80: 8080 + 80: 8081 "default/example2.com": 80: 8080 + policies: - action: Allow rules: diff --git a/src/copy.rs b/src/copy.rs index e186424e44..7bf85604cd 100644 --- a/src/copy.rs +++ b/src/copy.rs @@ -30,15 +30,15 @@ use tracing::trace; // BufferedSplitter is a trait to expose splitting an IO object into a buffered reader and a writer pub trait BufferedSplitter: Unpin { - type R: ResizeBufRead + Unpin; - type W: AsyncWriteBuf + Unpin; + type R: ResizeBufRead + Unpin + Send + Sync; + type W: AsyncWriteBuf + Unpin + Send + Sync; fn split_into_buffered_reader(self) -> (Self::R, Self::W); } // Generic BufferedSplitter for anything that can Read/Write. impl BufferedSplitter for I where - I: AsyncRead + AsyncWrite + Unpin, + I: AsyncRead + AsyncWrite + Unpin + Send + Sync, { type R = BufReader>; type W = WriteAdapter>; diff --git a/src/proxy/outbound.rs b/src/proxy/outbound.rs index 19e24ff8b2..9d04e7a4f3 100644 --- a/src/proxy/outbound.rs +++ b/src/proxy/outbound.rs @@ -30,6 +30,7 @@ use crate::proxy::metrics::Reporter; use crate::proxy::{BAGGAGE_HEADER, Error, ProxyInputs, TRACEPARENT_HEADER, TraceParent, util}; use crate::proxy::{ConnectionOpen, ConnectionResult, DerivedWorkload, metrics, pool}; +use crate::copy::BufferedSplitter; use crate::drain::DrainWatcher; use crate::drain::run_with_drain; use crate::proxy::h2::H2Stream; @@ -119,7 +120,7 @@ impl Outbound { } } } - .in_current_span() + .in_current_span() }; run_with_drain( @@ -139,6 +140,11 @@ pub(super) struct OutboundConnection { pub(super) hbone_port: u16, } +enum Connection { + Tcp(TcpStream), + Hbone(H2Stream), +} + impl OutboundConnection { async fn proxy(&mut self, source_stream: TcpStream) { let source_addr = @@ -161,60 +167,80 @@ impl OutboundConnection { metrics::log_early_deny(source_addr, dest_addr, Reporter::source, Error::SelfCall); return; } - // First find the source workload of this traffic. If we don't know where the request is from - // we will reject it. - let build = self - .pi - .local_workload_information - .get_workload() - .and_then(|source| self.build_request(source, source_addr.ip(), dest_addr)); - let req = match Box::pin(build).await { - Ok(req) => Box::new(req), - Err(err) => { - metrics::log_early_deny(source_addr, dest_addr, Reporter::source, err); - return; - } - }; - // TODO: should we use the original address or the actual address? Both seems nice! - let _conn_guard = self.pi.connection_manager.track_outbound( - source_addr, - dest_addr, - req.actual_destination, - ); + for attempt in 0..5 { + // First find the source workload of this traffic. If we don't know where the request is from + // we will reject it. + let build = self + .pi + .local_workload_information + .get_workload() + .and_then(|source| self.build_request(source, source_addr.ip(), dest_addr)); + let req = match Box::pin(build).await { + Ok(req) => Box::new(req), + Err(err) => { + metrics::log_early_deny(source_addr, dest_addr, Reporter::source, err); + return; + } + }; + // TODO: should we use the original address or the actual address? Both seems nice! + let _conn_guard = self.pi.connection_manager.track_outbound( + source_addr, + dest_addr, + req.actual_destination, + ); - let metrics = self.pi.metrics.clone(); - let hbone_target = req.hbone_target_destination; - let result_tracker = Box::new(ConnectionResult::new( - source_addr, - req.actual_destination, - hbone_target, - start, - Self::conn_metrics_from_request(&req), - metrics, - )); + let metrics = self.pi.metrics.clone(); + let hbone_target = req.hbone_target_destination; + let result_tracker = Box::new(ConnectionResult::new( + source_addr, + req.actual_destination, + hbone_target, + start, + Self::conn_metrics_from_request(&req), + metrics, + )); - let res = match req.protocol { - Protocol::HBONE => { - self.proxy_to_hbone(source_stream, source_addr, &req, &result_tracker) - .await - } - Protocol::TCP => { - self.proxy_to_tcp(source_stream, &req, &result_tracker) - .await + let res = match req.protocol { + Protocol::HBONE => self.proxy_to_hbone(source_addr, &req).await, + Protocol::TCP => self.proxy_to_tcp(&req).await, + }; + + match res { + Err(e) => { + result_tracker.record(Err(e)); + tracing::error!("howardjohn: RETRY"); + } + Ok(upstream) => { + // Proxying data between downstream and upstream + let src = copy::TcpStreamSplitter(source_stream); + let res = match upstream { + Connection::Tcp(s) => { + copy::copy_bidirectional( + src, + copy::TcpStreamSplitter(s), + &result_tracker, + ) + .await + } + Connection::Hbone(s) => { + copy::copy_bidirectional(src, s, &result_tracker).await + } + }; + result_tracker.record(res); + return; + } } - }; - result_tracker.record(res) + } } async fn proxy_to_hbone( &mut self, - stream: TcpStream, remote_addr: SocketAddr, req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - let upgraded = Box::pin(self.send_hbone_request(remote_addr, req)).await?; - copy::copy_bidirectional(copy::TcpStreamSplitter(stream), upgraded, connection_stats).await + ) -> Result { + self.send_hbone_request(remote_addr, req) + .await + .map(Connection::Hbone) } async fn send_hbone_request( @@ -253,26 +279,15 @@ impl OutboundConnection { Ok(upgraded) } - async fn proxy_to_tcp( - &mut self, - stream: TcpStream, - req: &Request, - connection_stats: &ConnectionResult, - ) -> Result<(), Error> { - let outbound = super::freebind_connect( - None, // No need to spoof source IP on outbound - req.actual_destination, - self.pi.socket_factory.as_ref(), - ) - .await?; - - // Proxying data between downstream and upstream - copy::copy_bidirectional( - copy::TcpStreamSplitter(stream), - copy::TcpStreamSplitter(outbound), - connection_stats, - ) - .await + async fn proxy_to_tcp(&mut self, req: &Request) -> Result { + Ok(Connection::Tcp( + super::freebind_connect( + None, // No need to spoof source IP on outbound + req.actual_destination, + self.pi.socket_factory.as_ref(), + ) + .await?, + )) } fn conn_metrics_from_request(req: &Request) -> ConnectionOpen {