File tree 2 files changed +101
-0
lines changed
2 files changed +101
-0
lines changed Original file line number Diff line number Diff line change
1
+ 'use strict' ;
2
+
3
+ const pipeline = require ( 'internal/streams/pipeline' ) ;
4
+ const Duplex = require ( 'internal/streams/duplex' ) ;
5
+ const { destroyer } = require ( 'internal/streams/destroy' ) ;
6
+
7
+ module . exports = function pipe ( ...streams ) {
8
+ let ondrain ;
9
+ let onfinish ;
10
+ let onreadable ;
11
+ let onclose ;
12
+ let ret ;
13
+
14
+ const r = pipeline ( streams , function ( err ) {
15
+ if ( onclose ) {
16
+ const cb = onclose ;
17
+ onclose = null ;
18
+ cb ( err ) ;
19
+ } else {
20
+ ret . destroy ( err ) ;
21
+ }
22
+ } ) ;
23
+ const w = streams [ 0 ] ;
24
+
25
+ const writable = w . writable ;
26
+ const readable = r . readable ;
27
+ const objectMode = w . readableObjectMode ;
28
+
29
+ ret = new Duplex ( {
30
+ writable,
31
+ readable,
32
+ objectMode,
33
+ highWaterMark : 1
34
+ } ) ;
35
+
36
+ if ( writable ) {
37
+ ret . _write = function ( chunk , encoding , callback ) {
38
+ if ( w . write ( chunk , encoding ) ) {
39
+ callback ( ) ;
40
+ } else {
41
+ ondrain = callback ;
42
+ }
43
+ } ;
44
+
45
+ ret . _final = function ( callback ) {
46
+ w . end ( ) ;
47
+ onfinish = callback ;
48
+ } ;
49
+
50
+ ret . on ( 'drain' , function ( ) {
51
+ if ( ondrain ) {
52
+ const cb = ondrain ;
53
+ ondrain = null ;
54
+ cb ( ) ;
55
+ }
56
+ } ) ;
57
+
58
+ ret . on ( 'finish' , function ( ) {
59
+ if ( onfinish ) {
60
+ const cb = onfinish ;
61
+ onfinish = null ;
62
+ cb ( ) ;
63
+ }
64
+ } ) ;
65
+ }
66
+
67
+ if ( readable ) {
68
+ r . on ( 'readable' , function ( ) {
69
+ if ( onreadable ) {
70
+ const cb = onreadable ;
71
+ onreadable = null ;
72
+ cb ( ) ;
73
+ }
74
+ } ) ;
75
+
76
+ ret . _read = function ( ) {
77
+ while ( true ) {
78
+ const buf = r . read ( ) ;
79
+
80
+ if ( buf === null ) {
81
+ onreadable = ret . _read ;
82
+ return ;
83
+ }
84
+
85
+ if ( ! ret . push ( buf ) ) {
86
+ return ;
87
+ }
88
+ }
89
+ } ;
90
+ }
91
+
92
+ ret . _destroy = function ( err , callback ) {
93
+ onclose = callback ;
94
+ onreadable = null ;
95
+ ondrain = null ;
96
+ onfinish = null ;
97
+ destroyer ( r , err ) ;
98
+ } ;
99
+ } ;
Original file line number Diff line number Diff line change @@ -30,6 +30,7 @@ const {
30
30
} = require ( 'internal/util' ) ;
31
31
32
32
const pipeline = require ( 'internal/streams/pipeline' ) ;
33
+ const pipelinify = require ( 'internal/streams/pipelinify' ) ;
33
34
const eos = require ( 'internal/streams/end-of-stream' ) ;
34
35
const internalBuffer = require ( 'internal/buffer' ) ;
35
36
@@ -42,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
42
43
Stream . Transform = require ( 'internal/streams/transform' ) ;
43
44
Stream . PassThrough = require ( 'internal/streams/passthrough' ) ;
44
45
Stream . pipeline = pipeline ;
46
+ Stream . pipelinify = pipelinify ;
45
47
const { addAbortSignal } = require ( 'internal/streams/add-abort-signal' ) ;
46
48
Stream . addAbortSignal = addAbortSignal ;
47
49
Stream . finished = eos ;
You can’t perform that action at this time.
0 commit comments