From f624685d06ca16ce3c1db9d291a543b2dd11875a Mon Sep 17 00:00:00 2001 From: dev0 Date: Fri, 24 Apr 2026 16:52:27 -0700 Subject: [PATCH 1/2] some ss and obfs fixes --- clash-bin/src/main.rs | 22 +- .../src/app/dispatcher/dispatcher_impl.rs | 10 +- clash-lib/src/proxy/datagram.rs | 13 +- .../proxy/shadowsocks/outbound/datagram.rs | 15 +- .../src/proxy/shadowsocks/outbound/mod.rs | 9 +- .../src/proxy/transport/simple_obfs/tls.rs | 290 ++++++++++-------- 6 files changed, 203 insertions(+), 156 deletions(-) diff --git a/clash-bin/src/main.rs b/clash-bin/src/main.rs index 51e5fd5fe..866a97e63 100644 --- a/clash-bin/src/main.rs +++ b/clash-bin/src/main.rs @@ -184,7 +184,15 @@ fn main() -> anyhow::Result<()> { recommended to enable this if you are using clash verge." ); if let Some(dir) = &cli.directory { - std::env::set_current_dir(dir)?; + // Canonicalize to an absolute path before changing the process cwd. + // If `dir` is relative (e.g. `./clash-bin/tests/data/config`), + // calling set_current_dir then passing the same relative string as + // `cwd` to start_scaffold would cause paths like + // `cwd.join("Country.mmdb")` to be resolved from the *new* process + // cwd, doubling the directory segments and producing a path that + // doesn't exist (os error 2). + let abs = std::fs::canonicalize(dir)?; + std::env::set_current_dir(&abs)?; } if config.general.mmdb.is_none() { config.general.mmdb = Some("Country.mmdb".to_string()); @@ -194,9 +202,19 @@ fn main() -> anyhow::Result<()> { } } + // When compatibility mode called set_current_dir the process cwd is + // already correct; pass None so start_scaffold uses "." (= the new cwd) + // rather than the original relative cli.directory which would be resolved + // from the wrong base. + let cwd = if cli.compatibility && cli.directory.is_some() { + None + } else { + cli.directory.map(|x| x.to_string_lossy().to_string()) + }; + clash::start_scaffold(clash::Options { config: clash::Config::Internal(config), - cwd: cli.directory.map(|x| x.to_string_lossy().to_string()), + cwd, rt: Some(TokioRuntime::MultiThread), log_file: cli.log_file, }) diff --git a/clash-lib/src/app/dispatcher/dispatcher_impl.rs b/clash-lib/src/app/dispatcher/dispatcher_impl.rs index d00bd1836..55b2bc52d 100644 --- a/clash-lib/src/app/dispatcher/dispatcher_impl.rs +++ b/clash-lib/src/app/dispatcher/dispatcher_impl.rs @@ -30,7 +30,11 @@ use crate::app::dns::ThreadSafeDNSResolver; use super::statistics_manager::Manager; -const DEFAULT_BUFFER_SIZE: usize = 16 * 1024; +// SS2022 (AEAD-2022) MAX_PACKET_SIZE is 0xFFFF (65535 bytes). Using a relay +// buffer smaller than that forces the cipher to split every full packet into +// multiple smaller encrypted chunks, multiplying encrypt/decrypt overhead. +// Classic AEAD ciphers cap at 0x3FFF (16383 bytes) so they are unaffected. +const DEFAULT_BUFFER_SIZE: usize = 64 * 1024; pub struct Dispatcher { outbound_manager: ThreadSafeOutboundManager, @@ -251,7 +255,7 @@ impl Dispatcher { */ let (mut local_w, mut local_r) = udp_inbound.split(); let (remote_receiver_w, mut remote_receiver_r) = - tokio::sync::mpsc::channel(32); + tokio::sync::mpsc::channel(256); let s = sess.clone(); let ss = sess.clone(); @@ -363,7 +367,7 @@ impl Dispatcher { let (mut remote_w, mut remote_r) = outbound_datagram.split(); let (remote_sender, mut remote_forwarder) = - tokio::sync::mpsc::channel::(32); + tokio::sync::mpsc::channel::(256); // remote -> local let r_handle = tokio::spawn(async move { diff --git a/clash-lib/src/proxy/datagram.rs b/clash-lib/src/proxy/datagram.rs index a89d74ee1..fa20cbe33 100644 --- a/clash-lib/src/proxy/datagram.rs +++ b/clash-lib/src/proxy/datagram.rs @@ -73,6 +73,9 @@ pub struct OutboundDatagramImpl { resolver: ThreadSafeDNSResolver, flushed: bool, pkt: Option, + // Pre-allocated receive buffer; avoids a 65535-byte heap allocation on + // every poll_next call. + recv_buf: Vec, } impl OutboundDatagramImpl { @@ -82,6 +85,7 @@ impl OutboundDatagramImpl { resolver, flushed: true, pkt: None, + recv_buf: vec![0u8; 65535], } } } @@ -190,9 +194,12 @@ impl Stream for OutboundDatagramImpl { mut self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { - let Self { ref mut inner, .. } = *self; - let mut mem = vec![0u8; 65535]; - let mut buf = ReadBuf::new(&mut mem); + let Self { + ref mut inner, + ref mut recv_buf, + .. + } = *self; + let mut buf = ReadBuf::new(recv_buf.as_mut_slice()); match ready!(inner.poll_recv_from(cx, &mut buf)) { Ok(src) => { let data = buf.filled().to_vec(); diff --git a/clash-lib/src/proxy/shadowsocks/outbound/datagram.rs b/clash-lib/src/proxy/shadowsocks/outbound/datagram.rs index 583588405..42fcfe4fe 100644 --- a/clash-lib/src/proxy/shadowsocks/outbound/datagram.rs +++ b/clash-lib/src/proxy/shadowsocks/outbound/datagram.rs @@ -2,6 +2,7 @@ use std::{ io, net::SocketAddr, pin::Pin, + sync::Mutex, task::{Context, Poll}, }; @@ -200,16 +201,16 @@ where /// Shadowsocks UDP I/O that ProxySocket required pub(crate) struct ShadowsocksUdpIo { - w: tokio::sync::Mutex>, - r: tokio::sync::Mutex<(SplitStream, BytesMut)>, + w: Mutex>, + r: Mutex<(SplitStream, BytesMut)>, } impl ShadowsocksUdpIo { pub fn new(inner: AnyOutboundDatagram) -> Self { let (w, r) = inner.split(); Self { - w: tokio::sync::Mutex::new(w), - r: tokio::sync::Mutex::new((r, BytesMut::new())), + w: Mutex::new(w), + r: Mutex::new((r, BytesMut::new())), } } } @@ -225,7 +226,7 @@ impl DatagramSend for ShadowsocksUdpIo { buf: &[u8], target: std::net::SocketAddr, ) -> Poll> { - let mut w = self.w.try_lock().expect("must acquire"); + let mut w = self.w.lock().unwrap(); match w.start_send_unpin(UdpPacket { data: buf.to_vec(), src_addr: SocksAddr::any_ipv4(), @@ -243,7 +244,7 @@ impl DatagramSend for ShadowsocksUdpIo { } fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { - let mut w = self.w.try_lock().expect("must acquire"); + let mut w = self.w.lock().unwrap(); w.poll_ready_unpin(cx) .map_err(|e| new_io_error(e.to_string())) } @@ -255,7 +256,7 @@ impl DatagramReceive for ShadowsocksUdpIo { cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll> { - let mut g = self.r.try_lock().expect("must acquire"); + let mut g = self.r.lock().unwrap(); let (r, remained) = &mut *g; if !remained.is_empty() { diff --git a/clash-lib/src/proxy/shadowsocks/outbound/mod.rs b/clash-lib/src/proxy/shadowsocks/outbound/mod.rs index 5540d2c10..3c7a9b630 100644 --- a/clash-lib/src/proxy/shadowsocks/outbound/mod.rs +++ b/clash-lib/src/proxy/shadowsocks/outbound/mod.rs @@ -47,7 +47,7 @@ pub struct HandlerOptions { pub struct Handler { opts: HandlerOptions, - + ctx: Arc, connector: tokio::sync::RwLock>>, } @@ -65,6 +65,7 @@ impl Handler { pub fn new(opts: HandlerOptions) -> Self { Self { opts, + ctx: Context::new_shared(ServerType::Local), connector: tokio::sync::RwLock::new(None), } } @@ -80,11 +81,10 @@ impl Handler { None => s, }; - let ctx = Context::new_shared(ServerType::Local); let cfg = self.server_config()?; let stream = ProxyClientStream::from_stream( - ctx, + self.ctx.clone(), stream, &cfg, (sess.destination.host(), sess.destination.port()), @@ -198,7 +198,6 @@ impl OutboundHandler for Handler { resolver: ThreadSafeDNSResolver, connector: &dyn RemoteConnector, ) -> io::Result { - let ctx = Context::new_shared(ServerType::Local); let cfg = self.server_config()?; let socket = connector @@ -214,7 +213,7 @@ impl OutboundHandler for Handler { let socket = ProxySocket::from_socket( UdpSocketType::Client, - ctx, + self.ctx.clone(), &cfg, ShadowsocksUdpIo::new(socket), ); diff --git a/clash-lib/src/proxy/transport/simple_obfs/tls.rs b/clash-lib/src/proxy/transport/simple_obfs/tls.rs index d722db7d9..8788a9df9 100644 --- a/clash-lib/src/proxy/transport/simple_obfs/tls.rs +++ b/clash-lib/src/proxy/transport/simple_obfs/tls.rs @@ -1,21 +1,20 @@ -// a rust implementation of https://github.com/MetaCubeX/Clash.Meta/blob/Alpha/transport/simple-obfs/tls.go +// a rust implementation of https://github.com/shadowsocks/simple-obfs use async_trait::async_trait; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::BufMut; use chrono::Utc; -use futures::pin_mut; use std::{ borrow::Cow, - future::Future, + io, pin::Pin, task::{Context, Poll, ready}, }; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use crate::proxy::{AnyStream, transport::Transport}; -const CHUNK_SIZE: isize = 1 << 14; // 2 ** 14 == 16 * 1024 +const CHUNK_SIZE: usize = 1 << 14; // 16 KiB pub struct Client { server: String, @@ -34,27 +33,34 @@ impl Transport for Client { } } -#[derive(Debug)] +// The TLS obfs read framing works as follows: +// - First server response: skip 105 bytes of TLS handshake preamble +// (ServerHello 96 bytes + ChangeCipherSpec 6 bytes + 3 type/version bytes) +// - Each subsequent record: skip 3 bytes (type + version), then read 2-byte +// length, then deliver `length` bytes of payload. +// +// The read state machine stores its in-progress buffers in the struct so that +// a Poll::Pending return never loses bytes that were already consumed from the +// inner stream. (The previous implementation used `read_exact` with local +// buffers pinned on the stack; dropping the future on Pending lost those bytes, +// desynchronising the frame parser and causing AEAD tag failures.) enum ReadState { - Idle, - Parsing, - Reading(usize), // Length -} - -#[derive(Debug)] -enum WriteState { - Idle, - Writing(usize, usize), // current, total + SkippingHeader(Vec, usize), // (discard buffer, bytes already consumed) + ReadingLength([u8; 2], usize), // (length buffer, bytes already consumed) } pub struct TLSObfs { inner: AnyStream, server: String, + // bytes left to deliver from the current TLS payload record remain: usize, + // write side: current TLS-wrapped chunk being sent to the inner stream first_request: bool, - first_response: bool, + write_buf: Vec, + write_pos: usize, + // src bytes represented by write_buf; > 0 while a chunk is in flight + write_committed: usize, read_state: ReadState, - write_state: WriteState, } impl AsyncWrite for TLSObfs { @@ -62,157 +68,166 @@ impl AsyncWrite for TLSObfs { self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8], - ) -> Poll> { + ) -> Poll> { let this = self.get_mut(); - loop { - match this.write_state { - WriteState::Idle => { - this.write_state = WriteState::Writing(0, buf.len()); - } - WriteState::Writing(current, total) => { - let end = (current + CHUNK_SIZE as usize).min(total); - let chunk = &buf[current..end]; - ready!(writing(Pin::new(this), chunk, cx))?; - if end == total { - this.write_state = WriteState::Idle; - return Poll::Ready(Ok(total)); - } else { - this.write_state = WriteState::Writing(end, total); - } - } - } + + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + + // If a previous chunk is still being sent, finish draining it first. + // Returning Ok(write_committed) signals to the caller how many source + // bytes that chunk covered; the caller then advances its window. + if this.write_committed > 0 { + ready!(drain_write_buf(this, cx))?; + let committed = this.write_committed; + this.write_committed = 0; + return Poll::Ready(Ok(committed)); } + + // Wrap the next chunk (at most CHUNK_SIZE source bytes) into a TLS + // Application Data record (or ClientHello for the very first write). + let end = CHUNK_SIZE.min(buf.len()); + let chunk = &buf[..end]; + this.write_buf = if this.first_request { + this.first_request = false; + make_client_hello_msg(chunk, &this.server).into_owned() + } else { + let mut v = Vec::with_capacity(5 + chunk.len()); + v.extend_from_slice(&[0x17, 0x03, 0x03]); + v.push((chunk.len() >> 8) as u8); + v.push((chunk.len() & 0xff) as u8); + v.extend_from_slice(chunk); + v + }; + this.write_pos = 0; + this.write_committed = end; + + ready!(drain_write_buf(this, cx))?; + this.write_committed = 0; + Poll::Ready(Ok(end)) } fn poll_flush( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.get_mut(); + ready!(drain_write_buf(this, cx))?; + this.write_committed = 0; Pin::new(&mut this.inner).poll_flush(cx) } fn poll_shutdown( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.get_mut(); + ready!(drain_write_buf(this, cx))?; + this.write_committed = 0; Pin::new(&mut this.inner).poll_shutdown(cx) } } +/// Send all remaining bytes of `this.write_buf[this.write_pos..]` to the inner +/// stream, advancing `this.write_pos` as each partial write completes. +fn drain_write_buf( + this: &mut TLSObfs, + cx: &mut Context<'_>, +) -> Poll> { + while this.write_pos < this.write_buf.len() { + let n = ready!( + Pin::new(&mut this.inner) + .poll_write(cx, &this.write_buf[this.write_pos..]) + )?; + if n == 0 { + return Poll::Ready(Err(io::Error::from(io::ErrorKind::WriteZero))); + } + this.write_pos += n; + } + Poll::Ready(Ok(())) +} + impl AsyncRead for TLSObfs { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.get_mut(); let mut inner = Pin::new(&mut this.inner); - if this.remain > 0 { - let before = buf.filled().len(); - ready!(inner.as_mut().poll_read(cx, buf))?; - let actually_read = buf.filled().len() - before; - this.remain -= actually_read; - return Poll::Ready(Ok(())); - } - if this.first_response { - // type + ver + lensize + 91 = 96 - // type + ver + lensize + 1 = 6 - // type + ver = 3 - ready!(reading(Pin::new(this), buf, cx, 105))?; - this.first_response = false; - return Poll::Ready(Ok(())); - } - // type + ver = 3 - ready!(reading(Pin::new(this), buf, cx, 3))?; - Poll::Ready(Ok(())) - } -} - -/// Maybe write one chunk, the return value doesn't really matter as it's not -/// necessarily related to the original data buffer length -fn writing( - this: std::pin::Pin<&mut TLSObfs>, - b: &[u8], - cx: &mut Context<'_>, -) -> Poll> { - let this = this.get_mut(); - let inner = Pin::new(&mut this.inner); - if this.first_request { - let hello_msg = make_client_hello_msg(b, &this.server); - match ready!(inner.poll_write(cx, &hello_msg)) { - Ok(_) => { - this.first_request = false; + loop { + // Phase 1: deliver bytes from the current TLS payload record. + if this.remain > 0 { + if buf.remaining() == 0 { + return Poll::Ready(Ok(())); + } + let limit = this.remain.min(buf.remaining()); + let spare = buf.initialize_unfilled_to(limit); + let mut sub = ReadBuf::new(&mut spare[..limit]); + ready!(inner.as_mut().poll_read(cx, &mut sub))?; + let n = sub.filled().len(); + if n == 0 { + return Poll::Ready(Err(io::Error::from( + io::ErrorKind::UnexpectedEof, + ))); + } + buf.advance(n); + this.remain -= n; return Poll::Ready(Ok(())); } - Err(e) => return Poll::Ready(Err(e)), - } - } - let mut buf = Vec::new(); - buf.put_slice(&[0x17, 0x03, 0x03]); - buf.write_u16::(b.len() as u16).unwrap(); - buf.put_slice(b); - inner.poll_write(cx, &buf).map(|_| Ok(())) -} -fn reading( - this: std::pin::Pin<&mut TLSObfs>, - buf: &mut ReadBuf<'_>, - cx: &mut Context<'_>, - discard_n: usize, -) -> Poll> { - let this = this.get_mut(); - let mut inner = Pin::new(&mut this.inner); - - loop { - match this.read_state { - ReadState::Idle => { - // 1. discard n bytes - let mut buffer = vec![0; discard_n]; - let fut = inner.read_exact(&mut buffer); - pin_mut!(fut); - match ready!(fut.poll(cx)) { - Ok(_) => { - this.read_state = ReadState::Parsing; - continue; - } - Err(e) => return Poll::Ready(Err(e)), - } - } - ReadState::Parsing => { - // 2. read 2 bytes as length - let mut buffer = vec![0; 2]; - let fut = inner.read_exact(&mut buffer); - pin_mut!(fut); - match ready!(fut.poll(cx)) { - Ok(_) => { - let length = BigEndian::read_u16(&buffer[..2]) as usize; - this.read_state = ReadState::Reading(length); - continue; + // Phase 2: parse the next TLS record header (type + version bytes) + // and 2-byte payload length, updating `this.remain` when done. + // Both states persist their partial buffers in `this.read_state` so + // that a Poll::Pending return never loses already-consumed bytes. + let to_reading_length = match &mut this.read_state { + ReadState::SkippingHeader(skip_buf, filled) => { + let total = skip_buf.len(); + while *filled < total { + let mut rb = ReadBuf::new(&mut skip_buf[*filled..]); + ready!(inner.as_mut().poll_read(cx, &mut rb))?; + let n = rb.filled().len(); + if n == 0 { + return Poll::Ready(Err(io::Error::from( + io::ErrorKind::UnexpectedEof, + ))); + } + *filled += n; } - Err(e) => return Poll::Ready(Err(e)), + true } - } - ReadState::Reading(length) => { - // 3. read length bytes - let remaining = buf.remaining(); - let len = length.min(remaining); - let mut buffer = vec![0; len]; - let fut = inner.read_exact(&mut buffer); - pin_mut!(fut); - match ready!(fut.poll(cx)) { - Ok(_) => { - buf.put_slice(&buffer); - if length > remaining { - this.remain = length - remaining; + ReadState::ReadingLength(len_buf, filled) => { + while *filled < 2 { + let mut rb = ReadBuf::new(&mut len_buf[*filled..]); + ready!(inner.as_mut().poll_read(cx, &mut rb))?; + let n = rb.filled().len(); + if n == 0 { + return Poll::Ready(Err(io::Error::from( + io::ErrorKind::UnexpectedEof, + ))); } - this.read_state = ReadState::Idle; - return Poll::Ready(Ok(())); + *filled += n; } - Err(e) => return Poll::Ready(Err(e)), + false } + }; + + if to_reading_length { + this.read_state = ReadState::ReadingLength([0u8; 2], 0); + } else { + // Extract length and prepare for the next record header. + let length = match &this.read_state { + ReadState::ReadingLength(len_buf, _) => { + BigEndian::read_u16(len_buf) as usize + } + _ => unreachable!(), + }; + this.remain = length; + // Subsequent records have a 3-byte header (type + version). + this.read_state = ReadState::SkippingHeader(vec![0u8; 3], 0); + // Loop back to Phase 1 to deliver payload bytes immediately. } } } @@ -307,10 +322,13 @@ impl TLSObfs { inner, server, remain: 0, - read_state: ReadState::Idle, - write_state: WriteState::Idle, first_request: true, - first_response: true, + write_buf: Vec::new(), + write_pos: 0, + write_committed: 0, + // First response: skip 105-byte TLS handshake preamble + // (ServerHello 96 B + ChangeCipherSpec 6 B + type/version 3 B). + read_state: ReadState::SkippingHeader(vec![0u8; 105], 0), } } } From 40522dd2635c5a2c5965119892084f73a1a8a1cb Mon Sep 17 00:00:00 2001 From: dev0 Date: Fri, 24 Apr 2026 17:25:03 -0700 Subject: [PATCH 2/2] f --- .../src/proxy/transport/simple_obfs/tls.rs | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/clash-lib/src/proxy/transport/simple_obfs/tls.rs b/clash-lib/src/proxy/transport/simple_obfs/tls.rs index 8788a9df9..9fe2dba4e 100644 --- a/clash-lib/src/proxy/transport/simple_obfs/tls.rs +++ b/clash-lib/src/proxy/transport/simple_obfs/tls.rs @@ -44,9 +44,21 @@ impl Transport for Client { // inner stream. (The previous implementation used `read_exact` with local // buffers pinned on the stack; dropping the future on Pending lost those bytes, // desynchronising the frame parser and causing AEAD tag failures.) + +// Maximum header size to discard: 105 bytes for the first response preamble; +// 3 bytes (type + version) for every subsequent record. +const MAX_SKIP: usize = 105; + enum ReadState { - SkippingHeader(Vec, usize), // (discard buffer, bytes already consumed) - ReadingLength([u8; 2], usize), // (length buffer, bytes already consumed) + // Fixed-size scratch buffer avoids a heap allocation on every record boundary. + // `target` is 105 for the initial handshake preamble, 3 for all subsequent + // records. + SkippingHeader { + buf: [u8; MAX_SKIP], + target: usize, + filled: usize, + }, + ReadingLength([u8; 2], usize), // (length buffer, bytes already consumed) } pub struct TLSObfs { @@ -183,10 +195,13 @@ impl AsyncRead for TLSObfs { // Both states persist their partial buffers in `this.read_state` so // that a Poll::Pending return never loses already-consumed bytes. let to_reading_length = match &mut this.read_state { - ReadState::SkippingHeader(skip_buf, filled) => { - let total = skip_buf.len(); - while *filled < total { - let mut rb = ReadBuf::new(&mut skip_buf[*filled..]); + ReadState::SkippingHeader { + buf: skip_buf, + target, + filled, + } => { + while *filled < *target { + let mut rb = ReadBuf::new(&mut skip_buf[*filled..*target]); ready!(inner.as_mut().poll_read(cx, &mut rb))?; let n = rb.filled().len(); if n == 0 { @@ -226,7 +241,11 @@ impl AsyncRead for TLSObfs { }; this.remain = length; // Subsequent records have a 3-byte header (type + version). - this.read_state = ReadState::SkippingHeader(vec![0u8; 3], 0); + this.read_state = ReadState::SkippingHeader { + buf: [0u8; MAX_SKIP], + target: 3, + filled: 0, + }; // Loop back to Phase 1 to deliver payload bytes immediately. } } @@ -328,7 +347,11 @@ impl TLSObfs { write_committed: 0, // First response: skip 105-byte TLS handshake preamble // (ServerHello 96 B + ChangeCipherSpec 6 B + type/version 3 B). - read_state: ReadState::SkippingHeader(vec![0u8; 105], 0), + read_state: ReadState::SkippingHeader { + buf: [0u8; MAX_SKIP], + target: MAX_SKIP, + filled: 0, + }, } } }