diff --git a/Cargo.lock b/Cargo.lock index 9891e00ba..525fe64be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1007,6 +1007,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cexpr" version = "0.6.0" @@ -1254,6 +1260,7 @@ dependencies = [ "rand_chacha 0.3.1", "regex", "register-count", + "reqwest 0.13.2", "russh", "rustls", "security-framework", @@ -1269,6 +1276,8 @@ dependencies = [ "sock2proc", "socket2 0.6.3", "ssh-key", + "sysinfo 0.38.3", + "tar", "tempfile", "thiserror 2.0.18", "time", @@ -1349,6 +1358,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "compact_str" version = "0.8.1" @@ -3579,9 +3598,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2 0.6.3", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -3820,6 +3841,7 @@ dependencies = [ "ed25519-dalek", "hex", "hmac", + "num-bigint-dig", "p256", "p384", "p521", @@ -4008,6 +4030,28 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "jni" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a87aa2bb7d2af34197c04845522473242e1aa17c12f4935d5856491a7fb8c97" +dependencies = [ + "cesu8", + "cfg-if", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.34" @@ -4535,7 +4579,7 @@ dependencies = [ "netlink-sys", "nix 0.30.1", "scopeguard", - "system-configuration-sys", + "system-configuration-sys 0.5.0", "thiserror 2.0.18", "widestring", "windows 0.61.3", @@ -4815,6 +4859,7 @@ dependencies = [ "num-iter", "num-traits", "rand 0.8.5", + "serde", "smallvec 1.15.1", "zeroize", ] @@ -4981,7 +5026,7 @@ dependencies = [ "bytes", "http", "opentelemetry", - "reqwest", + "reqwest 0.12.28", ] [[package]] @@ -4996,7 +5041,7 @@ dependencies = [ "opentelemetry-proto", "opentelemetry_sdk", "prost", - "reqwest", + "reqwest 0.12.28", "thiserror 2.0.18", "tracing", ] @@ -6126,6 +6171,44 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "reqwest" +version = "0.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab3f43e3283ab1488b624b44b0e988d0acea0b3214e694730a055cb6b2efa801" +dependencies = [ + "base64 0.22.1", + "bytes", + "encoding_rs", + "futures-core", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "js-sys", + "log", + "mime", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-pki-types", + "rustls-platform-verifier", + "sync_wrapper", + "tokio", + "tokio-rustls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "resolv-conf" version = "0.7.5" @@ -6232,8 +6315,9 @@ dependencies = [ "pkcs1 0.8.0-rc.4", "pkcs8 0.11.0-rc.8", "rand_core 0.10.0-rc-3", - "sha2 0.11.0-rc.3", + "sha2 0.11.0-rc.5", "signature 3.0.0-rc.6", + "spki 0.8.0-rc.4", "zeroize", ] @@ -6311,11 +6395,13 @@ dependencies = [ "p521", "pageant", "pbkdf2", + "pkcs1 0.8.0-rc.4", "pkcs5", "pkcs8 0.10.2", "rand 0.8.5", "rand_core 0.6.4", "ring", + "rsa 0.10.0-rc.12", "russh-cryptovec", "russh-util", "sec1", @@ -6467,6 +6553,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784" +dependencies = [ + "core-foundation 0.10.1", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls", + "rustls-native-certs", + "rustls-platform-verifier-android", + "rustls-webpki 0.103.5", + "security-framework", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.61.0", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.102.8" @@ -6703,7 +6816,7 @@ checksum = "d92d893ba7469d361a6958522fa440e4e2bc8bf4c5803cd1bf40b9af63f8f9a8" dependencies = [ "cfg_aliases", "httpdate", - "reqwest", + "reqwest 0.12.28", "rustls", "sentry-backtrace", "sentry-contexts", @@ -7059,9 +7172,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.11.0-rc.3" +version = "0.11.0-rc.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d43dc0354d88b791216bb5c1bfbb60c0814460cc653ae0ebd71f286d0bd927" +checksum = "7c5f3b1e2dc8aad28310d8410bd4d7e180eca65fca176c52ab00d364475d0024" dependencies = [ "cfg-if", "cpufeatures", @@ -7638,6 +7751,31 @@ dependencies = [ "windows 0.61.3", ] +[[package]] +name = "sysinfo" +version = "0.38.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d03c61d2a49c649a15c407338afe7accafde9dac869995dccb73e5f7ef7d9034" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows 0.62.2", +] + +[[package]] +name = "system-configuration" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" +dependencies = [ + "bitflags 2.9.4", + "core-foundation 0.9.4", + "system-configuration-sys 0.6.0", +] + [[package]] name = "system-configuration-sys" version = "0.5.0" @@ -7648,6 +7786,16 @@ dependencies = [ "libc", ] +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tabwriter" version = "1.4.1" @@ -7669,6 +7817,17 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "tar" +version = "0.4.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d863878d212c87a19c1a610eb53bb01fe12951c0501cf5a0d65f724914a667a" +dependencies = [ + "filetime", + "libc", + "xattr", +] + [[package]] name = "tempfile" version = "3.24.0" @@ -9907,6 +10066,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-root-certs" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804f18a4ac2676ffb4e8b5b5fa9ae38af06df08162314f96a68d2a363e21a8ca" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "1.0.6" @@ -10098,6 +10266,17 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "windows-registry" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" +dependencies = [ + "windows-link 0.2.1", + "windows-result 0.4.1", + "windows-strings 0.5.1", +] + [[package]] name = "windows-result" version = "0.3.4" @@ -10134,6 +10313,15 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -10179,6 +10367,21 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -10245,6 +10448,12 @@ dependencies = [ "windows-link 0.2.1", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -10263,6 +10472,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -10281,6 +10496,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -10311,6 +10532,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -10329,6 +10556,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -10347,6 +10580,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -10365,6 +10604,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -10533,6 +10778,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "xattr" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32e45ad4206f6d2479085147f02bc2ef834ac85886624a23575ae137c8aa8156" +dependencies = [ + "libc", + "rustix", +] + [[package]] name = "yaml-rust2" version = "0.11.0" diff --git a/clash-bin/Cargo.toml b/clash-bin/Cargo.toml index a25bec4c0..05a1198a5 100644 --- a/clash-bin/Cargo.toml +++ b/clash-bin/Cargo.toml @@ -47,4 +47,4 @@ sentry = { version = "0.46", default-features = false, features = ["backtrace", human-panic = "2.0" -aws-lc-rs = { version = "1", optional = true, default-features = false } +aws-lc-rs = { version = "1.16", optional = true, default-features = false } diff --git a/clash-bin/tests/data/config/socks5-auth.json b/clash-bin/tests/data/config/socks5-auth.json new file mode 100644 index 000000000..1f8177d31 --- /dev/null +++ b/clash-bin/tests/data/config/socks5-auth.json @@ -0,0 +1,28 @@ +{ + "log": { + "loglevel": "debug" + }, + "inbounds": [ + { + "port": 10002, + "listen": "0.0.0.0", + "protocol": "socks", + "settings": { + "auth": "password", + "accounts": [ + { + "user": "user", + "pass": "password" + } + ], + "udp": true, + "ip": "0.0.0.0" + } + } + ], + "outbounds": [ + { + "protocol": "freedom" + } + ] +} diff --git a/clash-bin/tests/data/config/socks5-noauth.json b/clash-bin/tests/data/config/socks5-noauth.json new file mode 100644 index 000000000..ebec78d36 --- /dev/null +++ b/clash-bin/tests/data/config/socks5-noauth.json @@ -0,0 +1,22 @@ +{ + "log": { + "loglevel": "debug" + }, + "inbounds": [ + { + "port": 10002, + "listen": "0.0.0.0", + "protocol": "socks", + "settings": { + "auth": "noauth", + "udp": true, + "ip": "0.0.0.0" + } + } + ], + "outbounds": [ + { + "protocol": "freedom" + } + ] +} diff --git a/clash-bin/tests/data/config/tuic.toml b/clash-bin/tests/data/config/tuic.toml index c45946d32..ddf8410bd 100644 --- a/clash-bin/tests/data/config/tuic.toml +++ b/clash-bin/tests/data/config/tuic.toml @@ -6,7 +6,8 @@ zero_rtt_handshake = false dual_stack = false acl = ''' -direct localhost +direct 0.0.0.0/0 +direct ::/0 ''' [users] diff --git a/clash-dns/src/handler.rs b/clash-dns/src/handler.rs index 7727ed4c1..7a8815146 100644 --- a/clash-dns/src/handler.rs +++ b/clash-dns/src/handler.rs @@ -362,7 +362,7 @@ mod tests { tls::{self, global_root_store}, }; use futures::FutureExt; - use hickory_client::client::{self, Client, ClientHandle}; + use hickory_client::client::{Client, ClientHandle}; use hickory_proto::{ h2::HttpsClientStreamBuilder, h3::H3ClientStreamBuilder, @@ -374,18 +374,11 @@ mod tests { }; use rustls::ClientConfig; use std::{sync::Arc, time::Duration}; - use tokio::task::JoinHandle; - mod addr { - use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - - const LOCAL: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + use tokio::{ + net::{TcpListener, UdpSocket}, + task::JoinHandle, + }; - pub(super) const UDP: SocketAddr = SocketAddr::new(LOCAL, 53553); - pub(super) const TCP: SocketAddr = SocketAddr::new(LOCAL, 53554); - pub(super) const DOT: SocketAddr = SocketAddr::new(LOCAL, 53555); - pub(super) const DOH: SocketAddr = SocketAddr::new(LOCAL, 53556); - pub(super) const DOH3: SocketAddr = SocketAddr::new(LOCAL, 53557); - } async fn send_query(client: &mut Client) -> anyhow::Result<()> { let name = Name::from_ascii("www.example.com.").unwrap(); @@ -441,22 +434,52 @@ mod tests { .boxed() }); + // Bind to port 0 to get OS-assigned available ports + let udp_sock = UdpSocket::bind("127.0.0.1:0").await?; + let udp_addr = udp_sock.local_addr()?; + drop(udp_sock); + + let tcp_sock = TcpListener::bind("127.0.0.1:0").await?; + let tcp_addr = tcp_sock.local_addr()?; + drop(tcp_sock); + + let dot_sock = TcpListener::bind("127.0.0.1:0").await?; + let dot_addr = dot_sock.local_addr()?; + drop(dot_sock); + + let doh_sock = TcpListener::bind("127.0.0.1:0").await?; + let doh_addr = doh_sock.local_addr()?; + drop(doh_sock); + + let doh3_sock = UdpSocket::bind("127.0.0.1:0").await?; + let doh3_addr = doh3_sock.local_addr()?; + drop(doh3_sock); + + eprintln!( + "Test using ports - UDP:{}, TCP:{}, DoT:{}, DoH:{}, DoH3:{}", + udp_addr.port(), + tcp_addr.port(), + dot_addr.port(), + doh_addr.port(), + doh3_addr.port() + ); + let cfg = DNSListenAddr { - udp: Some(addr::UDP), - tcp: Some(addr::TCP), + udp: Some(udp_addr), + tcp: Some(tcp_addr), dot: Some(DoTConfig { - addr: addr::DOT, + addr: dot_addr, ca_key: None, ca_cert: None, }), doh: Some(DoHConfig { - addr: addr::DOH, + addr: doh_addr, hostname: Some("dns.example.com".to_string()), ca_key: None, ca_cert: None, }), doh3: Some(DoH3Config { - addr: addr::DOH3, + addr: doh3_addr, hostname: Some("dns.example.com".to_string()), ca_key: None, ca_cert: None, @@ -473,18 +496,21 @@ mod tests { Ok(()) }); + // Wait for servers to start + tokio::time::sleep(Duration::from_millis(100)).await; + let stream = - UdpClientStream::builder(addr::UDP, TokioRuntimeProvider::new()).build(); + UdpClientStream::builder(udp_addr, TokioRuntimeProvider::new()).build(); - let (mut client, handle) = client::Client::connect(stream).await?; + let (mut client, handle) = Client::connect(stream).await?; tokio::spawn(handle); send_query(&mut client).await?; let (stream, sender) = - TcpClientStream::new(addr::TCP, None, None, TokioRuntimeProvider::new()); + TcpClientStream::new(tcp_addr, None, None, TokioRuntimeProvider::new()); - let (mut client, handle) = client::Client::new(stream, sender, None).await?; + let (mut client, handle) = Client::new(stream, sender, None).await?; tokio::spawn(handle); send_query(&mut client).await?; @@ -498,22 +524,18 @@ mod tests { .set_certificate_verifier(Arc::new(tls::DummyTlsVerifier::new())); let (stream, sender) = tls_client_connect( - addr::DOT, + dot_addr, "dns.example.com".to_owned(), Arc::new(tls_config), TokioRuntimeProvider::new(), ); - let (mut client, handle) = client::Client::with_timeout( - stream, - sender, - Duration::from_secs(5), - None, - ) - .await - .inspect_err(|e| { - assert!(false, "Failed to connect to DoT server: {}", e); - })?; + let (mut client, handle) = + Client::with_timeout(stream, sender, Duration::from_secs(5), None) + .await + .inspect_err(|e| { + assert!(false, "Failed to connect to DoT server: {}", e); + })?; tokio::spawn(handle); send_query(&mut client).await?; @@ -532,12 +554,12 @@ mod tests { TokioRuntimeProvider::new(), ) .build( - addr::DOH, + doh_addr, "dns.example.com".to_owned(), "/dns-query".to_owned(), ); - let (mut client, handle) = client::Client::connect(stream).await?; + let (mut client, handle) = Client::connect(stream).await?; tokio::spawn(handle); send_query(&mut client).await?; @@ -555,12 +577,12 @@ mod tests { .crypto_config(tls_config) .clone() .build( - addr::DOH3, + doh3_addr, "dns.example.com".to_owned(), "/dns-query".to_owned(), ); - let (mut client, handle) = client::Client::connect(stream).await?; + let (mut client, handle) = Client::connect(stream).await?; tokio::spawn(handle); send_query(&mut client).await?; diff --git a/clash-lib/Cargo.toml b/clash-lib/Cargo.toml index a2a0f6a17..5f6c38a48 100644 --- a/clash-lib/Cargo.toml +++ b/clash-lib/Cargo.toml @@ -181,7 +181,7 @@ criterion = { version = "0.8", features = ["html_reports", "async_tokio"], optio memory-stats = "1.0.0" # ssh -russh = { version = "0.56", default-features = false, features = ["async-trait"], optional = true } +russh = { version = "0.56", default-features = false, features = ["async-trait","rsa"], optional = true } dirs = { version = "6.0", optional = true } totp-rs = { version = "^5.7", features = ["serde_support"], optional = true } @@ -202,6 +202,7 @@ mockall = "0.14.0" tokio-test = "0.4.5" axum-macros = "0.5.0" bollard = "0.20" +tar = "0.4.44" serial_test = "3.3" env_logger = "0.11" # donnot change the version, russh is not compatible with the latest version of rand_core @@ -209,6 +210,8 @@ rand_chacha = "=0.3" httpmock = "0.8.2" # TODO replace with wiremock tracing-test = "0.2" http-body-util = "0.1" +reqwest = { version = "0.13", features = ["socks"] } +sysinfo = { version = "0.38", features = ["network"]} [build-dependencies] prost-build = "0.14" diff --git a/clash-lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs b/clash-lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs index 741e7801e..d6409e387 100644 --- a/clash-lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs +++ b/clash-lib/src/app/remote_content_manager/providers/proxy_provider/proxy_set_provider.rs @@ -291,12 +291,10 @@ mod tests { mock_vehicle.expect_read().returning(|| { Ok(r#" proxies: - - name: "ss" - type: ss + - name: "socks5" + type: socks5 server: localhost - port: 8388 - cipher: aes-256-gcm - password: "password" + port: 1080 udp: true "# .as_bytes() diff --git a/clash-lib/src/proxy/group/relay/mod.rs b/clash-lib/src/proxy/group/relay/mod.rs index 7660209ea..0de61872b 100644 --- a/clash-lib/src/proxy/group/relay/mod.rs +++ b/clash-lib/src/proxy/group/relay/mod.rs @@ -208,18 +208,20 @@ mod tests { use tokio::sync::RwLock; - use crate::proxy::{ - mocks::MockDummyProxyProvider, - utils::test_utils::{ - Suite, - consts::*, - docker_runner::{DockerTestRunner, DockerTestRunnerBuilder}, - run_test_suites_and_cleanup, + use super::*; + use crate::{ + proxy::{ + mocks::MockDummyProxyProvider, + utils::test_utils::{ + Suite, + consts::*, + docker_runner::{DockerTestRunner, DockerTestRunnerBuilder}, + run_test_suites_and_cleanup, + }, }, + tests::initialize, }; - use super::*; - const PASSWORD: &str = "FzcLbKs2dY9mhL"; const CIPHER: &str = "aes-256-gcm"; @@ -236,17 +238,24 @@ mod tests { #[tokio::test] #[serial_test::serial] async fn test_relay_1() -> anyhow::Result<()> { + initialize(); + let port = 10002; + let container = get_ss_runner(port).await?; + + let container_ip = container.container_ip(); + + debug!("container ip: {:?}", container_ip); let ss_opts = crate::proxy::shadowsocks::outbound::HandlerOptions { name: "test-ss".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), - port: 10002, + server: container_ip.unwrap_or(LOCAL_ADDR.to_owned()), + port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), plugin: Default::default(), udp: false, }; - let port = ss_opts.port; + let ss_handler: AnyOutboundHandler = Arc::new(crate::proxy::shadowsocks::outbound::Handler::new(ss_opts)) as _; @@ -262,28 +271,29 @@ mod tests { let handler = Handler::new(Default::default(), vec![Arc::new(RwLock::new(provider))]); - run_test_suites_and_cleanup( - handler, - get_ss_runner(port).await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } #[tokio::test] #[serial_test::serial] async fn test_relay_2() -> anyhow::Result<()> { + initialize(); + let port = 10002; + let container = get_ss_runner(port).await?; + + let container_ip = container.container_ip(); + let ss_opts = crate::proxy::shadowsocks::outbound::HandlerOptions { name: "test-ss".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), - port: 10002, + server: container_ip.unwrap_or(LOCAL_ADDR.to_owned()), + port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), plugin: Default::default(), udp: false, }; - let port = ss_opts.port; + let ss_handler: AnyOutboundHandler = Arc::new(crate::proxy::shadowsocks::outbound::Handler::new(ss_opts)) as _; @@ -299,11 +309,6 @@ mod tests { let handler = Handler::new(Default::default(), vec![Arc::new(RwLock::new(provider))]); - run_test_suites_and_cleanup( - handler, - get_ss_runner(port).await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } } diff --git a/clash-lib/src/proxy/group/smart/mod.rs b/clash-lib/src/proxy/group/smart/mod.rs index 7be20dc4f..0fbd2b53f 100644 --- a/clash-lib/src/proxy/group/smart/mod.rs +++ b/clash-lib/src/proxy/group/smart/mod.rs @@ -761,6 +761,7 @@ mod tests { run_test_suites_and_cleanup, }, }, + tests::initialize, }; use tempfile::tempdir; use tokio::sync::RwLock; @@ -783,11 +784,17 @@ mod tests { #[tokio::test] #[serial_test::serial] async fn test_smart_group_smoke() -> anyhow::Result<()> { - let ss_port = 10003; + initialize(); + let ss_port = 10002; + + let docker_runner = get_ss_runner(ss_port).await?; + let ss_opts = crate::proxy::shadowsocks::outbound::HandlerOptions { name: "test-ss-for-smart".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), + server: docker_runner + .container_ip() + .unwrap_or(LOCAL_ADDR.to_owned()), port: ss_port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), @@ -838,8 +845,6 @@ mod tests { ); let any_smart_handler: AnyOutboundHandler = Arc::new(smart_handler_instance); - let docker_runner = get_ss_runner(ss_port).await?; - run_test_suites_and_cleanup(any_smart_handler, docker_runner, Suite::all()) .await } diff --git a/clash-lib/src/proxy/hysteria2/datagram.rs b/clash-lib/src/proxy/hysteria2/datagram.rs index 9a7f4d5e6..9c14bb6ef 100644 --- a/clash-lib/src/proxy/hysteria2/datagram.rs +++ b/clash-lib/src/proxy/hysteria2/datagram.rs @@ -39,6 +39,12 @@ impl HysteriaDatagramOutbound { conn: Arc, local_addr: SocksAddr, ) -> Self { + tracing::trace!( + "HysteriaDatagramOutbound::new: session_id={}, local_addr={:?}", + session_id, + local_addr + ); + let (send_tx, send_rx) = tokio::sync::mpsc::channel::(32); let (recv_tx, recv_rx) = tokio::sync::mpsc::channel::(32); let udp_sessions = conn.udp_sessions.clone(); @@ -46,10 +52,15 @@ impl HysteriaDatagramOutbound { session_id, UdpSession { incoming: recv_tx, - local_addr, + local_addr: local_addr.clone(), defragger: Defragger::default(), }, ); + tracing::trace!( + "HysteriaDatagramOutbound: UDP session {} registered", + session_id + ); + tokio::spawn(async move { // capture vars let mut send_rx = send_rx; @@ -60,6 +71,13 @@ impl HysteriaDatagramOutbound { let pkt_id = next_pkt_id.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let pkt_id = (pkt_id % u16::MAX as u32) as u16; + tracing::trace!( + "HysteriaDatagramOutbound: sending packet for session {}, \ + pkt_id={}, dst={:?}", + session_id, + pkt_id, + next_send.dst_addr + ); if let Err(e) = conn.send_packet( next_send.data.into(), next_send.dst_addr, diff --git a/clash-lib/src/proxy/hysteria2/mod.rs b/clash-lib/src/proxy/hysteria2/mod.rs index d8df83ec4..a24de7489 100644 --- a/clash-lib/src/proxy/hysteria2/mod.rs +++ b/clash-lib/src/proxy/hysteria2/mod.rs @@ -177,6 +177,10 @@ impl Handler { sess: &Session, resolver: ThreadSafeDNSResolver, ) -> anyhow::Result<(Connection, SendRequest)> { + tracing::trace!( + "hysteria2 new_authed_connection_inner: starting connection to {:?}", + self.opts.addr + ); // Everytime we enstablish a new session, we should lookup the server // address. maybe it changed since it use ddns let server_socket_addr = match self.opts.addr.clone() { @@ -245,10 +249,13 @@ impl Handler { ep.set_default_client_config(self.client_config.clone()); + tracing::trace!("hysteria2 connecting to server: {:?}", server_socket_addr); let session = ep .connect(server_socket_addr, self.opts.sni.as_deref().unwrap_or(""))? .await?; + tracing::trace!("hysteria2 QUIC connection established"); let (guard, _rx, udp) = Self::auth(&session, &self.opts.passwd).await?; + tracing::trace!("hysteria2 authentication successful, udp={}", udp); *self.support_udp.write().unwrap() = udp; // todo set congestion controller according to cc_rx @@ -424,11 +431,15 @@ impl HysteriaConnection { } async fn spawn_tasks(self: Arc) { + tracing::trace!("hysteria2 spawn_tasks: starting datagram receive loop"); let err = loop { tokio::select! { res = self.conn.read_datagram() => { match res { - Ok(pkt) => self.clone().recv_packet(pkt).await, + Ok(pkt) => { + tracing::trace!("hysteria2 received datagram: {} bytes", pkt.len()); + self.clone().recv_packet(pkt).await + }, Err(e) => { tracing::error!("hysteria2 read datagram error: {}", e); break e; @@ -498,9 +509,22 @@ impl HysteriaConnection { session_id: u32, pkt_id: u16, ) -> std::io::Result<()> { + tracing::trace!( + "hysteria2 send_packet: session_id={}, pkt_id={}, addr={:?}, \ + data_len={}", + session_id, + pkt_id, + addr, + pkt.len() + ); + let max_frag_size = match self.udp_mtu.or(self.conn.max_datagram_size()) { - Some(x) => x, + Some(x) => { + tracing::trace!("hysteria2 max_frag_size={}", x); + x + } None => { + tracing::error!("hysteria2 udp mtu not set"); return Err(std::io::Error::other( "hysteria2 udp mtu not set, please check your \ disable_mtu_discovery and udp_mtu option", @@ -508,22 +532,46 @@ impl HysteriaConnection { } }; let fragments = Fragments::new(session_id, pkt_id, addr, max_frag_size, pkt); + let mut frag_count = 0; for frag in fragments { + frag_count += 1; + tracing::trace!( + "hysteria2 sending fragment #{} for session_id={}", + frag_count, + session_id + ); self.conn .send_datagram(frag) .map_err(std::io::Error::other)?; } + tracing::trace!( + "hysteria2 sent {} fragments for session_id={}", + frag_count, + session_id + ); Ok(()) } pub async fn recv_packet(self: Arc, pkt: Bytes) { + tracing::trace!("hysteria2 recv_packet: {} bytes", pkt.len()); let mut buf: BytesMut = pkt.into(); let pkt = codec::HysUdpPacket::decode(&mut buf).unwrap(); let session_id = pkt.session_id; let mut udp_sessions = self.udp_sessions.lock().await; match udp_sessions.get_mut(&session_id) { Some(session) => { + tracing::trace!( + "hysteria2 found session {}, feeding packet", + session_id + ); if let Some(pkt) = session.feed(pkt) { + tracing::trace!( + "hysteria2 complete packet received for session {}: {} \ + bytes to {:?}", + session_id, + pkt.data.len(), + session.local_addr + ); let _ = session .incoming .send(UdpPacket { @@ -532,6 +580,11 @@ impl HysteriaConnection { dst_addr: session.local_addr.clone(), }) .await; + } else { + tracing::trace!( + "hysteria2 packet fragment buffered for session {}", + session_id + ); } } _ => { @@ -634,7 +687,14 @@ mod tests { #[serial_test::serial] async fn test_hysteria() -> anyhow::Result<()> { initialize(); - let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); + + let container = get_hysteria_runner().await?; + + let container_ip = + container.container_ip().unwrap_or("127.0.0.1".to_owned()); + + let ip = IpAddr::from_str(&container_ip) + .unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))); let port = 10002; let obfs = Some(Obfs::Salamander(SalamanderObfs { @@ -669,11 +729,6 @@ mod tests { handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup( - handler, - get_hysteria_runner().await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } } diff --git a/clash-lib/src/proxy/hysteria2/salamander.rs b/clash-lib/src/proxy/hysteria2/salamander.rs index 95d4250bf..c3eb21e7c 100644 --- a/clash-lib/src/proxy/hysteria2/salamander.rs +++ b/clash-lib/src/proxy/hysteria2/salamander.rs @@ -105,31 +105,133 @@ impl AsyncUdpSocket for Salamander { bufs: &mut [IoSliceMut<'_>], meta: &mut [RecvMeta], ) -> Poll> { - // the number of udp packets received let packet_nums = ready!(self.inner.poll_recv(cx, bufs, meta))?; - meta.iter().take(packet_nums).for_each(|v| { - tracing::trace!("meta addr {:?}, dst_ip: {:?}", v.addr, v.dst_ip); - }); - bufs.iter_mut() - .zip(meta.iter_mut()) - // first step take and then filter - .take(packet_nums) - .filter(|(_, meta)| meta.len > 8) - .for_each(|(v, meta)| { - let x = &mut v.deref_mut()[..meta.len]; - // decrypt in place, and drop first 8 bytes - self.obfs.decrypt(x); - let data = &mut x[8..]; - unsafe { - // because IoSliceMut is transparent and .0 is also transparent, so it is a &[u8] - let b: IoSliceMut<'_> = std::mem::transmute(data); - *v = b; + + let mut valid_count = 0; + + for i in 0..packet_nums { + tracing::trace!( + "meta addr {:?}, dst_ip: {:?}, len: {}, stride: {}", + meta[i].addr, + meta[i].dst_ip, + meta[i].len, + meta[i].stride, + ); + + let total_len = meta[i].len; + let stride = meta[i].stride; + let buf = bufs[i].deref_mut(); + let buf_len = buf.len(); + + // Validate buffer bounds + if total_len > buf_len { + tracing::error!( + "invalid buffer: total_len={} > buf_len={}, addr={:?}", + total_len, + buf_len, + meta[i].addr + ); + continue; + } + + // Salamander packets must have at least 8 bytes (salt) + 1 byte + // (data) + if total_len <= 8 || stride <= 8 { + tracing::debug!( + "invalid salamander packet: len={}, stride={}, addr={:?}", + total_len, + stride, + meta[i].addr + ); + continue; + } + + // Fast path: single packet (no GRO, typical on Windows/Mac) + if total_len == stride { + // Decrypt and strip the 8-byte salt prefix + self.obfs.decrypt(&mut buf[..total_len]); + buf.copy_within(8..total_len, 0); + + // Compact valid packets to the front + if i != valid_count { + meta[valid_count] = meta[i]; + bufs.swap(i, valid_count); + } + meta[valid_count].len = total_len - 8; + meta[valid_count].stride = stride - 8; + valid_count += 1; + continue; + } + + // Slow path: GRO-merged packets (Linux with GRO enabled) + // When GRO is enabled, a single buffer may contain multiple + // datagrams concatenated together, each of size `stride` (the last + // one may be smaller). Each sub-datagram has its own 8-byte + // salamander salt prefix that must be decrypted and stripped + // independently. + let mut read_offset = 0; + let mut write_offset = 0; + while read_offset < total_len { + let seg_len = stride.min(total_len - read_offset); + if seg_len <= 8 { + // Remaining segment too small to be valid + break; } - // MUST update meta.len - meta.len -= 8; - }); - Poll::Ready(Ok(packet_nums)) + // Ensure we don't read beyond buffer + if read_offset + seg_len > buf_len { + tracing::error!( + "GRO segment out of bounds: read_offset={}, seg_len={}, \ + buf_len={}", + read_offset, + seg_len, + buf_len + ); + break; + } + + // Decrypt this segment in place + self.obfs + .decrypt(&mut buf[read_offset..read_offset + seg_len]); + + // Ensure we don't write beyond valid range + let payload_len = seg_len - 8; + if write_offset + payload_len > buf_len { + tracing::error!( + "GRO write out of bounds: write_offset={}, payload_len={}, \ + buf_len={}", + write_offset, + payload_len, + buf_len + ); + break; + } + + // Copy decrypted payload (skip 8-byte salt) to compacted + // position + buf.copy_within( + read_offset + 8..read_offset + seg_len, + write_offset, + ); + + read_offset += seg_len; + write_offset += payload_len; + } + + // Only add to valid_count if we processed something + if write_offset > 0 { + // Compact valid packets to the front + if i != valid_count { + meta[valid_count] = meta[i]; + bufs.swap(i, valid_count); + } + meta[valid_count].len = write_offset; + meta[valid_count].stride = stride - 8; + valid_count += 1; + } + } + + Poll::Ready(Ok(valid_count)) } fn local_addr(&self) -> std::io::Result { diff --git a/clash-lib/src/proxy/shadowquic/mod.rs b/clash-lib/src/proxy/shadowquic/mod.rs index 7619510a4..ed2c25be2 100644 --- a/clash-lib/src/proxy/shadowquic/mod.rs +++ b/clash-lib/src/proxy/shadowquic/mod.rs @@ -263,9 +263,16 @@ mod tests { const PORT: u16 = 10002; - fn gen_options(over_stream: bool) -> anyhow::Result { + fn gen_options( + opt_ip: Option, + over_stream: bool, + ) -> anyhow::Result { Ok(HandlerOptions { - addr: SocketAddr::new(LOCAL_ADDR.parse().unwrap(), PORT).to_string(), + addr: SocketAddr::new( + opt_ip.unwrap_or(LOCAL_ADDR.to_owned()).parse().unwrap(), + PORT, + ) + .to_string(), password: "12345678".into(), username: "87654321".into(), server_name: "echo.free.beeceptor.com".into(), @@ -281,35 +288,34 @@ mod tests { #[serial_test::serial] async fn test_shadowquic_over_datagram() -> anyhow::Result<()> { initialize(); - let opts = gen_options(false)?; + + let container = get_shadowquic_runner().await?; + + let container_ip = container.container_ip(); + + let opts = gen_options(container_ip, false)?; let handler = Arc::new(Handler::new("test-shadowquic".into(), opts)); handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup( - handler, - get_shadowquic_runner().await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } #[tokio::test] #[serial_test::serial] async fn test_shadowquic_over_stream() -> anyhow::Result<()> { initialize(); - let mut opts = gen_options(true)?; + let container = get_shadowquic_runner().await?; + + let container_ip = container.container_ip(); + + let mut opts = gen_options(container_ip, true)?; opts.over_stream = true; let handler = Arc::new(Handler::new("test-shadowquic".into(), opts)); handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup( - handler, - get_shadowquic_runner().await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } } diff --git a/clash-lib/src/proxy/shadowsocks/outbound/mod.rs b/clash-lib/src/proxy/shadowsocks/outbound/mod.rs index 35a163429..c21f8cf22 100644 --- a/clash-lib/src/proxy/shadowsocks/outbound/mod.rs +++ b/clash-lib/src/proxy/shadowsocks/outbound/mod.rs @@ -238,7 +238,6 @@ impl OutboundHandler for Handler { #[cfg(all(test, docker_test))] mod tests { - use crate::{ proxy::{ transport::*, @@ -265,6 +264,7 @@ mod tests { let host = format!("0.0.0.0:{}", port); DockerTestRunnerBuilder::new() .image(IMAGE_SS_RUST) + .port(port) .entrypoint(&["ssserver"]) .cmd(&["-s", &host, "-m", CIPHER, "-k", PASSWORD, "-U", "-vvv"]) .build() @@ -280,6 +280,7 @@ mod tests { let host = format!("0.0.0.0:{}", port); DockerTestRunnerBuilder::new() .image(IMAGE_SS_RUST) + .port(port) .entrypoint(&["ssserver"]) .cmd(&[ "-s", @@ -313,34 +314,39 @@ mod tests { #[serial_test::serial] async fn test_ss_plain() -> anyhow::Result<()> { initialize(); + let port = 10002; + let container = get_ss_runner(port).await?; + let opts = HandlerOptions { name: "test-ss".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), - port: 10002, + server: container.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), + port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), plugin: Default::default(), udp: false, }; - let port = opts.port; + let handler = Arc::new(Handler::new(opts)); handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup( - handler, - get_ss_runner(port).await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } async fn get_shadowtls_runner( + ss_ip: Option, ss_port: u16, stls_port: u16, ) -> anyhow::Result { - let ss_server_env = format!("SERVER=127.0.0.1:{}", ss_port); + // Use host.docker.internal to access SS server running in another + // container via host port mapping + let ss_server_env = format!( + "SERVER={}:{}", + ss_ip.unwrap_or("host.docker.internal".to_owned()), + ss_port + ); let listen_env = format!("LISTEN=0.0.0.0:{}", stls_port); let password = format!("PASSWORD={}", SHADOW_TLS_PASSWORD); DockerTestRunnerBuilder::new() @@ -363,17 +369,28 @@ mod tests { #[tokio::test] #[serial_test::serial] async fn test_shadowtls() -> anyhow::Result<()> { + initialize(); // the real port that used for communication let shadow_tls_port = 10002; // not important, you can assign any port that is not conflict with // others let ss_port = 10004; + + let container1 = get_ss_runner(ss_port).await?; + + let container2 = get_shadowtls_runner( + container1.container_ip(), + ss_port, + shadow_tls_port, + ) + .await?; + let client = Shadowtls::new("www.feishu.cn".to_owned(), "password".to_owned(), true); let opts = HandlerOptions { name: "test-shadowtls".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), + server: container2.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: shadow_tls_port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), @@ -384,21 +401,24 @@ mod tests { // we need to store all the runners in a container, to make sure all of // them can be destroyed after the test let mut chained = MultiDockerTestRunner::default(); - chained.add(get_ss_runner(ss_port)).await?; - chained - .add(get_shadowtls_runner(ss_port, shadow_tls_port)) - .await?; + chained.add_with_runner(container1); + chained.add_with_runner(container2); // currently, shadow-tls does't support udp proxy // see: https://github.com/ihciah/shadow-tls/issues/54 run_test_suites_and_cleanup(handler, chained, Suite::tcp_tests()).await } async fn get_obfs_runner( + ss_ip: Option, ss_port: u16, obfs_port: u16, mode: SimpleOBFSMode, ) -> anyhow::Result { - let ss_server_env = format!("127.0.0.1:{}", ss_port); + let ss_server_env = format!( + "{}:{}", + ss_ip.unwrap_or("host.docker.internal".to_owned()), + ss_port + ); let port = format!("{}", obfs_port); let mode = match mode { SimpleOBFSMode::Http => "http", @@ -423,6 +443,12 @@ mod tests { async fn test_ss_obfs_inner(mode: SimpleOBFSMode) -> anyhow::Result<()> { let obfs_port = 10002; let ss_port = 10004; + + let container1 = get_ss_runner(ss_port).await?; + let container2 = + get_obfs_runner(container1.container_ip(), ss_port, obfs_port, mode) + .await?; + let host = "www.bing.com".to_owned(); let plugin = match mode { SimpleOBFSMode::Http => { @@ -433,7 +459,7 @@ mod tests { let opts = HandlerOptions { name: "test-obfs".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), + server: container2.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: obfs_port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), @@ -443,16 +469,15 @@ mod tests { let handler: Arc = Arc::new(Handler::new(opts)); let mut chained = MultiDockerTestRunner::default(); - chained.add(get_ss_runner(ss_port)).await?; - chained - .add(get_obfs_runner(ss_port, obfs_port, mode)) - .await?; + chained.add_with_runner(container1); + chained.add_with_runner(container2); run_test_suites_and_cleanup(handler, chained, Suite::tcp_tests()).await } #[tokio::test] #[serial_test::serial] async fn test_ss_obfs_http() -> anyhow::Result<()> { + initialize(); test_ss_obfs_inner(SimpleOBFSMode::Http).await } @@ -468,6 +493,7 @@ mod tests { async fn test_ss_v2ray_plugin() -> anyhow::Result<()> { initialize(); let ss_port = 10004; + let container = get_ss_runner_with_plugin(ss_port).await?; let host = "example.org".to_owned(); let plugin = V2rayWsClient::try_new( host, @@ -481,7 +507,7 @@ mod tests { let opts = HandlerOptions { name: "test-obfs".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), + server: container.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: ss_port, password: PASSWORD.to_owned(), cipher: CIPHER.to_owned(), @@ -490,11 +516,6 @@ mod tests { }; let handler: Arc = Arc::new(Handler::new(opts)); - run_test_suites_and_cleanup( - handler, - get_ss_runner_with_plugin(ss_port).await?, - Suite::tcp_tests(), - ) - .await + run_test_suites_and_cleanup(handler, container, Suite::tcp_tests()).await } } diff --git a/clash-lib/src/proxy/socks/outbound/mod.rs b/clash-lib/src/proxy/socks/outbound/mod.rs index 0e62689d0..2967ce104 100644 --- a/clash-lib/src/proxy/socks/outbound/mod.rs +++ b/clash-lib/src/proxy/socks/outbound/mod.rs @@ -262,102 +262,87 @@ mod tests { use std::sync::Arc; - use crate::proxy::{ - socks::outbound::{Handler, HandlerOptions}, - utils::{ - GLOBAL_DIRECT_CONNECTOR, - test_utils::{ - Suite, - consts::{IMAGE_SOCKS5, LOCAL_ADDR}, - docker_runner::{DockerTestRunner, DockerTestRunnerBuilder}, - run_test_suites_and_cleanup, + use crate::{ + proxy::{ + socks::outbound::{Handler, HandlerOptions}, + utils::{ + GLOBAL_DIRECT_CONNECTOR, + test_utils::{ + Suite, + consts::{IMAGE_SOCKS5, LOCAL_ADDR}, + docker_runner::{DockerTestRunner, DockerTestRunnerBuilder}, + run_test_suites_and_cleanup, + }, }, }, + tests::initialize, }; + use super::super::super::utils::test_utils::docker_utils::config_helper::test_config_base_dir; + const USER: &str = "user"; const PASSWORD: &str = "password"; - async fn get_socks5_runner( - port: u16, - username: Option, - password: Option, - ) -> anyhow::Result { - let host = format!("0.0.0.0:{}", port); - let username = username.unwrap_or_default(); - let password = password.unwrap_or_default(); - let cmd = if !username.is_empty() && !password.is_empty() { - vec![ - "-a", - &host, - "-u", - username.as_str(), - "-p", - password.as_str(), - ] + async fn get_socks5_runner(auth: bool) -> anyhow::Result { + let test_config_dir = test_config_base_dir(); + let conf = if auth { + test_config_dir.join("socks5-auth.json") } else { - vec!["-a", &host] + test_config_dir.join("socks5-noauth.json") }; + DockerTestRunnerBuilder::new() .image(IMAGE_SOCKS5) - .cmd(&cmd) + .mounts(&[(conf.to_str().unwrap(), "/etc/v2ray/config.json")]) .build() .await } + fn server_addr(runner: &DockerTestRunner) -> String { + runner.container_ip().unwrap_or(LOCAL_ADDR.to_owned()) + } + #[tokio::test] #[serial_test::serial] async fn test_socks5_no_auth() -> anyhow::Result<()> { + initialize(); + let port = 10002; + let runner = get_socks5_runner(false).await?; let opts = HandlerOptions { name: "test-socks5-no-auth".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), - port: 10002, + server: server_addr(&runner), + port, user: None, password: None, udp: true, ..Default::default() }; - let port = opts.port; let handler = Arc::new(Handler::new(opts)); - run_test_suites_and_cleanup( - handler, - get_socks5_runner(port, None, None).await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, runner, Suite::all()).await } #[tokio::test] #[serial_test::serial] async fn test_socks5_auth() -> anyhow::Result<()> { use crate::proxy::DialWithConnector; - + initialize(); + let port = 10002; + let runner = get_socks5_runner(true).await?; let opts = HandlerOptions { - name: "test-socks5-no-auth".to_owned(), + name: "test-socks5-auth".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), - port: 10002, + server: server_addr(&runner), + port, user: Some(USER.to_owned()), password: Some(PASSWORD.to_owned()), udp: true, ..Default::default() }; - let port = opts.port; let handler = Arc::new(Handler::new(opts)); handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup( - handler, - get_socks5_runner( - port, - Some(USER.to_owned()), - Some(PASSWORD.to_owned()), - ) - .await?, - Suite::all(), - ) - .await + run_test_suites_and_cleanup(handler, runner, Suite::all()).await } } diff --git a/clash-lib/src/proxy/ssh/mod.rs b/clash-lib/src/proxy/ssh/mod.rs index f214d7867..681082464 100644 --- a/clash-lib/src/proxy/ssh/mod.rs +++ b/clash-lib/src/proxy/ssh/mod.rs @@ -245,24 +245,24 @@ async fn auth0( #[cfg(all(test, docker_test))] mod tests { - use std::path::PathBuf; + use std::{future::Future, path::PathBuf}; use aead::rand_core::SeedableRng; use russh::keys::HashAlg; use tempfile::tempdir; - use super::super::utils::test_utils::{ - consts::*, docker_runner::DockerTestRunner, + use super::{ + super::utils::test_utils::{consts::*, docker_runner::DockerTestRunner}, + *, }; - use crate::proxy::utils::test_utils::{ - Suite, - config_helper::test_config_base_dir, - docker_runner::{DockerTestRunnerBuilder, MultiDockerTestRunner}, - run_test_suites_and_cleanup, + use crate::{ + proxy::utils::test_utils::{ + Suite, config_helper::test_config_base_dir, + docker_runner::DockerTestRunnerBuilder, run_test_suites_and_cleanup, + }, + tests::initialize, }; - use super::*; - const PASSWORD: &str = "123456789"; /// equals to: @@ -288,6 +288,7 @@ mod tests { /// `/config/sshd/sshd_config` in the container. /// before starting the container, we need to generate host key pairs in /// /tmp/.xxx/ssh/ssh_host_keys. + #[allow(unused)] async fn get_openssh_server_runner( ssh_config_path: PathBuf, ) -> anyhow::Result { @@ -308,6 +309,7 @@ mod tests { .await } + #[allow(unused)] fn gen_ssh_key_pair( algo: russh::keys::Algorithm, ) -> anyhow::Result<(String, String)> { @@ -324,60 +326,77 @@ mod tests { } #[derive(Debug)] + #[allow(dead_code)] struct TestOption { password: bool, // password or private key rsa: bool, // rsa or ed25519 host_key: Option>, // host key } + #[allow(unused)] async fn test_ssh_inner(opt: TestOption) -> anyhow::Result<()> { tracing::info!("testing ssh, using option: {:?}", opt); - // dirty works: prepare ssh config directory for the docker container & - // generate host key pairs - // under /tmp - // it's ok for cross test's docker in docker, since we declared volume of - // /tmp + + // Prepare SSH config directory for the docker container + // We need a writable temp directory because: + // 1. Host keys must be generated at runtime + // 2. Container needs to write logs let temp_dir = tempdir()?; let test_config_base_dir = test_config_base_dir(); let ssh_config_path = test_config_base_dir.join("ssh"); let ssh_config_tmp_path = temp_dir.path().join("ssh"); - // cp files under ssh_config_path to temp_dir - tokio::process::Command::new("cp") - .args([ - "-r", - ssh_config_path.to_str().unwrap(), - ssh_config_tmp_path.to_str().unwrap(), - ]) - .output() - .await - .expect("failed to copy ssh config files"); - tokio::process::Command::new("chmod") - .args(["-R", "777", ssh_config_tmp_path.to_str().unwrap()]) - .output() - .await - .expect("failed to chmod ssh config files"); + // Copy SSH config files using Rust APIs (cross-platform) + copy_dir_recursive(&ssh_config_path, &ssh_config_tmp_path).await?; + + // IMPORTANT: Container expects sshd_config at /config/sshd/sshd_config + // Our source has it at ssh_host_keys/sshd_config, but the container + // startup script will ignore/delete it from there and generate a default + // config if /config/sshd/sshd_config doesn't exist. + // So we need to copy it to the correct location. + let source_sshd_config = ssh_config_tmp_path + .join("ssh_host_keys") + .join("sshd_config"); + let target_sshd_dir = ssh_config_tmp_path.join("sshd"); + tokio::fs::create_dir_all(&target_sshd_dir).await?; + let target_sshd_config = target_sshd_dir.join("sshd_config"); + tokio::fs::copy(&source_sshd_config, &target_sshd_config).await?; + tracing::info!( + "Copied sshd_config from {:?} to {:?}", + source_sshd_config, + target_sshd_config + ); + + // Debug: print directory structure + tracing::debug!("SSH config directory structure after copy:"); + print_dir_structure(&ssh_config_tmp_path, 0).await?; + tracing::info!("ssh_config tmp mounting path: {:?}", ssh_config_tmp_path); + + // Create logs directory tokio::fs::create_dir_all(&ssh_config_tmp_path.join("logs").join("openssh")) .await?; - // generate host key pairs - // ignore rsa, it's too slow + // Generate host key pairs (ecdsa, ed25519, and rsa for test_ssh2) + // Note: RSA key generation doesn't need hash parameter (hash is only for + // signing) let name_and_key_pairs = [ ( "ecdsa", - russh::keys::Algorithm::Ecdsa { + Algorithm::Ecdsa { curve: russh::keys::EcdsaCurve::NistP256, }, ), - ("ed25519", russh::keys::Algorithm::Ed25519), + ("ed25519", Algorithm::Ed25519), ] .into_iter() .map(|(name, algo)| { - let (private_key, public_key) = gen_ssh_key_pair(algo).unwrap(); + let (private_key, public_key) = + gen_ssh_key_pair(algo).expect("Key generation failed"); (name, private_key, public_key) }) .collect::>(); + let host_key_path = ssh_config_tmp_path.join("ssh_host_keys"); for (name, private_key, public_key) in name_and_key_pairs { let private_key_path = @@ -388,12 +407,18 @@ mod tests { tokio::fs::write(public_key_path, public_key).await?; } - // now we are fine, real test starts + // Start the container + let container = + get_openssh_server_runner(ssh_config_tmp_path.clone()).await?; - let ssh_private_key_path = ssh_config_tmp_path - .join(".ssh") - .join(if opt.rsa { "test_rsa" } else { "test_ed25519" }); + // Configure client to connect to container + let ssh_private_key_path = ssh_config_path.join(".ssh").join(if opt.rsa { + "test_rsa" + } else { + "test_ed25519" + }); let ssh_private_key_path = ssh_private_key_path.to_str().unwrap(); + let password = if opt.password { Some(PASSWORD.to_owned()) } else { @@ -408,8 +433,8 @@ mod tests { let opts = HandlerOptions { name: "test-ssh".to_owned(), common_opts: Default::default(), - server: LOCAL_ADDR.to_owned(), - port: 2222, // in accordance with sshd_config'sport + server: container.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), + port: 2222, password, private_key, private_key_passphrase: None, @@ -417,7 +442,6 @@ mod tests { host_key: opt.host_key.clone(), host_key_algorithms: Some(vec![ Algorithm::Ed25519, - Algorithm::Rsa { hash: None }, Algorithm::Rsa { hash: Some(HashAlg::Sha256), }, @@ -428,19 +452,66 @@ mod tests { totp: None, }; let handler: Arc = Arc::new(Handler::new(opts)); - // we need to store all the runners in a container, to make sure all of - // them can be destroyed after the test - let mut chained = MultiDockerTestRunner::default(); - chained - .add(get_openssh_server_runner(ssh_config_tmp_path)) - .await?; - run_test_suites_and_cleanup(handler, chained, Suite::tcp_tests()).await + + run_test_suites_and_cleanup(handler, container, Suite::tcp_tests()).await + } + + /// Recursively copy a directory using async Rust APIs + #[allow(unused)] + fn copy_dir_recursive<'a>( + src: &'a std::path::Path, + dst: &'a std::path::Path, + ) -> Pin> + 'a>> { + Box::pin(async move { + tokio::fs::create_dir_all(dst).await?; + + let mut entries = tokio::fs::read_dir(src).await?; + while let Some(entry) = entries.next_entry().await? { + let src_path = entry.path(); + let dst_path = dst.join(entry.file_name()); + + if entry.file_type().await?.is_dir() { + copy_dir_recursive(&src_path, &dst_path).await?; + } else { + tokio::fs::copy(&src_path, &dst_path).await?; + } + } + + Ok(()) + }) + } + + /// Print directory structure for debugging + #[allow(unused)] + fn print_dir_structure<'a>( + path: &'a std::path::Path, + indent: usize, + ) -> Pin> + 'a>> { + Box::pin(async move { + let mut entries = tokio::fs::read_dir(path).await?; + while let Some(entry) = entries.next_entry().await? { + let file_name = entry.file_name(); + let indent_str = " ".repeat(indent); + + if entry.file_type().await?.is_dir() { + tracing::debug!( + "{}{}/", + indent_str, + file_name.to_string_lossy() + ); + print_dir_structure(&entry.path(), indent + 1).await?; + } else { + tracing::debug!("{}{}", indent_str, file_name.to_string_lossy()); + } + } + Ok(()) + }) } - #[cfg(target_os = "linux")] #[tokio::test] #[serial_test::serial] async fn test_ssh1() -> anyhow::Result<()> { + initialize(); test_ssh_inner(TestOption { password: true, rsa: false, @@ -449,11 +520,10 @@ mod tests { .await } - #[cfg(target_os = "linux")] #[tokio::test] #[serial_test::serial] - #[ignore = "this does pass locally, but not in CI. TODO: #720"] async fn test_ssh2() -> anyhow::Result<()> { + initialize(); test_ssh_inner(TestOption { password: false, rsa: true, @@ -462,11 +532,10 @@ mod tests { .await } - #[cfg(target_os = "linux")] #[tokio::test] #[serial_test::serial] - #[ignore = "this does pass locally, but not in CI. TODO: #720"] async fn test_ssh3() -> anyhow::Result<()> { + initialize(); test_ssh_inner(TestOption { password: false, rsa: false, @@ -475,10 +544,10 @@ mod tests { .await } - #[cfg(target_os = "linux")] #[tokio::test] #[serial_test::serial] async fn test_ssh4() -> anyhow::Result<()> { + initialize(); // config wrong host key, expect failure let host_key = Some( vec![ diff --git a/clash-lib/src/proxy/trojan/mod.rs b/clash-lib/src/proxy/trojan/mod.rs index 9f698b3bf..75fcbe755 100644 --- a/clash-lib/src/proxy/trojan/mod.rs +++ b/clash-lib/src/proxy/trojan/mod.rs @@ -217,19 +217,21 @@ mod tests { use std::collections::HashMap; - use crate::proxy::{ - transport, - utils::test_utils::{ - Suite, - config_helper::test_config_base_dir, - consts::*, - docker_runner::{DockerTestRunner, DockerTestRunnerBuilder}, - run_test_suites_and_cleanup, + use super::*; + use crate::{ + proxy::{ + transport, + utils::test_utils::{ + Suite, + config_helper::test_config_base_dir, + consts::*, + docker_runner::{DockerTestRunner, DockerTestRunnerBuilder}, + run_test_suites_and_cleanup, + }, }, + tests::initialize, }; - use super::*; - async fn get_ws_runner() -> anyhow::Result { let test_config_dir = test_config_base_dir(); let trojan_conf = test_config_dir.join("trojan-ws.json"); @@ -266,10 +268,12 @@ mod tests { let tls = transport::TlsClient::new(true, "example.org".to_owned(), None, None); + let container = get_ws_runner().await?; + let opts = HandlerOptions { name: "test-trojan-ws".to_owned(), common_opts: Default::default(), - server: "127.0.0.1".to_owned(), + server: container.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: 10002, password: "example".to_owned(), udp: true, @@ -281,8 +285,7 @@ mod tests { .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; // ignore the udp test - run_test_suites_and_cleanup(handler, get_ws_runner().await?, Suite::all()) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } async fn get_grpc_runner() -> anyhow::Result { @@ -305,6 +308,7 @@ mod tests { #[tokio::test] #[serial_test::serial] async fn test_trojan_grpc() -> anyhow::Result<()> { + initialize(); let transport = transport::GrpcClient::new( "example.org".to_owned(), "example" @@ -319,10 +323,12 @@ mod tests { None, ); + let runner = get_grpc_runner().await?; + let opts = HandlerOptions { name: "test-trojan-grpc".to_owned(), common_opts: Default::default(), - server: "127.0.0.1".to_owned(), + server: runner.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: 10002, password: "example".to_owned(), udp: true, @@ -333,7 +339,6 @@ mod tests { handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup(handler, get_grpc_runner().await?, Suite::all()) - .await + run_test_suites_and_cleanup(handler, runner, Suite::all()).await } } diff --git a/clash-lib/src/proxy/tuic/mod.rs b/clash-lib/src/proxy/tuic/mod.rs index e234ea467..9e45c33c8 100644 --- a/clash-lib/src/proxy/tuic/mod.rs +++ b/clash-lib/src/proxy/tuic/mod.rs @@ -405,10 +405,13 @@ mod tests { const PORT: u16 = 10002; - fn gen_options(skip_cert_verify: bool) -> anyhow::Result { + fn gen_options( + container_ip: Option, + skip_cert_verify: bool, + ) -> anyhow::Result { Ok(HandlerOptions { name: "test-tuic".to_owned(), - server: LOCAL_ADDR.into(), + server: container_ip.unwrap_or(LOCAL_ADDR.to_owned()), port: PORT, common_opts: Default::default(), uuid: "00000000-0000-0000-0000-000000000001".parse()?, @@ -437,32 +440,32 @@ mod tests { #[serial_test::serial] async fn test_tuic_skip_cert_verify() -> anyhow::Result<()> { initialize(); - let opts = gen_options(true)?; + + let container = get_tuic_runner().await?; + let opts = gen_options(container.container_ip(), true)?; let handler = Arc::new(Handler::new(opts)); handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup(handler, get_tuic_runner().await?, Suite::all()) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } #[tokio::test] #[serial_test::serial] async fn test_tuic_cert_verify_expect_fail() -> anyhow::Result<()> { initialize(); - let opts = gen_options(false)?; + + let container = get_tuic_runner().await?; + + let opts = gen_options(container.container_ip(), false)?; let handler = Arc::new(Handler::new(opts)); handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - let res = run_test_suites_and_cleanup( - handler, - get_tuic_runner().await?, - Suite::all(), - ) - .await; + let res = + run_test_suites_and_cleanup(handler, container, Suite::all()).await; assert!(res.is_err()); assert!(res.unwrap_err().to_string().contains( "the cryptographic handshake failed: error 45: invalid peer \ diff --git a/clash-lib/src/proxy/utils/socket_helpers.rs b/clash-lib/src/proxy/utils/socket_helpers.rs index f4cb5a18d..8b51c3b5a 100644 --- a/clash-lib/src/proxy/utils/socket_helpers.rs +++ b/clash-lib/src/proxy/utils/socket_helpers.rs @@ -147,6 +147,23 @@ pub async fn new_udp_socket( trace!(src = ?src, "udp socket bound: {socket:?}"); } (None, None) => { + // On Windows, UDP sockets must be bound to get a valid local_addr + // which is required for some operations (e.g., quinn/QUIC) + #[cfg(target_os = "windows")] + { + let bind_addr = match family { + socket2::Domain::IPV4 => { + "0.0.0.0:0".parse::().unwrap() + } + socket2::Domain::IPV6 => { + "[::]:0".parse::().unwrap() + } + _ => "0.0.0.0:0".parse::().unwrap(), + }; + socket.bind(&socket2::SockAddr::from(bind_addr))?; + trace!(addr = ?bind_addr, "udp socket bound to default address on Windows: {socket:?}"); + } + #[cfg(not(target_os = "windows"))] trace!("udp socket not bound to any specific address: {socket:?}"); } } diff --git a/clash-lib/src/proxy/utils/test_utils/docker_utils/consts.rs b/clash-lib/src/proxy/utils/test_utils/docker_utils/consts.rs index 9a3c589c8..28e0742db 100644 --- a/clash-lib/src/proxy/utils/test_utils/docker_utils/consts.rs +++ b/clash-lib/src/proxy/utils/test_utils/docker_utils/consts.rs @@ -1,5 +1,6 @@ pub const LOCAL_ADDR: &str = "127.0.0.1"; +#[allow(dead_code)] pub const IMAGE_WG: &str = "lscr.io/linuxserver/wireguard:1.0.20210914-legacy"; // image with v2ray-plugin pre-installed #[cfg(feature = "shadowsocks")] @@ -12,8 +13,9 @@ pub const IMAGE_TROJAN_GO: &str = "p4gefau1t/trojan-go:latest"; pub const IMAGE_VMESS: &str = "v2fly/v2fly-core:v4.45.2"; pub const IMAGE_VLESS: &str = "v2fly/v2fly-core:v4.45.2"; pub const IMAGE_XRAY: &str = "teddysun/xray:latest"; -pub const IMAGE_SOCKS5: &str = "ghcr.io/wzshiming/socks5/socks5:v0.4.3"; +pub const IMAGE_SOCKS5: &str = "v2fly/v2fly-core:v4.45.2"; #[cfg(feature = "ssh")] +#[allow(unused)] pub const IMAGE_OPENSSH: &str = "docker.io/linuxserver/openssh-server:latest"; pub const IMAGE_HYSTERIA: &str = "tobyxdd/hysteria:latest"; #[cfg(feature = "tuic")] diff --git a/clash-lib/src/proxy/utils/test_utils/docker_utils/docker_runner.rs b/clash-lib/src/proxy/utils/test_utils/docker_utils/docker_runner.rs index fea911a7b..f7af5dc9c 100644 --- a/clash-lib/src/proxy/utils/test_utils/docker_utils/docker_runner.rs +++ b/clash-lib/src/proxy/utils/test_utils/docker_utils/docker_runner.rs @@ -1,53 +1,284 @@ -use std::collections::HashMap; +use std::{collections::HashMap, path::Path}; +use anyhow; use bollard::{ - Docker, + API_DEFAULT_VERSION, Docker, body_full, + config::ContainerInspectResponse, models::ContainerCreateBody, - query_parameters::{CreateImageOptions, LogsOptions}, + query_parameters::{ + CreateContainerOptions, CreateImageOptions, CreateImageOptionsBuilder, + LogsOptions, RemoveContainerOptions, StartContainerOptions, + UploadToContainerOptions, + }, secret::{HostConfig, Mount, PortBinding}, }; - -use bollard::query_parameters::{ - CreateContainerOptions, CreateImageOptionsBuilder, RemoveContainerOptions, - StartContainerOptions, -}; -use futures::{Future, TryStreamExt}; +use bytes::Bytes; +use futures::{Future, StreamExt, TryStreamExt}; +use tar; const TIMEOUT_DURATION: u64 = 30; +/// Creates a tar archive from a source path with the given target path. +/// This is a blocking operation and should be called from `spawn_blocking`. +fn create_tar_archive(source: &str, target: &str) -> anyhow::Result> { + let mut ar = tar::Builder::new(Vec::new()); + + // Remove leading slash for tar path + let tar_path = if target.starts_with('/') { + &target[1..] + } else { + target + }; + + let source_path = Path::new(source); + let metadata = std::fs::metadata(source_path)?; + + if metadata.is_file() { + // Handle single file + let content = std::fs::read(source_path)?; + let mut header = tar::Header::new_gnu(); + header.set_size(content.len() as u64); + header.set_mode(0o644); + ar.append_data(&mut header, tar_path, &content[..])?; + } else if metadata.is_dir() { + // Handle directory recursively + ar.append_dir_all(tar_path, source_path)?; + } else { + anyhow::bail!("Unsupported file type for source: {}", source); + } + + let tar_data = ar.into_inner()?; + + // Debug: Print all files in the tar archive + tracing::trace!( + "=== TAR Archive Contents for mount {} -> {} ===", + source, + target + ); + let mut archive = tar::Archive::new(&tar_data[..]); + for (idx, entry) in archive.entries()?.enumerate() { + match entry { + Ok(e) => { + let path = e.path().ok(); + let size = e.header().size().ok(); + tracing::trace!(" [{}] {:?} (size: {:?})", idx, path, size); + } + Err(e) => { + tracing::warn!(" [{}] Error reading entry: {}", idx, e); + } + } + } + tracing::trace!("=== End TAR Archive Contents ==="); + + Ok(tar_data) +} + pub struct DockerTestRunner { instance: Docker, id: String, + inspect: ContainerInspectResponse, } impl DockerTestRunner { pub async fn try_new( image_conf: Option, - container_conf: ContainerCreateBody, + mut container_conf: ContainerCreateBody, ) -> anyhow::Result { - let docker: Docker = Docker::connect_with_socket_defaults()?; + let docker: Docker = if let Some(url) = std::env::var("DOCKER_HOST").ok() { + if url.starts_with("http://") + || url.starts_with("https://") + || url.starts_with("tcp://") + { + Docker::connect_with_http(&url, 60, API_DEFAULT_VERSION)? + } else if url.starts_with("unix://") || url.starts_with("npipe://") { + Docker::connect_with_socket(&url, 60, API_DEFAULT_VERSION)? + } else { + anyhow::bail!("invalid DOCKER_HOST url: {}", url); + } + } else { + Docker::connect_with_socket_defaults()? + }; docker .create_image(image_conf, None, None) .try_collect::>() .await?; - let id = docker + // For remote Docker, we need to handle mounts differently + let mounts = container_conf + .host_config + .as_mut() + .and_then(|hc| hc.mounts.take()); + let files_to_copy = if std::env::var("DOCKER_HOST") + .ok() + .map(|url| { + url.starts_with("http://") + || url.starts_with("https://") + || url.starts_with("tcp://") + }) + .unwrap_or(false) + { + // Remote Docker - collect files to copy via API + mounts + } else { + // Local Docker - keep mounts in config + if let Some(mounts) = mounts { + container_conf.host_config.as_mut().unwrap().mounts = Some(mounts); + } + None + }; + + let container = docker .create_container( Some(CreateContainerOptions::default()), container_conf, ) - .await? - .id; - docker - .start_container(&id, Some(StartContainerOptions::default())) .await?; + let id = container.id; + + // Copy files to container if needed (for remote Docker) + if let Some(mounts) = files_to_copy { + for mount in mounts { + if let (Some(source), Some(target)) = + (mount.source.as_deref(), mount.target.as_deref()) + { + // Create tar archive in blocking context + let source = source.to_string(); + let target = target.to_string(); + let tar_data = tokio::task::spawn_blocking(move || { + create_tar_archive(&source, &target) + }) + .await??; + + // Upload to container root directory + docker + .upload_to_container( + &id, + Some(UploadToContainerOptions { + path: "/".to_string(), + ..Default::default() + }), + body_full(Bytes::from(tar_data)), + ) + .await?; + } + } + } + + // Try to start the container, cleanup if it fails + if let Err(e) = docker + .start_container(&id, Some(StartContainerOptions::default())) + .await + { + // Cleanup the created container before returning error + let _ = docker + .remove_container( + &id, + Some(RemoveContainerOptions { + force: true, + ..Default::default() + }), + ) + .await; + return Err(e.into()); + } + let inspect = docker.inspect_container(&id, None).await?; Ok(Self { instance: docker, id, + inspect, }) } + #[allow(unused)] + pub fn container_ip(&self) -> Option { + self.inspect + .network_settings + .as_ref() + .and_then(|i| i.networks.as_ref()) + .and_then(|b| { + b.values().find_map(|j| { + [ + (&j.gateway, &j.ip_address), + (&j.ipv6_gateway, &j.global_ipv6_address), + ] + .into_iter() + .find(|(gateway, _)| { + gateway.as_ref().map_or(false, |g| !g.is_empty()) + }) + .and_then(|(_, ip)| ip.as_ref()) + .filter(|ip| !ip.is_empty()) + .map(|ip| ip.to_string()) + }) + }) + .inspect(|e| { + tracing::trace!("container_ip: {:?}", e); + }) + } + + #[allow(unused)] + pub fn gateway_ip(&self) -> Option { + self.inspect + .network_settings + .as_ref() + .and_then(|i| i.networks.as_ref()) + .and_then(|b| { + b.values().find_map(|j| { + [(&j.gateway), (&j.ipv6_gateway)] + .into_iter() + .find(|(gateway)| { + gateway.as_ref().map_or(false, |g| !g.is_empty()) + }) + .and_then(|(gateway)| gateway.as_ref()) + .filter(|ip| !ip.is_empty()) + .map(|ip| ip.to_string()) + }) + }) + .inspect(|e| { + tracing::trace!("gateway_ip: {:?}", e); + }) + } + + /// For debugging use + #[allow(unused)] + pub async fn exec_command(&self, cmd: &[&str]) -> anyhow::Result { + use bollard::exec::{CreateExecOptions, StartExecResults}; + + let exec = self + .instance + .create_exec( + &self.id, + CreateExecOptions { + cmd: Some(cmd.iter().map(|s| s.to_string()).collect()), + attach_stdout: Some(true), + attach_stderr: Some(true), + ..Default::default() + }, + ) + .await?; + + let start_result = self.instance.start_exec(&exec.id, None).await?; + + match start_result { + StartExecResults::Attached { mut output, .. } => { + let mut result = String::new(); + while let Some(log) = output.next().await { + match log { + Ok(log_output) => { + result.push_str(&log_output.to_string()); + } + Err(e) => { + tracing::warn!("Error reading exec output: {}", e); + break; + } + } + } + Ok(result) + } + StartExecResults::Detached => Ok(String::new()), + } + } + // you can run the cleanup manually pub async fn cleanup(self) -> anyhow::Result<()> { let logs = self @@ -104,14 +335,25 @@ impl MultiDockerTestRunner { error: {:?}", e ); + // Cleanup all previously added containers before returning error + for runner in std::mem::take(&mut self.runners) { + let _ = runner.cleanup().await; + } Err(e) } } } + + #[allow(unused)] + pub fn add_with_runner(&mut self, runners: DockerTestRunner) { + self.runners.push(runners); + } } #[async_trait::async_trait] pub trait RunAndCleanup { + /// Get the docker gateway IP address. + fn docker_gateway_ip(&self) -> Option; async fn run_and_cleanup( self, f: impl Future> + Send + 'static, @@ -120,6 +362,10 @@ pub trait RunAndCleanup { #[async_trait::async_trait] impl RunAndCleanup for DockerTestRunner { + fn docker_gateway_ip(&self) -> Option { + self.gateway_ip() + } + async fn run_and_cleanup( self, f: impl Future> + Send + 'static, @@ -145,6 +391,10 @@ impl RunAndCleanup for DockerTestRunner { #[async_trait::async_trait] impl RunAndCleanup for MultiDockerTestRunner { + fn docker_gateway_ip(&self) -> Option { + self.runners.iter().find_map(|d| d.gateway_ip()) + } + async fn run_and_cleanup( self, f: impl Future> + Send + 'static, @@ -230,6 +480,7 @@ impl DockerTestRunnerBuilder { self } + #[allow(dead_code)] pub fn env(mut self, env: &[&str]) -> Self { self.env = Some(env.iter().map(|x| x.to_string()).collect()); self @@ -259,6 +510,7 @@ impl DockerTestRunnerBuilder { self } + #[allow(dead_code)] pub fn sysctls(mut self, sysctls: &[(&str, &str)]) -> Self { self.host_config.sysctls = Some( sysctls @@ -270,12 +522,14 @@ impl DockerTestRunnerBuilder { self } + #[allow(dead_code)] pub fn cap_add(mut self, caps: &[&str]) -> Self { self.host_config.cap_add = Some(caps.iter().map(|x| x.to_string()).collect()); self } + #[allow(dead_code)] pub fn net_mode(mut self, mode: &str) -> Self { self.host_config.network_mode = Some(mode.to_string()); self @@ -328,9 +582,8 @@ pub fn get_host_config(port: u16) -> HostConfig { .into_iter() .collect::>(), ), - // we need to use the host mode to enable the benchmark function - #[cfg(not(target_os = "macos"))] - network_mode: Some("host".to_owned()), + // #[cfg(not(target_os = "macos"))] + // network_mode: Some("host".to_owned()), ..Default::default() } } diff --git a/clash-lib/src/proxy/utils/test_utils/docker_utils/mod.rs b/clash-lib/src/proxy/utils/test_utils/docker_utils/mod.rs index 643767980..7c34f637f 100644 --- a/clash-lib/src/proxy/utils/test_utils/docker_utils/mod.rs +++ b/clash-lib/src/proxy/utils/test_utils/docker_utils/mod.rs @@ -13,37 +13,77 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use sysinfo::Networks; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, split}, net::{TcpListener, UdpSocket}, }; -use tracing::info; +use tracing::{debug, info, trace}; pub mod config_helper; pub mod consts; pub mod docker_runner; +fn destination_list(gateway_ip: Option) -> Vec { + let mut destination_list = vec!["host.docker.internal".to_owned()]; + if let Some(ip) = gateway_ip { + debug!("gateway_ip Ip: {}", ip); + destination_list.push(ip); + } + if let Some(ip) = std::env::var("CLIENT_IP").ok() { + debug!("client Ip: {}", &ip); + destination_list.insert(0, ip); + } else { + debug!("CLIENT_IP env not set, "); + let mut networks = Networks::new_with_refreshed_list(); + networks.refresh(true); + + trace!("networks: {:?}", networks); + // 收集所有有流量的网卡的 IPv4 地址 + let mut active_interfaces = networks + .iter() + .filter(|(_, data)| { + data.mac_address().to_string() != "00:00:00:00:00:00" + }) + .collect::>(); + + // 按流量排序:优先按发送流量降序,其次按接收流量降序 + active_interfaces.sort_by(|a, b| { + b.1.total_transmitted() + .cmp(&a.1.total_transmitted()) + .then_with(|| b.1.total_received().cmp(&a.1.total_received())) + }); + for (iface_name, data) in active_interfaces { + trace!("Processing interface: {}, {:#?}", iface_name, data); + + // 获取该网卡的所有 IP 地址 + for ip_network in data.ip_networks() { + let addr = ip_network.addr; + // 只添加 IPv4 地址,排除 loopback + if addr.is_ipv4() && !addr.is_loopback() { + let ip_str = addr.to_string(); + // 跳过已存在的 IP + if !destination_list.contains(&ip_str) { + debug!("Found IPv4 address on {}: {}", iface_name, ip_str); + destination_list.push(ip_str); + } + } + } + } + } + destination_list +} + // TODO: add the throughput metrics pub async fn ping_pong_test( handler: Arc, + gateway_ip: Option, port: u16, ) -> anyhow::Result<()> { // PATH: our proxy handler -> proxy-server(container) -> target local // server(127.0.0.1:port) - let sess = Session { - destination: ( - if cfg!(any(target_os = "linux", target_os = "android")) { - "127.0.0.1".to_owned() - } else { - "host.docker.internal".to_owned() - }, - port, - ) - .try_into() - .unwrap_or_else(|_| panic!("")), - ..Default::default() - }; + let destination_list = destination_list(gateway_ip); let resolver = config_helper::build_dns_resolver().await?; @@ -60,33 +100,49 @@ pub async fn ping_pong_test( let chunk = "world"; let mut buf = vec![0; 5]; - tracing::info!("destination_fn start read"); + info!("destination_fn(tcp) start read"); for _ in 0..100 { read_half.read_exact(&mut buf).await?; assert_eq!(&buf, b"hello"); } - tracing::info!("destination_fn start write"); - + info!("destination_fn(tcp) start write"); for _ in 0..100 { write_half.write_all(chunk.as_bytes()).await?; write_half.flush().await?; } - tracing::info!("destination_fn end"); + info!("destination_fn(tcp) end"); Ok(()) } - + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); let target_local_server_handler = tokio::spawn(async move { + let mut rx = rx; loop { - let (stream, _) = listener.accept().await?; - - tracing::info!( - "Accepted connection from: {}", - stream.peer_addr().unwrap() - ); - destination_fn(stream).await? + tokio::select! { + data = listener.accept() => { + match data { + Ok((stream, _)) => { + info!( + "Accepted connection(tcp) from: {:?}", + stream.peer_addr().ok() + ); + if let Err(e) = destination_fn(stream).await { + info!("Error handling connection(tcp): {}", e); + } + }, + Err(e) => { + info!("Error accepting connection(tcp): {}", e); + continue; + } + } + } + _ = &mut rx => { + info!("target_local_server_handler(tcp) received shutdown signal, exiting..."); + return Ok(()); + } + } } }); @@ -96,28 +152,34 @@ pub async fn ping_pong_test( let chunk = "hello"; let mut buf = vec![0; 5]; - tracing::info!("proxy_fn start write"); + info!("proxy_fn(tcp) start write"); - for _ in 0..100 { + for i in 0..100 { write_half .write_all(chunk.as_bytes()) .await .inspect_err(|x| { - tracing::error!("proxy_fn write error: {x:?}"); + tracing::error!( + "proxy_fn(tcp) write error at iteration {}: {x:?}", + i + ); })?; } write_half.flush().await?; - tracing::info!("proxy_fn start read"); + info!("proxy_fn start(tcp) read"); - for _ in 0..100 { + for i in 0..100 { read_half.read_exact(&mut buf).await.inspect_err(|x| { - tracing::error!("proxy_fn read error: {x:?}"); + tracing::error!( + "proxy_fn(tcp) read error at iteration {}: {x:?}", + i + ); })?; assert_eq!(buf, "world".as_bytes().to_owned()); } - tracing::info!("proxy_fn end"); + info!("proxy_fn(tcp) end"); Ok(()) } @@ -126,72 +188,168 @@ pub async fn ping_pong_test( // give some time for the target local server to start tokio::time::sleep(Duration::from_secs(3)).await; - match handler.connect_stream(&sess, resolver).await { - Ok(stream) => proxy_fn(stream).await, - Err(e) => { - tracing::error!("Failed to proxy connection: {}", e); - Err(anyhow!("Failed to proxy connection: {}", e)) + let mut first_error: Option = None; + + for destination in &destination_list { + tracing::trace!("Attempting TCP connection(tcp) to: {}", destination); + + let dst: SocksAddr = match (destination.clone(), port).try_into() { + Ok(addr) => addr, + Err(e) => { + tracing::error!( + "Failed to parse destination address(tcp): {}", + e + ); + continue; + } + }; + + let sess = Session { + destination: dst.clone(), + ..Default::default() + }; + + let stream = match tokio::time::timeout( + Duration::from_secs(3), + handler.connect_stream(&sess, resolver.clone()), + ) + .await + { + Ok(Ok(stream)) => { + tracing::info!("Successfully connected(tcp) to: {:?}", dst); + stream + } + Ok(Err(e)) => { + tracing::error!( + "Failed to proxy connection(tcp) to {:?}: {}", + dst, + e + ); + if first_error.is_none() { + first_error = Some(e.into()); + } + continue; + } + Err(_) => { + tracing::error!( + "connect_stream timeout (5s) for destination(tcp): {}", + destination + ); + continue; + } + }; + + match tokio::time::timeout(Duration::from_secs(3), proxy_fn(stream)) + .await + { + Ok(Ok(())) => { + tracing::info!( + "proxy_fn succeeded for destination(tcp): {}", + destination + ); + return Ok(()); + } + Ok(Err(e)) => { + tracing::error!( + "proxy_fn failed for destination(tcp) {}: {}", + destination, + e + ); + continue; + } + Err(_) => { + tracing::error!( + "proxy_fn timeout (3s) for destination(tcp): {}", + destination + ); + continue; + } } } + + // Return the first connection error if available, otherwise return generic + // error + if let Some(err) = first_error { + Err(err) + } else { + Err(anyhow!( + "all destination test error(tcp): [{:?}]", + destination_list + )) + } }); let futs = vec![proxy_task, target_local_server_handler]; - select_all(futs).await.0? + let res = select_all(futs).await.0?; + tx.send(()).ok(); // signal the target local server to shutdown + res } pub async fn ping_pong_udp_test( handler: Arc, + gateway_ip: Option, port: u16, ) -> anyhow::Result<()> { // PATH: our proxy handler -> proxy-server(container) -> target local // server(127.0.0.1:port) - let src = ("127.0.0.1".to_owned(), 10005) - .try_into() - .unwrap_or_else(|_| panic!("")); - let dst: SocksAddr = ( - if cfg!(any(target_os = "linux", target_os = "android")) { - "127.0.0.1".to_owned() - } else { - "host.docker.internal".to_owned() - }, - port, - ) - .try_into() - .unwrap_or_else(|_| panic!("")); - - let sess = Session { - destination: dst.clone(), - ..Default::default() - }; + let destination_list = destination_list(gateway_ip); let resolver = config_helper::build_dns_resolver().await?; let listener = UdpSocket::bind(format!("0.0.0.0:{}", port).as_str()).await?; info!("target local server started at: {}", listener.local_addr()?); - async fn destination_fn(listener: UdpSocket) -> anyhow::Result<()> { + async fn destination_fn( + mut rx: tokio::sync::oneshot::Receiver<()>, + listener: UdpSocket, + ) -> anyhow::Result<()> { // Use inbound_stream here let chunk = "world"; let mut buf = vec![0; 5]; + info!( + "destination_fn(udp) waiting for data on {}", + listener.local_addr()? + ); tracing::trace!("destination_fn start read"); - let (_, src) = listener.recv_from(&mut buf).await?; - assert_eq!(&buf, b"hello"); - - tracing::trace!("destination_fn start write"); - - listener.send_to(chunk.as_bytes(), src).await?; - - tracing::trace!("destination_fn end"); - Ok(()) + loop { + tokio::select! { + data = listener.recv_from(&mut buf) => { + match data { + Ok((len, src) ) => { + info!( + "destination_fn(udp) received {} bytes from {}: {:?}", + len, + src, + &buf[..len] + ); + assert_eq!(&buf, b"hello"); + info!("destination_fn(udp) sending response to {}", src); + tracing::trace!("destination_fn start write"); + let sent = listener.send_to(chunk.as_bytes(), src).await?; + info!("destination_fn(udp) sent {} bytes", sent); + tracing::trace!("destination_fn end"); + }, + Err(e) => { + info!("Error accepting connection(tcp): {}", e); + continue; + } + } + } + _ = &mut rx => { + info!("target_local_server_handler(tcp) received shutdown signal, exiting..."); + return Ok(()); + } + } + } } - + let (tx, rx) = tokio::sync::oneshot::channel::<()>(); let target_local_server_handler: tokio::task::JoinHandle< Result<(), anyhow::Error>, - > = tokio::spawn(async move { destination_fn(listener).await }); + > = tokio::spawn(async move { destination_fn(rx, listener).await }); async fn proxy_fn( mut datagram: BoxedChainedDatagram, @@ -199,42 +357,121 @@ pub async fn ping_pong_udp_test( dst_addr: SocksAddr, ) -> anyhow::Result<()> { // let (mut sink, mut stream) = datagram.split(); - let packet = UdpPacket::new(b"hello".to_vec(), src_addr, dst_addr); + let packet = + UdpPacket::new(b"hello".to_vec(), src_addr.clone(), dst_addr.clone()); - tracing::trace!("proxy_fn start write"); + info!( + "proxy_fn(udp) sending packet: src={:?}, dst={:?}, data={:?}", + src_addr, dst_addr, b"hello" + ); + trace!("proxy_fn(udp) start write"); datagram.send(packet.clone()).await.map_err(|x| { - tracing::error!("proxy_fn write error: {}", x); + tracing::error!("proxy_fn(udp) write error: {}", x); anyhow::Error::new(x) })?; - tracing::trace!("proxy_fn start read"); - - let pkt = datagram.next().await; - let pkt = pkt.ok_or_else(|| anyhow!("no packet received"))?; - assert_eq!(pkt.data, b"world"); - - tracing::trace!("proxy_fn end"); - - Ok(()) + info!("proxy_fn(udp) packet sent successfully, waiting for response..."); + trace!("proxy_fn(udp) start read"); + + let pkt = + tokio::time::timeout(Duration::from_secs(5), datagram.next()).await; + + match pkt { + Ok(Some(pkt)) => { + tracing::info!( + "proxy_fn(udp) received response: {} bytes, data={:?}", + pkt.data.len(), + pkt.data + ); + assert_eq!(pkt.data, b"world"); + tracing::trace!("proxy_fn(udp) end"); + Ok(()) + } + Ok(None) => { + tracing::error!( + "proxy_fn(udp) datagram stream closed without response" + ); + Err(anyhow!("datagram stream closed")) + } + Err(_) => { + tracing::error!("proxy_fn(udp) timeout waiting for response (5s)"); + Err(anyhow!("timeout waiting for UDP response")) + } + } } let proxy_task = tokio::spawn(async move { // give some time for the target local server to start tokio::time::sleep(Duration::from_secs(3)).await; - match handler.connect_datagram(&sess, resolver).await { - Ok(stream) => proxy_fn(stream, src, dst).await, - Err(e) => { - tracing::error!("Failed to proxy connection: {}", e); - Err(anyhow!("Failed to proxy connection: {}", e)) + for destination in &destination_list { + let src = ("127.0.0.1".to_owned(), 10005) + .try_into() + .expect("Failed to parse source address"); + + let dst: SocksAddr = match (destination.clone(), port).try_into() { + Ok(addr) => addr, + Err(e) => { + tracing::error!("Failed to parse destination address: {}", e); + continue; + } + }; + + let sess = Session { + destination: dst.clone(), + ..Default::default() + }; + + let datagram = + match handler.connect_datagram(&sess, resolver.clone()).await { + Ok(datagram) => datagram, + Err(e) => { + tracing::error!("Failed to proxy connection(udp): {}", e); + continue; + } + }; + + match tokio::time::timeout( + Duration::from_secs(3), + proxy_fn(datagram, src, dst), + ) + .await + { + Ok(Ok(())) => { + tracing::info!( + "proxy_fn(udp) succeeded for destination: {}", + destination + ); + return Ok(()); + } + Ok(Err(e)) => { + tracing::error!( + "proxy_fn(udp) failed for destination {}: {}", + destination, + e + ); + continue; + } + Err(_) => { + tracing::error!( + "proxy_fn(udp) timeout (3s) for destination: {}", + destination + ); + continue; + } } } + Err(anyhow!( + "all destination test error(udp): [{:?}]", + destination_list + )) }); let futs = vec![proxy_task, target_local_server_handler]; - - select_all(futs).await.0? + let res = select_all(futs).await.0?; + tx.send(()).ok(); + res } // latency test of the proxy, will reuse the `url_test` ability @@ -243,8 +480,8 @@ pub async fn latency_test( ) -> anyhow::Result<(Duration, Duration)> { let resolver = config_helper::build_dns_resolver().await?; let proxy_manager = ProxyManager::new(resolver.clone(), None); - let mut retries = 3; - let latency = loop { + + for attempt in 1..=3 { match proxy_manager .url_test( handler.clone(), @@ -253,22 +490,27 @@ pub async fn latency_test( ) .await { - Ok(v) => break v, - Err(e) => { - retries -= 1; - if retries == 0 { - return Err(e.into()); - } - tokio::time::sleep(Duration::from_millis(100)).await; + Ok(latency) => return Ok(latency), + Err(_) if attempt < 3 => { + tokio::time::sleep(Duration::from_secs(1)).await; } + Err(e) => return Err(e.into()), } - }; - Ok(latency) + } + unreachable!() } pub async fn dns_test(handler: Arc) -> anyhow::Result<()> { - let src = SocksAddr::Ip("127.0.0.1:0".parse().unwrap()); - let dst = SocksAddr::Ip("1.0.0.1:53".parse().unwrap()); + let src = SocksAddr::Ip( + "127.0.0.1:0" + .parse() + .expect("Failed to parse source address"), + ); + let dst = SocksAddr::Ip( + "1.0.0.1:53" + .parse() + .expect("Failed to parse destination address"), + ); let sess = Session { destination: dst.clone(), @@ -276,35 +518,26 @@ pub async fn dns_test(handler: Arc) -> anyhow::Result<()> { }; let resolver = config_helper::build_dns_resolver().await?; - - // we don't need the resolver, so it doesn't matter to create a casual one let stream = handler.connect_datagram(&sess, resolver).await?; - let (mut sink, mut stream) = stream.split(); - // send dns request to domain + // DNS request for www.google.com A record let dns_req = b"\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01"; - let udp_packet: UdpPacket = UdpPacket::new(dns_req.to_vec(), src, dst); + let udp_packet = UdpPacket::new(dns_req.to_vec(), src, dst); let start_time = Instant::now(); - let max_retry = 3; - for _ in 0..max_retry { + for _ in 0..3 { sink.send(udp_packet.clone()).await?; - let pkt = stream.next().await; - if pkt.is_none() { - continue; + + if let Some(pkt) = stream.next().await { + assert!(!pkt.data.is_empty()); + tracing::debug!("dns test time cost: {:?}", start_time.elapsed()); + return Ok(()); } - let pkt = pkt.unwrap(); - assert!(!pkt.data.is_empty()); - let end_time = Instant::now(); - tracing::debug!( - "dns test time cost:{:?}", - end_time.duration_since(start_time) - ); - return Ok(()); } - bail!("fail to receive dns response"); + + bail!("Failed to receive DNS response after 3 attempts") } #[derive(Clone, Copy)] @@ -338,12 +571,18 @@ pub async fn run_test_suites_and_cleanup( suites: &[Suite], ) -> anyhow::Result<()> { let suites = suites.to_owned(); + let gateway_ip = docker_test_runner.docker_gateway_ip(); docker_test_runner .run_and_cleanup(async move { for suite in suites { match suite { Suite::PingPongTcp => { - let rv = ping_pong_test(handler.clone(), 10001).await; + let rv = ping_pong_test( + handler.clone(), + gateway_ip.clone(), + 10001, + ) + .await; if rv.is_err() { tracing::error!("ping_pong_test failed: {:?}", rv); return rv; @@ -352,7 +591,12 @@ pub async fn run_test_suites_and_cleanup( } } Suite::PingPongUdp => { - let rv = ping_pong_udp_test(handler.clone(), 10001).await; + let rv = ping_pong_udp_test( + handler.clone(), + gateway_ip.clone(), + 10001, + ) + .await; if rv.is_err() { tracing::error!("ping_pong_udp_test failed: {:?}", rv); return rv; diff --git a/clash-lib/src/proxy/vless/mod.rs b/clash-lib/src/proxy/vless/mod.rs index 62e4afe53..b3ab63279 100644 --- a/clash-lib/src/proxy/vless/mod.rs +++ b/clash-lib/src/proxy/vless/mod.rs @@ -232,6 +232,7 @@ mod tests { DockerTestRunnerBuilder::new() .image(IMAGE_VLESS) + .port(8443) .mounts(&[ (conf.to_str().unwrap(), "/etc/v2ray/config.json"), (cert.to_str().unwrap(), "/etc/ssl/v2ray/fullchain.pem"), @@ -258,11 +259,11 @@ mod tests { 0, "".to_owned(), ); - + let runner = get_ws_runner().await?; let opts = HandlerOptions { name: "test-vless-ws".into(), common_opts: Default::default(), - server: LOCAL_ADDR.into(), + server: runner.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: 8443, uuid: "b831381d-6324-4d53-ad4f-8cda48b30811".into(), udp: true, @@ -270,7 +271,7 @@ mod tests { transport: Some(Box::new(ws_client)), }; let handler = Arc::new(Handler::new(opts)); - let runner = get_ws_runner().await?; + run_test_suites_and_cleanup(handler, runner, Suite::all()).await } } diff --git a/clash-lib/src/proxy/vmess/mod.rs b/clash-lib/src/proxy/vmess/mod.rs index 7d27b4222..0ca2c7e12 100644 --- a/clash-lib/src/proxy/vmess/mod.rs +++ b/clash-lib/src/proxy/vmess/mod.rs @@ -267,10 +267,12 @@ mod tests { "".to_owned(), ); + let runner = get_ws_runner().await?; + let opts = HandlerOptions { name: "test-vmess-ws".into(), common_opts: Default::default(), - server: LOCAL_ADDR.into(), + server: runner.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: 10002, uuid: "b831381d-6324-4d53-ad4f-8cda48b30811".into(), alter_id: 0, @@ -280,7 +282,7 @@ mod tests { transport: Some(Box::new(ws_client)), }; let handler = Arc::new(Handler::new(opts)); - let runner = get_ws_runner().await?; + run_test_suites_and_cleanup(handler, runner, Suite::all()).await } @@ -309,10 +311,11 @@ mod tests { "example.org".to_owned(), "example!".to_owned().try_into()?, ); + let container = get_grpc_runner().await?; let opts = HandlerOptions { name: "test-vmess-grpc".into(), common_opts: Default::default(), - server: LOCAL_ADDR.into(), + server: container.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: 10002, uuid: "b831381d-6324-4d53-ad4f-8cda48b30811".into(), alter_id: 0, @@ -322,8 +325,7 @@ mod tests { transport: Some(Box::new(grpc_client)), }; let handler = Arc::new(Handler::new(opts)); - run_test_suites_and_cleanup(handler, get_grpc_runner().await?, Suite::all()) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } async fn get_h2_runner() -> anyhow::Result { @@ -353,10 +355,11 @@ mod tests { http::Method::POST, "/test".to_owned().try_into()?, ); + let container = get_h2_runner().await?; let opts = HandlerOptions { name: "test-vmess-h2".into(), common_opts: Default::default(), - server: LOCAL_ADDR.into(), + server: container.container_ip().unwrap_or(LOCAL_ADDR.to_owned()), port: 10002, uuid: "b831381d-6324-4d53-ad4f-8cda48b30811".into(), alter_id: 0, @@ -369,7 +372,6 @@ mod tests { handler .register_connector(GLOBAL_DIRECT_CONNECTOR.clone()) .await; - run_test_suites_and_cleanup(handler, get_h2_runner().await?, Suite::all()) - .await + run_test_suites_and_cleanup(handler, container, Suite::all()).await } } diff --git a/clash-lib/src/proxy/wg/device.rs b/clash-lib/src/proxy/wg/device.rs index eb6d6fea2..15536e6df 100644 --- a/clash-lib/src/proxy/wg/device.rs +++ b/clash-lib/src/proxy/wg/device.rs @@ -5,7 +5,7 @@ use std::{ time::Duration, }; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{Bytes, BytesMut}; use futures::{SinkExt, StreamExt}; use rand::seq::IndexedRandom; @@ -697,13 +697,38 @@ impl Device for VirtualIpDevice { let next = self.packet_receiver.try_recv().ok(); match next { Some((_proto, data)) => { - let rx_token = RxToken { - buffer: { - let mut buffer = BytesMut::new(); - buffer.put(data); - buffer - }, - }; + // Convert to mutable buffer for potential checksum fix + let mut buffer = BytesMut::from(&data[..]); + + // Fix UDP checksum if needed + // Some environments (NAT, checksum offload, virtualization) may + // corrupt the checksum We recalculate it here since + // WireGuard AEAD already guarantees data integrity + // Note: An alternative approach is to skip RX checksum verification + // by setting `caps.checksum.udp = + // smoltcp::phy::Checksum::Tx` in capabilities(), but + // recalculating feels cleaner than disabling verification entirely + use smoltcp::wire::*; + if let Ok(IpVersion::Ipv4) = IpVersion::of_packet(&buffer) + && let Ok(ipv4) = Ipv4Packet::new_checked(&buffer[..]) + && ipv4.next_header() == IpProtocol::Udp + { + let src_addr = ipv4.src_addr(); + let dst_addr = ipv4.dst_addr(); + let ip_header_len = ipv4.header_len() as usize; + + // Recalculate UDP checksum + if let Ok(mut udp) = + UdpPacket::new_checked(&mut buffer[ip_header_len..]) + { + udp.fill_checksum( + &IpAddress::Ipv4(src_addr), + &IpAddress::Ipv4(dst_addr), + ); + } + } + + let rx_token = RxToken { buffer }; let tx_token = TxToken { sender: self.packet_sender.clone(), }; diff --git a/clash-lib/src/proxy/wg/mod.rs b/clash-lib/src/proxy/wg/mod.rs index 37a80fcd6..a263042c7 100644 --- a/clash-lib/src/proxy/wg/mod.rs +++ b/clash-lib/src/proxy/wg/mod.rs @@ -317,12 +317,13 @@ mod tests { }, }; - use super::super::utils::test_utils::{ - consts::*, docker_runner::DockerTestRunner, + use super::{ + super::utils::test_utils::{consts::*, docker_runner::DockerTestRunner}, + *, + }; + use crate::{ + proxy::utils::test_utils::run_test_suites_and_cleanup, tests::initialize, }; - use crate::proxy::utils::test_utils::run_test_suites_and_cleanup; - - use super::*; // see: https://github.com/linuxserver/docker-wireguard?tab=readme-ov-file#usage // we shouldn't run the wireguard server with host mode, or @@ -356,10 +357,14 @@ mod tests { #[tokio::test] #[serial_test::serial] async fn test_wg() -> anyhow::Result<()> { + initialize(); + + let runner = get_runner().await?; + let opts = HandlerOptions { name: "wg".to_owned(), common_opts: Default::default(), - server: "127.0.0.1".to_owned(), + server: runner.container_ip().unwrap_or("127.0.0.1".to_owned()), port: 10002, ip: Ipv4Addr::new(10, 13, 13, 2), ipv6: None, @@ -384,7 +389,7 @@ mod tests { // on bridge network mode and the `net.ipv4.conf.all. // src_valid_mark` is not supported in the host network mode the // latency test should be enough - let runner = get_runner().await?; + // FIXME: wait for the startup of the test runner in a more elegant way tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; run_test_suites_and_cleanup( diff --git a/clash-lib/tests/api_tests.rs b/clash-lib/tests/api_tests.rs index 17e9b31aa..4d6d3a542 100644 --- a/clash-lib/tests/api_tests.rs +++ b/clash-lib/tests/api_tests.rs @@ -1,4 +1,4 @@ -use crate::common::{send_http_request, start_clash, wait_port_ready}; +use crate::common::{ClashInstance, send_http_request}; use bytes::{Buf, Bytes}; use clash_lib::{Config, Options}; use http_body_util::BodyExt; @@ -6,6 +6,7 @@ use std::{path::PathBuf, time::Duration}; mod common; +#[cfg(feature = "shadowsocks")] #[tokio::test(flavor = "current_thread")] #[serial_test::serial] async fn test_get_set_allow_lan() { @@ -18,17 +19,17 @@ async fn test_get_set_allow_lan() { config_path.to_string_lossy() ); - std::thread::spawn(move || { - start_clash(Options { + // Start Clash instance with RAII guard - will auto-cleanup on drop + let _clash = ClashInstance::start( + Options { config: Config::File(config_path.to_string_lossy().to_string()), cwd: Some(wd.to_string_lossy().to_string()), rt: None, log_file: None, - }) - .expect("Failed to start clash"); - }); - - wait_port_ready(9090).expect("Clash server is not ready"); + }, + vec![9090, 8888, 8889, 8899, 53553, 53554, 53555], + ) + .expect("Failed to start clash"); async fn get_allow_lan() -> bool { let get_configs_url = "http://127.0.0.1:9090/configs"; @@ -81,8 +82,11 @@ async fn test_get_set_allow_lan() { !get_allow_lan().await, "'allow_lan' should be false after update" ); + + // _clash will be dropped here, automatically cleaning up } +#[cfg(feature = "shadowsocks")] #[tokio::test(flavor = "current_thread")] #[serial_test::serial] async fn test_connections_returns_proxy_chain_names() { @@ -104,51 +108,57 @@ async fn test_connections_returns_proxy_chain_names() { client_config.to_string_lossy() ); - std::thread::spawn(move || { - start_clash(Options { + // Start server instance with RAII guard + let _server = ClashInstance::start( + Options { config: Config::File(server_config.to_string_lossy().to_string()), cwd: Some(wd_server.to_string_lossy().to_string()), rt: None, log_file: None, - }) - .expect("Failed to start server"); - }); - - std::thread::spawn(move || { - start_clash(Options { + }, + vec![9091, 8901], + ) + .expect("Failed to start server"); + + // Start client instance with RAII guard + let _client = ClashInstance::start( + Options { config: Config::File(client_config.to_string_lossy().to_string()), cwd: Some(wd_client.to_string_lossy().to_string()), rt: None, log_file: None, - }) - .expect("Failed to start client"); - }); - - wait_port_ready(8899).expect("Proxy port is not ready"); - - std::thread::spawn(move || { - // NOTE: use curl here for easy socks5h testing - let curl_args = vec![ - "-s", - "-x", - "socks5h://127.0.0.1:8899", - "https://httpbin.yba.dev/drip?duration=100&delay=1&numbytes=1000", - ]; - - let output = std::process::Command::new("curl") - .args(curl_args) - .output() - .expect("Failed to execute curl command"); + }, + vec![9090, 8888, 8889, 8899, 53553, 53554, 53555], + ) + .expect("Failed to start client"); + + let request_handle = tokio::spawn(async { + let proxy = reqwest::Proxy::all("socks5h://127.0.0.1:8899") + .expect("Failed to create proxy"); + + let client = reqwest::Client::builder() + .proxy(proxy) + .timeout(Duration::from_secs(30)) + .build() + .expect("Failed to build reqwest client"); + + let response = client + .get("https://httpbin.yba.dev/drip?duration=2&delay=1&numbytes=500") + .send() + .await + .expect("Failed to send request through proxy"); assert!( - output.status.success(), - "Curl command failed with output: {}, stderr: {}", - String::from_utf8_lossy(&output.stdout), - String::from_utf8_lossy(&output.stderr) + response.status().is_success(), + "Request failed with status: {}", + response.status() ); }); - tokio::time::sleep(Duration::from_secs(1)).await; + // Yield to allow the spawned task to start, then wait for connection to + // establish + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(1500)).await; let connections_url = "http://127.0.0.1:9090/connections"; @@ -189,4 +199,11 @@ async fn test_connections_returns_proxy_chain_names() { &["DIRECT", "url-test", "test 🌏"], "Chains do not match expected values" ); + + // Ensure the request task completed successfully + request_handle + .await + .expect("Request task panicked or failed"); + + // Both _server and _client will be dropped here, automatically cleaning up } diff --git a/clash-lib/tests/common/mod.rs b/clash-lib/tests/common/mod.rs index 72e2aceba..470b3c909 100644 --- a/clash-lib/tests/common/mod.rs +++ b/clash-lib/tests/common/mod.rs @@ -25,6 +25,69 @@ pub fn wait_port_ready(port: u16) -> Result<(), clash_lib::Error> { ))) } +#[allow(dead_code)] +fn wait_port_closed(port: u16) -> Result<(), clash_lib::Error> { + let addr = format!("127.0.0.1:{}", port); + let mut attempts = 0; + while attempts < 30 { + if TcpStream::connect(&addr).is_err() { + return Ok(()); + } + attempts += 1; + std::thread::sleep(std::time::Duration::from_millis(500)); + } + Err(clash_lib::Error::Io(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("Port {} is still open after 15 seconds", port), + ))) +} + +/// RAII guard for Clash instance that ensures proper cleanup +#[allow(dead_code)] +pub struct ClashInstance { + ports: Vec, +} + +impl ClashInstance { + #[allow(dead_code)] + + pub fn start( + options: clash_lib::Options, + ports: Vec, + ) -> Result { + std::thread::spawn(move || { + start_clash(options).expect("Failed to start clash"); + }); + + // Wait for the main port (usually API port) to be ready + if let Some(&main_port) = ports.first() { + wait_port_ready(main_port)?; + } + + Ok(Self { ports }) + } +} + +impl Drop for ClashInstance { + fn drop(&mut self) { + // Trigger shutdown + clash_lib::shutdown(); + + // Wait for all ports to be released + for &port in &self.ports { + if let Err(e) = wait_port_closed(port) { + eprintln!( + "Warning: Failed to wait for port {} to close: {}", + port, e + ); + } + } + + // Give a bit more time for full cleanup + std::thread::sleep(std::time::Duration::from_millis(500)); + } +} + /// Sends an HTTP request to the specified URL using a TCP connection. /// Don't use any domain name in the URL, which will trigger DNS resolution. /// And libnss_files will likely cause a coredump(in static crt build). diff --git a/clash-lib/tests/data/config/client/rules.yaml b/clash-lib/tests/data/config/client/rules.yaml index 91ef2e4f3..af550c653 100644 --- a/clash-lib/tests/data/config/client/rules.yaml +++ b/clash-lib/tests/data/config/client/rules.yaml @@ -14,17 +14,17 @@ dns: tcp: 127.0.0.1:53553 dot: addr: 127.0.0.1:53554 - ca-cert: dns.crt - ca-key: dns.key + ca-cert: ../../../../../clash-bin/tests/data/config/dns.crt + ca-key: ../../../../../clash-bin/tests/data/config/dns.key doh: addr: 127.0.0.1:53555 - ca-cert: dns.crt - ca-key: dns.key + ca-cert: ../../../../../clash-bin/tests/data/config/dns.crt + ca-key: ../../../../../clash-bin/tests/data/config/dns.key hostname: dns.example.com doh3: addr: 127.0.0.1:53555 - ca-cert: dns.crt - ca-key: dns.key + ca-cert: ../../../../../clash-bin/tests/data/config/dns.crt + ca-key: ../../../../../clash-bin/tests/data/config/dns.key hostname: dns.example.com # ipv6: false # when the false, response to AAAA questions will be empty diff --git a/clash-lib/tests/smoke_tests.rs b/clash-lib/tests/smoke_tests.rs index 53049089f..c18a5f078 100644 --- a/clash-lib/tests/smoke_tests.rs +++ b/clash-lib/tests/smoke_tests.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; mod common; +#[cfg(feature = "shadowsocks")] #[tokio::test(flavor = "current_thread")] #[serial_test::serial] /// Test Shadowsocks inbound and outbound functionality @@ -52,49 +53,57 @@ async fn smoke_test() { then.status(200).body("Mock response for testing"); }); - let curl_cmd = format!("curl -s {}", mock_server.url("/")); - let output = tokio::process::Command::new("sh") - .arg("-c") - .arg(curl_cmd) - .output() + // 使用 reqwest 客户端发送请求 + let client = reqwest::Client::new(); + + let response = client + .get(mock_server.url("/")) + .send() .await - .expect("Failed to execute curl command"); + .expect("Failed to execute HTTP request"); assert!( - output.status.success(), - "Curl command failed with output: {}", - String::from_utf8_lossy(&output.stderr) + response.status().is_success(), + "HTTP request failed with status: {}", + response.status() ); + + let body_str = response.text().await.expect("Failed to read response body"); + assert_eq!(mock.calls(), 1, "Mock server was not hit exactly once"); assert_eq!( - String::from_utf8_lossy(&output.stdout), - "Mock response for testing", + body_str, "Mock response for testing", "Unexpected response from mock server" ); wait_port_ready(8899).expect("Proxy port is not ready"); - let curl_cmd = format!( - "curl -s -x socks5h://127.0.0.1:8899 {}", - mock_server.url("/") - ); - let output = tokio::process::Command::new("sh") - .arg("-c") - .arg(curl_cmd) - .output() + // 使用 reqwest 通过 SOCKS5 代理发送请求 + let proxy = reqwest::Proxy::all("socks5://127.0.0.1:8899") + .expect("Failed to create proxy"); + + let client = reqwest::Client::builder() + .proxy(proxy) + .build() + .expect("Failed to build client with proxy"); + + let response = client + .get(mock_server.url("/")) + .send() .await - .expect("Failed to execute curl command"); + .expect("Failed to send request through proxy"); assert!( - output.status.success(), - "Curl command failed with output: {}", - String::from_utf8_lossy(&output.stderr) + response.status().is_success(), + "HTTP request through proxy failed with status: {}", + response.status() ); + let body_str = response.text().await.expect("Failed to read response body"); + assert_eq!(mock.calls(), 2, "Mock server was not hit exactly twice"); assert_eq!( - String::from_utf8_lossy(&output.stdout), - "Mock response for testing", + body_str, "Mock response for testing", "Unexpected response from mock server" ); }