File tree 2 files changed +89
-0
lines changed
2 files changed +89
-0
lines changed Original file line number Diff line number Diff line change
1
+ 'use strict' ;
2
+
3
+ const pipeline = require ( 'internal/streams/pipeline' ) ;
4
+ const Duplex = require ( 'internal/streams/duplex' ) ;
5
+
6
+ module . exports = function pipe ( ...streams ) {
7
+ let onclose ;
8
+ let ret ;
9
+
10
+ const r = pipeline ( streams , function ( err ) {
11
+ if ( onclose ) {
12
+ onclose ( err ) ;
13
+ } else {
14
+ ret . destroy ( err ) ;
15
+ }
16
+ } ) ;
17
+ const w = streams [ 0 ] ;
18
+
19
+ const writable = w . writable ;
20
+ const readable = r . readable ;
21
+ const objectMode = w . readableObjectMode ;
22
+
23
+ ret = new Duplex ( {
24
+ writable,
25
+ readable,
26
+ objectMode,
27
+ highWaterMark : 1
28
+ } ) ;
29
+
30
+ if ( writable ) {
31
+ let ondrain ;
32
+ let onfinish ;
33
+
34
+ ret . _write = function ( chunk , encoding , callback ) {
35
+ if ( w . write ( chunk , encoding ) ) {
36
+ callback ( ) ;
37
+ } else {
38
+ ondrain = callback ;
39
+ }
40
+ } ;
41
+
42
+ ret . _final = function ( chunk , encoding , callback ) {
43
+ w . end ( chunk , encoding ) ;
44
+ onfinish = callback ;
45
+ } ;
46
+
47
+ ret . on ( 'drain' , function ( ) {
48
+ if ( ondrain ) {
49
+ const cb = ondrain ;
50
+ ondrain = null ;
51
+ cb ( ) ;
52
+ }
53
+ } ) ;
54
+
55
+ ret . on ( 'finish' , function ( ) {
56
+ if ( onfinish ) {
57
+ const cb = onfinish
58
+ onfinish = null ;
59
+ cb ( ) ;
60
+ }
61
+ } ) ;
62
+ }
63
+
64
+ if ( readable ) {
65
+ const read = function ( ) {
66
+ while ( true ) {
67
+ const buf = r . read ( ) ;
68
+
69
+ if ( buf === null ) {
70
+ r . once ( 'readable' , read ) ;
71
+ return ;
72
+ }
73
+
74
+ if ( ! ret . push ( buf ) ) {
75
+ return ;
76
+ }
77
+ }
78
+ }
79
+
80
+ ret . _read = read ;
81
+ }
82
+
83
+ ret . _destroy = function ( err , callback ) {
84
+ onclose = callback ;
85
+ streams [ 0 ] . destroy ( err ) ;
86
+ } ;
87
+ }
Original file line number Diff line number Diff line change @@ -30,6 +30,7 @@ const {
30
30
} = require ( 'internal/util' ) ;
31
31
32
32
const pipeline = require ( 'internal/streams/pipeline' ) ;
33
+ const pipelinify = require ( 'internal/streams/pipelinify' ) ;
33
34
const eos = require ( 'internal/streams/end-of-stream' ) ;
34
35
const internalBuffer = require ( 'internal/buffer' ) ;
35
36
@@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
42
43
Stream . Transform = require ( 'internal/streams/transform' ) ;
43
44
Stream . PassThrough = require ( 'internal/streams/passthrough' ) ;
44
45
Stream . pipeline = pipeline ;
46
+ Stream . pipelinify = pipelinify ;
45
47
const { addAbortSignal } = require ( 'internal/streams/add-abort-signal' ) ;
46
48
Stream . addAbortSignal = addAbortSignal ;
47
49
Stream . finished = eos ;
You can’t perform that action at this time.
0 commit comments