Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better-behaved HBONE pooling #931

Merged
merged 46 commits into from
Apr 26, 2024
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c7ec54e
Attempt at smarter HBONE pooling between ztunnels
bleggett Apr 9, 2024
af0d232
Lints
bleggett Apr 16, 2024
58e7ab9
lints 2
bleggett Apr 16, 2024
5f8e182
Hmm
bleggett Apr 16, 2024
a585659
Fixup
bleggett Apr 16, 2024
05e3401
Fixup
bleggett Apr 16, 2024
2a2fa32
More comments
bleggett Apr 16, 2024
add0dce
cleanup
bleggett Apr 16, 2024
b290040
fixup
bleggett Apr 16, 2024
17de2be
Clean
bleggett Apr 16, 2024
6ebfc30
Fix jemalloc
bleggett Apr 16, 2024
ed2a284
WIP: move out of proxyinfo
bleggett Apr 16, 2024
5abf00f
Evict pooled conns after $INTERVAL
bleggett Apr 17, 2024
86d458c
Update src/proxy/pool.rs
bleggett Apr 17, 2024
417ee87
Evict pooled conns after $INTERVAL
bleggett Apr 17, 2024
996ac4b
For now, just do the foolproof collision check
bleggett Apr 17, 2024
c413650
Don't be silly
bleggett Apr 17, 2024
cf6b7c1
Naming, review comments
bleggett Apr 17, 2024
036a2e6
Tidy Arcs+drains
bleggett Apr 18, 2024
b04e5e9
Cleanups
bleggett Apr 18, 2024
53e790a
Format
bleggett Apr 18, 2024
6be0b55
Use the fancy lockless outer map, drop realm-io
bleggett Apr 18, 2024
b461cf3
Cleanup comments
bleggett Apr 18, 2024
d4b4d30
Fix outdent (review comment)
bleggett Apr 18, 2024
fe5ea2c
Fixups/review comments
bleggett Apr 19, 2024
6ff22cc
resync
bleggett Apr 22, 2024
2025a0f
Droptests
bleggett Apr 22, 2024
4a0dc47
fix testhang
bleggett Apr 22, 2024
852100c
add smarter evict test
bleggett Apr 22, 2024
45f23ac
Interesting failure
bleggett Apr 22, 2024
9ec19b2
No, it's not
bleggett Apr 22, 2024
c2f9015
Make this a bit simpler
bleggett Apr 22, 2024
bde5b45
Separate out the connspawner
bleggett Apr 22, 2024
22840ae
Tidy logging a bit
bleggett Apr 22, 2024
f1b102a
Add serverside keepalive
bleggett Apr 23, 2024
795b4ae
fixup
bleggett Apr 23, 2024
e16368a
Just for kicks
bleggett Apr 24, 2024
969b99a
D'oh - use mthread runtime for tests
bleggett Apr 25, 2024
bdb1c86
Fix none race
bleggett Apr 25, 2024
bd2ca75
Propagate connection establish errors
bleggett Apr 25, 2024
fb872fd
Cleanup
bleggett Apr 25, 2024
3105377
Work around local test server getting overloaded
bleggett Apr 26, 2024
e713b17
Move the rest to multi_thread, chill out on iterations, work around test
bleggett Apr 26, 2024
4495fc2
Tidy comments
bleggett Apr 26, 2024
7e794b6
lints
bleggett Apr 26, 2024
8737cd7
Clarify comment
bleggett Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Format
Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
bleggett committed Apr 26, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 53e790abdadfe91dc416343017e895832f61cc1a
6 changes: 5 additions & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
@@ -395,7 +395,11 @@ fn change_log_level(reset: bool, level: &str) -> Response<Full<Bytes>> {
async fn handle_jemalloc_pprof_heapgen(
_req: Request<Incoming>,
) -> anyhow::Result<Response<Full<Bytes>>> {
let mut prof_ctl = jemalloc_pprof::PROF_CTL.as_ref().expect("should init").lock().await;
let mut prof_ctl = jemalloc_pprof::PROF_CTL
.as_ref()
.expect("should init")
.lock()
.await;
if !prof_ctl.activated() {
return Ok(Response::builder()
.status(hyper::StatusCode::INTERNAL_SERVER_ERROR)
45 changes: 18 additions & 27 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
@@ -80,9 +80,10 @@ impl Outbound {
let (sub_drain_signal, sub_drain) = drain::channel();

let pool = proxy::pool::WorkloadHBONEPool::new(
self.pi.cfg.clone(),
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone());
self.pi.cfg.clone(),
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone(),
);
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
@@ -147,13 +148,8 @@ impl OutboundConnection {
let source_addr =
socket::to_canonical(source_stream.peer_addr().expect("must receive peer addr"));
let dst_addr = socket::orig_dst_addr_or_default(&source_stream);
self.proxy_to(
source_stream,
source_addr,
dst_addr,
false,
)
.await;
self.proxy_to(source_stream, source_addr, dst_addr, false)
.await;
}

// this is a cancellable outbound proxy. If `out_drain` is a Watch drain, will resolve
@@ -251,13 +247,8 @@ impl OutboundConnection {

let res = match req.protocol {
Protocol::HBONE => {
self.proxy_to_hbone(
&mut source_stream,
source_addr,
&req,
&result_tracker,
)
.await
self.proxy_to_hbone(&mut source_stream, source_addr, &req, &result_tracker)
.await
}
Protocol::TCP => {
self.proxy_to_tcp(&mut source_stream, &req, &result_tracker)
@@ -304,9 +295,11 @@ impl OutboundConnection {
};

debug!("outbound - connection get START");
let mut connection = self.pool.connect(pool_key.clone())
.instrument(trace_span!("get pool conn"))
.await?;
let mut connection = self
.pool
.connect(pool_key.clone())
.instrument(trace_span!("get pool conn"))
.await?;
debug!("outbound - connection get END");

let mut f = http_types::proxies::Forwarded::new();
@@ -329,9 +322,10 @@ impl OutboundConnection {
// There are scenarios (upstream hangup, etc) where this "send" will simply get stuck.
// As in, stream processing deadlocks, and `send_request` never resolves to anything.
// Probably related to https://github.com/hyperium/hyper/issues/3623
let response = connection.send_request(request)
.instrument(trace_span!("send pool conn"))
.await?;
let response = connection
.send_request(request)
.instrument(trace_span!("send pool conn"))
.await?;
debug!("outbound - connection send END");

let code = response.status();
@@ -669,10 +663,7 @@ mod tests {
connection_manager: ConnectionManager::default(),
},
id: TraceParent::new(),
pool: pool::WorkloadHBONEPool::new(
cfg,
sock_fact,
cert_mgr.clone()),
pool: pool::WorkloadHBONEPool::new(cfg, sock_fact, cert_mgr.clone()),
};

let req = outbound
43 changes: 26 additions & 17 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
@@ -14,11 +14,11 @@

use super::{Error, SocketFactory};
use bytes::Bytes;
use std::time::Duration;
use http_body_util::Empty;
use hyper::body::Incoming;
use hyper::client::conn::http2;
use hyper::http::{Request, Response};
use std::time::Duration;

use std::collections::hash_map::DefaultHasher;
use std::future::Future;
@@ -103,8 +103,13 @@ impl PoolState {
let pool_key_ref = pool_key.clone();
let release_timeout = self.pool_unused_release_timeout;
self.close_pollers.push(tokio::spawn(async move {
pool_ref.idle_timeout(&pool_key_ref, release_timeout, evict, rx, pickup).await;
debug!("connection {:#?} was removed/checked out/timed out of the pool", pool_key_ref)
pool_ref
.idle_timeout(&pool_key_ref, release_timeout, evict, rx, pickup)
.await;
debug!(
"connection {:#?} was removed/checked out/timed out of the pool",
pool_key_ref
)
}));
let _ = self.pool_notifier.send(true);
}
@@ -121,11 +126,17 @@ impl PoolState {
debug!("first checkout INNER WRITELOCK");
let _conn_lock = exist_conn_lock.as_ref().unwrap().lock().await;

debug!("getting conn for key {:#?} and hash {:#?}", workload_key, pool_key.key);
debug!(
"getting conn for key {:#?} and hash {:#?}",
workload_key, pool_key.key
);
self.connected_pool.get(&pool_key.key).and_then(|e_conn| {
debug!("got existing conn for key {:#?}", workload_key);
if e_conn.at_max_streamcount() {
debug!("got conn for key {:#?}, but streamcount is maxed", workload_key);
debug!(
"got conn for key {:#?}, but streamcount is maxed",
workload_key
);
None
} else {
self.checkin_conn(e_conn.clone(), pool_key.clone());
@@ -136,7 +147,6 @@ impl PoolState {
None => None,
}
}

}

impl Drop for PoolState {
@@ -159,12 +169,9 @@ impl WorkloadHBONEPool {
let (timeout_send, timeout_recv) = watch::channel(false);
let max_count = cfg.pool_max_streams_per_conn;
let pool_duration = cfg.pool_unused_release_timeout;
debug!(
"constructing pool with {:#?} streams per conn",
max_count
);
debug!("constructing pool with {:#?} streams per conn", max_count);
Self {
state: Arc::new(PoolState{
state: Arc::new(PoolState {
pool_notifier: timeout_tx,
timeout_tx: timeout_send,
timeout_rx: timeout_recv,
@@ -211,7 +218,10 @@ impl WorkloadHBONEPool {
//
// This is so we can backpressure correctly if 1000 tasks all demand a new connection
// to the same key at once, and not eagerly open 1000 tunnel connections.
let existing_conn = self.state.first_checkout_conn_from_pool(&workload_key, &pool_key).await;
let existing_conn = self
.state
.first_checkout_conn_from_pool(&workload_key, &pool_key)
.await;

debug!("pool connect GOT EXISTING");
if existing_conn.is_some() {
@@ -406,7 +416,6 @@ impl WorkloadHBONEPool {
}
}
})

}

async fn spawn_new_pool_conn(
@@ -461,7 +470,6 @@ impl WorkloadHBONEPool {

Ok(request_sender)
}

}

#[derive(Debug, Clone)]
@@ -470,7 +478,7 @@ impl WorkloadHBONEPool {
pub struct ConnClient {
sender: http2::SendRequest<Empty<Bytes>>,
stream_count: Arc<AtomicU16>, // the current streamcount for this client conn.
stream_count_max: u16, // the max streamcount associated with this client.
stream_count_max: u16, // the max streamcount associated with this client.
// A WL key may have many clients, but every client has no more than one WL key
wl_key: WorkloadKey, // the WL key associated with this client.
}
@@ -496,12 +504,13 @@ impl ConnClient {

pub fn is_for_workload(&self, wl_key: WorkloadKey) -> Result<(), crate::proxy::Error> {
if !(self.wl_key == wl_key) {
Err(crate::proxy::Error::Generic("fetched connection does not match workload key!".into()))
Err(crate::proxy::Error::Generic(
"fetched connection does not match workload key!".into(),
))
} else {
Ok(())
}
}

}

#[derive(PartialEq, Eq, Hash, Clone, Debug)]
7 changes: 4 additions & 3 deletions src/proxy/socks5.rs
Original file line number Diff line number Diff line change
@@ -68,9 +68,10 @@ impl Socks5 {
// TODO creating a new HBONE pool for SOCKS5 here may not be ideal,
// but ProxyInfo is overloaded and only `outbound` should ever use the pool.
let pool = crate::proxy::pool::WorkloadHBONEPool::new(
self.pi.cfg.clone(),
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone());
self.pi.cfg.clone(),
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone(),
);
match socket {
Ok((stream, remote)) => {
info!("accepted outbound connection from {}", remote);