Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -1243,6 +1243,23 @@ Emitted when an error occurs during the processing of a stream on the client.

Emitted when a stream is received on the client.

##### Event: `'http2.client.stream.bodyChunkSent'`

* `stream` {ClientHttp2Stream}
* `writev` {boolean}
* `data` {Buffer | string | Buffer\[] | Object\[]}
* `chunk` {Buffer|string}
* `encoding` {string}
* `encoding` {string}

Emitted when a chunk of the client stream body is being sent.

##### Event: `'http2.client.stream.bodySent'`

* `stream` {ClientHttp2Stream}

Emitted after the client stream body has been fully sent.

##### Event: `'http2.client.stream.close'`

* `stream` {ClientHttp2Stream}
Expand Down
15 changes: 15 additions & 0 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ const dc = require('diagnostics_channel');
const onClientStreamCreatedChannel = dc.channel('http2.client.stream.created');
const onClientStreamStartChannel = dc.channel('http2.client.stream.start');
const onClientStreamErrorChannel = dc.channel('http2.client.stream.error');
const onClientStreamBodyChunkSentChannel = dc.channel('http2.client.stream.bodyChunkSent');
const onClientStreamBodySentChannel = dc.channel('http2.client.stream.bodySent');
const onClientStreamFinishChannel = dc.channel('http2.client.stream.finish');
const onClientStreamCloseChannel = dc.channel('http2.client.stream.close');
const onServerStreamCreatedChannel = dc.channel('http2.server.stream.created');
Expand Down Expand Up @@ -2300,6 +2302,15 @@ class Http2Stream extends Duplex {
req = writeGeneric(this, data, encoding, writeCallback);

trackWriteState(this, req.bytes);

if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodyChunkSentChannel.hasSubscribers) {
onClientStreamBodyChunkSentChannel.publish({
stream: this,
writev,
data,
encoding,
});
}
}

_write(data, encoding, cb) {
Expand All @@ -2317,6 +2328,10 @@ class Http2Stream extends Duplex {
}
debugStreamObj(this, 'shutting down writable on _final');
ReflectApply(shutdownWritable, this, [cb]);

if (this.session[kType] === NGHTTP2_SESSION_CLIENT && onClientStreamBodySentChannel.hasSubscribers) {
onClientStreamBodySentChannel.publish({ stream: this });
}
}

_read(nread) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting
// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and
// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are
// being sent with multiple Buffers and strings.

const assert = require('assert');
const dc = require('diagnostics_channel');
const http2 = require('http2');
const { Duplex } = require('stream');

let bodyChunkSent = false;

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {
// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

assert.strictEqual(writev, true);

assert.ok(Array.isArray(data));
assert.strictEqual(data.length, 3);

assert.strictEqual(data[0].chunk, 'héllo');
assert.strictEqual(data[0].encoding, 'latin1');

assert.ok(Buffer.from('foo').equals(data[1].chunk));
assert.strictEqual(data[1].encoding, 'buffer');

assert.ok(Buffer.from('bar').equals(data[2].chunk));
assert.strictEqual(data[2].encoding, 'buffer');

assert.strictEqual(encoding, '');

bodyChunkSent = true;
}));

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {
// 'http2.client.stream.bodyChunkSent' must run first.
assert.ok(bodyChunkSent);

// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
}));

const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.respond({}, { endStream: true });
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });
stream.write('héllo', 'latin1');
stream.write(Buffer.from('foo'));
stream.write(new TextEncoder().encode('bar'));
stream.end();

stream.on('response', common.mustCall(() => {
client.close();
server.close();
}));
}, 1));
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting
// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and
// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are
// being sent with multiple Buffers.

const assert = require('assert');
const dc = require('diagnostics_channel');
const http2 = require('http2');
const { Duplex } = require('stream');

let bodyChunkSent = false;

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {
// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

assert.strictEqual(writev, true);

assert.ok(Array.isArray(data));
assert.strictEqual(data.length, 2);

assert.ok(Buffer.from('foo').equals(data[0]));
assert.ok(Buffer.from('bar').equals(data[1]));

assert.strictEqual(encoding, '');

bodyChunkSent = true;
}));

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {
// 'http2.client.stream.bodyChunkSent' must run first.
assert.ok(bodyChunkSent);

// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
}));

const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.respond({}, { endStream: true });
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });
stream.write(Buffer.from('foo'));
stream.write(Buffer.from('bar'));
stream.end();

stream.on('response', common.mustCall(() => {
client.close();
server.close();
}));
}, 1));
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting
// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and
// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are
// being sent with no chunks.

const assert = require('assert');
const dc = require('diagnostics_channel');
const http2 = require('http2');
const { Duplex } = require('stream');

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustNotCall());

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {
// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
}));

const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.respond({}, { endStream: true });
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });
stream.end();

stream.on('response', common.mustCall(() => {
client.close();
server.close();
}));
}, 1));
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting
// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and
// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are
// being sent with a single Buffer.

const assert = require('assert');
const dc = require('diagnostics_channel');
const http2 = require('http2');
const { Duplex } = require('stream');

let bodyChunkSent = false;

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {
// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

assert.strictEqual(writev, false);
assert.ok(Buffer.from('foo').equals(data));
assert.strictEqual(encoding, 'buffer');

bodyChunkSent = true;
}));

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {
// 'http2.client.stream.bodyChunkSent' must run first.
assert.ok(bodyChunkSent);

// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
}));

const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.respond({}, { endStream: true });
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });
stream.write(Buffer.from('foo'));
stream.end();

stream.on('response', common.mustCall(() => {
client.close();
server.close();
}));
}, 1));
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
'use strict';

const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

// This test ensures that the built-in HTTP/2 diagnostics channels are reporting
// the diagnostics messages for the 'http2.client.stream.bodyChunkSent' and
// 'http2.client.stream.bodySent' channels when ClientHttp2Streams bodies are
// being sent with a single string.

const assert = require('assert');
const dc = require('diagnostics_channel');
const http2 = require('http2');
const { Duplex } = require('stream');

let bodyChunkSent = false;

dc.subscribe('http2.client.stream.bodyChunkSent', common.mustCall(({ stream, writev, data, encoding }) => {
// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');

assert.strictEqual(writev, false);
assert.strictEqual(data, 'foo');
assert.strictEqual(encoding, 'utf8');

bodyChunkSent = true;
}));

dc.subscribe('http2.client.stream.bodySent', common.mustCall(({ stream }) => {
// 'http2.client.stream.bodyChunkSent' must run first.
assert.ok(bodyChunkSent);

// Since ClientHttp2Stream is not exported from any module, this just checks
// if the stream is an instance of Duplex.
assert.ok(stream instanceof Duplex);
assert.strictEqual(stream.constructor.name, 'ClientHttp2Stream');
}));

const server = http2.createServer();
server.on('stream', common.mustCall((stream) => {
stream.respond({}, { endStream: true });
}));

server.listen(0, common.mustCall(() => {
const port = server.address().port;
const client = http2.connect(`http://localhost:${port}`);

const stream = client.request({ [http2.constants.HTTP2_HEADER_METHOD]: 'POST' });
stream.write('foo');
stream.end();

stream.on('response', common.mustCall(() => {
client.close();
server.close();
}));
}, 1));
Loading