diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index c8f952b14ef..535cacfa810 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -17,5 +17,8 @@ jobs: uses: ./.github/actions/setup-bun with: bun-version: ${{ env.BUN_VERSION }} + - name: Setup Dependencies + run: | + bun install --frozen-lockfile - name: Lint run: bun lint diff --git a/src/http_jsc/websocket_client.rs b/src/http_jsc/websocket_client.rs index c68d890640d..4f911814f16 100644 --- a/src/http_jsc/websocket_client.rs +++ b/src/http_jsc/websocket_client.rs @@ -253,7 +253,7 @@ impl WebSocket { // the destructor's finalize() — does not leak. When reached via // fail(), outgoing_websocket is already None and this is a no-op. if had_tunnel { - this.dispatch_abrupt_close(ErrorCode::Ended); + this.dispatch_abrupt_close(ErrorCode::Ended, None); } } @@ -261,7 +261,14 @@ impl WebSocket { jsc::mark_binding!(); if let Some(ws) = self.outgoing_websocket.take() { log!("fail ({})", <&'static str>::from(code)); - CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code); + // Snapshot the unsent backlog before did_abrupt_close(): the JS + // close event fires synchronously inside it, yet the send buffer is + // not freed until cancel() below, so C++ must be told the amount now + // (it cannot query the connection across this &mut self borrow). + // SAFETY: `self` is a live `&mut Self`; buffered_amount only does + // short-lived raw-ptr field reads. + let buffered = unsafe { Self::buffered_amount(self) }; + CppWebSocket::opaque_ref(ws.as_ptr()).did_abrupt_close(code, buffered); // SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by // the socket/tunnel I/O ref (or by caller's guard). unsafe { Self::deref(self) }; @@ -338,10 +345,15 @@ impl WebSocket { pub fn handle_close(&mut self, _socket: Socket, _code: c_int, _reason: *mut c_void) { log!("onClose"); jsc::mark_binding!(); + // Snapshot the backlog before clear_data() frees it, so the close event + // does not see bufferedAmount reset to 0 (e.g. peer RST with unsent + // frames). SAFETY: `self` is a live `&mut Self`; buffered_amount only + // does short-lived raw-ptr field reads. + let buffered = unsafe { Self::buffered_amount(self) }; self.clear_data(); self.tcp.detach(); - self.dispatch_abrupt_close(ErrorCode::Ended); + self.dispatch_abrupt_close(ErrorCode::Ended, Some(buffered)); // For the socket. // SAFETY: `self: &mut Self` → `*mut Self`; this is the terminal @@ -1290,9 +1302,20 @@ impl WebSocket { true } Err(true) => { - // `terminate → clear_data` resets `send_buffer`; drop the - // taken fifo without restoring. - drop(buf); + // Restore the backlog before terminating: fail() snapshots + // send_buffer.readable_length() for bufferedAmount, so it must + // still be here. `terminate → cancel → clear_data` frees it + // immediately afterward, so this does not leak. + // + // KNOWN GAP (tunnel only): if the tunnel's SslWrapper::write_data + // hits a fatal SSL error it fires on_close → fail() *synchronously + // inside* the write above — before this restore — so that + // bufferedAmount snapshot reads 0. `self.send_buffer` cannot be + // kept populated across the write without either aliasing UB (the + // slice handed to write is borrowed from it) or an extra per-flush + // copy; the window is a fatal-handshake/close-notify error + // mid-flush, and 0 there is no worse than the pre-feature behavior. + self.send_buffer = buf; self.terminate(ErrorCode::FailedToWrite); false } @@ -1305,7 +1328,7 @@ impl WebSocket { fn send_pong(&mut self) -> bool { if !self.has_tcp() { - self.dispatch_abrupt_close(ErrorCode::Ended); + self.dispatch_abrupt_close(ErrorCode::Ended, None); return false; } @@ -1365,7 +1388,7 @@ impl WebSocket { let body_len = body_len.min(123); log!("Sending close with code {}", code); if !self.has_tcp() { - self.dispatch_abrupt_close(ErrorCode::Ended); + self.dispatch_abrupt_close(ErrorCode::Ended, None); self.clear_data(); return; } @@ -1417,8 +1440,14 @@ impl WebSocket { let slice = &final_body_bytes[..slice_len]; if self.enqueue_encoded_bytes(slice) { + // Snapshot the unsent backlog before clear_data() frees it, so the + // JS close event does not see bufferedAmount reset to 0 (spec: it + // does not reset once the connection closes). + // SAFETY: `self` is a live `&mut Self`; buffered_amount only does + // short-lived raw-ptr field reads. + let buffered = unsafe { Self::buffered_amount(self) }; self.clear_data(); - self.dispatch_close(dispatch_code.unwrap_or(code), &mut reason); + self.dispatch_close(dispatch_code.unwrap_or(code), &mut reason, buffered); } } @@ -1476,7 +1505,7 @@ impl WebSocket { let this = unsafe { &mut *this_ptr }; if !this.has_tcp() || op > 0xF { - this.dispatch_abrupt_close(ErrorCode::Ended); + this.dispatch_abrupt_close(ErrorCode::Ended, None); return; } @@ -1522,7 +1551,7 @@ impl WebSocket { let this = unsafe { &mut *this_ptr }; if !this.has_tcp() || op > 0xF { - this.dispatch_abrupt_close(ErrorCode::Ended); + this.dispatch_abrupt_close(ErrorCode::Ended, None); return; } @@ -1565,7 +1594,7 @@ impl WebSocket { let _ = this.send_data(bytes, !this.has_backpressure(), opcode); } else { // Invalid blob, close connection - this.dispatch_abrupt_close(ErrorCode::Ended); + this.dispatch_abrupt_close(ErrorCode::Ended, None); } } @@ -1583,7 +1612,7 @@ impl WebSocket { // SAFETY: str_ is a valid pointer from C++ let str = unsafe { &*str_ }; if !this.has_tcp() { - this.dispatch_abrupt_close(ErrorCode::Ended); + this.dispatch_abrupt_close(ErrorCode::Ended, None); return; } @@ -1636,25 +1665,33 @@ impl WebSocket { ); } - fn dispatch_abrupt_close(&mut self, code: ErrorCode) { + /// `buffered_override` lets a caller that already cleared the send buffer + /// (e.g. `handle_close()` calls `clear_data()` first) pass the backlog it + /// captured beforehand. `None` snapshots the live send buffer here. + fn dispatch_abrupt_close(&mut self, code: ErrorCode, buffered_override: Option) { let Some(out) = self.outgoing_websocket.take() else { return; }; self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this)); jsc::mark_binding!(); - CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code); + // Capture the unsent backlog so C++ can keep bufferedAmount from + // resetting to 0 on abrupt close. + // SAFETY: `self` is a live `&mut Self`; buffered_amount only does + // short-lived raw-ptr field reads. + let buffered = buffered_override.unwrap_or_else(|| unsafe { Self::buffered_amount(self) }); + CppWebSocket::opaque_ref(out.as_ptr()).did_abrupt_close(code, buffered); // SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by // caller's ref guard (see cancel/handle_close). unsafe { Self::deref(self) }; } - fn dispatch_close(&mut self, code: u16, reason: &mut bun_core::String) { + fn dispatch_close(&mut self, code: u16, reason: &mut bun_core::String, buffered_amount: usize) { let Some(out) = self.outgoing_websocket.take() else { return; }; self.poll_ref.unref(Self::vm_loop_ctx(&self.global_this)); jsc::mark_binding!(); - CppWebSocket::opaque_ref(out.as_ptr()).did_close(code, reason); + CppWebSocket::opaque_ref(out.as_ptr()).did_close(code, reason, buffered_amount); // SAFETY: `self: &mut Self` → `*mut Self`; allocation kept live by // caller's ref guard. unsafe { Self::deref(self) }; @@ -2097,6 +2134,45 @@ impl WebSocket { // This is under-estimated a little, as we don't include usockets context. cost } + + /// Bytes queued by `send()` that have not yet been written to the socket. + /// Backs the client `WebSocket.bufferedAmount` getter. Includes the framing + /// bytes of buffered frames (the send buffer holds fully framed messages), + /// plus any encrypted bytes the proxy tunnel still holds. + /// + /// Takes `*const Self` and projects to `send_buffer`/`proxy_tunnel` via + /// `addr_of!` rather than forming a whole-struct `&Self`: the C++ + /// `bufferedAmount` getter can run re-entrantly while a `&mut Self` is live + /// (JS reads `ws.bufferedAmount` inside an `onmessage` handler dispatched + /// from `dispatch_data(&mut self)`), and a whole-struct `&Self` would pop + /// that borrow's Unique tag (UB under Stacked Borrows). + /// + /// # Safety + /// `this` must point to a live `WebSocket`. + pub unsafe fn buffered_amount(this: *const Self) -> usize { + // SAFETY: `this` is live; short-lived shared borrows of the disjoint + // `send_buffer` and `proxy_tunnel` fields only (never a whole-struct + // `&Self`, which could overlap a live `&mut Self` on a re-entrant call). + let mut buffered = unsafe { (*core::ptr::addr_of!((*this).send_buffer)).readable_length() }; + // SAFETY: as above — `proxy_tunnel` is `Copy` (Option>). + let tunnel = unsafe { *core::ptr::addr_of!((*this).proxy_tunnel) }; + if let Some(tunnel) = tunnel { + // Raw-ptr accessor, not `tunnel.as_ref()`: reachable inside the + // tunnel's SSL-wrapper callbacks on abrupt close, where a + // whole-struct `&WebSocketProxyTunnel` would overlap the live + // `&mut SslWrapper` (see WebSocketProxyTunnel's Aliasing model doc). + // SAFETY: `tunnel` (NonNull) points to a live tunnel. + buffered += unsafe { WebSocketProxyTunnel::buffered_amount(tunnel.as_ptr()) }; + } + buffered + } + + // `extern "C"` entrypoint; `this` is non-null by C++ contract (see SAFETY comment below). + #[allow(clippy::not_unsafe_ptr_arg_deref)] + pub extern "C" fn get_buffered_amount(this: *const Self) -> usize { + // SAFETY: called from C++ with a valid pointer. + unsafe { Self::buffered_amount(this) } + } } // ────────────────────────────────────────────────────────────────────────── @@ -2110,6 +2186,7 @@ macro_rules! export_websocket_client { cancel = $cancel:ident, close = $close:ident, finalize = $finalize:ident, + get_buffered_amount = $get_buffered_amount:ident, init = $init:ident, init_with_tunnel = $init_with_tunnel:ident, memory_cost = $memory_cost:ident, @@ -2130,6 +2207,10 @@ macro_rules! export_websocket_client { WebSocket::<$ssl>::finalize(this) } #[unsafe(no_mangle)] + pub extern "C" fn $get_buffered_amount(this: *const WebSocket<$ssl>) -> usize { + WebSocket::<$ssl>::get_buffered_amount(this) + } + #[unsafe(no_mangle)] pub extern "C" fn $init( outgoing: *mut CppWebSocket, input_socket: *mut c_void, @@ -2200,6 +2281,7 @@ export_websocket_client!( cancel = Bun__WebSocketClient__cancel, close = Bun__WebSocketClient__close, finalize = Bun__WebSocketClient__finalize, + get_buffered_amount = Bun__WebSocketClient__getBufferedAmount, init = Bun__WebSocketClient__init, init_with_tunnel = Bun__WebSocketClient__initWithTunnel, memory_cost = Bun__WebSocketClient__memoryCost, @@ -2212,6 +2294,7 @@ export_websocket_client!( cancel = Bun__WebSocketClientTLS__cancel, close = Bun__WebSocketClientTLS__close, finalize = Bun__WebSocketClientTLS__finalize, + get_buffered_amount = Bun__WebSocketClientTLS__getBufferedAmount, init = Bun__WebSocketClientTLS__init, init_with_tunnel = Bun__WebSocketClientTLS__initWithTunnel, memory_cost = Bun__WebSocketClientTLS__memoryCost, diff --git a/src/http_jsc/websocket_client/CppWebSocket.rs b/src/http_jsc/websocket_client/CppWebSocket.rs index cb3fec21054..ec4e31d0437 100644 --- a/src/http_jsc/websocket_client/CppWebSocket.rs +++ b/src/http_jsc/websocket_client/CppWebSocket.rs @@ -44,8 +44,17 @@ unsafe extern "C" { buffered_len: usize, deflate_params: *const websocket_deflate::Params, ); - safe fn WebSocket__didAbruptClose(websocket_context: &CppWebSocket, reason: ErrorCode); - fn WebSocket__didClose(websocket_context: &CppWebSocket, code: u16, reason: *const BunString); + safe fn WebSocket__didAbruptClose( + websocket_context: &CppWebSocket, + reason: ErrorCode, + buffered_amount: usize, + ); + fn WebSocket__didClose( + websocket_context: &CppWebSocket, + code: u16, + reason: *const BunString, + buffered_amount: usize, + ); fn WebSocket__didReceiveText( websocket_context: &CppWebSocket, clone: bool, @@ -69,22 +78,28 @@ unsafe extern "C" { // borrows (often while `&mut WebSocket` is also live), so `&mut self` // would force needless `unsafe { &mut *ptr }` at every site. impl CppWebSocket { - pub(crate) fn did_abrupt_close(&self, reason: ErrorCode) { + /// `buffered_amount` is the sender's unsent backlog captured *before* this + /// call (the connection's send buffer may be freed during the abrupt-close + /// teardown), so C++ can keep `WebSocket.bufferedAmount` from resetting to 0. + pub(crate) fn did_abrupt_close(&self, reason: ErrorCode, buffered_amount: usize) { // SAFETY: VirtualMachine::get() returns the live current-thread VM; // event_loop() yields its raw event-loop pointer (live for VM lifetime). let event_loop = VirtualMachine::get().event_loop_mut(); event_loop.enter(); - WebSocket__didAbruptClose(self, reason); + WebSocket__didAbruptClose(self, reason, buffered_amount); event_loop.exit(); } - pub(crate) fn did_close(&self, code: u16, reason: &mut BunString) { + /// `buffered_amount` is the sender's unsent backlog captured *before* this + /// call (the send buffer is freed during close teardown), so C++ can keep + /// `WebSocket.bufferedAmount` from resetting to 0 once closed. + pub(crate) fn did_close(&self, code: u16, reason: &mut BunString, buffered_amount: usize) { // SAFETY: VirtualMachine::get() returns the live current-thread VM; // event_loop() yields its raw event-loop pointer (live for VM lifetime). let event_loop = VirtualMachine::get().event_loop_mut(); event_loop.enter(); // SAFETY: self is a valid C++ WebCore::WebSocket; reason outlives the call. - unsafe { WebSocket__didClose(self, code, reason) }; + unsafe { WebSocket__didClose(self, code, reason, buffered_amount) }; event_loop.exit(); } diff --git a/src/http_jsc/websocket_client/WebSocketProxyTunnel.rs b/src/http_jsc/websocket_client/WebSocketProxyTunnel.rs index 34264cef7e1..7894d8c62ef 100644 --- a/src/http_jsc/websocket_client/WebSocketProxyTunnel.rs +++ b/src/http_jsc/websocket_client/WebSocketProxyTunnel.rs @@ -599,6 +599,22 @@ impl WebSocketProxyTunnel { pub(crate) fn has_backpressure(&self) -> bool { self.write_buffer.is_not_empty() } + + /// Encrypted bytes still buffered in the tunnel awaiting a writable socket. + /// + /// Takes `*const Self` and projects to `write_buffer` via `addr_of!` rather + /// than forming a whole-struct `&Self`: this is reachable from inside the + /// SSL-wrapper callbacks (abrupt close during the connected phase), which + /// hold a `&mut SslWrapper` over the `wrapper` field — a whole-struct borrow + /// would overlap it (see the module's Aliasing model doc). + /// + /// # Safety + /// `this` must point to a live `WebSocketProxyTunnel`. + pub(crate) unsafe fn buffered_amount(this: *const Self) -> usize { + // SAFETY: `this` is live; short-lived shared borrow of the disjoint + // `write_buffer` field only (never touches `wrapper`). + unsafe { (*ptr::addr_of!((*this).write_buffer)).size() } + } } impl Drop for WebSocketProxyTunnel { diff --git a/src/http_jsc/websocket_client/WebSocketUpgradeClient.rs b/src/http_jsc/websocket_client/WebSocketUpgradeClient.rs index dad27580b5d..63f2c6c8558 100644 --- a/src/http_jsc/websocket_client/WebSocketUpgradeClient.rs +++ b/src/http_jsc/websocket_client/WebSocketUpgradeClient.rs @@ -686,7 +686,8 @@ impl HTTPClient { // SAFETY: short-lived `&mut` for the field take; ends before the FFI call. let ws = unsafe { (*this).outgoing_websocket.take() }; if let Some(ws) = ws { - CppWebSocket::opaque_ref(ws).did_abrupt_close(code); + // The upgrade handshake has no send buffer yet, so the backlog is 0. + CppWebSocket::opaque_ref(ws).did_abrupt_close(code, 0); // SAFETY: `this` carries root provenance; may free `this`. unsafe { Self::deref(this) }; } diff --git a/src/jsc/bindings/headers.h b/src/jsc/bindings/headers.h index c85e7379065..56cdf95298d 100644 --- a/src/jsc/bindings/headers.h +++ b/src/jsc/bindings/headers.h @@ -646,6 +646,7 @@ ZIG_DECL void* Bun__WebSocketClient__init(CppWebSocket* arg0, void* arg1, JSC::J ZIG_DECL void Bun__WebSocketClient__writeBinaryData(WebSocketClient* arg0, const unsigned char* arg1, size_t arg2, unsigned char arg3); ZIG_DECL void Bun__WebSocketClient__writeString(WebSocketClient* arg0, const ZigString* arg1, unsigned char arg2); ZIG_DECL size_t Bun__WebSocketClient__memoryCost(WebSocketClient* arg0); +ZIG_DECL size_t Bun__WebSocketClient__getBufferedAmount(WebSocketClient* arg0); #endif @@ -658,6 +659,7 @@ ZIG_DECL void* Bun__WebSocketClientTLS__init(CppWebSocket* arg0, void* arg1, JSC ZIG_DECL void Bun__WebSocketClientTLS__writeBinaryData(WebSocketClientTLS* arg0, const unsigned char* arg1, size_t arg2, unsigned char arg3); ZIG_DECL void Bun__WebSocketClientTLS__writeString(WebSocketClientTLS* arg0, const ZigString* arg1, unsigned char arg2); ZIG_DECL size_t Bun__WebSocketClientTLS__memoryCost(WebSocketClientTLS* arg0); +ZIG_DECL size_t Bun__WebSocketClientTLS__getBufferedAmount(WebSocketClientTLS* arg0); #endif #ifdef __cplusplus diff --git a/src/jsc/bindings/webcore/WebSocket.cpp b/src/jsc/bindings/webcore/WebSocket.cpp index 89203ec56d0..fbb185ccd49 100644 --- a/src/jsc/bindings/webcore/WebSocket.cpp +++ b/src/jsc/bindings/webcore/WebSocket.cpp @@ -174,6 +174,13 @@ static unsigned saturateAdd(unsigned a, unsigned b) return a + b; } +static unsigned clampToUnsigned(size_t value) +{ + return value > std::numeric_limits::max() + ? std::numeric_limits::max() + : static_cast(value); +} + ASCIILiteral WebSocket::subprotocolSeparator() { return ", "_s; @@ -835,6 +842,12 @@ WebCore::ExceptionOr WebCore::WebSocket::send(WebCore::JSBlob* blob) if (m_state == CONNECTING) return Exception { InvalidStateError }; if (m_state == CLOSING || m_state == CLOSED) { + // Mirror the String/ArrayBuffer/ArrayBufferView send() overloads: per + // spec, send() after close increases bufferedAmount by the size of the + // data instead of transmitting it. + size_t payloadSize = Blob__getSize(JSC::JSValue::encode(blob)); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); return {}; } @@ -857,8 +870,6 @@ void WebSocket::sendWebSocketData(const char* baseAddress, size_t length, const switch (m_connectedWebSocketKind) { case ConnectedWebSocketKind::Client: { Bun__WebSocketClient__writeBinaryData(this->m_connectedWebSocket.client, reinterpret_cast(baseAddress), length, static_cast(op)); - // this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode); - // this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount(); break; } case ConnectedWebSocketKind::ClientSSL: { @@ -887,8 +898,6 @@ void WebSocket::sendWebSocketString(const String& message, const Opcode op) case ConnectedWebSocketKind::Client: { auto zigStr = Zig::toZigString(message); Bun__WebSocketClient__writeString(this->m_connectedWebSocket.client, &zigStr, static_cast(op)); - // this->m_connectedWebSocket.client->send({ baseAddress, length }, opCode); - // this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount(); break; } case ConnectedWebSocketKind::ClientSSL: { @@ -991,17 +1000,19 @@ ExceptionOr WebSocket::close(std::optional optionalCode, c m_state = CLOSING; switch (m_connectedWebSocketKind) { case ConnectedWebSocketKind::Client: { + // Snapshot the backlog before the connection (and its send buffer) is + // torn down: per spec bufferedAmount must not reset to 0 on close. + m_bufferedAmount = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client)); ZigString reasonZigStr = Zig::toZigString(reason); Bun__WebSocketClient__close(this->m_connectedWebSocket.client, code, &reasonZigStr); updateHasPendingActivity(); - // this->m_bufferedAmount = this->m_connectedWebSocket.client->getBufferedAmount(); break; } case ConnectedWebSocketKind::ClientSSL: { + m_bufferedAmount = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL)); ZigString reasonZigStr = Zig::toZigString(reason); Bun__WebSocketClientTLS__close(this->m_connectedWebSocket.clientSSL, code, &reasonZigStr); updateHasPendingActivity(); - // this->m_bufferedAmount = this->m_connectedWebSocket.clientSSL->getBufferedAmount(); break; } // case ConnectedWebSocketKind::Server: { @@ -1036,11 +1047,15 @@ ExceptionOr WebSocket::terminate() m_state = CLOSING; switch (m_connectedWebSocketKind) { case ConnectedWebSocketKind::Client: { + // Snapshot the backlog before cancel() frees the send buffer, so + // bufferedAmount does not reset to 0 (see bufferedAmount()). + m_bufferedAmount = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client)); Bun__WebSocketClient__cancel(this->m_connectedWebSocket.client); updateHasPendingActivity(); break; } case ConnectedWebSocketKind::ClientSSL: { + m_bufferedAmount = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL)); Bun__WebSocketClientTLS__cancel(this->m_connectedWebSocket.clientSSL); updateHasPendingActivity(); break; @@ -1224,7 +1239,24 @@ WebSocket::State WebSocket::readyState() const unsigned WebSocket::bufferedAmount() const { - return saturateAdd(m_bufferedAmount, m_bufferedAmountAfterClose); + // While OPEN, query the live send-buffer size from the connection so + // backpressure is observable. Once closed the connection is gone, but the + // spec requires bufferedAmount not to reset to 0: close()/terminate() + // snapshot the final backlog into m_bufferedAmount, and send() after close + // adds to m_bufferedAmountAfterClose, so the total only ever increases. + unsigned buffered = m_bufferedAmount; + switch (m_connectedWebSocketKind) { + case ConnectedWebSocketKind::Client: + buffered = clampToUnsigned(Bun__WebSocketClient__getBufferedAmount(this->m_connectedWebSocket.client)); + break; + case ConnectedWebSocketKind::ClientSSL: + buffered = clampToUnsigned(Bun__WebSocketClientTLS__getBufferedAmount(this->m_connectedWebSocket.clientSSL)); + break; + case ConnectedWebSocketKind::None: + break; + } + + return saturateAdd(buffered, m_bufferedAmountAfterClose); } String WebSocket::protocol() const @@ -1568,7 +1600,7 @@ void WebSocket::didStartClosingHandshake() // }); } -void WebSocket::didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason) +void WebSocket::didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason, size_t bufferedAmountSnapshot) { // LOG(Network, "WebSocket %p didClose()", this); if (this->m_connectedWebSocketKind == ConnectedWebSocketKind::None) @@ -1588,7 +1620,15 @@ void WebSocket::didClose(unsigned unhandledBufferedAmount, unsigned short code, bool wasClean = m_state == CLOSING && !unhandledBufferedAmount && code != 0; // WebSocketChannel::CloseEventCodeAbnormalClosure; m_state = CLOSED; - m_bufferedAmount = unhandledBufferedAmount; + // Don't reset the backlog: the unsent bytes at close time were snapshotted + // (by close()/terminate() into m_bufferedAmount, or passed here via + // bufferedAmountSnapshot for the peer-initiated close handshake). The spec + // requires bufferedAmount not to drop to 0 once closed, so keep the largest. + unsigned snapshot = clampToUnsigned(bufferedAmountSnapshot); + if (unhandledBufferedAmount > m_bufferedAmount) + m_bufferedAmount = unhandledBufferedAmount; + if (snapshot > m_bufferedAmount) + m_bufferedAmount = snapshot; ASSERT(scriptExecutionContext()); this->m_connectedWebSocketKind = ConnectedWebSocketKind::None; this->m_upgradeClient = nullptr; @@ -1653,13 +1693,22 @@ void WebSocket::didConnect(us_socket_t* socket, char* bufferedData, size_t buffe this->didConnect(); } -void WebSocket::didFailWithErrorCode(Bun::WebSocketErrorCode code) +void WebSocket::didFailWithErrorCode(Bun::WebSocketErrorCode code, size_t bufferedAmount) { // from new WebSocket() -> connect() if (m_state == CLOSED) return; + // Keep the backlog reported before this abrupt close (captured on the Rust + // side, since the connection's send buffer is freed during teardown) so + // bufferedAmount does not reset to 0 — see bufferedAmount(). Keep the larger + // value; this also makes the socket-close path (buffer already cleared → 0) + // a no-op. + unsigned clamped = clampToUnsigned(bufferedAmount); + if (clamped > m_bufferedAmount) + m_bufferedAmount = clamped; + this->m_upgradeClient = nullptr; if (this->m_connectedWebSocketKind == ConnectedWebSocketKind::ClientSSL) { this->m_connectedWebSocket.clientSSL = nullptr; @@ -1886,11 +1935,11 @@ extern "C" void WebSocket__didConnectWithTunnel(WebCore::WebSocket* webSocket, v webSocket->didConnectWithTunnel(tunnel, bufferedData, len, deflate_params); } -extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, Bun::WebSocketErrorCode errorCode) +extern "C" void WebSocket__didAbruptClose(WebCore::WebSocket* webSocket, Bun::WebSocketErrorCode errorCode, size_t bufferedAmount) { - webSocket->didFailWithErrorCode(errorCode); + webSocket->didFailWithErrorCode(errorCode, bufferedAmount); } -extern "C" void WebSocket__didClose(WebCore::WebSocket* webSocket, uint16_t errorCode, BunString* reason) +extern "C" void WebSocket__didClose(WebCore::WebSocket* webSocket, uint16_t errorCode, BunString* reason, size_t bufferedAmount) { WTF::String wtf_reason = reason->transferToWTFString(); // The Rust client only calls this after a completed close handshake @@ -1898,8 +1947,13 @@ extern "C" void WebSocket__didClose(WebCore::WebSocket* webSocket, uint16_t erro // server-initiated close m_state is still OPEN here; transition to // CLOSING so didClose() reports wasClean = true. Abnormal closes go // through WebSocket__didAbruptClose instead. + // + // Pass the queued backlog as bufferedAmountSnapshot (not as + // unhandledBufferedAmount): this close handshake completed cleanly, so + // wasClean must stay true regardless of any application data still queued, + // but bufferedAmount must not reset to 0 (spec). webSocket->didStartClosingHandshake(); - webSocket->didClose(0, errorCode, WTF::move(wtf_reason)); + webSocket->didClose(0, errorCode, WTF::move(wtf_reason), bufferedAmount); } extern "C" void WebSocket__didReceiveText(WebCore::WebSocket* webSocket, bool clone, const ZigString* str) @@ -1943,6 +1997,11 @@ WebCore::ExceptionOr WebCore::WebSocket::ping(WebCore::JSBlob* blob) if (m_state == CONNECTING) return Exception { InvalidStateError }; if (m_state == CLOSING || m_state == CLOSED) { + // Match the String/ArrayBuffer/ArrayBufferView ping() overloads, which + // accumulate into bufferedAmount after close rather than transmitting. + size_t payloadSize = Blob__getSize(JSC::JSValue::encode(blob)); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); return {}; } @@ -1965,6 +2024,11 @@ WebCore::ExceptionOr WebCore::WebSocket::pong(WebCore::JSBlob* blob) if (m_state == CONNECTING) return Exception { InvalidStateError }; if (m_state == CLOSING || m_state == CLOSED) { + // Match the String/ArrayBuffer/ArrayBufferView pong() overloads, which + // accumulate into bufferedAmount after close rather than transmitting. + size_t payloadSize = Blob__getSize(JSC::JSValue::encode(blob)); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, payloadSize); + m_bufferedAmountAfterClose = saturateAdd(m_bufferedAmountAfterClose, getFramingOverhead(payloadSize)); return {}; } diff --git a/src/jsc/bindings/webcore/WebSocket.h b/src/jsc/bindings/webcore/WebSocket.h index a0c1ada560d..61478e7045d 100644 --- a/src/jsc/bindings/webcore/WebSocket.h +++ b/src/jsc/bindings/webcore/WebSocket.h @@ -199,10 +199,10 @@ class WebSocket final : public RefCounted, public EventTargetWithInli void didConnect(); void disablePendingActivity(); void didStartClosingHandshake(); - void didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason); + void didClose(unsigned unhandledBufferedAmount, unsigned short code, const String& reason, size_t bufferedAmountSnapshot = 0); void didConnect(us_socket_t* socket, char* bufferedData, size_t bufferedDataSize, const PerMessageDeflateParams* deflate_params, void* customSSLCtx); void didConnectWithTunnel(void* tunnel, char* bufferedData, size_t bufferedDataSize, const PerMessageDeflateParams* deflate_params); - void didFailWithErrorCode(Bun::WebSocketErrorCode code); + void didFailWithErrorCode(Bun::WebSocketErrorCode code, size_t bufferedAmount = 0); void didReceiveMessage(String&& message); void didReceiveData(const char* data, size_t length); diff --git a/test/js/web/websocket/websocket-buffered-amount.test.ts b/test/js/web/websocket/websocket-buffered-amount.test.ts new file mode 100644 index 00000000000..b42cd7e9702 --- /dev/null +++ b/test/js/web/websocket/websocket-buffered-amount.test.ts @@ -0,0 +1,259 @@ +import { describe, expect, test } from "bun:test"; +import crypto from "node:crypto"; +import net from "node:net"; + +const WS_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + +// Raw TCP server that completes the WebSocket handshake and then stops reading +// from the socket (`pause()`), so the client's outbound frames cannot drain to +// the peer and pile up in the in-process send buffer. `afterUpgrade`, when +// provided, runs once right after the handshake (read side still paused) to +// drive a specific close path, e.g. writing a frame or destroying the socket. +function nonDrainingServer(afterUpgrade?: (sock: net.Socket) => void): Promise<{ port: number; close: () => void }> { + return new Promise((resolve, reject) => { + const server = net.createServer(sock => { + let buf = ""; + let upgraded = false; + sock.on("data", d => { + if (upgraded) return; + buf += d.toString("latin1"); + if (!buf.includes("\r\n\r\n")) return; + const key = /sec-websocket-key:\s*(.+)\r\n/i.exec(buf)?.[1]?.trim() ?? ""; + const accept = crypto + .createHash("sha1") + .update(key + WS_MAGIC) + .digest("base64"); + sock.write( + "HTTP/1.1 101 Switching Protocols\r\n" + + "Upgrade: websocket\r\n" + + "Connection: Upgrade\r\n" + + `Sec-WebSocket-Accept: ${accept}\r\n\r\n`, + ); + upgraded = true; + sock.pause(); // never read the client's frames + afterUpgrade?.(sock); + }); + sock.on("error", () => {}); + }); + server.on("error", reject); + server.listen(0, "127.0.0.1", () => { + const address = server.address() as net.AddressInfo; + resolve({ port: address.port, close: () => server.close() }); + }); + }); +} + +// A server must not mask the frames it sends; a masked frame is a protocol +// violation that makes the client abort the connection via the abrupt-close +// (fail) path rather than a graceful close handshake. +function maskedServerFrame(): Buffer { + const payload = Buffer.from("x"); + // FIN + opcode 0x2 (binary), MASK bit set, 1-byte length, 4-byte mask key. + const header = Buffer.from([0x82, 0x80 | payload.length, 0x01, 0x02, 0x03, 0x04]); + const masked = Buffer.from(payload); + for (let i = 0; i < masked.length; i++) masked[i] ^= header[2 + (i % 4)]; + return Buffer.concat([header, masked]); +} + +// A valid (unmasked) server Close frame with status 1000. Triggers the client's +// graceful close handshake (echo Close), not the abrupt-close path. +function serverCloseFrame(): Buffer { + // FIN + opcode 0x8 (close), unmasked, 2-byte payload = status code 1000. + return Buffer.from([0x88, 0x02, 0x03, 0xe8]); +} + +describe("WebSocket.bufferedAmount (client)", () => { + test("reflects the backlog queued to a peer that stopped reading", async () => { + const { port, close } = await nonDrainingServer(); + try { + const ws = new WebSocket(`ws://127.0.0.1:${port}/`); + const { promise, resolve, reject } = Promise.withResolvers<{ atOpen: number; max: number }>(); + ws.onerror = () => reject(new Error("unexpected error event")); + ws.onopen = () => { + // Nothing queued yet: the baseline must be 0, not a constant. + const atOpen = ws.bufferedAmount; + const chunk = Buffer.alloc(64 * 1024, 0x79).toString(); + let max = atOpen; + // 4000 * 64 KiB = ~250 MiB — far more than any socket buffer can accept, + // so the excess must queue in-process. + for (let i = 0; i < 4000; i++) { + ws.send(chunk); + if (ws.bufferedAmount > max) max = ws.bufferedAmount; + } + resolve({ atOpen, max }); + }; + const { atOpen, max } = await promise; + ws.close(); + + // Baseline with nothing queued. + expect(atOpen).toBe(0); + // Before the fix, bufferedAmount was hard-wired to 0 for the client + // WebSocket. It must now track the unsent backlog — which is far larger + // than a single 64 KiB frame once the peer stops reading. + expect(max).toBeGreaterThan(64 * 1024); + } finally { + close(); + } + }); + + // Per the WHATWG spec, bufferedAmount "does not reset to zero once the + // connection closes" — after close() it only increases with further send(). + test("does not reset to 0 after close() while a backlog is queued", async () => { + const { port, close } = await nonDrainingServer(); + try { + const ws = new WebSocket(`ws://127.0.0.1:${port}/`); + const { promise, resolve, reject } = Promise.withResolvers<{ beforeClose: number; afterClose: number }>(); + ws.onerror = () => reject(new Error("unexpected error event")); + ws.onopen = () => { + const chunk = Buffer.alloc(64 * 1024, 0x7a).toString(); + for (let i = 0; i < 4000; i++) ws.send(chunk); + const beforeClose = ws.bufferedAmount; + ws.close(); + // Reading immediately after close() must retain the queued backlog, + // not snap back to 0. + const afterClose = ws.bufferedAmount; + resolve({ beforeClose, afterClose }); + }; + const { beforeClose, afterClose } = await promise; + + expect(beforeClose).toBeGreaterThan(64 * 1024); + // The backlog must survive the close() transition — per spec it does not + // reset to 0 and only increases afterward (close() itself queues an + // ~8-byte close frame, so afterClose is >= beforeClose, not exactly it). + expect(afterClose).toBeGreaterThanOrEqual(beforeClose); + } finally { + close(); + } + }); + + // Every send()/ping()/pong() overload must account for data queued after + // close() the same way the spec requires for send() ("increase the + // bufferedAmount attribute by the size of the data"). The Blob overloads were + // the only ones that returned without accounting; they must now match their + // String/ArrayBuffer/ArrayBufferView siblings. close() freezes m_bufferedAmount + // and drops the connection, so each post-close call adds deterministically. + test("send/ping/pong(Blob) after close() increase bufferedAmount like the other overloads", async () => { + const blobBytes = 4096; + const { port, close } = await nonDrainingServer(); + try { + const ws = new WebSocket(`ws://127.0.0.1:${port}/`); + const { promise, resolve, reject } = Promise.withResolvers(); + ws.onerror = () => reject(new Error("unexpected error event")); + ws.onopen = () => { + // After close() the state is CLOSING and the connection is released, so + // bufferedAmount is a frozen snapshot plus post-close accumulation only. + ws.close(); + const blob = () => new Blob([new Uint8Array(blobBytes)]); + const samples = [ws.bufferedAmount]; + ws.send(blob()); + samples.push(ws.bufferedAmount); + ws.ping(blob()); + samples.push(ws.bufferedAmount); + ws.pong(blob()); + samples.push(ws.bufferedAmount); + resolve(samples); + }; + const samples = await promise; + + // Each Blob overload must add at least the blob's raw size. Before the fix + // the Blob branch alone returned without touching bufferedAmount, so the + // value would not move between samples. + for (let i = 1; i < samples.length; i++) { + expect(samples[i] - samples[i - 1]).toBeGreaterThanOrEqual(blobBytes); + } + } finally { + close(); + } + }); + + // The abrupt-close path (protocol error / timeout / write failure) must also + // preserve the backlog: the spec's "does not reset to 0" guarantee is not + // limited to graceful close(). Here the server sends a masked frame (illegal + // from a server), which aborts the client via the fail() path. + test("does not reset to 0 on an abrupt close while a backlog is queued", async () => { + const { port, close } = await nonDrainingServer(sock => sock.write(maskedServerFrame())); + try { + const ws = new WebSocket(`ws://127.0.0.1:${port}/`); + const { promise, resolve } = Promise.withResolvers<{ beforeClose: number; onClose: number }>(); + let beforeClose = 0; + ws.onopen = () => { + const chunk = Buffer.alloc(64 * 1024, 0x7b).toString(); + // Synchronous flood: completes before the event loop processes the + // server's incoming masked frame, so the backlog is queued first. + for (let i = 0; i < 4000; i++) ws.send(chunk); + beforeClose = ws.bufferedAmount; + }; + // The illegal frame aborts the connection; bufferedAmount read in the + // close handler must still reflect the queued backlog. + ws.onclose = () => resolve({ beforeClose, onClose: ws.bufferedAmount }); + ws.onerror = () => {}; + const { beforeClose: queued, onClose } = await promise; + + expect(queued).toBeGreaterThan(64 * 1024); + // Must not reset to 0 on the abrupt close: the backlog is still queued. + // (Not an exact match: a few frames may drain between the read above and + // the close, so assert it stays a large backlog rather than an exact value.) + expect(onClose).toBeGreaterThan(64 * 1024); + } finally { + close(); + } + }); + + // The server-initiated graceful close (peer sends a Close frame, client echoes + // it) is a fourth close path. It must also preserve the backlog rather than + // reset bufferedAmount to 0. + test("does not reset to 0 on a server-initiated close while a backlog is queued", async () => { + // Stop reading so the client's sends pile up, then send a valid Close frame + // to initiate a graceful close handshake. + const { port, close } = await nonDrainingServer(sock => sock.write(serverCloseFrame())); + try { + const ws = new WebSocket(`ws://127.0.0.1:${port}/`); + const { promise, resolve } = Promise.withResolvers<{ beforeClose: number; onClose: number }>(); + let beforeClose = 0; + ws.onopen = () => { + const chunk = Buffer.alloc(64 * 1024, 0x7c).toString(); + for (let i = 0; i < 4000; i++) ws.send(chunk); + beforeClose = ws.bufferedAmount; + }; + ws.onclose = () => resolve({ beforeClose, onClose: ws.bufferedAmount }); + ws.onerror = () => {}; + const { beforeClose: queued, onClose } = await promise; + + expect(queued).toBeGreaterThan(64 * 1024); + // The backlog must survive the server-initiated close. + expect(onClose).toBeGreaterThan(64 * 1024); + } finally { + close(); + } + }); + + // An abrupt socket close (no WebSocket Close handshake) while a backlog is + // queued must also preserve bufferedAmount. Depending on the platform's event + // loop this routes through either handle_close() (socket-close callback) or + // handle_end() -> fail(); both snapshot the backlog before freeing it. + test("does not reset to 0 on an abrupt socket close while a backlog is queued", async () => { + // Stop reading so the client's sends pile up, then abruptly destroy the + // connection (sends FIN; the client's own writes to the closed peer may then + // draw an RST); no WebSocket Close handshake either way. + const { port, close } = await nonDrainingServer(sock => sock.destroy()); + try { + const ws = new WebSocket(`ws://127.0.0.1:${port}/`); + const { promise, resolve } = Promise.withResolvers<{ beforeClose: number; onClose: number }>(); + let beforeClose = 0; + ws.onopen = () => { + const chunk = Buffer.alloc(64 * 1024, 0x7d).toString(); + for (let i = 0; i < 4000; i++) ws.send(chunk); + beforeClose = ws.bufferedAmount; + }; + ws.onclose = () => resolve({ beforeClose, onClose: ws.bufferedAmount }); + ws.onerror = () => {}; + const { beforeClose: queued, onClose } = await promise; + + expect(queued).toBeGreaterThan(64 * 1024); + // The backlog must survive the abrupt socket close. + expect(onClose).toBeGreaterThan(64 * 1024); + } finally { + close(); + } + }); +});