Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions packages/bun-usockets/src/bsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,59 @@ int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int fl
#endif
}

#if defined(__linux__)
#include <linux/errqueue.h>

/* Dequeue one entry from the UDP socket's error queue (populated when
* IP_RECVERR/IPV6_RECVERR is enabled and an ICMP error arrives). Plain
* recv/recvmsg reports the pending error once but does NOT remove it from
* the error queue, so EPOLLERR stays level-triggered and the event loop
* busy-spins. Reading with MSG_ERRQUEUE actually drains it.
*
* Returns 1 and writes the error's errno to *err_out when an entry was
* dequeued; returns 0 when the queue is empty; returns -1 on unexpected
* failure (errno set). */
int bsd_udp_drain_errqueue(LIBUS_SOCKET_DESCRIPTOR fd, int *err_out) {
char control[256];
/* The error-queue message carries the offending packet's payload; we
* only care about the cmsg, so a 1-byte iov is enough (kernel sets
* MSG_TRUNC which we ignore). */
char scratch[1];
struct sockaddr_storage addr;
struct iovec iov = { .iov_base = scratch, .iov_len = sizeof(scratch) };
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_name = &addr;
msg.msg_namelen = sizeof(addr);
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

ssize_t ret;
do {
ret = recvmsg(fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
} while (ret < 0 && errno == EINTR);

if (ret < 0) {
return (errno == EAGAIN || errno == EWOULDBLOCK) ? 0 : -1;
}

for (struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if ((cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_RECVERR) ||
(cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_RECVERR)) {
struct sock_extended_err *serr = (struct sock_extended_err *) CMSG_DATA(cmsg);
*err_out = (int) serr->ee_errno;
return 1;
}
}
/* Dequeued something but no sock_extended_err cmsg — unexpected, but
* report it as drained so the caller keeps looping until EAGAIN. */
*err_out = 0;
return 1;
}
#endif

int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_recvbuf *recvbuf, int flags) {
#if defined(_WIN32)
socklen_t addr_len = sizeof(struct sockaddr_storage);
Expand Down
3 changes: 3 additions & 0 deletions packages/bun-usockets/src/internal/networking/bsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ struct udp_sendbuf {

int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_sendbuf* sendbuf, int flags);
int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, struct udp_recvbuf *recvbuf, int flags);
#if defined(__linux__)
int bsd_udp_drain_errqueue(LIBUS_SOCKET_DESCRIPTOR fd, int *err_out);
#endif
void bsd_udp_setup_recvbuf(struct udp_recvbuf *recvbuf, void *databuf, size_t databuflen);
int bsd_udp_setup_sendbuf(struct udp_sendbuf *buf, size_t bufsize, void** payloads, size_t* lengths, void** addresses, int num);
int bsd_udp_packet_buffer_payload_length(struct udp_recvbuf *msgvec, int index);
Expand Down
107 changes: 39 additions & 68 deletions packages/bun-usockets/src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
#ifndef WIN32
#include <sys/ioctl.h>
#endif
#ifdef __linux__
#include <netinet/in.h>
#include <linux/errqueue.h>
#endif

#if __has_include("wtf/Platform.h")
#include "wtf/Platform.h"
Expand Down Expand Up @@ -671,76 +667,60 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
}

