Skip to content

Commit

Permalink
fix(udp): make preview2_udp_sockopts apss
Browse files Browse the repository at this point in the history
  • Loading branch information
manekinekko committed Nov 23, 2023
1 parent 0a19e43 commit 20ac85e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 43 deletions.
7 changes: 7 additions & 0 deletions packages/preview2-shim/lib/nodejs/sockets/socket-common.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
export function cappedUint32(value) {
// Note: cap the value to the highest possible BigInt value that can be represented as a
// unsigned 32-bit integer.
const width = 32n;
return BigInt.asUintN(Number(width), value);
}

export function noop() {}

function tupleToIPv6(arr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export class TcpSocketImpl {
assert(err === -22, "address-in-use");
assert(err === -49, "address-not-bindable");
assert(err === -99, "address-not-bindable"); // EADDRNOTAVAIL
assert(true, "", err);
assert(true, "unknown", err);
}

this.#isBound = true;
Expand Down
87 changes: 45 additions & 42 deletions packages/preview2-shim/lib/nodejs/sockets/udp-socket-impl.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
const { UDP, SendWrap } = process.binding("udp_wrap");
import { isIP } from "node:net";
import { assert } from "../../common/assert.js";
import { deserializeIpAddress, serializeIpAddress } from "./socket-common.js";
import { deserializeIpAddress, cappedUint32, serializeIpAddress } from "./socket-common.js";
import { pollableCreate } from "../../io/worker-io.js";

const SocketConnectionState = {
Expand All @@ -26,17 +26,16 @@ const SocketConnectionState = {
const symbolState = Symbol("SocketInternalState");

// see https://github.com/libuv/libuv/blob/master/docs/src/udp.rst
const flags = {
const Flags = {
UV_UDP_IPV6ONLY: 1,
UV_UDP_REUSEADDR: 4,
};

const bufferSizeFlags = {
const BufferSizeFlags = {
SO_RCVBUF: true,
SO_SNDBUF: false,
};


export class IncomingDatagramStream {
static _create(socket) {
const stream = new IncomingDatagramStream(socket);
Expand Down Expand Up @@ -158,7 +157,6 @@ export class OutgoingDatagramStream {
const outgoingDatagramStreamCreate = OutgoingDatagramStream._create;
delete OutgoingDatagramStream._create;


export class UdpSocketImpl {
/** @type {UDP} */ #socket = null;
/** @type {Network} */ network = null;
Expand Down Expand Up @@ -238,9 +236,8 @@ export class UdpSocketImpl {
const { localAddress, localPort, family } = this.#socketOptions;

let flags = 0;

if (this[symbolState].ipv6Only) {
flags |= flags.UV_UDP_IPV6ONLY;
flags |= Flags.UV_UDP_IPV6ONLY;
}

let err = null;
Expand Down Expand Up @@ -285,18 +282,17 @@ export class UdpSocketImpl {
#startConnect(network, remoteAddress = undefined) {
this[symbolState].operationInProgress = false;

if (remoteAddress === undefined || this[symbolState].state === SocketConnectionState.Connected) {
// reusing a connected socket. See #finishConnect()
return;
}

const host = serializeIpAddress(remoteAddress, this.#socketOptions.family);
const ipFamily = `ipv${isIP(host)}`;

assert(ipFamily.toLocaleLowerCase() === "ipv0", "invalid-argument");
assert(this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), "invalid-argument");

if (this[symbolState].state === SocketConnectionState.Connected) {
// reusing a connected socket. See #finishConnect()
return;
} else {
}

const { port } = remoteAddress.val;
this.#socketOptions.remoteAddress = host;
this.#socketOptions.remotePort = port;
Expand Down Expand Up @@ -350,8 +346,12 @@ export class UdpSocketImpl {
*/
stream(remoteAddress = undefined) {
assert(this[symbolState].isBound === false, "invalid-state");

this.#connect(this.network, remoteAddress);

console.log({
state: this[symbolState],
})

return [incomingDatagramStreamCreate(this.#socket), outgoingDatagramStreamCreate(this.#socket)];
}

Expand Down Expand Up @@ -382,23 +382,22 @@ export class UdpSocketImpl {
* @throws {invalid-state} The socket is not streaming to a specific remote address. (ENOTCONN)
*/
remoteAddress() {
assert(this[symbolState].state !== SocketConnectionState.Connected, "invalid-state", "The socket is not streaming to a specific remote address");
assert(
this[symbolState].state !== SocketConnectionState.Connected,
"invalid-state",
"The socket is not streaming to a specific remote address"
);

const out = {};
this.#socket.getpeername(out);

// Note: getpeername() returns undefined if family is ipv6
// TODO: investigate
if (out.address === undefined) {
return {
tag: "ipv6",
val: {
address: "",
port: 0,
},
};
}
const r = this.#socket.getpeername(out);


assert(
out.address === undefined,
"invalid-state",
"The socket is not streaming to a specific remote address"
);

const { address, port, family } = out;
return {
tag: family.toLocaleLowerCase(),
Expand Down Expand Up @@ -437,7 +436,11 @@ export class UdpSocketImpl {
* @throws {not-supported} (set) Host does not support dual-stack sockets. (Implementations are not required to.)
*/
setIpv6Only(value) {
assert(value === true && this.#socketOptions.family.toLocaleLowerCase() === "ipv4", "not-supported", "Socket is an IPv4 socket.");
assert(
value === true && this.#socketOptions.family.toLocaleLowerCase() === "ipv4",
"not-supported",
"Socket is an IPv4 socket."
);
assert(this[symbolState].isBound, "invalid-state", "The socket is already bound");

this[symbolState].ipv6Only = value;
Expand Down Expand Up @@ -470,14 +473,18 @@ export class UdpSocketImpl {
*/
receiveBufferSize() {
const exceptionInfo = {};
const value = this.#socket.bufferSize(0, bufferSizeFlags.SO_RCVBUF, exceptionInfo);
const value = this.#socket.bufferSize(0, BufferSizeFlags.SO_RCVBUF, exceptionInfo);

if (exceptionInfo.code === "EBADF") {
// TODO: handle the case where bad file descriptor is returned
// This happens when the socket is not bound
return this[symbolState].receiveBufferSize;
}

console.log({
value
})

return value;
}

Expand All @@ -490,12 +497,10 @@ export class UdpSocketImpl {
setReceiveBufferSize(value) {
assert(value === 0n, "invalid-argument", "The provided value was 0");

// Note: libuv expects a uint32. Passing a bigint will result in an assertion error in V8.
value = Number(value);

const cappedValue = cappedUint32(value);
const exceptionInfo = {};
this.#socket.bufferSize(Number(value), bufferSizeFlags.SO_RCVBUF, exceptionInfo);
this[symbolState].receiveBufferSize = value;
this.#socket.bufferSize(Number(cappedValue), BufferSizeFlags.SO_RCVBUF, exceptionInfo);
this[symbolState].receiveBufferSize = cappedValue;
}

/**
Expand All @@ -504,8 +509,8 @@ export class UdpSocketImpl {
*/
sendBufferSize() {
const exceptionInfo = {};
const value = this.#socket.bufferSize(0, bufferSizeFlags.SO_SNDBUF, exceptionInfo);
const value = this.#socket.bufferSize(0, BufferSizeFlags.SO_SNDBUF, exceptionInfo);

if (exceptionInfo.code === "EBADF") {
// TODO: handle the case where bad file descriptor is returned
// This happens when the socket is not bound
Expand All @@ -524,12 +529,10 @@ export class UdpSocketImpl {
setSendBufferSize(value) {
assert(value === 0n, "invalid-argument", "The provided value was 0");

// Note: libuv expects a uint32. Passing a bigint will result in an assertion error in V8.
value = Number(value);

const cappedValue = cappedUint32(value);
const exceptionInfo = {};
this.#socket.bufferSize(value, bufferSizeFlags.SO_SNDBUF, exceptionInfo);
this[symbolState].sendBufferSize = value;
this.#socket.bufferSize(Number(cappedValue), BufferSizeFlags.SO_SNDBUF, exceptionInfo);
this[symbolState].sendBufferSize = cappedValue;
}

/**
Expand Down

0 comments on commit 20ac85e

Please sign in to comment.