Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
8702bd2
double hbone (no inner encryption) working!!
Stevenjin8 Jan 14, 2025
e340d8c
hbone-ish
Stevenjin8 Jan 15, 2025
b381e34
cleanup
Stevenjin8 Jan 15, 2025
2192d67
graceful shutdowns
Stevenjin8 Jan 17, 2025
dff6c92
Some cleanup
Stevenjin8 Jan 17, 2025
f1cc535
Add some auth/tls logic back in
Stevenjin8 Jan 21, 2025
cab4849
inline double hbone code
Stevenjin8 Jan 21, 2025
565f41f
Use correct(?) identities
Stevenjin8 Jan 21, 2025
48fa773
Merge branch 'master' into feature/double-hbone
Stevenjin8 Mar 3, 2025
9f5609d
checkpoint
Stevenjin8 Mar 5, 2025
54dd3e3
another checkpoint
Stevenjin8 Mar 5, 2025
58399be
Use new xds
Stevenjin8 Mar 7, 2025
59eedf0
Merge branch 'master' into HEAD
Stevenjin8 Mar 7, 2025
89efd6c
Merge commit '59eedf0' into feature/double-hbone
Stevenjin8 Mar 7, 2025
b2a21bd
lint
Stevenjin8 Mar 7, 2025
896cef5
Fix type sizes
Stevenjin8 Mar 11, 2025
522fd35
Check workload potocol
Stevenjin8 Mar 11, 2025
a56b3ca
wip
Stevenjin8 Mar 14, 2025
b255c25
Merge branch 'master' into feature/double-hbone
Stevenjin8 Mar 24, 2025
08e7a58
initial impl of hostname
Stevenjin8 Mar 26, 2025
a4b8e4b
Merge branch 'master' into feature/double-hbone
Stevenjin8 Mar 27, 2025
4ef17cc
tests passing
Stevenjin8 Mar 28, 2025
11b4f87
svc addressing
Stevenjin8 Mar 28, 2025
8b506cb
Check workload network when using network gateways
Stevenjin8 Mar 28, 2025
862c6f8
Use dummy SNI
Stevenjin8 Mar 28, 2025
d54a820
remove extra derive
Stevenjin8 Mar 28, 2025
2d23fb0
Merge commit 'dc54d850a774e033cc0' into feature/double-hbone
Stevenjin8 Mar 31, 2025
6d398ef
lint
Stevenjin8 Mar 31, 2025
4874f08
more tests
Stevenjin8 Apr 1, 2025
00aa327
Merge branch 'master' into feature/double-hbone
Stevenjin8 Apr 2, 2025
d7cb313
Drop some uneeded changers
Stevenjin8 Apr 2, 2025
7cd4cf1
get rid of more stuff
Stevenjin8 Apr 2, 2025
fa46e46
lint
Stevenjin8 Apr 2, 2025
0638119
lint
Stevenjin8 Apr 2, 2025
04e7d30
Review:
Stevenjin8 Apr 7, 2025
0fbae22
Assume existing workload
Stevenjin8 Apr 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benches/throughput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tokio::sync::Mutex;
use tracing::info;

