Skip to content

Commit bcbf459

Browse files
committed
WIP: fastpipe
1 parent 4974b2f commit bcbf459

File tree

5 files changed

+147
-1
lines changed

5 files changed

+147
-1
lines changed

lib/internal/http2/core.js

+4-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ const {
117117
kUpdateTimer,
118118
kHandle,
119119
kSession,
120-
setStreamTimeout
120+
setStreamTimeout,
121+
installFastpipe
121122
} = require('internal/stream_base_commons');
122123
const { kTimeout } = require('internal/timers');
123124
const { isArrayBufferView } = require('internal/util/types');
@@ -2119,6 +2120,8 @@ class Http2Stream extends Duplex {
21192120
}
21202121
}
21212122

2123+
installFastpipe(Http2Stream);
2124+
21222125
function processHeaders(oldHeaders) {
21232126
assertIsObject(oldHeaders, 'headers');
21242127
const headers = Object.create(null);

lib/internal/stream_base_commons.js

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ const {
2525
} = require('internal/timers');
2626
const { isUint8Array } = require('internal/util/types');
2727
const { clearTimeout } = require('timers');
28+
const {
29+
installFastpipe
30+
} = require('internal/stream_base_fastpipe');
2831

2932
const kMaybeDestroy = Symbol('kMaybeDestroy');
3033
const kUpdateTimer = Symbol('kUpdateTimer');
@@ -263,4 +266,5 @@ module.exports = {
263266
kBuffer,
264267
kBufferCb,
265268
kBufferGen
269+
installFastpipe
266270
};

lib/internal/stream_base_fastpipe.js

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
'use strict';
2+
const { Readable, Writable } = require('stream');
3+
const { owner_symbol } = require('internal/async_hooks').symbols;
4+
5+
const kFastpipeSupported = Symbol('kFastpipeSupported');
6+
const kFastpipeWritableBailoutMethods = {};
7+
const kFastpipeWritableBailoutMethodsOriginals = {};
8+
const kFastpipeReadableBailoutMethods = {};
9+
const kFastpipeReadableBailoutMethodsOriginals = {};
10+
11+
for (const name of ['cork', 'uncork', 'write', 'destroy']) {
12+
const original = Writable.prototype[name];
13+
kFastpipeWritableBailoutMethodsOriginals[name] = original;
14+
kFastpipeWritableBailoutMethods[name] = function(...args) {
15+
bailoutFromWritable(this);
16+
return original.call(this, ...args);
17+
};
18+
}
19+
20+
for (const name of [
21+
'pause', 'resume', 'setEncoding', 'unshift', 'push', 'destroy',
22+
'pipe', 'unpipe'
23+
]) {
24+
const original = Readable.prototype[name];
25+
kFastpipeReadableBailoutMethodsOriginals[name] = original;
26+
kFastpipeReadableBailoutMethods[name] = function(...args) {
27+
bailoutFromReadable(this);
28+
return original.call(this, ...args);
29+
};
30+
}
31+
32+
const OriginalPipe = Readable.prototype.pipe;
33+
34+
function installFastpipe(StreamClass) {
35+
StreamClass.prototype[kFastpipeSupported] = true;
36+
StreamClass.prototype.pipe = function(dest, opts) {
37+
// If one of the streams is already part of a pipe, we first need to unpipe
38+
// those streams. This will call the original `.pipe()` function, which
39+
// adds `data` listeners, which prevents a new fastpipe from being formed
40+
// later here.
41+
if (dest._handle && dest._handle.pipeSource)
42+
bailoutFromWritable(dest);
43+
44+
if (!dest[kFastpipeSupported] ||
45+
this.listenerCount('data') > 0 ||
46+
this.listenerCount('readable') > 0 ||
47+
dest.listenerCount('drain') > 0 ||
48+
this._readableState.decoder ||
49+
this._readableState.objectMode ||
50+
(dest._writableState && dest._writableState.objectMode)) {
51+
return OriginalPipe.call(this, dest, opts);
52+
}
53+
54+
if (this.connecting) {
55+
OriginalPipe.call(this, dest, opts);
56+
this.once('connect', () => {
57+
this.unpipe(dest);
58+
this.pipe(dest, opts);
59+
});
60+
return this;
61+
}
62+
63+
if (this._readableState.length > 0) {
64+
const data = this.read();
65+
dest.write(data);
66+
}
67+
68+
if (dest._writableState &&
69+
(dest._writableState.corked || dest._writableState.writing)) {
70+
OriginalPipe.call(this, dest, opts);
71+
dest.once('drain', () => {
72+
this.unpipe(dest);
73+
this.pipe(dest, opts);
74+
});
75+
return this;
76+
}
77+
78+
Object.assign(dest, kFastpipeWritableBailoutMethods);
79+
dest.on('newListener', writableNewListener);
80+
81+
Object.assign(this, kFastpipeReadableBailoutMethods);
82+
this.on('newListener', readableNewListener);
83+
84+
const { internalBinding } = require('internal/bootstrap/loaders');
85+
const { StreamPipe } = internalBinding('stream_pipe');
86+
const pipe = new StreamPipe(this._handle._externalStream,
87+
dest._handle._externalStream);
88+
pipe.onunpipe = onunpipe;
89+
pipe.start();
90+
return this;
91+
};
92+
}
93+
94+
function onunpipe() {
95+
const self = this.source[owner_symbol];
96+
bailoutFromReadable(self);
97+
}
98+
99+
function writableNewListener(name) {
100+
if (name === 'drain')
101+
bailoutFromWritable(this);
102+
}
103+
104+
function readableNewListener(name) {
105+
if (name === 'data' || name === 'readable')
106+
bailoutFromReadable(this);
107+
}
108+
109+
function bailoutFromReadable(self) {
110+
if (!self._handle)
111+
return;
112+
const pipe = self._handle.pipeTarget;
113+
const dest = pipe.sink[owner_symbol];
114+
self.removeListener('newListener', readableNewListener);
115+
Object.assign(self, kFastpipeReadableBailoutMethodsOriginals);
116+
dest.removeListener('newListener', writableNewListener);
117+
Object.assign(dest, kFastpipeWritableBailoutMethodsOriginals);
118+
pipe.unpipe();
119+
pipe.onunpipe = function() {};
120+
if (pipe.pendingWrites() > 0) {
121+
pipe.oncomplete = function() {
122+
OriginalPipe.call(self, dest);
123+
};
124+
} else {
125+
OriginalPipe.call(self, dest);
126+
}
127+
}
128+
129+
function bailoutFromWritable(dest) {
130+
bailoutFromReadable(dest._handle.pipeSource.source[owner_symbol]);
131+
}
132+
133+
module.exports = {
134+
installFastpipe
135+
};

lib/net.js

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const {
7171
kBuffer,
7272
kBufferCb,
7373
kBufferGen
74+
installFastpipe
7475
} = require('internal/stream_base_commons');
7576
const {
7677
codes: {
@@ -779,6 +780,8 @@ Socket.prototype._write = function(data, encoding, cb) {
779780
this._writeGeneric(false, data, encoding, cb);
780781
};
781782

783+
installFastpipe(Socket);
784+
782785

783786
// Legacy alias. Having this is probably being overly cautious, but it doesn't
784787
// really hurt anyone either. This can probably be removed safely if desired.

node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@
195195
'lib/internal/v8_prof_processor.js',
196196
'lib/internal/validators.js',
197197
'lib/internal/stream_base_commons.js',
198+
'lib/internal/stream_base_fastpipe.js',
198199
'lib/internal/vm/source_text_module.js',
199200
'lib/internal/worker.js',
200201
'lib/internal/worker/io.js',

0 commit comments

Comments
 (0)