diff --git a/packages/preview2-shim/lib/common/assert.js b/packages/preview2-shim/lib/common/assert.js new file mode 100644 index 000000000..0f151be53 --- /dev/null +++ b/packages/preview2-shim/lib/common/assert.js @@ -0,0 +1,7 @@ +export function assert(condition, tag, _val) { + if (condition) { + // TODO: throw meaningful errors + // NOTE: wasmtime conformance tests are expecting a string here (a tag) + throw tag; + } +} diff --git a/packages/preview2-shim/lib/io/calls.js b/packages/preview2-shim/lib/io/calls.js index bcb54090e..003ad708f 100644 --- a/packages/preview2-shim/lib/io/calls.js +++ b/packages/preview2-shim/lib/io/calls.js @@ -45,3 +45,8 @@ export const HTTP_CREATE_REQUEST = ++call_id << 24; export const CLOCKS_NOW = ++call_id << 24; export const CLOCKS_DURATION_SUBSCRIBE = ++call_id << 24; export const CLOCKS_INSTANT_SUBSCRIBE = ++call_id << 24; + +// Sockets +export const SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST = ++call_id << 24; +export const SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST = ++call_id << 24; +export const SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST = ++call_id << 24; diff --git a/packages/preview2-shim/lib/io/stream-types.js b/packages/preview2-shim/lib/io/stream-types.js index 0b75e43b3..1185e1300 100644 --- a/packages/preview2-shim/lib/io/stream-types.js +++ b/packages/preview2-shim/lib/io/stream-types.js @@ -4,5 +4,6 @@ export const STDIN = ++cnt; export const STDOUT = ++cnt; export const STDERR = ++cnt; export const FILE = ++cnt; +export const SOCKET = ++cnt; export const INCOMING_BODY = ++cnt; export const OUTGOING_BODY = ++cnt; diff --git a/packages/preview2-shim/lib/io/worker-thread.js b/packages/preview2-shim/lib/io/worker-thread.js index 9e7d0ff4f..e8b339037 100644 --- a/packages/preview2-shim/lib/io/worker-thread.js +++ b/packages/preview2-shim/lib/io/worker-thread.js @@ -1,8 +1,8 @@ -import { runAsWorker } from "../synckit/index.js"; -import { FILE, STDOUT, STDERR, STDIN } from "./stream-types.js"; +import { resolve } from "node:dns/promises"; import { createReadStream, createWriteStream } from "node:fs"; -import { Readable } from "node:stream"; import { hrtime } from "node:process"; +import { Readable } from "node:stream"; +import { runAsWorker } from "../synckit/index.js"; import { CALL_MASK, CALL_SHIFT, @@ -10,8 +10,8 @@ import { CLOCKS_DURATION_SUBSCRIBE, CLOCKS_INSTANT_SUBSCRIBE, CLOCKS_NOW, - FUTURE_DISPOSE_AND_GET_VALUE, FUTURE_DISPOSE, + FUTURE_DISPOSE_AND_GET_VALUE, HTTP_CREATE_REQUEST, INPUT_STREAM_BLOCKING_READ, INPUT_STREAM_BLOCKING_SKIP, @@ -30,12 +30,16 @@ import { OUTPUT_STREAM_FLUSH, OUTPUT_STREAM_SPLICE, OUTPUT_STREAM_SUBSCRIBE, - OUTPUT_STREAM_WRITE_ZEROES, OUTPUT_STREAM_WRITE, - POLL_POLL_LIST, + OUTPUT_STREAM_WRITE_ZEROES, POLL_POLLABLE_BLOCK, POLL_POLLABLE_READY, + POLL_POLL_LIST, + SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST, + SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, + SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST, } from "./calls.js"; +import { FILE, SOCKET, STDERR, STDIN, STDOUT } from "./stream-types.js"; let streamCnt = 0, pollCnt = 0; @@ -120,6 +124,30 @@ function handle(call, id, payload) { return createFuture(createHttpRequest(method, url, headers, body)); } + // Sockets + case SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST: + return createFuture(resolve(payload.hostname)); + case SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST: + return void unfinishedFutures.delete(id); + case SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST: { + const future = unfinishedFutures.get(id); + if (!future) { + // future not ready yet + if (unfinishedPolls.get(id)) { + throw 'would-block'; + } + throw new Error("future already got and dropped"); + } + unfinishedFutures.delete(id); + return future; + } + case OUTPUT_STREAM_CREATE | SOCKET: { + // TODO: implement + } + case INPUT_STREAM_CREATE | SOCKET: { + // TODO: implement + } + // Stdio case OUTPUT_STREAM_DISPOSE | STDOUT: case OUTPUT_STREAM_DISPOSE | STDERR: @@ -434,14 +462,16 @@ function handle(call, id, payload) { } // poll promises must always resolve and never error +// once the future is resolved, it is removed from unfinishedPolls function createPoll(promise) { const pollId = ++pollCnt; unfinishedPolls.set( pollId, promise.then( () => void unfinishedPolls.delete(pollId), - () => { + (err) => { process._rawDebug("Unexpected poll error"); + process._rawDebug(err); process.exit(1); } ) diff --git a/packages/preview2-shim/lib/nodejs/index.js b/packages/preview2-shim/lib/nodejs/index.js index 45205c3dc..8b00fa92b 100644 --- a/packages/preview2-shim/lib/nodejs/index.js +++ b/packages/preview2-shim/lib/nodejs/index.js @@ -15,3 +15,6 @@ export { sockets, cli } + +export { WasiSockets } from "./sockets/wasi-sockets.js"; + diff --git a/packages/preview2-shim/lib/nodejs/sockets.js b/packages/preview2-shim/lib/nodejs/sockets.js index 11897d1e6..2874e82f6 100644 --- a/packages/preview2-shim/lib/nodejs/sockets.js +++ b/packages/preview2-shim/lib/nodejs/sockets.js @@ -1,200 +1,11 @@ -export const instanceNetwork = { - instanceNetwork () { - console.log(`[sockets] instance network`); - } -}; - -export const ipNameLookup = { - dropResolveAddressStream () { - - }, - subscribe () { - - }, - resolveAddresses () { - - }, - resolveNextAddress () { - - }, - nonBlocking () { - - }, - setNonBlocking () { - - }, -}; - -export const network = { - dropNetwork () { - - } -}; - -export const tcpCreateSocket = { - createTcpSocket () { - - } -}; - -export const tcp = { - subscribe () { - - }, - dropTcpSocket() { - - }, - bind() { - - }, - connect() { - - }, - listen() { - - }, - accept() { - - }, - localAddress() { - - }, - remoteAddress() { - - }, - addressFamily() { - - }, - ipv6Only() { - - }, - setIpv6Only() { - - }, - setListenBacklogSize() { - - }, - keepAlive() { - - }, - setKeepAlive() { - - }, - noDelay() { - - }, - setNoDelay() { - - }, - unicastHopLimit() { - - }, - setUnicastHopLimit() { - - }, - receiveBufferSize() { - - }, - setReceiveBufferSize() { - - }, - sendBufferSize() { - - }, - setSendBufferSize() { - - }, - nonBlocking() { - - }, - setNonBlocking() { - - }, - shutdown() { - - } -}; - -export const udp = { - subscribe () { - - }, - - dropUdpSocket () { - - }, - - bind () { - - }, - - connect () { - - }, - - receive () { - - }, - - send () { - - }, - - localAddress () { - - }, - - remoteAddress () { - - }, - - addressFamily () { - - }, - - ipv6Only () { - - }, - - setIpv6Only () { - - }, - - unicastHopLimit () { - - }, - - setUnicastHopLimit () { - - }, - - receiveBufferSize () { - - }, - - setReceiveBufferSize () { - - }, - - sendBufferSize () { - - }, - - setSendBufferSize () { - - }, - - nonBlocking () { - - }, - - setNonBlocking () { - - } -}; - -export const udpCreateSocket = { - createTcpSocket () { - - } -}; +import { WasiSockets } from "./sockets/wasi-sockets.js"; + +export const { + ipNameLookup, + instanceNetwork, + network, + tcpCreateSocket, + udpCreateSocket, + tcp, + udp, +} = new WasiSockets(); \ No newline at end of file diff --git a/packages/preview2-shim/lib/nodejs/sockets/socket-common.js b/packages/preview2-shim/lib/nodejs/sockets/socket-common.js new file mode 100644 index 000000000..facfe9944 --- /dev/null +++ b/packages/preview2-shim/lib/nodejs/sockets/socket-common.js @@ -0,0 +1,116 @@ +export function cappedUint32(value) { + // Note: cap the value to the highest possible BigInt value that can be represented as a + // unsigned 32-bit integer. + const width = 32n; + return BigInt.asUintN(Number(width), value); +} + +export function noop() {} + +function tupleToIPv6(arr) { + if (arr.length !== 8) { + return null; + } + return arr.map((segment) => segment.toString(16)).join(":"); +} + +function tupleToIpv4(arr) { + if (arr.length !== 4) { + return null; + } + return arr.map((segment) => segment.toString(10)).join("."); +} + +// TODO: write a better (faste?) parser for ipv6 +function ipv6ToTuple(ipv6) { + if (ipv6 === "::1") { + return [0, 0, 0, 0, 0, 0, 0, 1]; + } else if (ipv6 === "::") { + return [0, 0, 0, 0, 0, 0, 0, 0]; + } else if (ipv6.includes("::")) { + const [head, tail] = ipv6.split("::"); + const headSegments = head.split(":").map((segment) => parseInt(segment, 16)); + const tailSegments = tail.split(":").map((segment) => parseInt(segment, 16)); + const missingSegments = 8 - headSegments.length - tailSegments.length; + const middleSegments = Array(missingSegments).fill(0); + return headSegments.concat(middleSegments).concat(tailSegments); + } + return ipv6.split(":").map((segment) => parseInt(segment, 16)); +} + +function ipv4ToTuple(ipv4) { + return ipv4.split(".").map((segment) => parseInt(segment, 10)); +} + +export function serializeIpAddress(addr = undefined, includePort = false) { + if (addr === undefined) { + return undefined; + } + + const family = addr.tag; + + let { address } = addr.val; + if (family.toLocaleLowerCase() === "ipv4") { + address = tupleToIpv4(address); + } else if (family.toLocaleLowerCase() === "ipv6") { + address = tupleToIPv6(address); + } + + if (includePort) { + address = `${address}:${addr.val.port}`; + } + + return address; +} + +export function deserializeIpAddress(addr, family) { + let address = []; + if (family.toLocaleLowerCase() === "ipv4") { + address = ipv4ToTuple(addr); + } else if (family.toLocaleLowerCase() === "ipv6") { + address = ipv6ToTuple(addr); + } + return address; +} + +export function findUnsuedLocalAddress(family) { + let address = [127, 0, 0, 1]; + if (family.toLocaleLowerCase() === "ipv6") { + address = [0, 0, 0, 0, 0, 0, 0, 1]; + } + return { + tag: family, + val: { + address, + port: 0, + }, + }; +} + +export function isUnicastIpAddress(ipSocketAddress) { + return !isMulticastIpAddress(ipSocketAddress) && !isBroadcastIpAddress(ipSocketAddress); +} + +export function isMulticastIpAddress(ipSocketAddress) { + // ipv6: [0xff00, 0, 0, 0, 0, 0, 0, 0] + // ipv4: [224, 0, 0, 0] + return ipSocketAddress.val.address[0] === 224 || ipSocketAddress.val.address[0] === 0xff00; +} + +export function isBroadcastIpAddress(ipSocketAddress) { + // ipv4: [255, 255, 255, 255] + return ( + ipSocketAddress.val.address[0] === 0xff && // 255 + ipSocketAddress.val.address[1] === 0xff && // 255 + ipSocketAddress.val.address[2] === 0xff && // 255 + ipSocketAddress.val.address[3] === 0xff // 255 + ); +} + +export function isIPv4MappedAddress(ipSocketAddress) { + // ipv6: [0, 0, 0, 0, 0, 0xffff, 0, 0] + if (ipSocketAddress.val.address.length !== 8) { + return false; + } + return ipSocketAddress.val.address[5] === 0xffff; +} diff --git a/packages/preview2-shim/lib/nodejs/sockets/socketopts-bindings.js b/packages/preview2-shim/lib/nodejs/sockets/socketopts-bindings.js new file mode 100644 index 000000000..ee9bb27f7 --- /dev/null +++ b/packages/preview2-shim/lib/nodejs/sockets/socketopts-bindings.js @@ -0,0 +1,94 @@ +import { platform } from "node:os"; +import { _errnoException } from "node:util"; +import { types, refType } from "ref-napi"; +import { Library, errno as _errno } from "ffi-napi"; + +const tryGetUV = (() => { + let UV = null; + return () => { + if (UV === null) { + try { + UV = typeof process.binding === "function" ? process.binding("uv") : undefined; + } catch (ex) { + // Continue regardless + } + } + return UV; + }; +})(); + +const uvErrName = (errno) => { + const UV = tryGetUV(); + return UV && UV.errname ? UV.errname(errno) : "UNKNOWN"; +}; + +const errnoException = (errno, syscall, original) => { + if (_errnoException) { + return _errnoException(-errno, syscall, original); + } + + const errname = uvErrName(-errno), + message = original ? `${syscall} ${errname} (${errno}) ${original}` : `${syscall} ${errname} (${errno})`; + + const e = new Error(message); + e.code = errname; + e.errno = errname; + e.syscall = syscall; + return e; +}; + +const createFFI = () => { + const cInt = types.int; + const cVoid = types.void; + + return Library(null, { + //name ret 1 2 3 4 5 + setsockopt: [cInt, [cInt, cInt, cInt, refType(cVoid), cInt]], + getsockopt: [cInt, [cInt, cInt, cInt, refType(cVoid), refType(cInt)]], + }); +}; + +const ffi = (() => { + let instance; + return () => { + if (!instance) { + instance = createFFI(); + } + return instance; + }; +})(); + +const _setsockopt = (fd, level, name, value, valueLength) => { + if (fd == null) { + return false; + } + + const err = ffi().setsockopt(fd, level, name, value, valueLength); + + if (err !== 0) { + const errno = _errno(); + throw errnoException(errno, "setsockopt"); + } + + return true; +}; + +const _getsockopt = (fd, level, name, value, valueLength) => { + if (fd == null) { + return false; + } + + const err = ffi().getsockopt(fd, level, name, value, valueLength); + + if (err !== 0) { + const errno = _errno(); + throw errnoException(errno, "getsockopt"); + } + return true; +}; + +const noop = () => false; +const isWin32 = platform() === "win32"; + +export const setsockopt = isWin32 ? noop : _setsockopt; +export const getsockopt = isWin32 ? noop : _getsockopt; diff --git a/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js b/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js new file mode 100644 index 000000000..ac994976a --- /dev/null +++ b/packages/preview2-shim/lib/nodejs/sockets/tcp-socket-impl.js @@ -0,0 +1,795 @@ +/** + * @typedef {import("../../../types/interfaces/wasi-sockets-network.js").Network} Network + * @typedef {import("../../../types/interfaces/wasi-sockets-network.js").IpSocketAddress} IpSocketAddress + * @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").TcpSocket} TcpSocket + * @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").InputStream} InputStream + * @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").OutputStream} OutputStream + * @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").IpAddressFamily} IpAddressFamily + * @typedef {import("../../../types/interfaces/wasi-io-poll-poll").Pollable} Pollable + * @typedef {import("../../../types/interfaces/wasi-sockets-tcp.js").ShutdownType} ShutdownType + * @typedef {import("../../../types/interfaces/wasi-clocks-monotonic-clock.js").Duration} Duration + */ + +import { isIP, Socket as NodeSocket } from "node:net"; +import { platform } from "node:os"; +import { assert } from "../../common/assert.js"; +import { streams } from "../io.js"; +const { InputStream, OutputStream } = streams; + +const symbolDispose = Symbol.dispose || Symbol.for("dispose"); +const symbolSocketState = Symbol.SocketInternalState || Symbol.for("SocketInternalState"); +const symbolOperations = Symbol.SocketOperationsState || Symbol.for("SocketOperationsState"); + +// See: https://github.com/nodejs/node/blob/main/src/tcp_wrap.cc +const { TCP, TCPConnectWrap, constants: TCPConstants } = process.binding("tcp_wrap"); +const { ShutdownWrap } = process.binding("stream_wrap"); + +import { INPUT_STREAM_CREATE, OUTPUT_STREAM_CREATE } from "../../io/calls.js"; +import { SOCKET } from "../../io/stream-types.js"; +import { inputStreamCreate, ioCall, outputStreamCreate, pollableCreate } from "../../io/worker-io.js"; +import { + deserializeIpAddress, + findUnsuedLocalAddress, + isIPv4MappedAddress, + isMulticastIpAddress, + isUnicastIpAddress, + serializeIpAddress, +} from "./socket-common.js"; + +// TODO: move to a common +const ShutdownType = { + Receive: "receive", + Send: "send", + Both: "both", +}; + +// TODO: move to a common +const SocketConnectionState = { + Error: "Error", + Closed: "Closed", + Connecting: "Connecting", + Connected: "Connected", + Listening: "Listening", +}; + +// As a workaround, we store the bound address in a global map +// this is needed because 'address-in-use' is not always thrown when binding +// more than one socket to the same address +// TODO: remove this workaround when we figure out why! +const globalBoundAddresses = new Map(); + +// TODO: implement would-block exceptions +// TODO: implement concurrency-conflict exceptions +export class TcpSocketImpl { + id = 1; + /** @type {TCP.TCPConstants.SOCKET} */ #socket = null; + /** @type {Network} */ network = null; + + #connections = 0; + + #pollId = null; + + // track in-progress operations + // counter must be 0 for the operation to be considered complete + // we increment the counter when the operation starts + // and decrement it when the operation finishes + [symbolOperations] = { + bind: 0, + connect: 0, + listen: 0, + accept: 0, + }; + + [symbolSocketState] = { + lastErrorState: null, + isBound: false, + ipv6Only: false, + connectionState: SocketConnectionState.Closed, + acceptedClient: null, + canReceive: true, + canSend: true, + + // See: https://github.com/torvalds/linux/blob/fe3cfe869d5e0453754cf2b4c75110276b5e8527/net/core/request_sock.c#L19-L31 + backlogSize: 128, + + // TODO: what these default values should be? + keepAlive: false, + keepAliveCount: 1, + keepAliveIdleTime: 1, + keepAliveInterval: 1, + hopLimit: 1, + receiveBufferSize: 1, + sendBufferSize: 1, + }; + + #socketOptions = { + family: "ipv4", + localAddress: "", + localPort: 0, + remoteAddress: "", + remotePort: 0, + }; + + // this is set by the TcpSocket child class + #tcpSocketChildClassType = null; + + /** + * @param {IpAddressFamily} addressFamily + * @param {TcpSocket} childClassType + * @param {number} id + */ + constructor(addressFamily, childClassType, id) { + this.id = id; + + this.#socketOptions.family = addressFamily.toLocaleLowerCase(); + this.#tcpSocketChildClassType = childClassType; + + this.#socket = new TCP(TCPConstants.SOCKET | TCPConstants.SERVER); + } + + #handleConnection(err, newClientSocket) { + if (err) { + assert(true, "unknown", err); + } + + this.#connections++; + + this[symbolSocketState].acceptedClient = new NodeSocket({ + handle: newClientSocket, + }); + this[symbolSocketState].acceptedClient.server = this.#socket; + this[symbolSocketState].acceptedClient._server = this.#socket; + + // TODO: handle data received from the client + this[symbolSocketState].acceptedClient._handle.onread = (nread, buffer) => { + if (nread > 0) { + const data = buffer.toString("utf8", 0, nread); + console.log("accepted socket on read:", data); + } + }; + } + + #handleDisconnect(err) { + if (err) { + assert(true, "unknown", err); + } + + this.#connections--; + } + + #onClientConnectComplete(err) { + if (err) { + // TODO: figure out what theis error mean and why it is thrown + assert(err === -89, "-89"); // on macos + + assert(err === -99, "ephemeral-ports-exhausted"); + assert(err === -104, "connection-reset"); + assert(err === -110, "timeout"); + assert(err === -111, "connection-refused"); + assert(err === -113, "remote-unreachable"); + assert(err === -125, "operation-cancelled"); + + throw new Error(err); + } + + this[symbolSocketState].connectionState = SocketConnectionState.Connected; + } + + // TODO: is this needed? + #handleAfterShutdown() {} + + #autoBind(network, ipFamily) { + const unsusedLocalAddress = findUnsuedLocalAddress(ipFamily); + this.#socketOptions.localAddress = serializeIpAddress(unsusedLocalAddress, this.#socketOptions.family); + this.#socketOptions.localPort = unsusedLocalAddress.val.port; + this.startBind(network, unsusedLocalAddress); + this.finishBind(); + } + + #cacheBoundAddress() { + let { localIpSocketAddress: boundAddress, localPort } = this.#socketOptions; + // when port is 0, the OS will assign an ephemeral port + // we need to get the actual port assigned by the OS + if (localPort === 0) { + boundAddress = this.localAddress(); + } + globalBoundAddresses.set(serializeIpAddress(boundAddress, true), this.#socket); + } + + /** + * @param {Network} network + * @param {IpSocketAddress} localAddress + * @returns {void} + * @throws {invalid-argument} The `local-address` has the wrong address family. (EAFNOSUPPORT, EFAULT on Windows) + * @throws {invalid-argument} `local-address` is not a unicast address. (EINVAL) + * @throws {invalid-argument} `local-address` is an IPv4-mapped IPv6 address, but the socket has `ipv6-only` enabled. (EINVAL) + * @throws {invalid-state} The socket is already bound. (EINVAL) + */ + startBind(network, localAddress) { + try { + assert(this[symbolSocketState].isBound, "invalid-state", "The socket is already bound"); + + const address = serializeIpAddress(localAddress); + const ipFamily = `ipv${isIP(address)}`; + + assert( + this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), + "invalid-argument", + "The `local-address` has the wrong address family" + ); + + assert(isUnicastIpAddress(localAddress) === false, "invalid-argument"); + assert(isIPv4MappedAddress(localAddress) && this.ipv6Only(), "invalid-argument"); + + const { port } = localAddress.val; + this.#socketOptions.localIpSocketAddress = localAddress; + this.#socketOptions.localAddress = address; + this.#socketOptions.localPort = port; + this.network = network; + this[symbolOperations].bind++; + this[symbolSocketState].lastErrorState = null; + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + } + + /** + * @returns {void} + * @throws {address-in-use} No ephemeral ports available. (EADDRINUSE, ENOBUFS on Windows) + * @throws {address-in-use} Address is already in use. (EADDRINUSE) + * @throws {address-not-bindable} `local-address` is not an address that the `network` can bind to. (EADDRNOTAVAIL) + * @throws {not-in-progress} A `bind` operation is not in progress. + * @throws {would-block} Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + **/ + finishBind() { + try { + assert(this[symbolOperations].bind === 0, "not-in-progress"); + + const { localAddress, localIpSocketAddress, localPort, family } = this.#socketOptions; + assert(isIP(localAddress) === 0, "address-not-bindable"); + assert(globalBoundAddresses.has(serializeIpAddress(localIpSocketAddress, true)), "address-in-use"); + + let err = null; + let bind = "bind"; // ipv4 + if (family.toLocaleLowerCase() === "ipv6") { + bind = "bind6"; + } + + err = this.#socket[bind](localAddress, localPort); + + if (err) { + this.#socket.close(); + assert(err === -22, "address-in-use"); + assert(err === -49, "address-not-bindable"); + assert(err === -99, "address-not-bindable"); // EADDRNOTAVAIL + assert(true, "unknown", err); + } + + this[symbolSocketState].lastErrorState = null; + this[symbolSocketState].isBound = true; + this[symbolOperations].bind--; + + this.#cacheBoundAddress(); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + } + + /** + * @param {Network} network + * @param {IpSocketAddress} remoteAddress + * @returns {void} + * @throws {invalid-argument} The `remote-address` has the wrong address family. (EAFNOSUPPORT) + * @throws {invalid-argument} `remote-address` is not a unicast address. (EINVAL, ENETUNREACH on Linux, EAFNOSUPPORT on MacOS) + * @throws {invalid-argument} `remote-address` is an IPv4-mapped IPv6 address, but the socket has `ipv6-only` enabled. (EINVAL, EADDRNOTAVAIL on Illumos) + * @throws {invalid-argument} `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + * @throws {invalid-argument} The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EADDRNOTAVAIL on Windows) + * @throws {invalid-argument} The port in `remote-address` is set to 0. (EADDRNOTAVAIL on Windows) + * @throws {invalid-argument} The socket is already attached to a different network. The `network` passed to `connect` must be identical to the one passed to `bind`. + * @throws {invalid-state} The socket is already in the Connection state. (EISCONN) + * @throws {invalid-state} The socket is already in the Listener state. (EOPNOTSUPP, EINVAL on Windows) + */ + startConnect(network, remoteAddress) { + const host = serializeIpAddress(remoteAddress); + const ipFamily = `ipv${isIP(host)}`; + try { + assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); + assert(this[symbolSocketState].connectionState === SocketConnectionState.Connecting, "invalid-state"); + assert(this[symbolSocketState].connectionState === SocketConnectionState.Listening, "invalid-state"); + + assert(host === "0.0.0.0" || host === "0:0:0:0:0:0:0:0", "invalid-argument"); + assert(this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), "invalid-argument"); + assert(isUnicastIpAddress(remoteAddress) === false, "invalid-argument"); + assert(isMulticastIpAddress(remoteAddress), "invalid-argument"); + assert(isIPv4MappedAddress(remoteAddress) && this.ipv6Only(), "invalid-argument"); + assert(remoteAddress.val.port === 0, "invalid-argument"); + + if (this[symbolSocketState].isBound === false) { + this.#autoBind(network, ipFamily); + } + + assert(network !== this.network, "invalid-argument"); + assert(ipFamily.toLocaleLowerCase() === "ipv0", "invalid-argument"); + assert(remoteAddress.val.port === 0 && platform() === "win32", "invalid-argument"); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + + this[symbolSocketState].lastErrorState = null; + + this.#socketOptions.remoteIpSocketAddress = remoteAddress; + this.#socketOptions.remoteAddress = host; + this.#socketOptions.remotePort = remoteAddress.val.port; + this.network = network; + this[symbolOperations].connect++; + } + + /** + * @returns {Array} + * @throws {timeout} Connection timed out. (ETIMEDOUT) + * @throws {connection-refused} The connection was forcefully rejected. (ECONNREFUSED) + * @throws {connection-reset} The connection was reset. (ECONNRESET) + * @throws {connection-aborted} The connection was aborted. (ECONNABORTED) + * @throws {remote-unreachable} The remote address is not reachable. (EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + * @throws {address-in-use} Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) + * @throws {not-in-progress} A `connect` operation is not in progress. + * @throws {would-block} Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + */ + finishConnect() { + try { + assert(this[symbolOperations].connect === 0, "not-in-progress"); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + + this[symbolSocketState].lastErrorState = null; + + const { localAddress, localPort, remoteAddress, remotePort, family } = this.#socketOptions; + const connectReq = new TCPConnectWrap(); + + let err = null; + let connect = "connect"; // ipv4 + if (family.toLocaleLowerCase() === "ipv6") { + connect = "connect6"; + } + + err = this.#socket[connect](connectReq, remoteAddress, remotePort); + + if (err) { + console.error(`[tcp] connect error on socket: ${err}`); + this[symbolSocketState].connectionState = SocketConnectionState.Error; + } + + connectReq.oncomplete = this.#onClientConnectComplete.bind(this); + connectReq.address = remoteAddress; + connectReq.port = remotePort; + connectReq.localAddress = localAddress; + connectReq.localPort = localPort; + + this.#socket.onread = (buffer) => { + // TODO: handle data received from the server + }; + + this.#socket.readStart(); + + const inputStream = inputStreamCreate(SOCKET, ioCall(INPUT_STREAM_CREATE | SOCKET, null, {})); + const outputStream = outputStreamCreate(SOCKET, ioCall(OUTPUT_STREAM_CREATE | SOCKET, null, {})); + + this[symbolOperations].connect--; + this[symbolSocketState].connectionState = SocketConnectionState.Connecting; + + // TODO: this is a temporary workaround, move this to the connection callback + // when the connection is actually established + this[symbolSocketState].connectionState = SocketConnectionState.Connected; + + return [inputStream, outputStream]; + } + + /** + * @returns {void} + * @throws {invalid-state} The socket is not bound to any local address. (EDESTADDRREQ) + * @throws {invalid-state} The socket is already in the Connection state. (EISCONN, EINVAL on BSD) + * @throws {invalid-state} The socket is already in the Listener state. + */ + startListen() { + try { + assert(this[symbolSocketState].lastErrorState !== null, "invalid-state"); + assert(this[symbolSocketState].isBound === false, "invalid-state"); + assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); + assert(this[symbolSocketState].connectionState === SocketConnectionState.Listening, "invalid-state"); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + + this[symbolSocketState].lastErrorState = null; + this[symbolOperations].listen++; + } + + /** + * @returns {void} + * @throws {address-in-use} Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE) + * @throws {not-in-progress} A `listen` operation is not in progress. + * @throws {would-block} Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + */ + finishListen() { + try { + assert(this[symbolOperations].listen === 0, "not-in-progress"); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + + this[symbolSocketState].lastErrorState = null; + + const err = this.#socket.listen(this[symbolSocketState].backlogSize); + if (err) { + console.error(`[tcp] listen error on socket: ${err}`); + this.#socket.close(); + + // TODO: handle errors + throw new Error(err); + } + + this[symbolSocketState].connectionState = SocketConnectionState.Listening; + this[symbolOperations].listen--; + } + + /** + * @returns {Array} + * @throws {invalid-state} Socket is not in the Listener state. (EINVAL) + * @throws {would-block} No pending connections at the moment. (EWOULDBLOCK, EAGAIN) + * @throws {connection-aborted} An incoming connection was pending, but was terminated by the client before this listener could accept it. (ECONNABORTED) + * @throws {new-socket-limit} The new socket resource could not be created because of a system limit. (EMFILE, ENFILE) + */ + accept() { + this[symbolOperations].accept++; + + try { + assert(this[symbolSocketState].connectionState !== SocketConnectionState.Listening, "invalid-state"); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + + this[symbolSocketState].lastErrorState = null; + + if (this[symbolSocketState].isBound === false) { + this.#autoBind(this.network, this.addressFamily()); + } + const inputStream = inputStreamCreate(SOCKET, ioCall(INPUT_STREAM_CREATE | SOCKET, null, {})); + const outputStream = outputStreamCreate(SOCKET, ioCall(OUTPUT_STREAM_CREATE | SOCKET, null, {})); + + // Because we have to return a valid TcpSocket resrouce type, + // we need to instantiate the correct child class + // TODO: figure out a more elegant way to do this + const socket = new this.#tcpSocketChildClassType(this.addressFamily()); + + // The returned socket is bound and in the Connection state. + // The following properties are inherited from the listener socket: + // - `address-family` + // - `ipv6-only` + // - `keep-alive-enabled` + // - `keep-alive-idle-time` + // - `keep-alive-interval` + // - `keep-alive-count` + // - `hop-limit` + // - `receive-buffer-size` + // - `send-buffer-size` + // + socket[symbolSocketState].ipv6Only = this[symbolSocketState].ipv6Only; + socket[symbolSocketState].keepAlive = this[symbolSocketState].keepAlive; + socket[symbolSocketState].keepAliveIdleTime = this[symbolSocketState].keepAliveIdleTime; + socket[symbolSocketState].keepAliveInterval = this[symbolSocketState].keepAliveInterval; + socket[symbolSocketState].keepAliveCount = this[symbolSocketState].keepAliveCount; + socket[symbolSocketState].hopLimit = this[symbolSocketState].hopLimit; + socket[symbolSocketState].receiveBufferSize = this[symbolSocketState].receiveBufferSize; + socket[symbolSocketState].sendBufferSize = this[symbolSocketState].sendBufferSize; + + this[symbolOperations].accept--; + + return [socket, inputStream, outputStream]; + } + + /** + * @returns {IpSocketAddress} + * @throws {invalid-state} The socket is not bound to any local address. + */ + localAddress() { + assert(this[symbolSocketState].isBound === false, "invalid-state"); + + const out = {}; + this.#socket.getsockname(out); + + const { address, port, family } = out; + this.#socketOptions.localAddress = address; + this.#socketOptions.localPort = port; + this.#socketOptions.family = family.toLocaleLowerCase(); + + return { + tag: family.toLocaleLowerCase(), + val: { + address: deserializeIpAddress(address, family), + port, + }, + }; + } + + /** + * @returns {IpSocketAddress} + * @throws {invalid-state} The socket is not connected to a remote address. (ENOTCONN) + */ + remoteAddress() { + assert(this[symbolSocketState].connectionState !== SocketConnectionState.Connected, "invalid-state"); + + const out = {}; + this.#socket.getpeername(out); + + const { address, port, family } = out; + this.#socketOptions.remoteAddress = address; + this.#socketOptions.remotePort = port; + this.#socketOptions.family = family.toLocaleLowerCase(); + + return { + tag: family.toLocaleLowerCase(), + val: { + address: deserializeIpAddress(address, family), + port, + }, + }; + } + + isListening() { + return this[symbolSocketState].connectionState === SocketConnectionState.Listening; + } + + /** + * @returns {IpAddressFamily} + */ + addressFamily() { + return this.#socketOptions.family; + } + + /** + * @returns {boolean} + * @throws {not-supported} (get/set) `this` socket is an IPv4 socket. + */ + ipv6Only() { + assert(this.#socketOptions.family.toLocaleLowerCase() === "ipv4", "not-supported"); + + return this[symbolSocketState].ipv6Only; + } + + /** + * @param {boolean} value + * @returns {void} + * @throws {invalid-state} (set) The socket is already bound. + * @throws {invalid-state} (get/set) `this` socket is an IPv4 socket. + * @throws {not-supported} (set) Host does not support dual-stack sockets. (Implementations are not required to.) + */ + setIpv6Only(value) { + assert(this.#socketOptions.family.toLocaleLowerCase() === "ipv4", "not-supported"); + assert(this[symbolSocketState].isBound, "invalid-state"); + + this[symbolSocketState].ipv6Only = value; + } + + /** + * @param {bigint} value + * @returns {void} + * @throws {not-supported} (set) The platform does not support changing the backlog size after the initial listen. + * @throws {invalid-argument} (set) The provided value was 0. + * @throws {invalid-state} (set) The socket is already in the Connection state. + */ + setListenBacklogSize(value) { + assert(value === 0n, "invalid-argument", "The provided value was 0."); + assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); + + this[symbolSocketState].backlogSize = Number(value); + } + + /** + * @returns {boolean} + */ + keepAliveEnabled() { + return this[symbolSocketState].keepAlive; + } + + /** + * @param {boolean} value + * @returns {void} + */ + setKeepAliveEnabled(value) { + this.#socket.setKeepAlive(value); + this[symbolSocketState].keepAlive = value; + + if (value) { + this.setKeepAliveIdleTime(this.keepAliveIdleTime()); + this.setKeepAliveInterval(this.keepAliveInterval()); + this.setKeepAliveCount(this.keepAliveCount()); + } + } + + /** + * + * @returns {Duration} + */ + keepAliveIdleTime() { + return this[symbolSocketState].keepAliveIdleTime; + } + + /** + * + * @param {Duration} value + * @returns {void} + * @throws {invalid-argument} (set) The idle time must be 1 or higher. + */ + setKeepAliveIdleTime(value) { + assert(value < 1, "invalid-argument", "The idle time must be 1 or higher."); + + this[symbolSocketState].keepAliveIdleTime = value; + } + + /** + * + * @returns {Duration} + */ + keepAliveInterval() { + return this[symbolSocketState].keepAliveInterval; + } + + /** + * + * @param {Duration} value + * @returns {void} + * @throws {invalid-argument} (set) The interval must be 1 or higher. + */ + setKeepAliveInterval(value) { + assert(value < 1, "invalid-argument", "The interval must be 1 or higher."); + + this[symbolSocketState].keepAliveInterval = value; + } + + /** + * + * @returns {Duration} + */ + keepAliveCount() { + return this[symbolSocketState].keepAliveCount; + } + + /** + * + * @param {Duration} value + * @returns {void} + * @throws {invalid-argument} (set) The count must be 1 or higher. + */ + setKeepAliveCount(value) { + assert(value < 1, "invalid-argument", "The count must be 1 or higher."); + + // TODO: set this on the client socket as well + this[symbolSocketState].keepAliveCount = value; + } + + /** + * @returns {number} + * @description Not available on Node.js (see https://github.com/WebAssembly/wasi-sockets/blob/main/Posix-compatibility.md#socket-options) + */ + hopLimit() { + return this[symbolSocketState].hopLimit; + } + + /** + * @param {number} value + * @returns {void} + * @throws {invalid-argument} (set) The TTL value must be 1 or higher. + * @throws {invalid-state} (set) The socket is already in the Connection state. + * @throws {invalid-state} (set) The socket is already in the Listener state. + * @description Not available on Node.js (see https://github.com/WebAssembly/wasi-sockets/blob/main/Posix-compatibility.md#socket-options) + */ + setHopLimit(value) { + assert(value < 1, "invalid-argument", "The TTL value must be 1 or higher."); + + this[symbolSocketState].hopLimit = value; + } + + /** + * @returns {bigint} + */ + receiveBufferSize() { + return this[symbolSocketState].receiveBufferSize; + } + + /** + * @param {number} value + * @returns {void} + * @throws {not-supported} (set) The platform does not support changing the backlog size after the initial listen. + * @throws {invalid-argument} (set) The provided value was 0. + * @throws {invalid-state} (set) The socket is already in the Connection state. + */ + setReceiveBufferSize(value) { + // TODO: review these assertions based on WIT specs + // assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); + assert(value === 0n, "invalid-argument", "The provided value was 0."); + + // TODO: set this on the client socket as well + this[symbolSocketState].receiveBufferSize = value; + } + + /** + * @returns {bigint} + */ + sendBufferSize() { + return this[symbolSocketState].sendBufferSize; + } + + /** + * @param {bigint} value + * @returns {void} + * @throws {invalid-argument} (set) The provided value was 0. + * @throws {invalid-state} (set) The socket is already in the Connection state. + * @throws {invalid-state} (set) The socket is already in the Listener state. + */ + setSendBufferSize(value) { + // TODO: review these assertions based on WIT specs + // assert(this[symbolSocketState].connectionState === SocketConnectionState.Connected, "invalid-state"); + assert(value === 0n, "invalid-argument", "The provided value was 0."); + + // TODO: set this on the client socket as well + this[symbolSocketState].sendBufferSize = value; + } + + /** + * @returns {Pollable} + */ + subscribe() { + if (this.#pollId) return pollableCreate(this.#pollId); + // 0 poll is immediately resolving + return pollableCreate(0); + } + + /** + * @param {ShutdownType} shutdownType + * @returns {void} + * @throws {invalid-state} The socket is not in the Connection state. (ENOTCONN) + */ + shutdown(shutdownType) { + assert(this[symbolSocketState].connectionState !== SocketConnectionState.Connected, "invalid-state"); + + // TODO: figure out how to handle shutdownTypes + if (shutdownType === ShutdownType.Receive) { + this[symbolSocketState].canReceive = false; + } else if (shutdownType === ShutdownType.Send) { + this[symbolSocketState].canSend = false; + } else if (shutdownType === ShutdownType.Both) { + this[symbolSocketState].canReceive = false; + this[symbolSocketState].canSend = false; + } + + const req = new ShutdownWrap(); + req.oncomplete = this.#handleAfterShutdown.bind(this); + req.handle = this._handle; + req.callback = () => {}; + const err = this._handle.shutdown(req); + + assert(err === 1, "invalid-state"); + } + + [symbolDispose]() { + this.#socket.close(); + + // we only need to remove the bound address from the global map + // if the socket was already bound + if (this[symbolSocketState].isBound) { + globalBoundAddresses.delete(serializeIpAddress(this.#socketOptions.localIpSocketAddress, true)); + } + } + + handle() { + return this.#socket; + } +} diff --git a/packages/preview2-shim/lib/nodejs/sockets/udp-socket-impl.js b/packages/preview2-shim/lib/nodejs/sockets/udp-socket-impl.js new file mode 100644 index 000000000..8e39f4e82 --- /dev/null +++ b/packages/preview2-shim/lib/nodejs/sockets/udp-socket-impl.js @@ -0,0 +1,628 @@ +/** + * @typedef {import("../../types/interfaces/wasi-sockets-network").Network} Network + * @typedef {import("../../types/interfaces/wasi-sockets-network").IpSocketAddress} IpSocketAddress + * @typedef {import("../../types/interfaces/wasi-sockets-network").IpAddressFamily} IpAddressFamily + * @typedef {import("../../types/interfaces/wasi-sockets-udp").Datagram} Datagram + * @typedef {import("../../types/interfaces/wasi-io-poll-poll").Pollable} Pollable + */ + +// See: https://github.com/nodejs/node/blob/main/src/udp_wrap.cc +const { UDP, SendWrap } = process.binding("udp_wrap"); +import { isIP } from "node:net"; +import { assert } from "../../common/assert.js"; +import { pollableCreate } from "../../io/worker-io.js"; +import { cappedUint32, deserializeIpAddress, serializeIpAddress } from "./socket-common.js"; + +const symbolDispose = Symbol.dispose || Symbol.for("dispose"); +const symbolSocketState = Symbol.SocketInternalState || Symbol.for("SocketInternalState"); +const symbolOperations = Symbol.SocketOperationsState || Symbol.for("SocketOperationsState"); + +// TODO: move to a common +const SocketConnectionState = { + Error: "Error", + Closed: "Closed", + Connecting: "Connecting", + Connected: "Connected", + Listening: "Listening", +}; + +// see https://github.com/libuv/libuv/blob/master/docs/src/udp.rst +// TODO: move to a common +const Flags = { + UV_UDP_IPV6ONLY: 1, + UV_UDP_REUSEADDR: 4, +}; + +// TODO: move to a common +const BufferSizeFlags = { + SO_RCVBUF: true, + SO_SNDBUF: false, +}; + +// As a workaround, we store the bound address in a global map +// this is needed because 'address-in-use' is not always thrown when binding +// more than one socket to the same address +// TODO: remove this workaround when we figure out why! +const globalBoundAddresses = new Map(); + +export class IncomingDatagramStream { + static _create(socket) { + const stream = new IncomingDatagramStream(socket); + return stream; + } + + #socket = null; + constructor(socket) { + this.#socket = socket; + } + + /** + * + * @param {bigint} maxResults + * @returns {Datagram[]} + * @throws {invalid-state} The socket is not bound to any local address. (EINVAL) + * @throws {not-in-progress} The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + * @throws {remote-unreachable} The remote address is not reachable. (ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN, ENONET) + * @throws {connection-refused} The connection was refused. (ECONNREFUSED) + * @throws {would-block} There is no pending data available to be read at the moment. (EWOULDBLOCK, EAGAIN) + */ + receive(maxResults) { + assert(self[symbolSocketState].isBound === false, "invalid-state"); + assert(self[symbolOperations].receive === 0, "not-in-progress"); + + if (maxResults === 0n) { + return []; + } + + // TODO: not sure this is the right API to use! + const socket = this.#socket; + socket.onmessage = (...args) => console.log("recv onmessage", args[2].toString()); + socket.onerror = (err) => console.log("recv error", err); + socket.recvStart(); + const datagrams = []; + return datagrams; + } + + /** + * + * @returns {Pollable} A pollable which will resolve once the stream is ready to receive again. + */ + subscribe() { + throw new Error("Not implemented"); + } + + [symbolDispose]() { + // TODO: stop receiving + } +} +const incomingDatagramStreamCreate = IncomingDatagramStream._create; +delete IncomingDatagramStream._create; + +export class OutgoingDatagramStream { + static _create(socket) { + const stream = new OutgoingDatagramStream(socket); + return stream; + } + + #socket = null; + constructor(socket) { + this.#socket = socket; + } + + /** + * + * @returns {bigint} + * @throws {invalid-state} The socket is not bound to any local address. (EINVAL) + */ + checkSend() { + throw new Error("Not implemented"); + } + + /** + * + * @param {Datagram[]} datagrams + * @returns {bigint} + * @throws {invalid-argument} The `remote-address` has the wrong address family. (EAFNOSUPPORT) + * @throws {invalid-argument} `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + * @throws {invalid-argument} The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) + * @throws {invalid-argument} The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) + * @throws {invalid-argument} The socket is in "connected" mode and the `datagram.remote-address` does not match the address passed to `connect`. (EISCONN) + * @throws {invalid-argument} The socket is not "connected" and no value for `remote-address` was provided. (EDESTADDRREQ) + * @throws {remote-unreachable} The remote address is not reachable. (ECONNREFUSED, ECONNRESET, ENETRESET on Windows, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN) + * @throws {connection-refused} The connection was refused. (ECONNREFUSED) + * @throws {datagram-too-large} The datagram is too large. (EMSGSIZE) + */ + send(datagrams) { + const req = new SendWrap(); + const doSend = (data, port, host, family) => { + // setting hasCallback to false will make send() synchronous + // TODO: handle async send + const hasCallback = false; + const socket = this.#socket; + + let err = null; + if (family.toLocaleLowerCase() === "ipv4") { + err = socket.send(req, data, data.length, port, host, hasCallback); + } else if (family.toLocaleLowerCase() === "ipv6") { + err = socket.send6(req, data, data.length, port, host, hasCallback); + } + return err; + }; + + datagrams.forEach((datagram) => { + const { data, remoteAddress } = datagram; + const { tag: family, val } = remoteAddress; + const { address, port } = val; + const err = doSend(data, port, serializeIpAddress(remoteAddress), family); + console.error({ + err, + }); + }); + } + + /** + * + * @returns {Pollable} A pollable which will resolve once the stream is ready to send again. + */ + subscribe() { + throw new Error("Not implemented"); + } + + [symbolDispose]() { + // TODO: stop sending + } +} +const outgoingDatagramStreamCreate = OutgoingDatagramStream._create; +delete OutgoingDatagramStream._create; + +export class UdpSocketImpl { + id = 1; + /** @type {UDP} */ #socket = null; + /** @type {Network} */ network = null; + + // track in-progress operations + // counter must be 0 for the operation to be considered complete + // we increment the counter when the operation starts + // and decrement it when the operation finishes + [symbolOperations] = { + bind: 0, + connect: 0, + listen: 0, + accept: 0, + receive: 0, + send: 0, + }; + + [symbolSocketState] = { + lastErrorState: null, + isBound: false, + ipv6Only: false, + connectionState: SocketConnectionState.Closed, + + // TODO: what these default values should be? + unicastHopLimit: 1, + receiveBufferSize: 1, + sendBufferSize: 1, + }; + + #socketOptions = { + family: "ipv4", + localAddress: undefined, + localPort: 0, + remoteAddress: undefined, + remotePort: 0, + }; + + /** + * @param {IpAddressFamily} addressFamily + * @returns {void} + */ + constructor(addressFamily, id) { + this.id = id; + this.#socketOptions.family = addressFamily; + + this.#socket = new UDP(); + } + + #cacheBoundAddress() { + let { localIpSocketAddress: boundAddress, localPort } = this.#socketOptions; + // when port is 0, the OS will assign an ephemeral port + // we need to get the actual port assigned by the OS + if (localPort === 0) { + boundAddress = this.localAddress(); + } + globalBoundAddresses.set(serializeIpAddress(boundAddress, true), this.#socket); + } + + /** + * + * @param {Network} network + * @param {IpAddressFamily} localAddress + * @returns {void} + * @throws {invalid-argument} The `local-address` has the wrong address family. (EAFNOSUPPORT, EFAULT on Windows) + * @throws {invalid-state} The socket is already bound. (EINVAL) + */ + startBind(network, localAddress) { + try { + assert(this[symbolSocketState].isBound, "invalid-state", "The socket is already bound"); + + const address = serializeIpAddress(localAddress); + const ipFamily = `ipv${isIP(address)}`; + + assert( + this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), + "invalid-argument", + "The `local-address` has the wrong address family" + ); + assert(this[symbolSocketState].ipv6Only, "invalid-argument", "The `local-address` has the wrong address family"); + + const { port } = localAddress.val; + this.#socketOptions.localIpSocketAddress = localAddress; + this.#socketOptions.localAddress = address; + this.#socketOptions.localPort = port; + this.network = network; + this[symbolOperations].bind++; + this[symbolSocketState].lastErrorState = null; + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + } + + /** + * + * @returns {void} + * @throws {address-in-use} No ephemeral ports available. (EADDRINUSE, ENOBUFS on Windows) + * @throws {address-in-use} Address is already in use. (EADDRINUSE) + * @throws {address-not-bindable} `local-address` is not an address that the `network` can bind to. (EADDRNOTAVAIL) + * @throws {not-in-progress} A `bind` operation is not in progress. + * @throws {would-block} Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + **/ + finishBind() { + try { + assert(this[symbolOperations].bind === 0, "not-in-progress"); + + const { localAddress, localIpSocketAddress, localPort, family } = this.#socketOptions; + assert(isIP(localAddress) === 0, "address-not-bindable"); + assert(globalBoundAddresses.has(serializeIpAddress(localIpSocketAddress, true)), "address-in-use"); + + let flags = 0; + if (this[symbolSocketState].ipv6Only) { + flags |= Flags.UV_UDP_IPV6ONLY; + } + + let err = null; + let bind = "bind"; // ipv4 + if (family.toLocaleLowerCase() === "ipv6") { + bind = "bind6"; + } + + err = this.#socket[bind](localAddress, localPort, flags); + + if (err === 0) { + this[symbolSocketState].isBound = true; + } else { + assert(err === -22, "address-in-use"); + assert(err === -48, "address-in-use"); // macos + assert(err === -49, "address-not-bindable"); + assert(err === -98, "address-in-use"); // WSL + assert(err === -99, "address-not-bindable"); // EADDRNOTAVAIL + assert(true, "unknown", err); + } + + this[symbolSocketState].lastErrorState = null; + this[symbolSocketState].isBound = true; + this[symbolOperations].bind--; + + this.#cacheBoundAddress(); + } catch (err) { + this[symbolSocketState].lastErrorState = err; + throw err; + } + } + + /** + * Alias for startBind() and finishBind() + */ + bind(network, localAddress) { + this.startBind(network, localAddress); + this.finishBind(); + } + + /** + * + * @param {Network} network + * @param {IpAddressFamily | undefined} remoteAddress + * @returns {void} + * @throws {invalid-argument} The `remote-address` has the wrong address family. (EAFNOSUPPORT) + * @throws {invalid-argument} `remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + * @throws {invalid-argument} The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / `::`). (EDESTADDRREQ, EADDRNOTAVAIL) + * @throws {invalid-argument} The port in `remote-address` is set to 0. (EADDRNOTAVAIL on Windows) + * @throws {invalid-argument} The socket is already bound to a different network. The `network` passed to `connect` must be identical to the one passed to `bind`. + */ + #startConnect(network, remoteAddress = undefined) { + this[symbolOperations].connect++; + + if (remoteAddress === undefined || this[symbolSocketState].connectionState === SocketConnectionState.Connected) { + // TODO: should we reuse a connected socket if remoteAddress is undefined? + // See #finishConnect() + return; + } + + const host = serializeIpAddress(remoteAddress); + const ipFamily = `ipv${isIP(host)}`; + + assert(ipFamily.toLocaleLowerCase() === "ipv0", "invalid-argument"); + assert(this.#socketOptions.family.toLocaleLowerCase() !== ipFamily.toLocaleLowerCase(), "invalid-argument"); + + const { port } = remoteAddress.val; + this.#socketOptions.remoteAddress = host; // can be undefined + this.#socketOptions.remotePort = port; + + this.network = network; + } + + /** + * + * @returns {void} + * @throws {address-in-use} Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) + * @throws {not-in-progress} A `connect` operation is not in progress. + * @throws {would-block} Can't finish the operation, it is still in progress. (EWOULDBLOCK, EAGAIN) + */ + #finishConnect() { + // Note: remoteAddress can be undefined + const { remoteAddress, remotePort } = this.#socketOptions; + this[symbolSocketState].connectionState = SocketConnectionState.Connecting; + + // TODO: figure out how to reuse a connected socket + const err = this.#socket.connect(remoteAddress ?? null, remotePort); + + if (!err) { + this[symbolSocketState].connectionState = SocketConnectionState.Connected; + } else { + assert(err === -22, "invalid-argument"); + assert(true, "unknown", err); + } + + this[symbolOperations].connect--; + } + + /** + * Alias for startBind() and finishBind() + */ + #connect(network, remoteAddress = undefined) { + this.#startConnect(network, remoteAddress); + this.#finishConnect(); + } + + /** + * + * @param {IpSocketAddress | undefined} remoteAddress + * @returns {Array} + * @throws {invalid-argument} The `remote-address` has the wrong address family. (EAFNOSUPPORT) + * @throws {invalid-argument} remote-address` is a non-IPv4-mapped IPv6 address, but the socket was bound to a specific IPv4-mapped IPv6 address. (or vice versa) + * @throws {invalid-argument} The IP address in `remote-address` is set to INADDR_ANY (`0.0.0.0` / :`). (EDESTADDRREQ, EADDRNOTAVAIL) + * @throws {invalid-argument} The port in `remote-address` is set to 0. (EDESTADDRREQ, EADDRNOTAVAIL) + * @throws {invalid-state} The socket is not bound. + * @throws {address-in-use} Tried to perform an implicit bind, but there were no ephemeral ports available. (EADDRINUSE, EADDRNOTAVAIL on Linux, EAGAIN on BSD) + * @throws {remote-unreachable} The remote address is not reachable. (ECONNRESET, ENETRESET, EHOSTUNREACH, EHOSTDOWN, ENETUNREACH, ENETDOWN, ENONET) + * @throws {connection-refused} The connection was refused. (ECONNREFUSED) + */ + stream(remoteAddress = undefined) { + assert(this[symbolSocketState].isBound === false, "invalid-state"); + this.#connect(this.network, remoteAddress); + return [incomingDatagramStreamCreate(this.#socket), outgoingDatagramStreamCreate(this.#socket)]; + } + + /** + * + * Note: Concurrent invocations of this test can yield port to be 0 on Windows/WSL. + * @returns {IpSocketAddress} + * @throws {invalid-state} The socket is not bound to any local address. + */ + localAddress() { + assert(this[symbolSocketState].isBound === false, "invalid-state"); + + const out = {}; + this.#socket.getsockname(out); + + const { address, port, family } = out; + this.#socketOptions.localAddress = address; + this.#socketOptions.localPort = port; + this.#socketOptions.family = family.toLocaleLowerCase(); + + return { + tag: family.toLocaleLowerCase(), + val: { + address: deserializeIpAddress(address, family), + port, + }, + }; + } + + /** + * + * @returns {IpSocketAddress} + * @throws {invalid-state} The socket is not streaming to a specific remote address. (ENOTCONN) + */ + remoteAddress() { + console.log("remoteAddress", this[symbolSocketState]); + assert( + this[symbolSocketState].connectionState !== SocketConnectionState.Connected, + "invalid-state", + "The socket is not streaming to a specific remote address" + ); + + const out = {}; + this.#socket.getpeername(out); + + assert(out.address === undefined, "invalid-state", "The socket is not streaming to a specific remote address"); + + const { address, port, family } = out; + this.#socketOptions.remoteAddress = address; + this.#socketOptions.remotePort = port; + this.#socketOptions.family = family.toLocaleLowerCase(); + + return { + tag: family.toLocaleLowerCase(), + val: { + address: deserializeIpAddress(address), + port, + }, + }; + } + + /** + * + * @returns {IpAddressFamily} + */ + addressFamily() { + return this.#socketOptions.family; + } + + /** + * + * @returns {boolean} + * @throws {not-supported} (get/set) `this` socket is an IPv4 socket. + */ + ipv6Only() { + assert(this.#socketOptions.family.toLocaleLowerCase() === "ipv4", "not-supported", "Socket is an IPv4 socket."); + + return this[symbolSocketState].ipv6Only; + } + + /** + * + * @param {boolean} value + * @returns {void} + * @throws {not-supported} (get/set) `this` socket is an IPv4 socket. + * @throws {invalid-state} (set) The socket is already bound. + * @throws {not-supported} (set) Host does not support dual-stack sockets. (Implementations are not required to.) + */ + setIpv6Only(value) { + assert( + value === true && this.#socketOptions.family.toLocaleLowerCase() === "ipv4", + "not-supported", + "Socket is an IPv4 socket." + ); + assert(this[symbolSocketState].isBound, "invalid-state", "The socket is already bound"); + + this[symbolSocketState].ipv6Only = value; + } + + /** + * + * @returns {number} + */ + unicastHopLimit() { + return this[symbolSocketState].unicastHopLimit; + } + + /** + * + * @param {number} value + * @returns {void} + * @throws {invalid-argument} The TTL value must be 1 or higher. + */ + setUnicastHopLimit(value) { + assert(value < 1, "invalid-argument", "The TTL value must be 1 or higher"); + + this.#socket.setTTL(value); + this[symbolSocketState].unicastHopLimit = value; + } + + /** + * + * @returns {bigint} + */ + receiveBufferSize() { + const exceptionInfo = {}; + const value = this.#socket.bufferSize(0, BufferSizeFlags.SO_RCVBUF, exceptionInfo); + + if (exceptionInfo.code === "EBADF") { + // TODO: handle the case where bad file descriptor is returned + // This happens when the socket is not bound + return this[symbolSocketState].receiveBufferSize; + } + + console.log({ + value, + }); + + return value; + } + + /** + * + * @param {bigint} value + * @returns {void} + * @throws {invalid-argument} The provided value was 0. + */ + setReceiveBufferSize(value) { + assert(value === 0n, "invalid-argument", "The provided value was 0"); + + const cappedValue = cappedUint32(value); + const exceptionInfo = {}; + this.#socket.bufferSize(Number(cappedValue), BufferSizeFlags.SO_RCVBUF, exceptionInfo); + this[symbolSocketState].receiveBufferSize = cappedValue; + } + + /** + * + * @returns {bigint} + */ + sendBufferSize() { + const exceptionInfo = {}; + const value = this.#socket.bufferSize(0, BufferSizeFlags.SO_SNDBUF, exceptionInfo); + + if (exceptionInfo.code === "EBADF") { + // TODO: handle the case where bad file descriptor is returned + // This happens when the socket is not bound + return this[symbolSocketState].sendBufferSize; + } + + return value; + } + + /** + * + * @param {bigint} value + * @returns {void} + * @throws {invalid-argument} The provided value was 0. + */ + setSendBufferSize(value) { + assert(value === 0n, "invalid-argument", "The provided value was 0"); + + const cappedValue = cappedUint32(value); + const exceptionInfo = {}; + this.#socket.bufferSize(Number(cappedValue), BufferSizeFlags.SO_SNDBUF, exceptionInfo); + this[symbolSocketState].sendBufferSize = cappedValue; + } + + /** + * + * @returns {Pollable} + */ + subscribe() { + return pollableCreate(0); + } + + [symbolDispose]() { + let err = null; + err = this.#socket.recvStop((...args) => { + console.log("stop recv", args); + }); + + if (err) { + assert(err === -9, "invalid-state", "Interface is not currently Up"); + assert(err === -11, "not-in-progress"); + assert(true, "", err); + } + + this.#socket.close(); + } + + handle() { + return this.#socket; + } +} diff --git a/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js b/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js new file mode 100644 index 000000000..0d83f19e3 --- /dev/null +++ b/packages/preview2-shim/lib/nodejs/sockets/wasi-sockets.js @@ -0,0 +1,320 @@ +/** + * @typedef {import("../../../types/interfaces/wasi-sockets-network").Network} Network + * @typedef {import("../../../types/interfaces/wasi-sockets-network").ErrorCode} ErrorCode + * @typedef {import("../../../types/interfaces/wasi-sockets-network").IpAddressFamily} IpAddressFamily + * @typedef {import("../../../types/interfaces/wasi-sockets-network").IpAddress} IpAddress + * @typedef {import("../../../types/interfaces/wasi-sockets-tcp").TcpSocket} TcpSocket + * @typedef {import("../../../types/interfaces/wasi-sockets-udp").UdpSocket} UdpSocket + */ + +import { isIP } from "net"; +import { assert } from "../../common/assert.js"; +import { + SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST, + SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST, + SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST, +} from "../../io/calls.js"; +import { ioCall, pollableCreate } from "../../io/worker-io.js"; +import { deserializeIpAddress } from "./socket-common.js"; +import { TcpSocketImpl } from "./tcp-socket-impl.js"; +import { IncomingDatagramStream, OutgoingDatagramStream, UdpSocketImpl } from "./udp-socket-impl.js"; + +const symbolDispose = Symbol.dispose || Symbol.for("dispose"); + +/** @type {ErrorCode} */ +export const errorCode = { + // ### GENERAL ERRORS ### + + /// Unknown error + unknown: "unknown", + + /// Access denied. + /// + /// POSIX equivalent: EACCES, EPERM + accessDenied: "access-denied", + + /// The operation is not supported. + /// + /// POSIX equivalent: EOPNOTSUPP + notSupported: "not-supported", + + /// One of the arguments is invalid. + /// + /// POSIX equivalent: EINVAL + invalidArgument: "invalid-argument", + + /// Not enough memory to complete the operation. + /// + /// POSIX equivalent: ENOMEM, ENOBUFS, EAI_MEMORY + outOfMemory: "out-of-memory", + + /// The operation timed out before it could finish completely. + timeout: "timeout", + + /// This operation is incompatible with another asynchronous operation that is already in progress. + /// + /// POSIX equivalent: EALREADY + concurrencyConflict: "concurrency-conflict", + + /// Trying to finish an asynchronous operation that: + /// - has not been started yet, or: + /// - was already finished by a previous `finish-*` call. + /// + /// Note: this is scheduled to be removed when `future`s are natively supported. + notInProgress: "not-in-progress", + + /// The operation has been aborted because it could not be completed immediately. + /// + /// Note: this is scheduled to be removed when `future`s are natively supported. + wouldBlock: "would-block", + + // ### TCP & UDP SOCKET ERRORS ### + + /// The operation is not valid in the socket's current state. + invalidState: "invalid-state", + + /// A new socket resource could not be created because of a system limit. + newSocketLimit: "new-socket-limit", + + /// A bind operation failed because the provided address is not an address that the `network` can bind to. + addressNotBindable: "address-not-bindable", + + /// A bind operation failed because the provided address is already in use or because there are no ephemeral ports available. + addressInUse: "address-in-use", + + /// The remote address is not reachable + remoteUnreachable: "remote-unreachable", + + // ### TCP SOCKET ERRORS ### + + /// The connection was forcefully rejected + connectionRefused: "connection-refused", + + /// The connection was reset. + connectionReset: "connection-reset", + + /// A connection was aborted. + connectionAborted: "connection-aborted", + + // ### UDP SOCKET ERRORS ### + datagramTooLarge: "datagram-too-large", + + // ### NAME LOOKUP ERRORS ### + + /// Name does not exist or has no suitable associated IP addresses. + nameUnresolvable: "name-unresolvable", + + /// A temporary failure in name resolution occurred. + temporaryResolverFailure: "temporary-resolver-failure", + + /// A permanent failure in name resolution occurred. + permanentResolverFailure: "permanent-resolver-failure", +}; + +/** @type {IpAddressFamily[]} */ +const supportedAddressFamilies = ["ipv4", "ipv6"]; + +export const IpAddressFamily = { + ipv4: "ipv4", + ipv6: "ipv6", +}; + +export class WasiSockets { + networkCnt = 1; + socketCnt = 1; + + // TODO: figure out what the max number of sockets should be + maxSockets = 100; + + /** @type {Network} */ networkInstance = null; + /** @type {Map} */ networks = new Map(); + /** @type {Map net.maxSockets, + errorCode.newSocketLimit, + "The new socket resource could not be created because of a system limit" + ); + + try { + return new UdpSocket(addressFamily); + } catch (err) { + assert(true, errorCode.notSupported, err); + } + }, + }; + + this.tcpCreateSocket = { + /** + * @param {IpAddressFamily} addressFamily + * @returns {TcpSocket} + * @throws {not-supported} The specified `address-family` is not supported. (EAFNOSUPPORT) + * @throws {new-socket-limit} The new socket resource could not be created because of a system limit. (EMFILE, ENFILE) + */ + createTcpSocket(addressFamily) { + assert( + supportedAddressFamilies.includes(addressFamily) === false, + errorCode.notSupported, + "The specified `address-family` is not supported." + ); + + assert( + net.socketCnt + 1 > net.maxSockets, + errorCode.newSocketLimit, + "The new socket resource could not be created because of a system limit" + ); + + try { + return new TcpSocket(addressFamily); + } catch (err) { + // assert(true, errorCode.unknown, err); + throw err; + } + }, + }; + + class ResolveAddressStream { + #pollId; + #data; + #curItem = 0; + #error; + resolveNextAddress() { + if (this.#error) throw this.#error; + if (!this.#data) { + const { value: addresses, error } = ioCall(SOCKET_RESOLVE_ADDRESS_GET_AND_DISPOSE_REQUEST, this.#pollId); + if (error) throw (this.#error = convertResolveAddressError(error)); + this.#data = addresses.map((address) => { + const family = `ipv${isIP(address)}`; + return { + tag: family, + val: deserializeIpAddress(address), + }; + }); + } + if (this.#curItem < this.#data.length) return this.#data[this.#curItem++]; + return undefined; + } + subscribe() { + if (this.#data) return pollableCreate(0); + return pollableCreate(this.#pollId); + } + [symbolDispose]() { + if (!this.#data) ioCall(SOCKET_RESOLVE_ADDRESS_DISPOSE_REQUEST); + } + static _create(hostname) { + const res = new ResolveAddressStream(); + if (hostname === "0.0.0.0") { + res.#pollId = 0; + res.#data = { tag: "ipv4", val: [0, 0, 0, 0] }; + return res; + } else if (hostname === "::") { + res.#pollId = 0; + res.#data = { tag: "ipv6", val: [0, 0, 0, 0, 0, 0, 0, 0] }; + return res; + } else if (hostname === "::1") { + res.#pollId = 0; + res.#data = { tag: "ipv6", val: [0, 0, 0, 0, 0, 0, 0, 1] }; + return res; + } + res.#pollId = ioCall(SOCKET_RESOLVE_ADDRESS_CREATE_REQUEST, null, { + hostname, + }); + return res; + } + } + + const resolveAddressStreamCreate = ResolveAddressStream._create; + delete ResolveAddressStream._create; + + this.ipNameLookup = { + ResolveAddressStream, + + /** + * + * @param {Network} network + * @param {string} name + * @returns {ResolveAddressStream} + * @throws {invalid-argument} `name` is a syntactically invalid domain name or IP address. + */ + resolveAddresses(network, name) { + // TODO: bind to network + return resolveAddressStreamCreate(name); + }, + }; + } +} + +function convertResolveAddressError(err) { + switch (err.code) { + default: + return "unknown"; + } +} diff --git a/packages/preview2-shim/test/test.js b/packages/preview2-shim/test/test.js index 0dc82dabd..b2bc260af 100644 --- a/packages/preview2-shim/test/test.js +++ b/packages/preview2-shim/test/test.js @@ -1,16 +1,13 @@ -import { ok, strictEqual } from "node:assert"; +import { deepEqual, deepStrictEqual, equal, notEqual, ok, strictEqual, throws } from "node:assert"; +import { mock } from "node:test"; import { fileURLToPath } from "node:url"; suite("Node.js Preview2", () => { test("Stdio", async () => { const { cli } = await import("@bytecodealliance/preview2-shim"); // todo: wrap in a process call to not spill to test output - cli.stdout - .getStdout() - .blockingWriteAndFlush(new TextEncoder().encode("test stdout")); - cli.stderr - .getStderr() - .blockingWriteAndFlush(new TextEncoder().encode("test stderr")); + cli.stdout.getStdout().blockingWriteAndFlush(new TextEncoder().encode("test stdout")); + cli.stderr.getStderr().blockingWriteAndFlush(new TextEncoder().encode("test stderr")); }); suite("Clocks", () => { @@ -94,17 +91,11 @@ suite("Node.js Preview2", () => { test("FS read", async () => { const { filesystem } = await import("@bytecodealliance/preview2-shim"); const [[rootDescriptor]] = filesystem.preopens.getDirectories(); - const childDescriptor = rootDescriptor.openAt( - {}, - fileURLToPath(import.meta.url), - {}, - {} - ); + const childDescriptor = rootDescriptor.openAt({}, fileURLToPath(import.meta.url), {}, {}); const stream = childDescriptor.readViaStream(0); stream.subscribe().block(); let buf = stream.read(10000n); - while (buf.byteLength === 0) - buf = stream.read(10000n); + while (buf.byteLength === 0) buf = stream.read(10000n); const source = new TextDecoder().decode(buf); ok(source.includes("UNIQUE STRING")); stream[Symbol.dispose](); @@ -139,9 +130,7 @@ suite("Node.js Preview2", () => { const responseHeaders = incomingResponse.headers().entries(); const decoder = new TextDecoder(); - const headers = Object.fromEntries( - responseHeaders.map(([k, v]) => [k, decoder.decode(v)]) - ); + const headers = Object.fromEntries(responseHeaders.map(([k, v]) => [k, decoder.decode(v)])); let responseBody; const incomingBody = incomingResponse.consume(); @@ -149,8 +138,7 @@ suite("Node.js Preview2", () => { const bodyStream = incomingBody.stream(); bodyStream.subscribe().block(); let buf = bodyStream.read(5000n); - while (buf.byteLength === 0) - buf = bodyStream.read(5000n); + while (buf.byteLength === 0) buf = bodyStream.read(5000n); responseBody = new TextDecoder().decode(buf); } @@ -158,4 +146,365 @@ suite("Node.js Preview2", () => { ok(headers["content-type"].startsWith("text/html")); ok(responseBody.includes("WebAssembly")); }); + + suite("WASI Sockets (TCP)", async () => { + test("sockets.instanceNetwork() should be a singleton", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network1 = sockets.instanceNetwork.instanceNetwork(); + equal(network1.id, 1); + const network2 = sockets.instanceNetwork.instanceNetwork(); + equal(network2.id, 1); + }); + + test("sockets.tcpCreateSocket() should throw not-supported", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const socket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv4); + notEqual(socket, null); + + throws( + () => { + sockets.tcpCreateSocket.createTcpSocket("abc"); + }, + (err) => err === sockets.network.errorCode.notSupported + ); + }); + test("tcp.bind(): should bind to a valid ipv4 address", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const tcpSocket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv4); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 1337, + }, + }; + tcpSocket.startBind(network, localAddress); + tcpSocket.finishBind(); + + equal(tcpSocket.network.id, network.id); + deepEqual(tcpSocket.localAddress(), { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 1337, + }, + }); + equal(tcpSocket.addressFamily(), "ipv4"); + }); + + test("tcp.bind(): should bind to a valid ipv6 address and port=0", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const tcpSocket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv6); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + port: 0, + }, + }; + tcpSocket.startBind(network, localAddress); + tcpSocket.finishBind(); + + equal(tcpSocket.network.id, network.id); + equal(tcpSocket.addressFamily(), "ipv6"); + + const boundAddress = tcpSocket.localAddress(); + const expectedAddress = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + // port will be assigned by the OS, so it should be > 0 + // port: 0, + }, + }; + + strictEqual(boundAddress.tag, expectedAddress.tag); + deepStrictEqual(boundAddress.val.address, expectedAddress.val.address); + strictEqual(boundAddress.val.port > 0, true); + }); + + test("tcp.bind(): should throw invalid-argument when invalid address family", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + + const network = sockets.instanceNetwork.instanceNetwork(); + const tcpSocket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv4); + const localAddress = { + // invalid address family + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0xffff, 0xc0a8, 0x0001], + port: 0, + }, + }; + throws( + () => { + tcpSocket.startBind(network, localAddress); + }, + (err) => err === sockets.network.errorCode.invalidArgument + ); + }); + + test("tcp.bind(): should throw invalid-state when already bound", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + + const network = sockets.instanceNetwork.instanceNetwork(); + const tcpSocket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv4); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + throws( + () => { + tcpSocket.startBind(network, localAddress); + tcpSocket.finishBind(); + // already bound + tcpSocket.startBind(network, localAddress); + }, + (err) => err === sockets.network.errorCode.invalidState + ); + }); + + test("tcp.listen(): should listen to an ipv4 address", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const tcpSocket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv4); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + + mock.method(tcpSocket.handle(), "listen", () => { + // mock listen + }); + + tcpSocket.startBind(network, localAddress); + tcpSocket.finishBind(); + tcpSocket.startListen(); + tcpSocket.finishListen(); + + strictEqual(tcpSocket.handle().listen.mock.calls.length, 1); + + mock.reset(); + }); + + test("tcp.connect(): should connect to a valid ipv4 address and port=0", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const tcpSocket = sockets.tcpCreateSocket.createTcpSocket(sockets.network.IpAddressFamily.ipv4); + + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + const remoteAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [192, 168, 0, 1], + port: 80, + }, + }; + + mock.method(tcpSocket.handle(), "connect", () => { + // mock connect + }); + + tcpSocket.startBind(network, localAddress); + tcpSocket.finishBind(); + tcpSocket.startConnect(network, remoteAddress); + tcpSocket.finishConnect(); + + strictEqual(tcpSocket.handle().connect.mock.calls.length, 1); + + equal(tcpSocket.network.id, network.id); + equal(tcpSocket.addressFamily(), "ipv4"); + + const boundAddress = tcpSocket.localAddress(); + const expectedAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + + strictEqual(boundAddress.tag, expectedAddress.tag); + deepStrictEqual(boundAddress.val.address, expectedAddress.val.address); + strictEqual(boundAddress.val.port > 0, true); + }); + }); + + suite("WASI Sockets (UDP)", async () => { + test("sockets.udpCreateSocket() should be a singleton", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const socket1 = sockets.udpCreateSocket.createUdpSocket(sockets.network.IpAddressFamily.ipv4); + notEqual(socket1.id, 1); + const socket2 = sockets.udpCreateSocket.createUdpSocket(sockets.network.IpAddressFamily.ipv4); + notEqual(socket2.id, 1); + }); + test("sockets.udpCreateSocket() should not-support on invalid ip family", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + + throws( + () => { + sockets.udpCreateSocket.createUdpSocket("xyz"); + }, + (err) => err === sockets.network.errorCode.notSupported + ); + }); + test("udp.bind(): should bind to a valid ipv4 address and port=0", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const socket = sockets.udpCreateSocket.createUdpSocket(sockets.network.IpAddressFamily.ipv4); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + socket.startBind(network, localAddress); + socket.finishBind(); + + equal(socket.network.id, network.id); + + const boundAddress = socket.localAddress(); + const expectedAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + strictEqual(boundAddress.tag, expectedAddress.tag); + deepStrictEqual(boundAddress.val.address, expectedAddress.val.address); + strictEqual(boundAddress.val.port > 0, true); + equal(socket.addressFamily(), "ipv4"); + }); + test("udp.bind(): should bind to a valid ipv6 address and port=0", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const socket = sockets.udpCreateSocket.createUdpSocket(sockets.network.IpAddressFamily.ipv6); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + port: 0, + }, + }; + socket.startBind(network, localAddress); + socket.finishBind(); + + equal(socket.network.id, network.id); + + const boundAddress = socket.localAddress(); + const expectedAddress = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + // port will be assigned by the OS, so it should be > 0 + // port: 0, + }, + }; + strictEqual(boundAddress.tag, expectedAddress.tag); + deepStrictEqual(boundAddress.val.address, expectedAddress.val.address); + strictEqual(boundAddress.val.port > 0, true); + equal(socket.addressFamily(), "ipv6"); + }); + test("udp.stream(): should connect to a valid ipv4 address", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const socket = sockets.udpCreateSocket.createUdpSocket(sockets.network.IpAddressFamily.ipv4); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + port: 0, + }, + }; + const remoteAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [192, 168, 0, 1], + port: 80, + }, + }; + + mock.method(socket.handle(), "connect", () => { + // mock connect + }); + + socket.startBind(network, localAddress); + socket.finishBind(); + socket.stream(remoteAddress); + + strictEqual(socket.handle().connect.mock.calls.length, 1); + + strictEqual(socket.network.id, network.id); + strictEqual(socket.addressFamily(), "ipv4"); + + const boundAddress = socket.localAddress(); + const expectedAddress = { + tag: sockets.network.IpAddressFamily.ipv4, + val: { + address: [0, 0, 0, 0], + // port will be assigned by the OS, so it should be > 0 + // port: 0, + }, + }; + + strictEqual(boundAddress.tag, expectedAddress.tag); + deepStrictEqual(boundAddress.val.address, expectedAddress.val.address); + strictEqual(boundAddress.val.port > 0, true); + }); + test("udp.stream(): should connect to a valid ipv6 address", async () => { + const { sockets } = await import("@bytecodealliance/preview2-shim"); + const network = sockets.instanceNetwork.instanceNetwork(); + const socket = sockets.udpCreateSocket.createUdpSocket(sockets.network.IpAddressFamily.ipv6); + const localAddress = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + port: 1337, + }, + }; + const remoteAddress = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + port: 1336, + }, + }; + + mock.method(socket.handle(), "connect", () => { + // mock connect + }); + + socket.startBind(network, localAddress); + socket.finishBind(); + socket.stream(remoteAddress); + + strictEqual(socket.handle().connect.mock.calls.length, 1); + + strictEqual(socket.network.id, network.id); + strictEqual(socket.addressFamily(), "ipv6"); + deepEqual(socket.localAddress(), { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0], + port: 1337, + }, + }); + }); + }); }); diff --git a/test/preview2-wasi-sockets-tcp.js b/test/preview2-wasi-sockets-tcp.js new file mode 100644 index 000000000..c3a6d598a --- /dev/null +++ b/test/preview2-wasi-sockets-tcp.js @@ -0,0 +1,41 @@ +const { sockets } = await import("@bytecodealliance/preview2-shim"); +const network = sockets.instanceNetwork.instanceNetwork(); + +// server +const serverAddressIpv6 = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0x1], + port: 3000, + }, +}; +const server = sockets.tcpCreateSocket.createTcpSocket( + sockets.network.IpAddressFamily.ipv6 +); +server.startBind(network, serverAddressIpv6); +server.finishBind(); +server.startListen(); +server.finishListen(); +const { address, port } = server.localAddress().val; +console.log(`[wasi-sockets-tcp] Server listening on: ${address}:${port}`); + +// client +const client = sockets.tcpCreateSocket.createTcpSocket( + sockets.network.IpAddressFamily.ipv6 +); + +client.setKeepAlive(true); +client.setNoDelay(true); +client.startConnect(network, serverAddressIpv6); +client.finishConnect(); + +setTimeout(() => { + // const [socket, input, output] = server.accept(); + // output.write('hello world'); + // const buff = input.read(2); + // console.log(`[wasi-sockets] Server received: ${buff}`); + + // client.shutdown("send"); + // server.shutdown("receive"); + // process.exit(0); +}, 2000); diff --git a/test/preview2-wasi-sockets-udp.js b/test/preview2-wasi-sockets-udp.js new file mode 100644 index 000000000..66318e84d --- /dev/null +++ b/test/preview2-wasi-sockets-udp.js @@ -0,0 +1,46 @@ +const { sockets } = await import("@bytecodealliance/preview2-shim"); +const network = sockets.instanceNetwork.instanceNetwork(); + +// server +const serverAddressIpv6 = { + tag: sockets.network.IpAddressFamily.ipv6, + val: { + address: [0, 0, 0, 0, 0, 0, 0, 0x1], + port: 3000, + }, +}; +const server = sockets.udpCreateSocket.createUdpSocket( + sockets.network.IpAddressFamily.ipv6 +); +server.startBind(network, serverAddressIpv6); +server.finishBind(); +const { address, port } = server.localAddress().val; +console.log(`[wasi-sockets-udp] Server listening on: ${address}:${port}`); + +// client +const client = sockets.udpCreateSocket.createUdpSocket( + sockets.network.IpAddressFamily.ipv6 +); + +client.startConnect(network, serverAddressIpv6); +client.finishConnect(); + +setTimeout(() => { + client.send([ + { + data: [Buffer.from('hello world')], + remoteAddress: serverAddressIpv6, + } + ]); + + const data = server.receive(); + console.log(`[wasi-sockets-udp] Server received`); + console.log({ + data + }); +}, 2000); + +setTimeout(() => { + server[Symbol.dispose](); + client[Symbol.dispose](); +}, 5000); \ No newline at end of file