Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/bun-types/bun.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6400,12 +6400,25 @@ declare module "bun" {
namespace udp {
type Data = string | ArrayBufferView | ArrayBufferLike;

/**
* Extra metadata passed to the `data` callback for each received datagram.
*/
export interface ReceiveFlags {
/**
* `true` if the datagram was larger than the receive buffer and was
* truncated by the kernel (MSG_TRUNC). The `data` passed to the
* callback contains only the portion that fit in the buffer.
*/
truncated: boolean;
}

export interface SocketHandler<DataBinaryType extends BinaryType> {
data?(
socket: Socket<DataBinaryType>,
data: BinaryTypeList[DataBinaryType],
port: number,
address: string,
flags: ReceiveFlags,
): void | Promise<void>;
drain?(socket: Socket<DataBinaryType>): void | Promise<void>;
error?(socket: Socket<DataBinaryType>, error: Error): void | Promise<void>;
Expand All @@ -6417,6 +6430,7 @@ declare module "bun" {
data: BinaryTypeList[DataBinaryType],
port: number,
address: string,
flags: ReceiveFlags,
): void | Promise<void>;
drain?(socket: ConnectedSocket<DataBinaryType>): void | Promise<void>;
error?(socket: ConnectedSocket<DataBinaryType>, error: Error): void | Promise<void>;
Expand Down
25 changes: 25 additions & 0 deletions packages/bun-usockets/src/bsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,16 @@ int bsd_udp_packet_buffer_payload_length(struct udp_recvbuf *msgvec, int index)
#endif
}

int bsd_udp_packet_buffer_truncated(struct udp_recvbuf *msgvec, int index) {
#if defined(_WIN32)
/* On Windows, WSARecvFrom signals truncation via WSAEMSGSIZE on recv,
* which we don't currently surface here. */
return 0;
#else
return (((struct mmsghdr *) msgvec)[index].msg_hdr.msg_flags & MSG_TRUNC) ? 1 : 0;
#endif
}

LIBUS_SOCKET_DESCRIPTOR apple_no_sigpipe(LIBUS_SOCKET_DESCRIPTOR fd) {
#ifdef __APPLE__
if (fd != LIBUS_SOCKET_ERROR) {
Expand Down Expand Up @@ -1291,6 +1301,21 @@ LIBUS_SOCKET_DESCRIPTOR bsd_create_udp_socket(const char *host, int port, int op
}
}

#if defined(__linux__)
/* Linux suppresses ICMP errors (port unreachable, host unreachable, TTL
* exceeded, etc.) on unconnected UDP sockets by default. Enabling
* IP_RECVERR/IPV6_RECVERR surfaces them as errors on the next send/recv,
* rather than silently dropping them. Matches libuv. */
#ifdef IP_RECVERR
setsockopt(listenFd, IPPROTO_IP, IP_RECVERR, &enabled, sizeof(enabled));
#endif
#ifdef IPV6_RECVERR
if (listenAddr->ai_family == AF_INET6) {
setsockopt(listenFd, IPPROTO_IPV6, IPV6_RECVERR, &enabled, sizeof(enabled));
}
#endif
#endif

/* We bind here as well */
if (bind(listenFd, listenAddr->ai_addr, (socklen_t) listenAddr->ai_addrlen)) {
if (err != NULL) {
Expand Down
4 changes: 4 additions & 0 deletions packages/bun-usockets/src/internal/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ struct us_udp_socket_t {
void (*on_data)(struct us_udp_socket_t *, void *, int);
void (*on_drain)(struct us_udp_socket_t *);
void (*on_close)(struct us_udp_socket_t *);
/* Called when recvmmsg returns an error (other than EAGAIN). The socket
* is NOT closed — caller decides whether to close. Used to surface ICMP
* errors delivered via IP_RECVERR on Linux (ECONNREFUSED, etc.). */
void (*on_recv_error)(struct us_udp_socket_t *, int err);
void *user;
struct us_loop_t *loop;
/* An UDP socket can only ever be bound to one single port regardless of how
Expand Down
1 change: 1 addition & 0 deletions packages/bun-usockets/src/internal/networking/bsd.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ int bsd_udp_packet_buffer_payload_length(struct udp_recvbuf *msgvec, int index);
char *bsd_udp_packet_buffer_payload(struct udp_recvbuf *msgvec, int index);
char *bsd_udp_packet_buffer_peer(struct udp_recvbuf *msgvec, int index);
int bsd_udp_packet_buffer_local_ip(struct udp_recvbuf *msgvec, int index, char *ip);
int bsd_udp_packet_buffer_truncated(struct udp_recvbuf *msgvec, int index);
// int bsd_udp_packet_buffer_ecn(struct udp_recvbuf *msgvec, int index);

LIBUS_SOCKET_DESCRIPTOR apple_no_sigpipe(LIBUS_SOCKET_DESCRIPTOR fd);
Expand Down
6 changes: 5 additions & 1 deletion packages/bun-usockets/src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ struct us_cert_string_t {
char *us_udp_packet_buffer_payload(struct us_udp_packet_buffer_t *buf, int index);
int us_udp_packet_buffer_payload_length(struct us_udp_packet_buffer_t *buf, int index);

/* Returns 1 if the received datagram was truncated (larger than recv buffer),
* 0 otherwise. Backed by MSG_TRUNC in msg_hdr.msg_flags on POSIX. */
int us_udp_packet_buffer_truncated(struct us_udp_packet_buffer_t *buf, int index);

/* Copies out local (received destination) ip (4 or 16 bytes) of received packet */
int us_udp_packet_buffer_local_ip(struct us_udp_packet_buffer_t *buf, int index, char *ip);

Expand Down Expand Up @@ -161,7 +165,7 @@ struct us_udp_packet_buffer_t *us_create_udp_packet_buffer();

//struct us_udp_socket_t *us_create_udp_socket(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), char *host, unsigned short port);

struct us_udp_socket_t *us_create_udp_socket(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), const char *host, unsigned short port, int flags, int *err, void *user);
struct us_udp_socket_t *us_create_udp_socket(us_loop_r loop, void (*data_cb)(struct us_udp_socket_t *, void *, int), void (*drain_cb)(struct us_udp_socket_t *), void (*close_cb)(struct us_udp_socket_t *), void (*recv_error_cb)(struct us_udp_socket_t *, int), const char *host, unsigned short port, int flags, int *err, void *user);

void us_udp_socket_close(struct us_udp_socket_t *s);

Expand Down
34 changes: 29 additions & 5 deletions packages/bun-usockets/src/loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -585,21 +585,40 @@
case POLL_TYPE_UDP: {
struct us_udp_socket_t *u = (struct us_udp_socket_t *) p;
if (u->closed) {
break;
}

if (events & LIBUS_SOCKET_READABLE) {
// On Linux with IP_RECVERR, EPOLLERR fires when an ICMP error (port
// unreachable, host unreachable, TTL exceeded, ...) is queued on
// the socket. The kernel may or may not also set EPOLLIN. Calling
// recvmmsg on such a socket returns -1 with the ICMP errno
// (ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, EMSGSIZE, ...), which
// we surface via on_recv_error. The socket stays open.
int recv_error_surfaced = 0;

// recv_would_block_only means: we attempted to recv and the only
// outcome was EAGAIN (no packets, no real error). EPOLLERR in that
// state is a residual event from the kernel's socket error queue
// that has already been drained — not a fatal condition.
int recv_would_block_only = 0;

if ((events & LIBUS_SOCKET_READABLE) || error) {
do {
struct udp_recvbuf recvbuf;
bsd_udp_setup_recvbuf(&recvbuf, u->loop->data.recv_buf, LIBUS_RECV_BUFFER_LENGTH);

Check failure on line 608 in packages/bun-usockets/src/loop.c

View check run for this annotation

Claude / Claude Code Review

Recv loop entered on EPOLLERR on all platforms, not just Linux

The recv loop is now entered on any error event (EPOLLERR/EV_ERROR) regardless of platform, but the recv_would_block_only flag was designed only for Linux IP_RECVERR semantics. On macOS/kqueue, EV_ERROR means a fatal filter condition with no drainable error queue; if recvmmsg returns EAGAIN after EV_ERROR, recv_would_block_only=1 prevents socket closure, leaving the socket permanently open in a broken state. The recv-on-error path and recv_would_block_only logic should be guarded with #if define
Comment thread
robobun marked this conversation as resolved.
int npackets = bsd_recvmmsg(us_poll_fd(p), &recvbuf, MSG_DONTWAIT);
if (npackets > 0) {
u->on_data(u, &recvbuf, npackets);
} else {
if (npackets == LIBUS_SOCKET_ERROR) {
// If the error was not EAGAIN, mark the error
if (!bsd_would_block()) {
error = 1;
int recv_err = errno;
if (bsd_would_block()) {
recv_would_block_only = 1;
} else {
recv_error_surfaced = 1;
if (u->on_recv_error) {
u->on_recv_error(u, recv_err);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
}
} else {
// 0 messages received, we are done
Expand All @@ -623,7 +642,12 @@
us_poll_change(&u->p, u->loop, us_poll_events(&u->p) & LIBUS_SOCKET_READABLE);
}

if (error && !u->closed) {
// Only close on EPOLLERR if we didn't surface the real errno via
// recvmmsg + on_recv_error above AND recv wasn't just EAGAIN (which
// means the error queue is already drained, leaving a residual
// EPOLLERR). Otherwise the socket stays open so the user can keep
// sending/receiving after a transient ICMP error.
if (error && !recv_error_surfaced && !recv_would_block_only && !u->closed) {
us_udp_socket_close(u);
}
break;
Expand Down
6 changes: 6 additions & 0 deletions packages/bun-usockets/src/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ int us_udp_packet_buffer_payload_length(struct us_udp_packet_buffer_t *buf, int
return bsd_udp_packet_buffer_payload_length((struct udp_recvbuf *)buf, index);
}

int us_udp_packet_buffer_truncated(struct us_udp_packet_buffer_t *buf, int index) {
return bsd_udp_packet_buffer_truncated((struct udp_recvbuf *)buf, index);
}

int us_udp_socket_send(struct us_udp_socket_t *s, void** payloads, size_t* lengths, void** addresses, int num) {
if (num == 0) return 0;
int fd = us_poll_fd((struct us_poll_t *) s);
Expand Down Expand Up @@ -147,6 +151,7 @@ struct us_udp_socket_t *us_create_udp_socket(
void (*data_cb)(struct us_udp_socket_t *, void *, int),
void (*drain_cb)(struct us_udp_socket_t *),
void (*close_cb)(struct us_udp_socket_t *),
void (*recv_error_cb)(struct us_udp_socket_t *, int),
const char *host,
unsigned short port,
int flags,
Expand Down Expand Up @@ -182,6 +187,7 @@ struct us_udp_socket_t *us_create_udp_socket(
udp->on_data = data_cb;
udp->on_drain = drain_cb;
udp->on_close = close_cb;
udp->on_recv_error = recv_error_cb;
udp->next = NULL;

us_poll_start((struct us_poll_t *) udp, udp->loop, LIBUS_SOCKET_READABLE | LIBUS_SOCKET_WRITABLE);
Expand Down
20 changes: 20 additions & 0 deletions src/bun.js/api/bun/udp_socket.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ fn onClose(socket: *uws.udp.Socket) callconv(.c) void {
this.socket = null;
}

fn onRecvError(socket: *uws.udp.Socket, errno: c_int) callconv(.c) void {
jsc.markBinding(@src());

const this: *UDPSocket = bun.cast(*UDPSocket, socket.user().?);
// Build a SystemError from errno and dispatch through the existing error handler.
// Triggered on Linux via IP_RECVERR when the kernel surfaces ICMP errors
// (ECONNREFUSED, EHOSTUNREACH, ENETUNREACH, EMSGSIZE, ...) on recv.
const e: bun.sys.E = @enumFromInt(errno);
const maybe = bun.sys.Maybe(void).errno(e, .recv);
const globalThis = this.globalThis;
const err_value = maybe.err.toJS(globalThis) catch return;
this.callErrorHandler(.zero, err_value);
}

fn onDrain(socket: *uws.udp.Socket) callconv(.c) void {
jsc.markBinding(@src());

Expand Down Expand Up @@ -76,6 +90,7 @@ fn onData(socket: *uws.udp.Socket, buf: *uws.udp.PacketBuffer, packets: c_int) c
}

const slice = buf.getPayload(i);
const truncated = buf.getTruncated(i);

const span = std.mem.span(hostname.?);
var hostname_string = if (scope_id) |id| blk: {
Expand All @@ -94,11 +109,15 @@ fn onData(socket: *uws.udp.Socket, buf: *uws.udp.PacketBuffer, packets: c_int) c
defer loop.exit();
defer thisValue.ensureStillAlive();

const flags = jsc.JSValue.createEmptyObject(globalThis, 1);
flags.put(globalThis, jsc.ZigString.static("truncated"), .jsBoolean(truncated));

_ = callback.call(globalThis, thisValue, &.{
thisValue,
udpSocket.config.binary_type.toJS(slice, globalThis) catch return,
.jsNumber(port),
hostname_string.transferToJS(globalThis) catch return,
flags,
}) catch |err| {
udpSocket.callErrorHandler(.zero, globalThis.takeException(err));
};
Expand Down Expand Up @@ -295,6 +314,7 @@ pub const UDPSocket = struct {
onData,
onDrain,
onClose,
onRecvError,
hostname_z,
this.config.port,
this.config.flags,
Expand Down
11 changes: 8 additions & 3 deletions src/deps/uws/udp.zig
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
const udp = @This();

pub const Socket = opaque {
pub fn create(loop: *Loop, data_cb: *const fn (*udp.Socket, *PacketBuffer, c_int) callconv(.c) void, drain_cb: *const fn (*udp.Socket) callconv(.c) void, close_cb: *const fn (*udp.Socket) callconv(.c) void, host: [*c]const u8, port: c_ushort, options: c_int, err: ?*c_int, user_data: ?*anyopaque) ?*udp.Socket {
return us_create_udp_socket(loop, data_cb, drain_cb, close_cb, host, port, options, err, user_data);
pub fn create(loop: *Loop, data_cb: *const fn (*udp.Socket, *PacketBuffer, c_int) callconv(.c) void, drain_cb: *const fn (*udp.Socket) callconv(.c) void, close_cb: *const fn (*udp.Socket) callconv(.c) void, recv_error_cb: *const fn (*udp.Socket, c_int) callconv(.c) void, host: [*c]const u8, port: c_ushort, options: c_int, err: ?*c_int, user_data: ?*anyopaque) ?*udp.Socket {
return us_create_udp_socket(loop, data_cb, drain_cb, close_cb, recv_error_cb, host, port, options, err, user_data);
}

pub fn send(this: *udp.Socket, payloads: []const [*]const u8, lengths: []const usize, addresses: []const ?*const anyopaque) c_int {
Expand Down Expand Up @@ -71,7 +71,7 @@ pub const Socket = opaque {
return us_udp_socket_set_source_specific_membership(this, source, group, iface, @intFromBool(drop));
}

extern fn us_create_udp_socket(loop: ?*Loop, data_cb: *const fn (*udp.Socket, *PacketBuffer, c_int) callconv(.c) void, drain_cb: *const fn (*udp.Socket) callconv(.c) void, close_cb: *const fn (*udp.Socket) callconv(.c) void, host: [*c]const u8, port: c_ushort, options: c_int, err: ?*c_int, user_data: ?*anyopaque) ?*udp.Socket;
extern fn us_create_udp_socket(loop: ?*Loop, data_cb: *const fn (*udp.Socket, *PacketBuffer, c_int) callconv(.c) void, drain_cb: *const fn (*udp.Socket) callconv(.c) void, close_cb: *const fn (*udp.Socket) callconv(.c) void, recv_error_cb: *const fn (*udp.Socket, c_int) callconv(.c) void, host: [*c]const u8, port: c_ushort, options: c_int, err: ?*c_int, user_data: ?*anyopaque) ?*udp.Socket;
extern fn us_udp_socket_connect(socket: *udp.Socket, hostname: [*c]const u8, port: c_uint) c_int;
extern fn us_udp_socket_disconnect(socket: *udp.Socket) c_int;
extern fn us_udp_socket_send(socket: *udp.Socket, [*c]const [*c]const u8, [*c]const usize, [*c]const ?*const anyopaque, c_int) c_int;
Expand Down Expand Up @@ -101,9 +101,14 @@ pub const PacketBuffer = opaque {
return payload[0..@as(usize, @intCast(len))];
}

pub fn getTruncated(this: *PacketBuffer, index: c_int) bool {
return us_udp_packet_buffer_truncated(this, index) != 0;
}

extern fn us_udp_packet_buffer_peer(buf: ?*PacketBuffer, index: c_int) *std.posix.sockaddr.storage;
extern fn us_udp_packet_buffer_payload(buf: ?*PacketBuffer, index: c_int) [*]u8;
extern fn us_udp_packet_buffer_payload_length(buf: ?*PacketBuffer, index: c_int) c_int;
extern fn us_udp_packet_buffer_truncated(buf: ?*PacketBuffer, index: c_int) c_int;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
};

const bun = @import("bun");
Expand Down
96 changes: 96 additions & 0 deletions test/js/bun/udp/udp_socket_recv_flags.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Coverage for the fifth parameter of Bun.udpSocket's `data` callback
// (`ReceiveFlags.truncated` from MSG_TRUNC) and for Linux's IP_RECVERR
// surfacing ICMP errors as `error` events on the socket.

import { udpSocket } from "bun";
import { describe, expect, test } from "bun:test";
import { isLinux } from "harness";

describe("udpSocket() receive flags", () => {
test("data callback receives flags object with truncated=false for normal packets", async () => {
const { promise, resolve, reject } = Promise.withResolvers<unknown>();
const client = await udpSocket({});
const server = await udpSocket({
socket: {
data(_socket, _data, _port, _address, flags) {
resolve(flags);
},
error(_socket, err) {
reject(err);
},
},
});
function sendRec() {
if (!client.closed) {
client.send("hello", server.port, "127.0.0.1");
setTimeout(sendRec, 10);
}
}
sendRec();
Comment thread
coderabbitai[bot] marked this conversation as resolved.
try {
const flags = await promise;
expect(flags).toEqual({ truncated: false });
} finally {
client.close();
server.close();
}
});

// IP_RECVERR is Linux-specific. On BSDs and Windows, ICMP errors on
// unconnected UDP sockets either propagate by default or are delivered
// through different channels that we don't currently surface.
test.skipIf(!isLinux)(
"surfaces ECONNREFUSED from ICMP port unreachable (IP_RECVERR) and keeps the socket usable",
async () => {
const { promise: errPromise, resolve: resolveErr } = Promise.withResolvers<Error & { code?: string }>();
const { promise: msgPromise, resolve: resolveMsg } = Promise.withResolvers<string>();

const receiver = await udpSocket({
socket: {
data(_socket, data) {
resolveMsg(data.toString());
},
},
});

const sender = await udpSocket({
socket: {
error(err: Error & { code?: string }) {
resolveErr(err);
},
},
});

// Send to a closed port on localhost. The kernel replies with ICMP
// port unreachable; with IP_RECVERR the next recv surfaces ECONNREFUSED.
let gotError = false;
function sendDead() {
if (!gotError && !sender.closed) {
sender.send("dead", 1, "127.0.0.1");
setTimeout(sendDead, 10);
}
}
sendDead();

try {
const err = await errPromise;
gotError = true;
expect(err?.code).toBe("ECONNREFUSED");
// The sender socket must remain usable after an ICMP error.
expect(sender.closed).toBe(false);

function sendAlive() {
if (!sender.closed && !receiver.closed) {
sender.send("alive", receiver.port, "127.0.0.1");
setTimeout(sendAlive, 10);
}
}
sendAlive();
expect(await msgPromise).toBe("alive");
} finally {
sender.close();
receiver.close();
}
},
);
});
Loading