Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better-behaved HBONE pooling #931

Merged
merged 46 commits into from
Apr 26, 2024
Merged
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
c7ec54e
Attempt at smarter HBONE pooling between ztunnels
bleggett Apr 9, 2024
af0d232
Lints
bleggett Apr 16, 2024
58e7ab9
lints 2
bleggett Apr 16, 2024
5f8e182
Hmm
bleggett Apr 16, 2024
a585659
Fixup
bleggett Apr 16, 2024
05e3401
Fixup
bleggett Apr 16, 2024
2a2fa32
More comments
bleggett Apr 16, 2024
add0dce
cleanup
bleggett Apr 16, 2024
b290040
fixup
bleggett Apr 16, 2024
17de2be
Clean
bleggett Apr 16, 2024
6ebfc30
Fix jemalloc
bleggett Apr 16, 2024
ed2a284
WIP: move out of proxyinfo
bleggett Apr 16, 2024
5abf00f
Evict pooled conns after $INTERVAL
bleggett Apr 17, 2024
86d458c
Update src/proxy/pool.rs
bleggett Apr 17, 2024
417ee87
Evict pooled conns after $INTERVAL
bleggett Apr 17, 2024
996ac4b
For now, just do the foolproof collision check
bleggett Apr 17, 2024
c413650
Don't be silly
bleggett Apr 17, 2024
cf6b7c1
Naming, review comments
bleggett Apr 17, 2024
036a2e6
Tidy Arcs+drains
bleggett Apr 18, 2024
b04e5e9
Cleanups
bleggett Apr 18, 2024
53e790a
Format
bleggett Apr 18, 2024
6be0b55
Use the fancy lockless outer map, drop realm-io
bleggett Apr 18, 2024
b461cf3
Cleanup comments
bleggett Apr 18, 2024
d4b4d30
Fix outdent (review comment)
bleggett Apr 18, 2024
fe5ea2c
Fixups/review comments
bleggett Apr 19, 2024
6ff22cc
resync
bleggett Apr 22, 2024
2025a0f
Droptests
bleggett Apr 22, 2024
4a0dc47
fix testhang
bleggett Apr 22, 2024
852100c
add smarter evict test
bleggett Apr 22, 2024
45f23ac
Interesting failure
bleggett Apr 22, 2024
9ec19b2
No, it's not
bleggett Apr 22, 2024
c2f9015
Make this a bit simpler
bleggett Apr 22, 2024
bde5b45
Separate out the connspawner
bleggett Apr 22, 2024
22840ae
Tidy logging a bit
bleggett Apr 22, 2024
f1b102a
Add serverside keepalive
bleggett Apr 23, 2024
795b4ae
fixup
bleggett Apr 23, 2024
e16368a
Just for kicks
bleggett Apr 24, 2024
969b99a
D'oh - use mthread runtime for tests
bleggett Apr 25, 2024
bdb1c86
Fix none race
bleggett Apr 25, 2024
bd2ca75
Propagate connection establish errors
bleggett Apr 25, 2024
fb872fd
Cleanup
bleggett Apr 25, 2024
3105377
Work around local test server getting overloaded
bleggett Apr 26, 2024
e713b17
Move the rest to multi_thread, chill out on iterations, work around test
bleggett Apr 26, 2024
4495fc2
Tidy comments
bleggett Apr 26, 2024
7e794b6
lints
bleggett Apr 26, 2024
8737cd7
Clarify comment
bleggett Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Tidy Arcs+drains
Signed-off-by: Benjamin Leggett <benjamin.leggett@solo.io>
bleggett committed Apr 26, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit 036a2e626017bc62ce34bb47e349974da4fe6e5d
7 changes: 2 additions & 5 deletions src/proxy/outbound.rs
Original file line number Diff line number Diff line change
@@ -82,8 +82,7 @@ impl Outbound {
let pool = proxy::pool::WorkloadHBONEPool::new(
self.pi.cfg.clone(),
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone(),
sub_drain.clone());
self.pi.cert_manager.clone());
let accept = async move {
loop {
// Asynchronously wait for an inbound socket.
@@ -674,9 +673,7 @@ mod tests {
pool: pool::WorkloadHBONEPool::new(
cfg,
sock_fact,
cert_mgr.clone(),
sub_drain,
),
cert_mgr.clone()),
};

let req = outbound
207 changes: 112 additions & 95 deletions src/proxy/pool.rs
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@
use super::{Error, SocketFactory};
use bytes::Bytes;
use drain::Watch;
use std::time::Duration;
use http_body_util::Empty;
use hyper::body::Incoming;
use hyper::client::conn::http2;
@@ -53,27 +54,71 @@ static GLOBAL_CONN_COUNT: AtomicI32 = AtomicI32::new(0);
// by flow control throttling.
#[derive(Clone)]
pub struct WorkloadHBONEPool {
pool_notifier: Arc<watch::Sender<bool>>, // This is already impl clone? rustc complains that it isn't, tho
pool_watcher: watch::Receiver<bool>,
timeout_send: Arc<watch::Sender<bool>>, // This is already impl clone? rustc complains that it isn't, tho
timeout_recv: watch::Receiver<bool>,
max_streamcount: u16,
// this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops
// and has no actual hyper/http/connection logic.
connected_pool: Arc<pingora_pool::ConnectionPool<ConnClient>>,
cfg: config::Config,
socket_factory: Arc<dyn SocketFactory + Send + Sync>,
cert_manager: Arc<SecretManager>,
drainer: Watch,
state: Arc<PoolState>,
pool_watcher: watch::Receiver<bool>,
max_streamcount: u16,
}

struct PoolState {
pool_notifier: watch::Sender<bool>, // This is already impl clone? rustc complains that it isn't, tho
timeout_tx: watch::Sender<bool>, // This is already impl clone? rustc complains that it isn't, tho
timeout_rx: watch::Receiver<bool>,
// this is effectively just a convenience data type - a rwlocked hashmap with keying and LRU drops
// and has no actual hyper/http/connection logic.
connected_pool: Arc<pingora_pool::ConnectionPool<ConnClient>>,
// this must be a readlockable list-of-locks, so we can lock per-key, not globally, and avoid holding up all conn attempts
established_conn_writelock: Arc<RwLock<HashMap<u64, Option<Mutex<()>>>>>,
close_pollers: Arc<futures::stream::FuturesUnordered<task::JoinHandle<()>>>,
established_conn_writelock: RwLock<HashMap<u64, Option<Mutex<()>>>>,
close_pollers: futures::stream::FuturesUnordered<task::JoinHandle<()>>,
pool_unused_release_timeout: Duration,
}

impl Drop for WorkloadHBONEPool {
impl PoolState {
// This simply puts the connection back into the inner pool,
// and sets up a timed popper, which will resolve
// - when this reference is popped back out of the inner pool (doing nothing)
// - when this reference is evicted from the inner pool (doing nothing)
// - when the timeout_idler is drained (will pop)
// - when the timeout is hit (will pop)
//
// Idle poppers are safe to invoke if the conn they are popping is already gone
// from the inner queue, so we will start one for every insert, let them run or terminate on their own,
// and poll them to completion on shutdown.
//
// Note that "idle" in the context of this pool means "no one has asked for it or dropped it in X time, so prune it".
//
// Pruning the idle connection from the pool does not close it - it simply ensures the pool stops holding a ref.
// hyper self-closes client conns when all refs are dropped and streamcount is 0, so pool consumers must
// drop their checked out conns and/or terminate their streams as well.
//
// Note that this simply removes the client ref from this pool - if other things hold client/streamrefs refs,
// they must also drop those before the underlying connection is fully closed.
fn checkin_conn(&self, conn: ConnClient, pool_key: pingora_pool::ConnectionMeta) {
let (evict, pickup) = self.connected_pool.put(&pool_key, conn);
let rx = self.timeout_rx.clone();
let pool_ref = self.connected_pool.clone();
let pool_key_ref = pool_key.clone();
let release_timeout = self.pool_unused_release_timeout;
self.close_pollers.push(tokio::spawn(async move {
pool_ref.idle_timeout(&pool_key_ref, release_timeout, evict, rx, pickup).await;
debug!("connection {:#?} was removed/checked out/timed out of the pool", pool_key_ref)
}));
let _ = self.pool_notifier.send(true);
}

}
impl Drop for PoolState {
fn drop(&mut self) {
println!("pool dropping, cancelling all outstanding pool eviction timeout spawns");
let _ = self.timeout_send.send(true);
// let sc = Arc::strong_count(&self.timeout_tx);
// debug!("pool dropping, strong count is {sc}");
// if sc == 1 {
debug!("poolstate dropping, cancelling all outstanding pool eviction timeout spawns");
let _ = self.timeout_tx.send(true);
// }
// No need to wait for all `close_pollers` to resolve,
// since this is a drop - the recievers will either get the notification, or
// return an error if their sender drops first - either way they will resolve.
@@ -88,29 +133,35 @@ impl WorkloadHBONEPool {
cfg: crate::config::Config,
socket_factory: Arc<dyn SocketFactory + Send + Sync>,
cert_manager: Arc<SecretManager>,
drainer: Watch, //when signaled, will stop driving all conns in the pool, effectively draining the pool.
// drainer: Watch, //when signaled, will stop driving all conns in the pool, effectively draining the pool.
) -> WorkloadHBONEPool {
let (tx, rx) = watch::channel(false);
let (timeout_tx, timeout_rx) = watch::channel(false);
let (timeout_send, timeout_recv) = watch::channel(false);
let (server_drain_signal, server_drain) = drain::channel();
let max_count = cfg.pool_max_streams_per_conn;
let pool_duration = cfg.pool_unused_release_timeout;
debug!(
"constructing pool with {:#?} streams per conn",
cfg.pool_max_streams_per_conn
max_count
);
Self {
pool_notifier: Arc::new(tx),
pool_watcher: rx,
timeout_send: Arc::new(timeout_send),
timeout_recv,
max_streamcount: cfg.pool_max_streams_per_conn,
// the number here is simply the number of unique src/dest keys
// the pool is expected to track before the inner hashmap resizes.
connected_pool: Arc::new(pingora_pool::ConnectionPool::new(500)),
state: Arc::new(PoolState{
pool_notifier: timeout_tx,
timeout_tx: timeout_send,
timeout_rx: timeout_recv,
// the number here is simply the number of unique src/dest keys
// the pool is expected to track before the inner hashmap resizes.
connected_pool: Arc::new(pingora_pool::ConnectionPool::new(500)),
established_conn_writelock: RwLock::new(HashMap::new()),
close_pollers: futures::stream::FuturesUnordered::new(),
pool_unused_release_timeout: pool_duration,
}),
cfg,
socket_factory,
cert_manager,
drainer,
established_conn_writelock: Arc::new(RwLock::new(HashMap::new())),
close_pollers: Arc::new(futures::stream::FuturesUnordered::new()),
pool_watcher: timeout_rx,
max_streamcount: max_count,
// drainer,
}
}

@@ -169,7 +220,7 @@ impl WorkloadHBONEPool {
// for the rest, a readlock (nonexclusive) is sufficient.
{
debug!("pool connect MAP OUTER WRITELOCK START");
let mut map_write_lock = self.established_conn_writelock.write().await;
let mut map_write_lock = self.state.established_conn_writelock.write().await;
debug!("pool connect MAP OUTER WRITELOCK END");
match map_write_lock.get(&hash_key) {
Some(_) => {
@@ -205,7 +256,7 @@ impl WorkloadHBONEPool {
//
// BEGIN take outer readlock
debug!("pool connect MAP OUTER READLOCK START");
let map_read_lock = self.established_conn_writelock.read().await;
let map_read_lock = self.state.established_conn_writelock.read().await;
debug!("pool connect MAP OUTER READLOCK END");
let exist_conn_lock = map_read_lock.get(&hash_key).unwrap();
// BEGIN take inner writelock
@@ -254,7 +305,7 @@ impl WorkloadHBONEPool {
);
// The sharded mutex for this connkey is already locked - someone else must be making a conn
// if they are, try to wait for it, but bail if we find one and it's got a maxed streamcount.
let existing_conn = self.connected_pool.get(&hash_key);
let existing_conn = self.state.connected_pool.get(&hash_key);
match existing_conn {
None => {
debug!("got nothing");
@@ -302,7 +353,7 @@ impl WorkloadHBONEPool {
// After waiting, we found an available conn we can use, so no need to start another.
// Clone the underlying client, return a copy, and put the other back in the pool.
Some(f_conn) => {
self.checkin_conn(f_conn.clone(), pool_key.clone());
self.state.checkin_conn(f_conn.clone(), pool_key.clone());
Ok(f_conn)
}

@@ -318,7 +369,7 @@ impl WorkloadHBONEPool {
stream_count_max: self.max_streamcount,
wl_key: workload_key.clone(),
};
self.checkin_conn(r_conn.clone(), pool_key.clone());
self.state.checkin_conn(r_conn.clone(), pool_key.clone());
Ok(r_conn)
}
}
@@ -347,20 +398,20 @@ impl WorkloadHBONEPool {
pool_key: &pingora_pool::ConnectionMeta,
) -> Option<ConnClient> {
debug!("first checkout READLOCK");
let map_read_lock = self.established_conn_writelock.read().await;
let map_read_lock = self.state.established_conn_writelock.read().await;
match map_read_lock.get(&pool_key.key) {
Some(exist_conn_lock) => {
debug!("first checkout INNER WRITELOCK");
let _conn_lock = exist_conn_lock.as_ref().unwrap().lock().await;

debug!("getting conn for key {:#?} and hash {:#?}", workload_key, pool_key.key);
self.connected_pool.get(&pool_key.key).and_then(|e_conn| {
self.state.connected_pool.get(&pool_key.key).and_then(|e_conn| {
debug!("got existing conn for key {:#?}", workload_key);
if e_conn.at_max_streamcount() {
debug!("got conn for key {:#?}, but streamcount is maxed", workload_key);
None
} else {
self.checkin_conn(e_conn.clone(), pool_key.clone());
self.state.checkin_conn(e_conn.clone(), pool_key.clone());
Some(e_conn)
}
})
@@ -369,37 +420,6 @@ impl WorkloadHBONEPool {
}
}

// This simply puts the connection back into the inner pool,
// and sets up a timed popper, which will resolve
// - when this reference is popped back out of the inner pool (doing nothing)
// - when this reference is evicted from the inner pool (doing nothing)
// - when the timeout_idler is drained (will pop)
// - when the timeout is hit (will pop)
//
// Idle poppers are safe to invoke if the conn they are popping is already gone
// from the inner queue, so we will start one for every insert, let them run or terminate on their own,
// and poll them to completion on shutdown.
//
// Note that "idle" in the context of this pool means "no one has asked for it or dropped it in X time, so prune it".
//
// Pruning the idle connection from the pool does not close it - it simply ensures the pool stops holding a ref.
// hyper self-closes client conns when all refs are dropped and streamcount is 0, so pool consumers must
// drop their checked out conns and/or terminate their streams as well.
//
// Note that this simply removes the client ref from this pool - if other things hold client/streamrefs refs,
// they must also drop those before the underlying connection is fully closed.
fn checkin_conn(&self, conn: ConnClient, pool_key: pingora_pool::ConnectionMeta) {
let (evict, pickup) = self.connected_pool.put(&pool_key, conn);
let rx = self.timeout_recv.clone();
let pool_ref = self.connected_pool.clone();
let pool_key_ref = pool_key.clone();
let release_timeout = self.cfg.pool_unused_release_timeout;
self.close_pollers.push(tokio::spawn(async move {
pool_ref.idle_timeout(&pool_key_ref, release_timeout, evict, rx, pickup).await;
}));
let _ = self.pool_notifier.send(true);
}

async fn spawn_new_pool_conn(
&self,
key: WorkloadKey,
@@ -430,12 +450,12 @@ impl WorkloadHBONEPool {
// spawn a task to poll the connection and drive the HTTP state
// if we got a drain for that connection, respect it in a race
// it is important to have a drain here, or this connection will never terminate
let driver_drain = self.drainer.clone();
let mut driver_drain = self.state.timeout_rx.clone();
tokio::spawn(async move {
debug!("starting a connection driver for {:?}", clone_key);
tokio::select! {
_ = driver_drain.signaled() => {
debug!("draining outer HBONE connection");
_ = driver_drain.changed() => {
debug!("draining outer HBONE connection {:?}", clone_key);
}
res = connection=> {
match res {
@@ -494,12 +514,6 @@ impl ConnClient {

}

impl Drop for ConnClient {
fn drop(&mut self) {
println!("Dropping Client!");
}
}

#[derive(PartialEq, Eq, Hash, Clone, Debug)]
pub struct WorkloadKey {
pub src_id: Identity,
@@ -541,11 +555,11 @@ mod test {

#[tokio::test]
async fn test_pool_reuses_conn_for_same_key() {
// crate::telemetry::setup_logging();
crate::telemetry::setup_logging();

let (server_drain_signal, server_drain) = drain::channel();

let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -555,7 +569,7 @@ mod test {
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));

let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let key1 = WorkloadKey {
src_id: Identity::default(),
@@ -567,8 +581,12 @@ mod test {
let client2 = spawn_client(pool.clone(), key1.clone(), server_addr, 2).await;
let client3 = spawn_client(pool.clone(), key1, server_addr, 2).await;

drop(pool);
assert!(client1.is_ok());
assert!(client2.is_ok());
assert!(client3.is_ok());

server_drain_signal.drain().await;
drop(pool);
let real_conncount = server_handle.await.unwrap();
assert!(real_conncount == 1, "actual conncount was {real_conncount}");

@@ -580,7 +598,7 @@ mod test {
#[tokio::test]
async fn test_pool_does_not_reuse_conn_for_diff_key() {
let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

// crate::telemetry::setup_logging();

@@ -591,7 +609,7 @@ mod test {
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let key1 = WorkloadKey {
src_id: Identity::default(),
@@ -609,8 +627,8 @@ mod test {
let client1 = spawn_client(pool.clone(), key1, server_addr, 2).await;
let client2 = spawn_client(pool.clone(), key2, server_addr, 2).await;

drop(pool);
server_drain_signal.drain().await;
drop(pool);
let real_conncount = server_handle.await.unwrap();
assert!(real_conncount == 2, "actual conncount was {real_conncount}");

@@ -621,7 +639,7 @@ mod test {
#[tokio::test]
async fn test_pool_respects_per_conn_stream_limit() {
let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -630,7 +648,7 @@ mod test {
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let key1 = WorkloadKey {
src_id: Identity::default(),
@@ -641,8 +659,8 @@ mod test {
let client1 = spawn_client(pool.clone(), key1.clone(), server_addr, 4).await;
let client2 = spawn_client(pool.clone(), key1, server_addr, 2).await;

drop(pool);
server_drain_signal.drain().await;
drop(pool);

let real_conncount = server_handle.await.unwrap();
assert!(real_conncount == 2, "actual conncount was {real_conncount}");
@@ -654,7 +672,7 @@ mod test {
#[tokio::test]
async fn test_pool_handles_many_conns_per_key() {
let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -664,7 +682,7 @@ mod test {
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));

let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let key1 = WorkloadKey {
src_id: Identity::default(),
@@ -690,7 +708,7 @@ mod test {
// crate::telemetry::setup_logging();

let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -699,7 +717,7 @@ mod test {
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let key1 = WorkloadKey {
src_id: Identity::default(),
@@ -724,7 +742,6 @@ mod test {
}

drop(pool);

server_drain_signal.drain().await;
let real_conncount = server_handle.await.unwrap();
assert!(real_conncount == 3, "actual conncount was {real_conncount}");
@@ -735,7 +752,7 @@ mod test {
// crate::telemetry::setup_logging();

let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -744,7 +761,7 @@ mod test {
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let key1 = WorkloadKey {
src_id: Identity::default(),
@@ -780,7 +797,7 @@ mod test {
// crate::telemetry::setup_logging();

let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -789,7 +806,7 @@ mod test {
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let client_count = 100;
let mut count = 0u8;
@@ -832,7 +849,7 @@ mod test {
// crate::telemetry::setup_logging();

let (server_drain_signal, server_drain) = drain::channel();
let (server_addr, server_handle) = spawn_server(server_drain.clone()).await;
let (server_addr, server_handle) = spawn_server(server_drain).await;

let cfg = crate::config::Config {
local_node: Some("local-node".to_string()),
@@ -841,7 +858,7 @@ mod test {
};
let sock_fact = Arc::new(crate::proxy::DefaultSocketFactory);
let cert_mgr = identity::mock::new_secret_manager(Duration::from_secs(10));
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr, server_drain);
let pool = WorkloadHBONEPool::new(cfg.clone(), sock_fact, cert_mgr);

let mut key1 = WorkloadKey {
src_id: Identity::default(),
3 changes: 1 addition & 2 deletions src/proxy/socks5.rs
Original file line number Diff line number Diff line change
@@ -70,8 +70,7 @@ impl Socks5 {
let pool = crate::proxy::pool::WorkloadHBONEPool::new(
self.pi.cfg.clone(),
self.pi.socket_factory.clone(),
self.pi.cert_manager.clone(),
stream_drain.clone());
self.pi.cert_manager.clone());
match socket {
Ok((stream, remote)) => {
info!("accepted outbound connection from {}", remote);