use ztunnel::rbac::{Authorization, RbacMatch, StringMatch};
use ztunnel::state::workload::{Protocol, Workload};
use ztunnel::state::workload::{InboundProtocol, Workload};
use ztunnel::state::{DemandProxyState, ProxyRbacContext, ProxyState};
use ztunnel::test_helpers::app::{DestinationAddr, TestApp};
use ztunnel::test_helpers::linux::{TestMode, WorkloadManager};
Expand Down Expand Up @@ -457,7 +457,7 @@ fn hbone_connection_config() -> ztunnel::config::ConfigSource {
let lwl = LocalWorkload {
workload: Workload {
workload_ips: vec![hbone_connection_ip(i)],
protocol: Protocol::HBONE,
protocol: InboundProtocol::HBONE,
uid: strng::format!("cluster1//v1/Pod/default/remote{}", i),
name: strng::format!("workload-{}", i),
namespace: strng::format!("namespace-{}", i),
Expand All @@ -471,7 +471,7 @@ fn hbone_connection_config() -> ztunnel::config::ConfigSource {
let lwl = LocalWorkload {
workload: Workload {
workload_ips: vec![],
protocol: Protocol::HBONE,
protocol: InboundProtocol::HBONE,
uid: "cluster1//v1/Pod/default/local-source".into(),
name: "local-source".into(),
namespace: "default".into(),
Expand Down
4 changes: 2 additions & 2 deletions src/cert_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::config;
use crate::config::ProxyMode;
use crate::identity::Priority::Warmup;
use crate::identity::{Identity, Request, SecretManager};
use crate::state::workload::{Protocol, Workload};
use crate::state::workload::{InboundProtocol, Workload};
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::{debug, error, info};
Expand Down Expand Up @@ -96,7 +96,7 @@ impl CertFetcherImpl {
// We only get certs for our own node
Some(w.node.as_ref()) == self.local_node.as_deref() &&
// If it doesn't support HBONE it *probably* doesn't need a cert.
(w.native_tunnel || w.protocol == Protocol::HBONE)
(w.native_tunnel || w.protocol == InboundProtocol::HBONE)
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,9 @@ pub enum Error {
#[error("unknown waypoint: {0}")]
UnknownWaypoint(String),

#[error("unknown network gateway: {0}")]
UnknownNetworkGateway(String),

#[error("no service or workload for hostname: {0}")]
NoHostname(String),

Expand Down
12 changes: 6 additions & 6 deletions src/proxy/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::net::SocketAddr;

use crate::drain;
use crate::drain::{DrainTrigger, DrainWatcher};
use crate::state::workload::Protocol;
use crate::state::workload::{InboundProtocol, OutboundProtocol};
use std::sync::Arc;
use std::sync::RwLock;
use tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -134,7 +134,7 @@ pub struct OutboundConnection {
pub src: SocketAddr,
pub original_dst: SocketAddr,
pub actual_dst: SocketAddr,
pub protocol: Protocol,
pub protocol: OutboundProtocol,
}

#[derive(Debug, Clone, Eq, Hash, Ord, PartialEq, PartialOrd, serde::Serialize)]
Expand All @@ -143,7 +143,7 @@ pub struct InboundConnectionDump {
pub src: SocketAddr,
pub original_dst: Option<String>,
pub actual_dst: SocketAddr,
pub protocol: Protocol,
pub protocol: InboundProtocol,
}

#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize)]
Expand All @@ -160,7 +160,7 @@ impl ConnectionManager {
src: SocketAddr,
original_dst: SocketAddr,
actual_dst: SocketAddr,
protocol: Protocol,
protocol: OutboundProtocol,
) -> OutboundConnectionGuard {
let c = OutboundConnection {
src,
Expand Down Expand Up @@ -284,9 +284,9 @@ impl Serialize for ConnectionManager {
original_dst: c.dest_service,
actual_dst: c.ctx.conn.dst,
protocol: if c.ctx.conn.src_identity.is_some() {
Protocol::HBONE
InboundProtocol::HBONE
} else {
Protocol::TCP
InboundProtocol::TCP
},
})
.collect();
Expand Down
67 changes: 66 additions & 1 deletion src/proxy/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use crate::copy;
use bytes::Bytes;
use bytes::{BufMut, Bytes};
use futures_core::ready;
use h2::Reason;
use std::io::Error;
Expand Down Expand Up @@ -85,6 +85,8 @@ pub struct H2StreamWriteHalf {
_dropped: Option<DropCounter>,
}

pub struct TokioH2Stream(H2Stream);

struct DropCounter {
// Whether the other end of this shared counter has already dropped.
// We only decrement if they have, so we do not double count
Expand Down Expand Up @@ -138,6 +140,69 @@ impl Drop for DropCounter {
}
}

// We can't directly implement tokio::io::{AsyncRead, AsyncWrite} for H2Stream because
// then the specific implementation will conflict with the generic one.
impl TokioH2Stream {
pub fn new(stream: H2Stream) -> Self {
Self(stream)
}
}

impl tokio::io::AsyncRead for TokioH2Stream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let pinned = std::pin::Pin::new(&mut self.0.read);
copy::ResizeBufRead::poll_bytes(pinned, cx).map(|r| match r {
Ok(bytes) => {
if buf.remaining() < bytes.len() {
Err(Error::new(
std::io::ErrorKind::Other,
format!(
"kould overflow buffer of with {} remaining",
buf.remaining()
),
))
} else {
buf.put(bytes);
Ok(())
}
}
Err(e) => Err(e),
})
}
}

impl tokio::io::AsyncWrite for TokioH2Stream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, tokio::io::Error>> {
let pinned = std::pin::Pin::new(&mut self.0.write);
let buf = Bytes::copy_from_slice(buf);
copy::AsyncWriteBuf::poll_write_buf(pinned, cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let pinned = std::pin::Pin::new(&mut self.0.write);
copy::AsyncWriteBuf::poll_flush(pinned, cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
let pinned = std::pin::Pin::new(&mut self.0.write);
copy::AsyncWriteBuf::poll_shutdown(pinned, cx)
}
}

impl copy::ResizeBufRead for H2StreamReadHalf {
fn poll_bytes(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<Bytes>> {
let this = self.get_mut();
Expand Down
4 changes: 1 addition & 3 deletions src/proxy/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU16, Ordering};
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::oneshot;
use tokio::sync::watch::Receiver;
use tokio_rustls::client::TlsStream;
use tracing::{Instrument, debug, error, trace, warn};

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -147,7 +145,7 @@ impl H2ConnectClient {

pub async fn spawn_connection(
cfg: Arc<config::Config>,
s: TlsStream<TcpStream>,
s: impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
driver_drain: Receiver<bool>,
wl_key: WorkloadKey,
) -> Result<H2ConnectClient, Error> {
Expand Down
4 changes: 2 additions & 2 deletions src/proxy/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ mod tests {
self, DemandProxyState,
service::{Endpoint, EndpointSet, Service},
workload::{
ApplicationTunnel, GatewayAddress, NetworkAddress, Protocol, Workload,
ApplicationTunnel, GatewayAddress, InboundProtocol, NetworkAddress, Workload,
application_tunnel::Protocol as AppProtocol, gatewayaddress::Destination,
},
},
Expand Down Expand Up @@ -922,7 +922,7 @@ mod tests {
.map(|(name, ip, waypoint, app_tunnel)| Workload {
workload_ips: vec![ip.parse().unwrap()],
waypoint: waypoint.workload_attached(),
protocol: Protocol::HBONE,
protocol: InboundProtocol::HBONE,
uid: strng::format!("cluster1//v1/Pod/default/{name}"),
name: strng::format!("workload-{name}"),
namespace: "default".into(),
Expand Down
Loading