Skip to content

Commit

Permalink
http2: wait for session socket writable end on close/destroy
Browse files Browse the repository at this point in the history
This slightly alters the behaviour of session close by first using
.end() on a session socket to finish writing the data and only then
calls .destroy() to make sure the Readable side is closed. This allows
the socket to finish transmitting data, receive proper FIN packet and
avoid ECONNRESET errors upon graceful close.

onStreamClose now directly calls stream.destroy() instead of
kMaybeDestroy because the latter will first check that the stream has
writableFinished set. And that may not be true as we have just
(synchronously) called .end() on the stream if it was not closed and
that doesn't give it enough time to finish. Furthermore there is no
point in waiting for 'finish' as the other party have already closed the
stream and we won't be able to write anyway.

This also changes a few tests to correctly handle graceful session
close. This includes:
* not reading request data (on client side)
* not reading push stream data (on client side)
* relying on socket.destroy() (on client) to finish server session
  due to the destroy of the socket without closing the server session.
  As the goaway itself is *not* a session close.

Added few 'close' event mustCall checks.

PR-URL: #30854
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Rich Trott <[email protected]>
  • Loading branch information
lundibundi authored and BridgeAR committed Jan 3, 2020
1 parent 86f2e86 commit ba0682e
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 45 deletions.
107 changes: 71 additions & 36 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,10 @@ function onStreamClose(code) {
if (!stream || stream.destroyed)
return false;

debugStreamObj(stream, 'closed with code %d', code);
debugStreamObj(
stream, 'closed with code %d, closed %s, readable %s',
code, stream.closed, stream.readable
);

if (!stream.closed)
closeStream(stream, code, kNoRstStream);
Expand All @@ -497,7 +500,7 @@ function onStreamClose(code) {
// Defer destroy we actually emit end.
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](code);
stream.destroy();
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -985,22 +988,76 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
debugSessionObj(session, 'finishSessionDestroy');

function cleanupSession(session) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

const handle = session[kHandle];
session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
session[kNativeFields] = new Uint8Array(kSessionUint8FieldCount);
socket[kSession] = undefined;
socket[kServer] = undefined;
if (handle)
handle.ondone = null;
if (socket) {
socket[kSession] = undefined;
socket[kServer] = undefined;
}
}

function finishSessionClose(session, error) {
debugSessionObj(session, 'finishSessionClose');

const socket = session[kSocket];
cleanupSession(session);

if (socket && !socket.destroyed) {
// Always wait for writable side to finish.
socket.end((err) => {
debugSessionObj(session, 'finishSessionClose socket end', err);
// Due to the way the underlying stream is handled in Http2Session we
// won't get graceful Readable end from the other side even if it was sent
// as the stream is already considered closed and will neither be read
// from nor keep the event loop alive.
// Therefore destroy the socket immediately.
// Fixing this would require some heavy juggling of ReadStart/ReadStop
// mostly on Windows as on Unix it will be fine with just ReadStart
// after this 'ondone' callback.
socket.destroy(error);
emitClose(session, error);
});
} else {
process.nextTick(emitClose, session, error);
}
}

function closeSession(session, code, error) {
debugSessionObj(session, 'start closing/destroying');

const state = session[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners.
session.setTimeout(0);
session.removeAllListeners('timeout');

// Destroy any pending and open streams
if (state.pendingStreams.size > 0 || state.streams.size > 0) {
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));
}

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
// Disassociate from the socket and server.
const socket = session[kSocket];
const handle = session[kHandle];

// Destroy the handle if it exists at this point.
if (handle !== undefined) {
handle.ondone = finishSessionClose.bind(null, session, error);
handle.destroy(code, socket.destroyed);
} else {
finishSessionClose(session, error);
}
}

// Upon creation, the Http2Session takes ownership of the socket. The session
Expand Down Expand Up @@ -1327,6 +1384,7 @@ class Http2Session extends EventEmitter {
destroy(error = NGHTTP2_NO_ERROR, code) {
if (this.destroyed)
return;

debugSessionObj(this, 'destroying');

if (typeof error === 'number') {
Expand All @@ -1338,30 +1396,7 @@ class Http2Session extends EventEmitter {
if (code === undefined && error != null)
code = NGHTTP2_INTERNAL_ERROR;

const state = this[kState];
state.flags |= SESSION_FLAGS_DESTROYED;
state.destroyCode = code;

// Clear timeout and remove timeout listeners
this.setTimeout(0);
this.removeAllListeners('timeout');

// Destroy any pending and open streams
const cancel = new ERR_HTTP2_STREAM_CANCEL(error);
state.pendingStreams.forEach((stream) => stream.destroy(cancel));
state.streams.forEach((stream) => stream.destroy(error));

// Disassociate from the socket and server
const socket = this[kSocket];
const handle = this[kHandle];

// Destroy the handle if it exists at this point
if (handle !== undefined) {
handle.ondone = finishSessionDestroy.bind(null, this, error);
handle.destroy(code, socket.destroyed);
} else {
finishSessionDestroy(this, error);
}
closeSession(this, code, error);
}

// Closing the session will:
Expand Down
2 changes: 1 addition & 1 deletion src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
Context::Scope context_scope(env()->context());
Http2Scope h2scope(this);
CHECK_NOT_NULL(stream_);
Debug(this, "receiving %d bytes", nread);
Debug(this, "receiving %d bytes, offset %d", nread, stream_buf_offset_);
AllocatedBuffer buf(env(), buf_);

// Only pass data on if nread > 0
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-http2-capture-rejection.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ events.captureRejections = true;
}));
}


