diff --git a/lib/internal/quic/quic.js b/lib/internal/quic/quic.js index 6e996eb32f1c8e..e04ccd7f2640d0 100644 --- a/lib/internal/quic/quic.js +++ b/lib/internal/quic/quic.js @@ -143,6 +143,18 @@ const onEndpointErrorChannel = dc.channel('quic.endpoint.error'); const onEndpointBusyChangeChannel = dc.channel('quic.endpoint.busy.change'); const onEndpointClientSessionChannel = dc.channel('quic.session.created.client'); const onEndpointServerSessionChannel = dc.channel('quic.session.created.server'); +const onSessionOpenStreamChannel = dc.channel('quic.session.open.stream'); +const onSessionReceivedStreamChannel = dc.channel('quic.session.received.stream'); +const onSessionSendDatagramChannel = dc.channel('quic.session.send.datagram'); +const onSessionUpdateKeyChannel = dc.channel('quic.session.update.key'); +const onSessionClosingChannel = dc.channel('quic.session.closing'); +const onSessionClosedChannel = dc.channel('quic.session.closed'); +const onSessionReceiveDatagramChannel = dc.channel('quic.session.receive.datagram'); +const onSessionReceiveDatagramStatusChannel = dc.channel('quic.session.receive.datagram.status'); +const onSessionPathValidationChannel = dc.channel('quic.session.path.validation'); +const onSessionTicketChannel = dc.channel('quic.session.ticket'); +const onSessionVersionNegotiationChannel = dc.channel('quic.session.version.negotiation'); +const onSessionHandshakeChannel = dc.channel('quic.session.handshake'); /** * @typedef {import('../socketaddress.js').SocketAddress} SocketAddress @@ -873,6 +885,13 @@ class QuicSession { const stream = new QuicStream(kPrivateConstructor, this.#streamConfig, handle, this, 0 /* Bidirectional */); this.#streams.add(stream); + + if (onSessionOpenStreamChannel.hasSubscribers) { + onSessionOpenStreamChannel.publish({ + stream, + session: this, + }); + } return stream; } @@ -893,6 +912,14 @@ class QuicSession { const stream = new QuicStream(kPrivateConstructor, this.#streamConfig, handle, this, 1 /* Unidirectional */); this.#streams.add(stream); + + if (onSessionOpenStreamChannel.hasSubscribers) { + onSessionOpenStreamChannel.publish({ + stream, + session: this, + }); + } + return stream; } @@ -923,7 +950,17 @@ class QuicSession { datagram.byteOffset, datagram.byteLength); } - return this.#handle.sendDatagram(datagram); + const id = this.#handle.sendDatagram(datagram); + + if (onSessionSendDatagramChannel.hasSubscribers) { + onSessionSendDatagramChannel.publish({ + id, + length: datagram.byteLength, + session: this, + }); + } + + return id; } /** @@ -934,6 +971,11 @@ class QuicSession { throw new ERR_INVALID_STATE('Session is closed'); } this.#handle.updateKey(); + if (onSessionUpdateKeyChannel.hasSubscribers) { + onSessionUpdateKeyChannel.publish({ + session: this, + }); + } } /** @@ -950,6 +992,11 @@ class QuicSession { if (!this.#isClosedOrClosing) { this.#isPendingClose = true; this.#handle?.gracefulClose(); + if (onSessionClosingChannel.hasSubscribers) { + onSessionClosingChannel.publish({ + session: this, + }); + } } return this.closed; } @@ -1023,6 +1070,12 @@ class QuicSession { this.#handle.destroy(); this.#handle = undefined; + if (onSessionClosedChannel.hasSubscribers) { + onSessionClosedChannel.publish({ + session: this, + }); + } + return this.closed; } @@ -1069,6 +1122,14 @@ class QuicSession { assert(this.#ondatagram, 'Unexpected datagram event'); if (this.destroyed) return; this.#ondatagram(u8, early); + + if (onSessionReceiveDatagramChannel.hasSubscribers) { + onSessionReceiveDatagramChannel.publish({ + length: u8.byteLength, + early, + session: this, + }); + } } /** @@ -1080,6 +1141,14 @@ class QuicSession { // The ondatagramstatus callback may not have been specified. That's ok. // We'll just ignore the event in that case. this.#ondatagramstatus?.(id, status); + + if (onSessionReceiveDatagramStatusChannel.hasSubscribers) { + onSessionReceiveDatagramStatusChannel.publish({ + id, + status, + session: this, + }); + } } /** @@ -1098,6 +1167,18 @@ class QuicSession { if (this.destroyed) return; this.#onpathvalidation(result, newLocalAddress, newRemoteAddress, oldLocalAddress, oldRemoteAddress, preferredAddress); + + if (onSessionPathValidationChannel.hasSubscribers) { + onSessionPathValidationChannel.publish({ + result, + newLocalAddress, + newRemoteAddress, + oldLocalAddress, + oldRemoteAddress, + preferredAddress, + session: this, + }); + } } /** @@ -1109,6 +1190,12 @@ class QuicSession { assert(this.#onsessionticket, 'Unexpected session ticket event'); if (this.destroyed) return; this.#onsessionticket(ticket); + if (onSessionTicketChannel.hasSubscribers) { + onSessionTicketChannel.publish({ + ticket, + session: this, + }); + } } /** @@ -1123,6 +1210,15 @@ class QuicSession { if (this.destroyed) return; this.#onversionnegotiation(version, requestedVersions, supportedVersions); this.destroy(new ERR_QUIC_VERSION_NEGOTIATION_ERROR()); + + if (onSessionVersionNegotiationChannel.hasSubscribers) { + onSessionVersionNegotiationChannel.publish({ + version, + requestedVersions, + supportedVersions, + session: this, + }); + } } /** @@ -1141,6 +1237,19 @@ class QuicSession { // We'll just ignore the event in that case. this.#onhandshake?.(sni, alpn, cipher, cipherVersion, validationErrorReason, validationErrorCode, earlyDataAccepted); + + if (onSessionHandshakeChannel.hasSubscribers) { + onSessionHandshakeChannel.publish({ + sni, + alpn, + cipher, + cipherVersion, + validationErrorReason, + validationErrorCode, + earlyDataAccepted, + session: this, + }); + } } /** @@ -1161,6 +1270,13 @@ class QuicSession { this.#streams.add(stream); this.#onstream(stream); + + if (onSessionReceivedStreamChannel.hasSubscribers) { + onSessionReceivedStreamChannel.publish({ + stream, + session: this, + }); + } } [kRemoveStream](stream) {