#if defined(__linux__)
/* On Linux with IP_RECVERR, EPOLLERR fires when an ICMP error
* (port unreachable, host unreachable, TTL exceeded, ...) is
* queued on the socket's error queue. For an *unconnected* UDP
* socket regular recvmmsg does NOT dequeue these — only
* recvmsg(MSG_ERRQUEUE) does — so EPOLLERR stays level-triggered
* until we drain it explicitly. Do that here, surfacing each
* errno via on_recv_error; the socket stays open. On other
* platforms (kqueue EV_ERROR, Windows) an error event is fatal —
* preserve close-on-error there. */
int recv_error_surfaced = 0;
int recv_would_block_only = 0;
/* IP_RECVERR: EPOLLERR means the error queue and/or sk_err is
* set. Drain the queue with MSG_ERRQUEUE (plain recv doesn't
* dequeue, so EPOLLERR would stay asserted and busy-loop),
* then consume any residual sk_err via SO_ERROR. Surface each
* errno via on_recv_error and keep the socket open. */
if (error) {
struct msghdr eh; char ectrl[512]; char ebuf[1];
struct iovec eiov = { ebuf, sizeof(ebuf) };
while (!u->closed) {
memset(&eh, 0, sizeof(eh));
eh.msg_iov = &eiov; eh.msg_iovlen = 1;
eh.msg_control = ectrl; eh.msg_controllen = sizeof(ectrl);
if (recvmsg(us_poll_fd(p), &eh, MSG_ERRQUEUE) < 0) break;
recv_error_surfaced = 1;
if (u->on_recv_error) {
/* The queued ICMP error is in sock_extended_err,
* not errno. */
int ee = 0;
for (struct cmsghdr *cm = CMSG_FIRSTHDR(&eh); cm; cm = CMSG_NXTHDR(&eh, cm)) {
if ((cm->cmsg_level == IPPROTO_IP && cm->cmsg_type == IP_RECVERR) ||
(cm->cmsg_level == IPPROTO_IPV6 && cm->cmsg_type == IPV6_RECVERR)) {
ee = ((struct sock_extended_err *) CMSG_DATA(cm))->ee_errno;
break;
}
}
u->on_recv_error(u, ee ? ee : ECONNREFUSED);
int err = 0, drained = 0;
for (int budget = 32; budget > 0 && !u->closed; budget--) {
if ((drained = bsd_udp_drain_errqueue(us_poll_fd(p), &err)) <= 0) break;
if (err && u->on_recv_error) u->on_recv_error(u, err);
}
if (!u->closed && drained < 0) {
/* recvmsg(MSG_ERRQUEUE) itself failed — surface and
* close to avoid spinning on a stuck EPOLLERR. */
if (errno && u->on_recv_error) u->on_recv_error(u, errno);
if (!u->closed) us_udp_socket_close(u);
} else if (!u->closed && drained == 0) {
/* Queue empty — read-and-clear sk_err. Skip when budget
* ran out (drained>0): sk_err then holds the NEXT
* queue entry's errno and reading it now would
* double-report on the next tick. */
socklen_t len = sizeof(err);
if (getsockopt(us_poll_fd(p), SOL_SOCKET, SO_ERROR, (char *) &err, &len) == 0 && err) {
if (u->on_recv_error) u->on_recv_error(u, err);
}
}
if (u->closed) break;
error = 0; /* handled; don't close below */
}
Comment thread
claude[bot] marked this conversation as resolved.
#endif

if ((events & LIBUS_SOCKET_READABLE) && !u->closed) {

do {
struct udp_recvbuf recvbuf;
bsd_udp_setup_recvbuf(&recvbuf, u->loop->data.recv_buf, LIBUS_RECV_BUFFER_LENGTH);
int npackets = bsd_recvmmsg(us_poll_fd(p), &recvbuf, MSG_DONTWAIT);
if (npackets > 0) {
u->on_data(u, &recvbuf, npackets);
} else {
if (npackets == LIBUS_SOCKET_ERROR) {
if (!bsd_would_block()) {
if (npackets == LIBUS_SOCKET_ERROR && !bsd_would_block()) {
#if defined(__linux__)
int recv_err = errno;
recv_error_surfaced = 1;
if (u->on_recv_error) {
u->on_recv_error(u, recv_err);
}
#else
/* non-Linux: fall through and close below */
error = 1;
#endif
}
#if defined(__linux__)
else {
recv_would_block_only = 1;
/* A pending error on the normal recv path (not
* the error queue) — surface it but keep the
* socket open; UDP errors are per-datagram. */
int recv_err = errno;
if (u->on_recv_error) {
u->on_recv_error(u, recv_err);
}
#else
/* non-Linux: fall through and close below */
error = 1;
#endif
} else {
// 0 messages received, we are done
// this case can happen if either:
// - the total number of messages pending was not divisible by 8
// - recvmsg() was used instead of recvmmsg() and there was no message to read.
}
// else: 0 messages or EAGAIN — done for now.

break;
}
Expand All @@ -761,21 +741,12 @@ void us_internal_dispatch_ready_poll(struct us_poll_t *p, int error, int eof, in
}
}

#if defined(__linux__)
/* Only close on EPOLLERR if we didn't surface the real errno
* via recvmmsg + on_recv_error above AND recv wasn't just
* EAGAIN (which means the error queue is already drained,
* leaving a residual EPOLLERR). Otherwise the socket stays
* open so the user can keep sending/receiving after a
* transient ICMP error. */
if (error && !recv_error_surfaced && !recv_would_block_only && !u->closed) {
us_udp_socket_close(u);
}
#else
/* On non-Linux platforms (kqueue's EV_ERROR, Windows) an error
* event is a fatal socket condition, not a drainable queue. On
* Linux, EPOLLERR was handled above and error was cleared. */
if (error && !u->closed) {
us_udp_socket_close(u);
}
#endif
break;
}
}
Expand Down
150 changes: 150 additions & 0 deletions test/regression/issue/29436.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// https://github.com/oven-sh/bun/issues/29436
//
// Sending a UDP datagram to a port with no listener on Linux generates an
// ICMP "port unreachable". With IP_RECVERR enabled the kernel queues this on
// the socket's error queue and raises EPOLLERR. The error queue must be read
// with recvmsg(MSG_ERRQUEUE) — plain recvmsg reports the pending error once
// but does not dequeue it, so EPOLLERR stays level-triggered and epoll_wait
// busy-loops at 100% CPU forever.

import { expect, test } from "bun:test";
import { bunEnv, bunExe, isLinux } from "harness";

// Port 1 (tcpmux) is privileged (< 1024) so the kernel never auto-assigns it
// and no userspace process binds it in CI — guarantees ICMP port-unreachable
// without a bind→close→send TOCTOU race on an ephemeral port.
const deadPort = 1;

// Each test spawns a subprocess that sleeps up to ~3s; debug/ASAN builds add
// several seconds of startup, so budget well above the 5s default.
const timeout = 20_000;

async function run(script: string) {
await using proc = Bun.spawn({
cmd: [bunExe(), "-e", script],
env: bunEnv,
stdout: "pipe",
stderr: "inherit",
});
const [stdout, exitCode] = await Promise.all([proc.stdout.text(), proc.exited]);

const result = JSON.parse(stdout.trim());
// The error handler should fire exactly once per ICMP error, not zero
// (event swallowed) and not unbounded (re-fired every loop tick).
expect(result.errorCount).toBe(1);
expect(result.errorCode).toBe("ECONNREFUSED");
// The socket must remain open and usable after a transient ICMP error —
// a "fix" that closes it on error would also stop the busy-loop.
expect(result.closed).toBe(false);
// The buggy build burns ~100% CPU (cpuMs ≈ wallMs). A fixed build idles;
// even under debug/ASAN it stays well below 75% of wall time.
expect(result.cpuMs).toBeLessThan(result.wallMs * 0.75);
expect(exitCode).toBe(0);
}

// IP_RECVERR is Linux-only; on other platforms the send either silently
// succeeds (no ICMP surfaced on unconnected sockets) or errors synchronously.
test.skipIf(!isLinux)(
"Bun.udpSocket: ICMP error does not busy-loop the event loop",
() =>
run(/* js */ `
let errorCount = 0;
let errorCode;
const { promise: gotError, resolve } = Promise.withResolvers();
const socket = await Bun.udpSocket({
socket: {
error(err) {
errorCount++;
errorCode ??= err?.code;
resolve();
},
},
});
socket.send("x", ${deadPort}, "127.0.0.1");
await Promise.race([gotError, Bun.sleep(2000)]);

// Measure CPU time consumed while the process should be idle. With the
// bug, the event loop spins and CPU time ~= wall time.
const wallMs = 1000;
const before = process.cpuUsage();
await Bun.sleep(wallMs);
const after = process.cpuUsage(before);
const cpuMs = (after.user + after.system) / 1000;

const closed = socket.closed;
socket.close();
console.log(JSON.stringify({ errorCount, errorCode, closed, cpuMs, wallMs }));
`),
timeout,
);

// Connected UDP: the kernel's udp_err() sets sk->sk_err AND enqueues to the
// error queue. Draining the error queue via MSG_ERRQUEUE clears sk_err (in
// sock_dequeue_err_skb) for the last ICMP entry; a follow-up SO_ERROR read
// consumes any residual sk_err so EPOLLERR deasserts.
test.skipIf(!isLinux)(
"Bun.udpSocket (connected): ICMP error does not busy-loop the event loop",
() =>
run(/* js */ `
let errorCount = 0;
let errorCode;
const { promise: gotError, resolve } = Promise.withResolvers();

const socket = await Bun.udpSocket({
connect: { hostname: "127.0.0.1", port: ${deadPort} },
socket: {
error(err) {
errorCount++;
errorCode ??= err?.code;
resolve();
},
},
});
socket.send("x");
await Promise.race([gotError, Bun.sleep(2000)]);

const wallMs = 1000;
const before = process.cpuUsage();
await Bun.sleep(wallMs);
const after = process.cpuUsage(before);
const cpuMs = (after.user + after.system) / 1000;

const closed = socket.closed;
socket.close();
console.log(JSON.stringify({ errorCount, errorCode, closed, cpuMs, wallMs }));
`),
timeout,
);

test.skipIf(!isLinux)(
"node:dgram: ICMP error does not busy-loop the event loop",
() =>
run(/* js */ `
const dgram = require("node:dgram");
let errorCount = 0;
let errorCode;
const { promise: gotError, resolve } = Promise.withResolvers();
const sock = dgram.createSocket("udp4");
sock.on("error", err => {
errorCount++;
errorCode ??= err?.code;
resolve();
});
sock.send("x", ${deadPort}, "127.0.0.1");
await Promise.race([gotError, Bun.sleep(2000)]);

const wallMs = 1000;
const before = process.cpuUsage();
await Bun.sleep(wallMs);
const after = process.cpuUsage(before);
const cpuMs = (after.user + after.system) / 1000;

// Still bound and usable — address() throws ERR_SOCKET_DGRAM_NOT_RUNNING
// if the socket was torn down.
let closed;
try { sock.address(); closed = false; } catch { closed = true; }
sock.close();
console.log(JSON.stringify({ errorCount, errorCode, closed, cpuMs, wallMs }));
`),
timeout,
);
Loading