@@ -8,7 +8,8 @@ const proc =
88 }
99const EE = require ( 'events' )
1010const Stream = require ( 'stream' )
11- const SD = require ( 'string_decoder' ) . StringDecoder
11+ const stringdecoder = require ( 'string_decoder' )
12+ const SD = stringdecoder . StringDecoder
1213
1314const EOF = Symbol ( 'EOF' )
1415const MAYBE_EMIT_END = Symbol ( 'maybeEmitEnd' )
@@ -38,6 +39,9 @@ const EMITDATA = Symbol('emitData')
3839const EMITEND = Symbol ( 'emitEnd' )
3940const EMITEND2 = Symbol ( 'emitEnd2' )
4041const ASYNC = Symbol ( 'async' )
42+ const ABORT = Symbol ( 'abort' )
43+ const ABORTED = Symbol ( 'aborted' )
44+ const SIGNAL = Symbol ( 'signal' )
4145
4246const defer = fn => Promise . resolve ( ) . then ( fn )
4347
@@ -93,7 +97,7 @@ class PipeProxyErrors extends Pipe {
9397 }
9498}
9599
96- module . exports = class Minipass extends Stream {
100+ class Minipass extends Stream {
97101 constructor ( options ) {
98102 super ( )
99103 this [ FLOWING ] = false
@@ -122,6 +126,14 @@ module.exports = class Minipass extends Stream {
122126 if ( options && options . debugExposePipes === true ) {
123127 Object . defineProperty ( this , 'pipes' , { get : ( ) => this [ PIPES ] } )
124128 }
129+ this [ SIGNAL ] = options && options . signal
130+ this [ ABORTED ] = false
131+ if ( this [ SIGNAL ] ) {
132+ this [ SIGNAL ] . addEventListener ( 'abort' , ( ) => this [ ABORT ] ( ) )
133+ if ( this [ SIGNAL ] . aborted ) {
134+ this [ ABORT ] ( )
135+ }
136+ }
125137 }
126138
127139 get bufferLength ( ) {
@@ -168,7 +180,20 @@ module.exports = class Minipass extends Stream {
168180 this [ ASYNC ] = this [ ASYNC ] || ! ! a
169181 }
170182
183+ // drop everything and get out of the flow completely
184+ [ ABORT ] ( ) {
185+ this [ ABORTED ] = true
186+ this . emit ( 'abort' , this [ SIGNAL ] . reason )
187+ this . destroy ( this [ SIGNAL ] . reason )
188+ }
189+
190+ get aborted ( ) {
191+ return this [ ABORTED ]
192+ }
193+ set aborted ( _ ) { }
194+
171195 write ( chunk , encoding , cb ) {
196+ if ( this [ ABORTED ] ) return false
172197 if ( this [ EOF ] ) throw new Error ( 'write after end' )
173198
174199 if ( this [ DESTROYED ] ) {
@@ -342,21 +367,20 @@ module.exports = class Minipass extends Stream {
342367 }
343368
344369 [ BUFFERSHIFT ] ( ) {
345- if ( this [ BUFFER ] . length ) {
346- if ( this [ OBJECTMODE ] ) this [ BUFFERLENGTH ] -= 1
347- else this [ BUFFERLENGTH ] -= this [ BUFFER ] [ 0 ] . length
348- }
370+ if ( this [ OBJECTMODE ] ) this [ BUFFERLENGTH ] -= 1
371+ else this [ BUFFERLENGTH ] -= this [ BUFFER ] [ 0 ] . length
349372 return this [ BUFFER ] . shift ( )
350373 }
351374
352375 [ FLUSH ] ( noDrain ) {
353- do { } while ( this [ FLUSHCHUNK ] ( this [ BUFFERSHIFT ] ( ) ) )
376+ do { } while ( this [ FLUSHCHUNK ] ( this [ BUFFERSHIFT ] ( ) ) && this [ BUFFER ] . length )
354377
355378 if ( ! noDrain && ! this [ BUFFER ] . length && ! this [ EOF ] ) this . emit ( 'drain' )
356379 }
357380
358381 [ FLUSHCHUNK ] ( chunk ) {
359- return chunk ? ( this . emit ( 'data' , chunk ) , this . flowing ) : false
382+ this . emit ( 'data' , chunk )
383+ return this . flowing
360384 }
361385
362386 pipe ( dest , opts ) {
@@ -437,7 +461,7 @@ module.exports = class Minipass extends Stream {
437461 if ( ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this [ DESTROYED ] )
438462 return
439463 else if ( ev === 'data' ) {
440- return ! data
464+ return ! this [ OBJECTMODE ] && ! data
441465 ? false
442466 : this [ ASYNC ]
443467 ? defer ( ( ) => this [ EMITDATA ] ( data ) )
@@ -454,7 +478,10 @@ module.exports = class Minipass extends Stream {
454478 } else if ( ev === 'error' ) {
455479 this [ EMITTED_ERROR ] = data
456480 super . emit ( ERROR , data )
457- const ret = super . emit ( 'error' , data )
481+ const ret =
482+ ! this [ SIGNAL ] || this . listeners ( 'error' ) . length
483+ ? super . emit ( 'error' , data )
484+ : false
458485 this [ MAYBE_EMIT_END ] ( )
459486 return ret
460487 } else if ( ev === 'resume' ) {
@@ -659,8 +686,12 @@ module.exports = class Minipass extends Stream {
659686 ( s instanceof Minipass ||
660687 s instanceof Stream ||
661688 ( s instanceof EE &&
662- ( typeof s . pipe === 'function' || // readable
663- ( typeof s . write === 'function' && typeof s . end === 'function' ) ) ) ) // writable
689+ // readable
690+ ( typeof s . pipe === 'function' ||
691+ // writable
692+ ( typeof s . write === 'function' && typeof s . end === 'function' ) ) ) )
664693 )
665694 }
666695}
696+
697+ module . exports = Minipass
0 commit comments