From 1d82020aca67a4c2608ed033326e73e18a5ac2ee Mon Sep 17 00:00:00 2001 From: lalitcap23 Date: Tue, 25 Nov 2025 16:58:44 +0530 Subject: [PATCH 1/3] refactor- moved closeRead and reset tests to compliance suite --- .../src/stream-muxer/stream-test.ts | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts b/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts index 96735cfedb..0fe7b629dc 100644 --- a/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts +++ b/packages/interface-compliance-tests/src/stream-muxer/stream-test.ts @@ -200,6 +200,15 @@ export default (common: TestSetup): void => { expect(inboundStream).to.have.property('writeStatus', 'writable', 'inbound stream writeStatus was incorrect') expect(inboundStream).to.have.property('readStatus', 'readable', 'inbound stream readStatus was incorrect') }) + + it('closes read only', async () => { + expect(outboundStream).to.not.have.nested.property('timeline.close') + + await outboundStream.closeRead() + + expect(outboundStream).to.have.property('writeStatus', 'writable') + expect(outboundStream).to.have.property('readStatus', 'closed') + }) it('aborts', async () => { const eventPromises = Promise.all([ @@ -237,6 +246,19 @@ export default (common: TestSetup): void => { expect(inboundEvent).to.have.nested.property('error.name', 'StreamResetError') }) + it('resets when remote aborts', async () => { + expect(outboundStream).to.not.have.nested.property('timeline.close') + + const closePromise = pEvent(outboundStream, 'close') + inboundStream.abort(new Error('Urk!')) + + await closePromise + + expect(outboundStream).to.have.property('status', 'reset') + expect(isValidTick(outboundStream.timeline.close)).to.equal(true) + expect(outboundStream.timeline.close).to.be.greaterThanOrEqual(outboundStream.timeline.open) + }) + it('does not send close read when remote closes write', async () => { // @ts-expect-error internal method of AbstractMessageStream const sendCloseReadSpy = Sinon.spy(outboundStream, 'sendCloseRead') From 7a8a01601c881a81da7857a4f2e58617f03fd596 Mon Sep 17 00:00:00 2001 From: lalitcap23 Date: Tue, 25 Nov 2025 17:53:03 +0530 Subject: [PATCH 2/3] chore: remove resolved TODO comment --- packages/transport-webrtc/test/stream.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index fc4388c83f..bb629857f6 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -117,7 +117,6 @@ function generatePbByFlag (flag?: Message.Flag): Uint8Array { return lengthPrefixed.encode.single(buf).subarray() } -// TODO: move to transport interface compliance suite describe.skip('Stream Stats', () => { let stream: WebRTCStream let peerConnection: RTCPeerConnection From f7a61b68a283c4cddd0b46b126e1c4b058329e59 Mon Sep 17 00:00:00 2001 From: lalitcap23 Date: Thu, 11 Dec 2025 00:33:00 +0530 Subject: [PATCH 3/3] chore: remove redundant skipped stream tests --- packages/transport-webrtc/test/stream.spec.ts | 184 ------------------ 1 file changed, 184 deletions(-) diff --git a/packages/transport-webrtc/test/stream.spec.ts b/packages/transport-webrtc/test/stream.spec.ts index bb629857f6..ce9efd1dee 100644 --- a/packages/transport-webrtc/test/stream.spec.ts +++ b/packages/transport-webrtc/test/stream.spec.ts @@ -1,18 +1,11 @@ import { defaultLogger } from '@libp2p/logger' import { expect } from 'aegir/chai' -import delay from 'delay' import * as lengthPrefixed from 'it-length-prefixed' -import { bytes } from 'multiformats' -import { pEvent } from 'p-event' import { stubInterface } from 'sinon-ts' import { MAX_MESSAGE_SIZE, PROTOBUF_OVERHEAD } from '../src/constants.js' import { Message } from '../src/private-to-public/pb/message.js' import { createStream } from '../src/stream.js' import { isFirefox } from '../src/util.ts' -import { RTCPeerConnection } from '../src/webrtc/index.js' -import { receiveFinAck, receiveRemoteCloseWrite } from './util.js' -import type { WebRTCStream } from '../src/stream.js' -import type { Stream } from '@libp2p/interface' describe('Max message size', () => { it(`sends messages smaller or equal to ${MAX_MESSAGE_SIZE} bytes in one`, async () => { @@ -84,180 +77,3 @@ describe('Max message size', () => { } }) }) - -const TEST_MESSAGE = 'test_message' - -async function setup (): Promise<{ peerConnection: RTCPeerConnection, dataChannel: RTCDataChannel, stream: WebRTCStream }> { - const peerConnection = new RTCPeerConnection() - const dataChannel = peerConnection.createDataChannel('whatever', { negotiated: true, id: 91 }) - - await pEvent(dataChannel, 'open', { - rejectionEvents: [ - 'close', - 'error' - ] - }) - - const stream = createStream({ - channel: dataChannel, - direction: 'outbound', - closeTimeout: 1, - log: defaultLogger().forComponent('test') - }) - - return { peerConnection, dataChannel, stream } -} - -function generatePbByFlag (flag?: Message.Flag): Uint8Array { - const buf = Message.encode({ - flag, - message: bytes.fromString(TEST_MESSAGE) - }) - - return lengthPrefixed.encode.single(buf).subarray() -} - -describe.skip('Stream Stats', () => { - let stream: WebRTCStream - let peerConnection: RTCPeerConnection - let dataChannel: RTCDataChannel - - beforeEach(async () => { - ({ stream, peerConnection, dataChannel } = await setup()) - }) - - afterEach(() => { - if (peerConnection != null) { - peerConnection.close() - } - }) - - it('can construct', () => { - expect(stream.timeline.close).to.not.exist() - }) - - it('close marks it closed', async () => { - expect(stream.timeline.close).to.not.exist() - expect(stream.writeStatus).to.equal('writable') - - receiveFinAck(dataChannel) - receiveRemoteCloseWrite(dataChannel) - - await Promise.all([ - pEvent(stream, 'close'), - stream.close() - ]) - - expect(stream.timeline.close).to.be.a('number') - expect(stream.writeStatus).to.equal('closed') - }) - - it('closeRead marks it read-closed only', async () => { - expect(stream.timeline.close).to.not.exist() - await stream.closeRead() - - expect(stream).to.have.property('writeStatus', 'writable') - expect(stream).to.have.property('readStatus', 'closed') - }) - - it('closeWrite marks it write-closed only', async () => { - expect(stream.timeline.close).to.not.exist() - - receiveFinAck(dataChannel) - await stream.close() - - expect(stream).to.have.property('writeStatus', 'closed') - expect(stream).to.have.property('readStatus', 'readable') - }) - - it('abort = close', () => { - expect(stream.timeline.close).to.not.exist() - stream.abort(new Error('Oh no!')) - expect(stream.timeline.close).to.be.a('number') - }) - - it('reset = close', () => { - expect(stream.timeline.close).to.not.exist() - stream.onRemoteReset() // only resets the write side - expect(stream.timeline.close).to.be.a('number') - expect(stream.timeline.close).to.be.greaterThanOrEqual(stream.timeline.open) - }) -}) - -// TODO: move to transport interface compliance suite -describe.skip('Stream Read Stats Transition By Incoming Flag', () => { - let dataChannel: RTCDataChannel - let stream: Stream - let peerConnection: RTCPeerConnection - - beforeEach(async () => { - ({ dataChannel, stream, peerConnection } = await setup()) - }) - - afterEach(() => { - if (peerConnection != null) { - peerConnection.close() - } - }) - - it('no flag, no transition', () => { - expect(stream.timeline.close).to.not.exist() - const data = generatePbByFlag() - dataChannel.onmessage?.(new MessageEvent('message', { data })) - - expect(stream.timeline.close).to.not.exist() - }) - - it('open to read-close by flag:FIN', async () => { - const data = generatePbByFlag(Message.Flag.FIN) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.readStatus).to.equal('closed') - }) - - it('read-close to close by flag:STOP_SENDING', async () => { - const data = generatePbByFlag(Message.Flag.STOP_SENDING) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.remoteReadStatus).to.equal('closed') - }) -}) - -// TODO: move to transport interface compliance suite -describe.skip('Stream Write Stats Transition By Incoming Flag', () => { - let dataChannel: RTCDataChannel - let stream: Stream - let peerConnection: RTCPeerConnection - - beforeEach(async () => { - ({ dataChannel, stream, peerConnection } = await setup()) - }) - - afterEach(() => { - if (peerConnection != null) { - peerConnection.close() - } - }) - - it('open to write-close by flag:STOP_SENDING', async () => { - const data = generatePbByFlag(Message.Flag.STOP_SENDING) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.remoteReadStatus).to.equal('closed') - }) - - it('write-close to close by flag:FIN', async () => { - const data = generatePbByFlag(Message.Flag.FIN) - dataChannel.dispatchEvent(new MessageEvent('message', { data })) - - await delay(100) - - expect(stream.remoteWriteStatus).to.equal('closed') - }) -})