Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 72 additions & 20 deletions src/async/posix_event_loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -833,13 +833,25 @@ pub const FilePoll = struct {
if (comptime Environment.isLinux) {
const one_shot_flag: u32 = if (!this.flags.contains(.one_shot)) 0 else linux.EPOLL.ONESHOT;

const flags: u32 = switch (flag) {
var flags: u32 = switch (flag) {
.process,
.readable,
=> linux.EPOLL.IN | linux.EPOLL.HUP | one_shot_flag,
.writable => linux.EPOLL.OUT | linux.EPOLL.HUP | linux.EPOLL.ERR | one_shot_flag,
else => unreachable,
};
// epoll keys on fd alone; if the other direction is already
// registered on this poll, preserve it in the CTL_MOD mask.
// (EPOLLONESHOT disarms the whole fd after the first event in
// either direction, so bidirectional one-shot is not supported.)
if (flag == .readable and this.flags.contains(.poll_writable)) {
bun.debugAssert(!this.flags.contains(.one_shot));
flags |= linux.EPOLL.OUT | linux.EPOLL.ERR;
}
if (flag == .writable and this.flags.contains(.poll_readable)) {
bun.debugAssert(!this.flags.contains(.one_shot));
flags |= linux.EPOLL.IN;
Comment thread
robobun marked this conversation as resolved.
}

var event = linux.epoll_event{ .events = flags, .data = .{ .u64 = @intFromPtr(Pollable.init(this).ptr()) } };

Expand Down Expand Up @@ -936,8 +948,9 @@ pub const FilePoll = struct {
// If an error occurs while
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
// with EV_ERROR set in flags and the system error in data.
if (changelist[0].flags == std.c.EV.ERROR and changelist[0].data != 0) {
// with EV_ERROR set in flags and the system error in data. xnu ORs
// EV_ERROR into the existing action bits, so test the bit.
if ((changelist[0].flags & std.c.EV.ERROR) != 0 and changelist[0].data != 0) {
return bun.sys.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
// Otherwise, -1 will be returned, and errno will be set to
// indicate the error condition.
Expand Down Expand Up @@ -1049,6 +1062,7 @@ pub const FilePoll = struct {

bun.assert(fd != invalid_fd);
const watcher_fd = loop.fd;
const both_directions = this.flags.contains(.poll_readable) and this.flags.contains(.poll_writable);
const flag: Flags = brk: {
if (this.flags.contains(.poll_readable))
break :brk .readable;
Expand All @@ -1066,14 +1080,15 @@ pub const FilePoll = struct {
log("unregister: {s} ({f}) skipped due to needs_rearm", .{ @tagName(flag), fd });
this.flags.remove(.poll_process);
this.flags.remove(.poll_readable);
this.flags.remove(.poll_process);
this.flags.remove(.poll_writable);
this.flags.remove(.poll_machport);
return .success;
}

log("unregister: FilePoll(0x{x}, generation_number={d}) {s} ({f})", .{ @intFromPtr(this), this.generation_number, @tagName(flag), fd });
log("unregister: FilePoll(0x{x}, generation_number={d}) {s}{s} ({f})", .{ @intFromPtr(this), this.generation_number, @tagName(flag), if (both_directions) "+writable" else "", fd });

if (comptime Environment.isLinux) {
// CTL_DEL keys on fd alone, so both directions are removed together.
const ctl = linux.epoll_ctl(
watcher_fd,
linux.EPOLL.CTL_DEL,
Expand Down Expand Up @@ -1127,6 +1142,21 @@ pub const FilePoll = struct {
else => unreachable,
};

var nchanges: c_int = 1;
if (both_directions) {
// kqueue keys on (fd, filter); delete EVFILT_WRITE as a second change.
changelist[1] = .{
.ident = @intCast(fd.cast()),
.filter = std.posix.system.EVFILT.WRITE,
.data = 0,
.fflags = 0,
.udata = @intFromPtr(Pollable.init(this).ptr()),
.flags = std.c.EV.DELETE,
.ext = .{ 0, 0 },
};
nchanges = 2;
}

// output events only include change errors
const KEVENT_FLAG_ERROR_EVENTS = 0x000002;

Expand All @@ -1136,30 +1166,38 @@ pub const FilePoll = struct {
const rc = std.posix.system.kevent64(
watcher_fd,
&changelist,
1,
nchanges,
// The same array may be used for the changelist and eventlist.
&changelist,
1,
nchanges,
KEVENT_FLAG_ERROR_EVENTS,
&timeout,
);
// If an error occurs while
// processing an element of the changelist and there is enough room
// in the eventlist, then the event will be placed in the eventlist
// with EV_ERROR set in flags and the system error in data.
if (changelist[0].flags == std.c.EV.ERROR) {
return bun.sys.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
// Otherwise, -1 will be returned, and errno will be set to
// indicate the error condition.
}

const errno = bun.sys.getErrno(rc);
switch (rc) {
// Global failure (e.g. EBADF on the kqueue fd): the eventlist
// was not written, so per-entry checks below would read our
// own input. Report errno and stop.
std.math.minInt(@TypeOf(rc))...-1 => return bun.sys.Maybe(void).errnoSys(@intFromEnum(errno), .kevent).?,
else => {},
}

// If an error occurs while processing an element of the changelist
// and there is enough room in the eventlist, then the event will be
// placed in the eventlist with EV_ERROR set in flags and the system
// error in data. With KEVENT_FLAG_ERROR_EVENTS, rc is the count of
// such error events; they are packed from index 0 regardless of
// which change failed. xnu ORs EV_ERROR into the existing action
// bits (EV_DELETE|EV_ERROR = 0x4002), so test the bit, not equality.
if (rc >= 1 and (changelist[0].flags & std.c.EV.ERROR) != 0 and changelist[0].data != 0) {
return bun.sys.Maybe(void).errnoSys(changelist[0].data, .kevent).?;
}
if (rc >= 2 and (changelist[1].flags & std.c.EV.ERROR) != 0 and changelist[1].data != 0) {
return bun.sys.Maybe(void).errnoSys(changelist[1].data, .kevent).?;
}
} else if (comptime Environment.isFreeBSD) {
var changelist = std.mem.zeroes([1]std.c.Kevent);
var changelist = std.mem.zeroes([2]std.c.Kevent);
changelist[0] = switch (flag) {
.readable => .{
.ident = @intCast(fd.cast()),
Expand Down Expand Up @@ -1189,10 +1227,26 @@ pub const FilePoll = struct {
else => unreachable,
};

var nchanges: c_int = 1;
if (both_directions) {
changelist[1] = .{
.ident = @intCast(fd.cast()),
.filter = std.c.EVFILT.WRITE,
.data = 0,
.fflags = 0,
.udata = @intFromPtr(Pollable.init(this).ptr()),
.flags = std.c.EV.DELETE,
};
nchanges = 2;
}

// nevents=0: per-entry errors surface as rc=-1/errno for the
// first failing change. For EV_DELETE (typically ENOENT) a silent
// miss on the second entry is harmless.
const rc = std.c.kevent(
watcher_fd,
&changelist,
1,
nchanges,
@constCast(&changelist),
0,
null,
Expand All @@ -1206,8 +1260,6 @@ pub const FilePoll = struct {

this.flags.remove(.needs_rearm);
this.flags.remove(.one_shot);
// we don't support both right now
bun.assert(!(this.flags.contains(.poll_readable) and this.flags.contains(.poll_writable)));
this.flags.remove(.poll_readable);
this.flags.remove(.poll_writable);
this.flags.remove(.poll_process);
Expand Down
38 changes: 32 additions & 6 deletions src/bun.js/api/bun/dns.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2527,7 +2527,7 @@ pub const Resolver = struct {

const poll: *UvDnsPoll = poll_entry.value_ptr.*;

const uv_events = if (readable) uv.UV_READABLE else 0 | if (writable) uv.UV_WRITABLE else 0;
const uv_events = (if (readable) uv.UV_READABLE else 0) | (if (writable) uv.UV_WRITABLE else 0);
if (uv.uv_poll_start(&poll.poll, uv_events, onDNSPollUv) < 0) {
_ = this.polls.swapRemove(fd);
uv.uv_close(@ptrCast(&poll.poll), onCloseUv);
Expand All @@ -2554,11 +2554,37 @@ pub const Resolver = struct {

var poll = poll_entry.value_ptr.*;

if (readable and !poll.flags.contains(.poll_readable))
_ = poll.register(vm.event_loop_handle.?, .readable, false);

if (writable and !poll.flags.contains(.poll_writable))
_ = poll.register(vm.event_loop_handle.?, .writable, false);
// c-ares reports the full desired (readable, writable) set for this
// fd; sync the poll's registration to match. FilePoll now supports
// both directions on one poll (epoll: combined mask via CTL_MOD;
// kqueue: two filters on the same ident, both EV_DELETEd on
// unregister).
const loop = vm.event_loop_handle.?;
Comment thread
robobun marked this conversation as resolved.
const have_readable = poll.flags.contains(.poll_readable);
const have_writable = poll.flags.contains(.poll_writable);

if ((have_readable and !readable) or (have_writable and !writable)) {
// Dropping a direction. FilePoll has no per-direction
// unregister (epoll CTL_DEL removes both; a targeted kqueue
// EV_DELETE would need a new API), and leaving the unwanted
// direction armed would busy-loop on level-triggered writable
// once the socket connects. Full resync is the simplest
// correct path and c-ares DNS fds are short-lived.
_ = poll.unregister(loop, false);
if (readable)
_ = poll.register(loop, .readable, false);
if (writable)
_ = poll.register(loop, .writable, false);
} else {
// Only adding directions (or no change). register() issues a
// single CTL_MOD on epoll that preserves the other direction;
// on kqueue EV_ADD creates a separate (ident, filter) knote
// without disturbing the existing one.
if (readable and !have_readable)
_ = poll.register(loop, .readable, false);
if (writable and !have_writable)
_ = poll.register(loop, .writable, false);
}
}
}

Expand Down
86 changes: 86 additions & 0 deletions test/js/node/dns/dns-tcp-bidirectional-poll-fixture.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions test/js/node/dns/dns-tcp-bidirectional-poll.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// c-ares opens a TCP socket when the UDP response has TC=1. At
// ares_open_connection() it reports (readable=1, writable=1) for that fd via
// the sock_state_cb so it learns when the nonblocking connect() completes.
// onDNSSocketState registers both directions on one FilePoll; before the fix,
// unregisterWithFd asserted !(poll_readable && poll_writable) and crashed the
// debug build, and on epoll the second register()'s CTL_MOD silently dropped
// the first direction's mask so the response was never read.
//
// The server and resolver run together in a subprocess: the assertion aborts
// the whole process, and the busy-loop on EPOLLOUT (pre-fix) would otherwise
// starve the test runner.
import { expect, test } from "bun:test";
import { bunEnv, bunExe, isWindows } from "harness";
import { join } from "node:path";

test.skipIf(isWindows)("c-ares TCP DNS fd registers readable+writable on one FilePoll without asserting", async () => {
await using proc = Bun.spawn({
cmd: [bunExe(), join(import.meta.dir, "dns-tcp-bidirectional-poll-fixture.ts")],
env: bunEnv,
stdout: "pipe",
stderr: "pipe",
});

const [stdout, stderr, exitCode] = await Promise.all([proc.stdout.text(), proc.stderr.text(), proc.exited]);

expect(stderr.trim()).toBe("");
expect(stdout.trim()).toBe('[["hello"]]');
expect(exitCode).toBe(0);
});
Loading