From 4752d7073d0f727c90bde2182f08817e0a674d35 Mon Sep 17 00:00:00 2001 From: robobun Date: Mon, 27 Apr 2026 19:42:35 +0000 Subject: [PATCH 1/2] udp: drain IP_RECVERR error queue with MSG_ERRQUEUE on EPOLLERR On Linux with IP_RECVERR enabled, an ICMP error (e.g. port unreachable) is queued on the socket's error queue and EPOLLERR is raised. Plain recvmsg/recvmmsg reports the pending error once but does not remove it from the error queue, so EPOLLERR stays level-triggered and epoll_wait busy-loops at 100% CPU. Drain the error queue with recvmsg(MSG_ERRQUEUE) (capped at 32 entries per dispatch), surfacing each sock_extended_err.ee_errno via on_recv_error and keeping the socket open. When the queue drains to empty, also read-and-clear sk_err via SO_ERROR to handle non-ICMP async errors that set sk_err without an error-queue entry. If the drain itself fails, surface that errno and close to avoid spinning on a stuck EPOLLERR. Fixes #29436 --- packages/bun-usockets/src/bsd.c | 53 +++++++ .../src/internal/networking/bsd.h | 3 + packages/bun-usockets/src/loop.c | 107 +++++-------- test/regression/issue/29436.test.ts | 150 ++++++++++++++++++ 4 files changed, 245 insertions(+), 68 deletions(-) create mode 100644 test/regression/issue/29436.test.ts diff --git a/packages/bun-usockets/src/bsd.c b/packages/bun-usockets/src/bsd.c index 69ef7539588..12d8ad70765 100644 --- a/packages/bun-usockets/src/bsd.c +++ b/packages/bun-usockets/src/bsd.c @@ -126,6 +126,59 @@ int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int fl #endif } +#if defined(__linux__) +#include + +/* Dequeue one entry from the UDP socket's error queue (populated when + * IP_RECVERR/IPV6_RECVERR is enabled and an ICMP error arrives). Plain + * recv/recvmsg reports the pending error once but does NOT remove it from + * the error queue, so EPOLLERR stays level-triggered and the event loop + * busy-spins. Reading with MSG_ERRQUEUE actually drains it. + * + * Returns 1 and writes the error's errno to *err_out when an entry was + * dequeued; returns 0 when the queue is empty; returns -1 on unexpected + * failure (errno set). */ +int bsd_udp_drain_errqueue(LIBUS_SOCKET_DESCRIPTOR fd, int *err_out) { + char control[256]; + /* The error-queue message carries the offending packet's payload; we + * only care about the cmsg, so a 1-byte iov is enough (kernel sets + * MSG_TRUNC which we ignore). */ + char scratch[1]; + struct sockaddr_storage addr; + struct iovec iov = { .iov_base = scratch, .iov_len = sizeof(scratch) }; + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + msg.msg_name = &addr; + msg.msg_namelen = sizeof(addr); + msg.msg_iov = &iov; + msg.msg_iovlen = 1; + msg.msg_control = control; + msg.msg_controllen = sizeof(control); + + ssize_t ret; + do { + ret = recvmsg(fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT); + } while (ret < 0 && errno == EINTR); + + if (ret < 0) { + return (errno == EAGAIN || errno == EWOULDBLOCK) ? 0 : -1; + } + + for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if ((cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_RECVERR) || + (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_RECVERR)) { + struct sock_extended_err *serr = (struct sock_extended_err *) CMSG_DATA(cmsg); + *err_out = (int) serr->ee_errno; + return 1; + } + } + /* Dequeued something but no sock_extended_err cmsg — unexpected, but + * report it as drained so the caller keeps looping until EAGAIN. */ + *err_out = 0; + return 1; +} +#endif + int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_recvbuf *recvbuf, int flags) { #if defined(_WIN32) socklen_t addr_len = sizeof(struct sockaddr_storage); diff --git a/packages/bun-usockets/src/internal/networking/bsd.h b/packages/bun-usockets/src/internal/networking/bsd.h index 0b57bf11e04..c94ace487ed 100644 --- a/packages/bun-usockets/src/internal/networking/bsd.h +++ b/packages/bun-usockets/src/internal/networking/bsd.h @@ -167,6 +167,9 @@ struct udp_sendbuf { int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int flags); int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_recvbuf *recvbuf, int flags); +#if defined(__linux__) +int bsd_udp_drain_errqueue(LIBUS_SOCKET_DESCRIPTOR fd, int *err_out); +#endif void bsd_udp_setup_recvbuf(struct udp_recvbuf *recvbuf, void *databuf, size_t databuflen); int bsd_udp_setup_sendbuf(struct udp_sendbuf *buf, size_t bufsize, void** payloads, size_t* lengths, void** addresses, int num); int bsd_udp_packet_buffer_payload_length(struct udp_recvbuf *msgvec, int index); diff --git a/packages/bun-usockets/src/loop.c b/packages/bun-usockets/src/loop.c index 2d626e540f7..fa65ec7d8aa 100644 --- a/packages/bun-usockets/src/loop.c +++ b/packages/bun-usockets/src/loop.c @@ -24,10 +24,6 @@ #ifndef WIN32 #include #endif -#ifdef __linux__ -#include -#include -#endif #if __has_include("wtf/Platform.h") #include "wtf/Platform.h" @@ -671,45 +667,38 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in } #if defined(__linux__) - /* On Linux with IP_RECVERR, EPOLLERR fires when an ICMP error - * (port unreachable, host unreachable, TTL exceeded, ...) is - * queued on the socket's error queue. For an *unconnected* UDP - * socket regular recvmmsg does NOT dequeue these — only - * recvmsg(MSG_ERRQUEUE) does — so EPOLLERR stays level-triggered - * until we drain it explicitly. Do that here, surfacing each - * errno via on_recv_error; the socket stays open. On other - * platforms (kqueue EV_ERROR, Windows) an error event is fatal — - * preserve close-on-error there. */ - int recv_error_surfaced = 0; - int recv_would_block_only = 0; + /* IP_RECVERR: EPOLLERR means the error queue and/or sk_err is + * set. Drain the queue with MSG_ERRQUEUE (plain recv doesn't + * dequeue, so EPOLLERR would stay asserted and busy-loop), + * then consume any residual sk_err via SO_ERROR. Surface each + * errno via on_recv_error and keep the socket open. */ if (error) { - struct msghdr eh; char ectrl[512]; char ebuf[1]; - struct iovec eiov = { ebuf, sizeof(ebuf) }; - while (!u->closed) { - memset(&eh, 0, sizeof(eh)); - eh.msg_iov = &eiov; eh.msg_iovlen = 1; - eh.msg_control = ectrl; eh.msg_controllen = sizeof(ectrl); - if (recvmsg(us_poll_fd(p), &eh, MSG_ERRQUEUE) < 0) break; - recv_error_surfaced = 1; - if (u->on_recv_error) { - /* The queued ICMP error is in sock_extended_err, - * not errno. */ - int ee = 0; - for (struct cmsghdr *cm = CMSG_FIRSTHDR(&eh); cm; cm = CMSG_NXTHDR(&eh, cm)) { - if ((cm->cmsg_level == IPPROTO_IP && cm->cmsg_type == IP_RECVERR) || - (cm->cmsg_level == IPPROTO_IPV6 && cm->cmsg_type == IPV6_RECVERR)) { - ee = ((struct sock_extended_err *) CMSG_DATA(cm))->ee_errno; - break; - } - } - u->on_recv_error(u, ee ? ee : ECONNREFUSED); + int err = 0, drained = 0; + for (int budget = 32; budget > 0 && !u->closed; budget--) { + if ((drained = bsd_udp_drain_errqueue(us_poll_fd(p), &err)) <= 0) break; + if (err && u->on_recv_error) u->on_recv_error(u, err); + } + if (!u->closed && drained < 0) { + /* recvmsg(MSG_ERRQUEUE) itself failed — surface and + * close to avoid spinning on a stuck EPOLLERR. */ + if (errno && u->on_recv_error) u->on_recv_error(u, errno); + if (!u->closed) us_udp_socket_close(u); + } else if (!u->closed && drained == 0) { + /* Queue empty — read-and-clear sk_err. Skip when budget + * ran out (drained>0): sk_err then holds the NEXT + * queue entry's errno and reading it now would + * double-report on the next tick. */ + socklen_t len = sizeof(err); + if (getsockopt(us_poll_fd(p), SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0 && err) { + if (u->on_recv_error) u->on_recv_error(u, err); } } + if (u->closed) break; + error = 0; /* handled; don't close below */ } #endif if ((events & LIBUS_SOCKET_READABLE) && !u->closed) { - do { struct udp_recvbuf recvbuf; bsd_udp_setup_recvbuf(&recvbuf, u->loop->data.recv_buf, LIBUS_RECV_BUFFER_LENGTH); @@ -717,30 +706,21 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in if (npackets > 0) { u->on_data(u, &recvbuf, npackets); } else { - if (npackets == LIBUS_SOCKET_ERROR) { - if (!bsd_would_block()) { + if (npackets == LIBUS_SOCKET_ERROR && !bsd_would_block()) { #if defined(__linux__) - int recv_err = errno; - recv_error_surfaced = 1; - if (u->on_recv_error) { - u->on_recv_error(u, recv_err); - } -#else - /* non-Linux: fall through and close below */ - error = 1; -#endif - } -#if defined(__linux__) - else { - recv_would_block_only = 1; + /* A pending error on the normal recv path (not + * the error queue) — surface it but keep the + * socket open; UDP errors are per-datagram. */ + int recv_err = errno; + if (u->on_recv_error) { + u->on_recv_error(u, recv_err); } +#else + /* non-Linux: fall through and close below */ + error = 1; #endif - } else { - // 0 messages received, we are done - // this case can happen if either: - // - the total number of messages pending was not divisible by 8 - // - recvmsg() was used instead of recvmmsg() and there was no message to read. } + // else: 0 messages or EAGAIN — done for now. break; } @@ -761,21 +741,12 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in } } -#if defined(__linux__) - /* Only close on EPOLLERR if we didn't surface the real errno - * via recvmmsg + on_recv_error above AND recv wasn't just - * EAGAIN (which means the error queue is already drained, - * leaving a residual EPOLLERR). Otherwise the socket stays - * open so the user can keep sending/receiving after a - * transient ICMP error. */ - if (error && !recv_error_surfaced && !recv_would_block_only && !u->closed) { - us_udp_socket_close(u); - } -#else + /* On non-Linux platforms (kqueue's EV_ERROR, Windows) an error + * event is a fatal socket condition, not a drainable queue. On + * Linux, EPOLLERR was handled above and error was cleared. */ if (error && !u->closed) { us_udp_socket_close(u); } -#endif break; } } diff --git a/test/regression/issue/29436.test.ts b/test/regression/issue/29436.test.ts new file mode 100644 index 00000000000..c16c5b8156d --- /dev/null +++ b/test/regression/issue/29436.test.ts @@ -0,0 +1,150 @@ +// https://github.com/oven-sh/bun/issues/29436 +// +// Sending a UDP datagram to a port with no listener on Linux generates an +// ICMP "port unreachable". With IP_RECVERR enabled the kernel queues this on +// the socket's error queue and raises EPOLLERR. The error queue must be read +// with recvmsg(MSG_ERRQUEUE) — plain recvmsg reports the pending error once +// but does not dequeue it, so EPOLLERR stays level-triggered and epoll_wait +// busy-loops at 100% CPU forever. + +import { expect, test } from "bun:test"; +import { bunEnv, bunExe, isLinux } from "harness"; + +// Port 1 (tcpmux) is privileged (< 1024) so the kernel never auto-assigns it +// and no userspace process binds it in CI — guarantees ICMP port-unreachable +// without a bind→close→send TOCTOU race on an ephemeral port. +const deadPort = 1; + +// Each test spawns a subprocess that sleeps up to ~3s; debug/ASAN builds add +// several seconds of startup, so budget well above the 5s default. +const timeout = 20_000; + +async function run(script: string) { + await using proc = Bun.spawn({ + cmd: [bunExe(), "-e", script], + env: bunEnv, + stdout: "pipe", + stderr: "inherit", + }); + const [stdout, exitCode] = await Promise.all([proc.stdout.text(), proc.exited]); + + const result = JSON.parse(stdout.trim()); + // The error handler should fire exactly once per ICMP error, not zero + // (event swallowed) and not unbounded (re-fired every loop tick). + expect(result.errorCount).toBe(1); + expect(result.errorCode).toBe("ECONNREFUSED"); + // The socket must remain open and usable after a transient ICMP error — + // a "fix" that closes it on error would also stop the busy-loop. + expect(result.closed).toBe(false); + // The buggy build burns ~100% CPU (cpuMs ≈ wallMs). A fixed build idles; + // even under debug/ASAN it stays well below 75% of wall time. + expect(result.cpuMs).toBeLessThan(result.wallMs * 0.75); + expect(exitCode).toBe(0); +} + +// IP_RECVERR is Linux-only; on other platforms the send either silently +// succeeds (no ICMP surfaced on unconnected sockets) or errors synchronously. +test.skipIf(!isLinux)( + "Bun.udpSocket: ICMP error does not busy-loop the event loop", + () => + run(/* js */ ` + let errorCount = 0; + let errorCode; + const { promise: gotError, resolve } = Promise.withResolvers(); + const socket = await Bun.udpSocket({ + socket: { + error(err) { + errorCount++; + errorCode ??= err?.code; + resolve(); + }, + }, + }); + socket.send("x", ${deadPort}, "127.0.0.1"); + await Promise.race([gotError, Bun.sleep(2000)]); + + // Measure CPU time consumed while the process should be idle. With the + // bug, the event loop spins and CPU time ~= wall time. + const wallMs = 1000; + const before = process.cpuUsage(); + await Bun.sleep(wallMs); + const after = process.cpuUsage(before); + const cpuMs = (after.user + after.system) / 1000; + + const closed = socket.closed; + socket.close(); + console.log(JSON.stringify({ errorCount, errorCode, closed, cpuMs, wallMs })); + `), + timeout, +); + +// Connected UDP: the kernel's udp_err() sets sk->sk_err AND enqueues to the +// error queue. Draining the error queue via MSG_ERRQUEUE clears sk_err (in +// sock_dequeue_err_skb) for the last ICMP entry; a follow-up SO_ERROR read +// consumes any residual sk_err so EPOLLERR deasserts. +test.skipIf(!isLinux)( + "Bun.udpSocket (connected): ICMP error does not busy-loop the event loop", + () => + run(/* js */ ` + let errorCount = 0; + let errorCode; + const { promise: gotError, resolve } = Promise.withResolvers(); + + const socket = await Bun.udpSocket({ + connect: { hostname: "127.0.0.1", port: ${deadPort} }, + socket: { + error(err) { + errorCount++; + errorCode ??= err?.code; + resolve(); + }, + }, + }); + socket.send("x"); + await Promise.race([gotError, Bun.sleep(2000)]); + + const wallMs = 1000; + const before = process.cpuUsage(); + await Bun.sleep(wallMs); + const after = process.cpuUsage(before); + const cpuMs = (after.user + after.system) / 1000; + + const closed = socket.closed; + socket.close(); + console.log(JSON.stringify({ errorCount, errorCode, closed, cpuMs, wallMs })); + `), + timeout, +); + +test.skipIf(!isLinux)( + "node:dgram: ICMP error does not busy-loop the event loop", + () => + run(/* js */ ` + const dgram = require("node:dgram"); + let errorCount = 0; + let errorCode; + const { promise: gotError, resolve } = Promise.withResolvers(); + const sock = dgram.createSocket("udp4"); + sock.on("error", err => { + errorCount++; + errorCode ??= err?.code; + resolve(); + }); + sock.send("x", ${deadPort}, "127.0.0.1"); + await Promise.race([gotError, Bun.sleep(2000)]); + + const wallMs = 1000; + const before = process.cpuUsage(); + await Bun.sleep(wallMs); + const after = process.cpuUsage(before); + const cpuMs = (after.user + after.system) / 1000; + + // Still bound and usable — address() throws ERR_SOCKET_DGRAM_NOT_RUNNING + // if the socket was torn down. + let closed; + try { sock.address(); closed = false; } catch { closed = true; } + sock.close(); + console.log(JSON.stringify({ errorCount, errorCode, closed, cpuMs, wallMs })); + `), + timeout, +); From 3c78ddc43ceb0b444b139622cd98ca4dc1f3188c Mon Sep 17 00:00:00 2001 From: robobun Date: Wed, 13 May 2026 04:26:55 +0000 Subject: [PATCH 2/2] ci: retrigger