{
// Test error thrown in 'request' event

Expand Down Expand Up @@ -136,6 +135,7 @@ events.captureRejections = true;
const session = connect(`http://localhost:${port}`);

const req = session.request();
req.resume();

session.on('stream', common.mustCall(async (stream) => {
session.close();
Expand Down
1 change: 1 addition & 0 deletions test/parallel/test-http2-client-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ const Countdown = require('../common/countdown');
server.on('stream', common.mustNotCall());
server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
client.on('close', common.mustCall());
const socket = client[kSocket];
socket.on('close', common.mustCall(() => {
assert(socket.destroyed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ if (!common.hasCrypto)
const assert = require('assert');
const h2 = require('http2');
const NGHTTP2_INTERNAL_ERROR = h2.constants.NGHTTP2_INTERNAL_ERROR;
const Countdown = require('../common/countdown');

const server = h2.createServer();

Expand All @@ -27,6 +28,11 @@ server.on('stream', (stream) => {

server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
const countdown = new Countdown(2, () => {
server.close();
client.close();
});
client.on('connect', () => countdown.dec());

const req = client.request();
req.destroy(new Error('test'));
Expand All @@ -39,8 +45,7 @@ server.listen(0, common.mustCall(() => {
req.on('close', common.mustCall(() => {
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
assert.strictEqual(req.rstCode, NGHTTP2_INTERNAL_ERROR);
server.close();
client.close();
countdown.dec();
}));

req.on('response', common.mustNotCall());
Expand Down
2 changes: 2 additions & 0 deletions test/parallel/test-http2-compat-client-upload-reject.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ fs.readFile(loc, common.mustCall((err, data) => {
res.end();
});
}));
server.on('close', common.mustCall());

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
client.on('close', common.mustCall());

const req = client.request({ ':method': 'POST' });
req.on('response', common.mustCall((headers) => {
Expand Down
4 changes: 3 additions & 1 deletion test/parallel/test-http2-create-client-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ const URL = url.URL;
const client =
h2.connect.apply(null, i)
.on('connect', common.mustCall(() => maybeClose(client)));
client.on('close', common.mustCall());
});

// Will fail because protocol does not match the server.
h2.connect({ port: port, protocol: 'https:' })
const client = h2.connect({ port: port, protocol: 'https:' })
.on('error', common.mustCall(() => serverClose.dec()));
client.on('close', common.mustCall());
}));
}

Expand Down
8 changes: 6 additions & 2 deletions test/parallel/test-http2-goaway-opaquedata.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,24 @@ const http2 = require('http2');

const server = http2.createServer();
const data = Buffer.from([0x1, 0x2, 0x3, 0x4, 0x5]);
let session;

server.on('stream', common.mustCall((stream) => {
stream.session.goaway(0, 0, data);
session = stream.session;
session.on('close', common.mustCall());
session.goaway(0, 0, data);
stream.respond();
stream.end();
}));
server.on('close', common.mustCall());

server.listen(0, () => {

const client = http2.connect(`http://localhost:${server.address().port}`);
client.once('goaway', common.mustCall((code, lastStreamID, buf) => {
assert.deepStrictEqual(code, 0);
assert.deepStrictEqual(lastStreamID, 1);
assert.deepStrictEqual(data, buf);
session.close();
server.close();
}));
const req = client.request();
Expand Down
9 changes: 7 additions & 2 deletions test/parallel/test-http2-ping-settings-heapdump.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ for (const variant of ['ping', 'settings']) {
}));

server.listen(0, common.mustCall(() => {
http2.connect(`http://localhost:${server.address().port}`,
common.mustCall());
const client = http2.connect(`http://localhost:${server.address().port}`,
common.mustCall());
client.on('error', (err) => {
// We destroy the session so it's possible to get ECONNRESET here.
if (err.code !== 'ECONNRESET')
throw err;
});
}));
}
2 changes: 2 additions & 0 deletions test/parallel/test-http2-server-push-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ server.listen(0, common.mustCall(() => {
assert.strictEqual(headers['x-push-data'], 'pushed by server');
}));
stream.on('aborted', common.mustNotCall());
// We have to read the data of the push stream to end gracefully.
stream.resume();
}));

let data = '';
Expand Down

0 comments on commit ba0682e

Please sign in to comment.