diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index bce236cb4c061a5d8451384b8962fbfd438850e8..223a328eb97a59de0115079c72622b0d859b774b 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -16,6 +16,7 @@ type AnyType = Object | null | undefined | unknown; declare function requireNapi(napiModuleName: string): AnyType; // @ts-ignore const { TextEncoder, StringDecoder } = requireNapi('util'); +const Buffer:any = requireNapi('buffer') const DEFAULT_HIGH_WATER_MARK = 16 * 1024; const DEFAULT_ENCODING = 'utf-8'; @@ -69,13 +70,15 @@ class EventEmitter { emit(event: string, ...args: AnyType[]): void { if (this.handlers[event]) { - this.handlers[event].forEach((item: any) => { + let handler = [...this.handlers[event]]; + let handlerLen = this.handlers[event].length; + for(let i = 0; i < handlerLen; i++) { if (args.length > 0) { - item({ data: args[0] }); + handler[i]({ data: args[0] }); } else { - item({}); + handler[i](); } - }); + } } } @@ -87,7 +90,89 @@ class EventEmitter { listenerCount(event: string): number { return this.handlers[event]?.length || 0; } + + once(event: string, callback: Function): EventEmitter { + if (typeof callback !== 'function') { + throw new BusinessError(`Parameter error. The type of callback must be string or Function`, 401); + } + this.on(event, _onceWrap(this, event, callback)); + return this; + } +} + +function processErrOrClose(stream: Readable, err: Error, sync: boolean = false):void { + // If the readable stream is disabled, return directly + if (stream.closed) { + return; + } + + if (err) { + stream.errored = true; + stream.closed = true; + if (sync) { + Promise.resolve().then(()=>{ + emitError(stream, err); + emitClose(stream); + }) + } else { + emitError(stream, err); + emitClose(stream); + } + } +} + +function emitError(stream: Readable, err: Error) { + if(stream.errorEmitted) { + return; + } + stream.errorEmitted = true; + stream.listener?.emit(ReadableEvent.ERROR, err); } + + +function emitClose(stream: Readable) { + if(stream.closedEmitted) { + return; + } + stream.closedEmitted = true; + stream.listener?.emit(ReadableEvent.CLOSE); +} +function onceWrapper(this: OnceState) { + if (!this.fired) { + this.target.off(this.event, this.wrapFn!); + this.fired = true; + + if (arguments.length === 0) { + return this.callback.call(this.target); + } + return this.callback.apply(this.target, arguments as any); + } +} + +interface OnceState { + fired: boolean; + wrapFn: Function | undefined; + target: EventEmitter; + event: string; + callback: Function; +} + +function _onceWrap(target: EventEmitter, event: string, callback: Function):Function{ + const state: OnceState = { + fired: false, + wrapFn: undefined, + target, + event, + callback + }; + + const wrapped = onceWrapper.bind(state) as Function; + (wrapped as any).callback = callback; + state.wrapFn = wrapped; + + return wrapped +} + function runOnce(runFn: Function, callback?: (multipleTimes: boolean, error: Error) => void): Function { let executed = false; return function (...args: Function[]) { @@ -104,11 +189,24 @@ function runOnce(runFn: Function, callback?: (multipleTimes: boolean, error: Err } }; } + function asyncFn(asyncFn: Function) { return function (...args: Function[]): void { setTimeout(() => asyncFn(...args)); }; } + +function advanceListener(stream:Writable, event: WritableEvent, callback:Function) { + if (!stream.listener?.handlers || !stream.listener.handlers[event]) { + stream.on(event, callback); + } else if (Array.isArray(stream.listener.handlers[event])) { + const callbackFn = callback.bind(stream); + stream.listener.handlers[event].unshift(callbackFn); + } else { + const callbackFn = callback.bind(stream); + stream.listener.handlers[event] = [callbackFn, ...stream.listener.handlers[event]]; + } +} enum ReadableEvent { CLOSE = 'close', DATA = 'data', @@ -134,24 +232,42 @@ interface ReadablePipeStream { endCallback: Function; } class Readable { - private buf: Array; - private listener: EventEmitter | undefined; + private buf: Array; + private bufIndex: number; + private awaitDrainWriters: Set; + public listener: EventEmitter | undefined; private callbacks: { [key: string]: Function[] } = {}; protected encoder = new TextEncoder(); - protected stringDecoder = new StringDecoder(); + protected stringDecoderInner = new StringDecoder(); + private readableDecoder: boolean = false; private isInitialized: boolean = false; private pauseInner: boolean; - private pipeWritableArrayInner: ReadablePipeStream[] = []; - private readableObjectModeInner: boolean | undefined; + private readableHasPasued: boolean = false; + private pipeWritableArrayInner: Array = []; + private readableObjectModeInner: boolean = false; private readableInner: boolean; private readableHighWatermarkInner: number; - private readableFlowingInner: boolean; + private readableFlowingInner: boolean | null = null; private readableLengthInner: number; - private readableEncodingInner: string; + private readableEncodingInner: string | null = null; private readableEndedInner: boolean; - private erroredInner: Error | undefined; + private erroredInner: boolean = false; + private erroredValueInner: Error | undefined; private closedInner: boolean | undefined; - + private dataListenning: boolean | undefined; + private dataEmmited: boolean = false; + private readableListenning: boolean | undefined; + private readableHasFlowingInner: boolean = false; + private readableEndEmitted: boolean = false; + private readableNeedReadable: boolean | undefined = false; + private readableEmittedReadable: boolean = false; + private isReading: boolean | undefined; + private readableEmittedResume: boolean = false; + private readableSync: boolean = true; + private isReadingMore: boolean = false; + private multiAwaitDrain: boolean = false; + public errorEmitted: boolean = false; + public closedEmitted: boolean = false; /** * The Readable constructor. * @@ -162,29 +278,32 @@ class Readable { * @since 12 */ constructor(options?: { encoding: string | null; highWatermark?: number; doRead?: (size: number) => void }) { - this.readableEncodingInner = options?.encoding || DEFAULT_ENCODING; this.readableHighWatermarkInner = options?.highWatermark || DEFAULT_HIGH_WATER_MARK; this.readableObjectModeInner = false; this.readableLengthInner = 0; this.pauseInner = false; - this.readableFlowingInner = true; this.readableInner = true; this.readableEndedInner = false; + this.dataListenning = false; this.listener = new EventEmitter(); this.buf = []; + this.bufIndex = 0; + this.awaitDrainWriters = new Set(); if (arguments.length !== 0) { if (typeof options?.doRead === 'function') { this.doRead = options.doRead; } - if (this.readableEncodingInner.toLowerCase() === 'utf8') { - this.readableEncodingInner = DEFAULT_ENCODING; - } - if (ENCODING_SET.indexOf(this.readableEncodingInner.toLowerCase()) === -1) { - let error = new BusinessError('Parameter error. Incorrect parameter types.', 401); - throw error; + if (options?.encoding) { + this.readableEncodingInner = options.encoding; + if (this.readableEncodingInner.toLowerCase() === 'utf8') { + this.readableEncodingInner = DEFAULT_ENCODING; + } + if (ENCODING_SET.indexOf(this.readableEncodingInner.toLowerCase()) === -1) { + let error = new BusinessError('Parameter error. Incorrect parameter types.', 401); + throw error; + } + this.stringDecoder = new StringDecoder(this.readableEncodingInner) } - this.stringDecoder = new StringDecoder(this.readableEncodingInner); - this.encoder = new TextEncoder(this.readableEncodingInner); } } @@ -209,7 +328,7 @@ class Readable { */ get readable(): boolean { if (this.readableInner) { - return true; + return this.readableInner && !this.errorEmitted && !this.readableEndEmitted; } else if (!this.readableInner && this.readableEndedInner) { return false; } @@ -234,7 +353,7 @@ class Readable { * @crossplatform * @since 12 */ - get readableFlowing(): boolean { + get readableFlowing(): boolean | null { return this.readableFlowingInner; } @@ -269,7 +388,7 @@ class Readable { * @since 12 */ get readableEnded(): boolean { - return this.readableEndedInner; + return this.readableEndEmitted; } /** @@ -279,10 +398,13 @@ class Readable { * @crossplatform * @since 12 */ - get errored(): Error | undefined { + get errored(): boolean { return this.erroredInner; } + set errored(value: boolean) { + this.erroredInner = value; + } /** * Readable completes destroyfile and returns true, otherwise returns false. * @@ -293,6 +415,21 @@ class Readable { get closed(): boolean { return this.closedInner || false; } + set closed(value: boolean) { + this.closedInner = value; + } + get stringDecoder(): any { + return this.readableDecoder ? this.stringDecoderInner : null; + } + + set stringDecoder(value) { + if (value) { + this.stringDecoderInner = value; + this.readableDecoder = true; + } else { + this.readableDecoder = false; + } + } private computeNewReadableHighWatermark(readSize: number): number { readSize--; @@ -305,6 +442,308 @@ class Readable { return readSize; } + private flowInner(stream: Readable) { + while (this.readableFlowing && stream.read() !== null); + } + + private emitReadableNextCycle(stream: Readable) { + stream.read(0) + } + + private emitReadableNow(stream: Readable) { + stream.readableNeedReadable = false; + if (!stream.readableEmittedReadable) { + stream.readableEmittedReadable = true; + Promise.resolve().then(()=>{ + stream.emitReadableInner(stream); + }) + } + } + + private emitReadableEnd(stream: Readable) { + if (!stream.readableEndEmitted) { + stream.readableEndedInner = true; + Promise.resolve().then(()=>{ + this.emitReadableEndNextCycle(stream) + }) + } + } + + private emitReadableEndNextCycle(stream: Readable) { + if (!stream.erroredInner && !stream.closedInner + && !stream.readableEndEmitted && stream.readableLengthInner === 0) { + stream.readableEndEmitted = true; + stream.listener?.emit(ReadableEvent.END); + } + } + + private emitReadableInner(stream: Readable) { + if (!stream.erroredInner && (stream.readableLengthInner || stream.readableEndedInner)) { + stream.listener?.emit(ReadableEvent.READABLE); + stream.readableEmittedReadable = false; + } + if (!stream.readableFlowingInner && !stream.readableEndedInner + && stream.readableLengthInner <= stream.readableHighWatermarkInner) { + stream.readableNeedReadable = true; + } else { + stream.readableNeedReadable = false; + } + stream.flowInner(stream) + } + + private resumeInner(stream: Readable) { + if (!this.isReading) { + stream.read(0); + } + stream.readableEmittedResume = false; + stream.listener?.emit(ReadableEvent.RESUME); + this.flowInner(stream); + if (stream.readableFlowingInner && !stream.isReading) { + stream.read(0); + } + } + + private endOfFlow(stream: Readable) { + if (stream.readableEndedInner) { + return; + } + const stringDecoder = stream.readableDecoder ? stream.stringDecoderInner :null; + if (stringDecoder) { + const chunk = stringDecoder.end(); + if(chunk?.length) { + stream.buf.push(chunk); + stream.readableLengthInner += stream.readableObjectModeInner ? 1 : chunk.length; + } + } + stream.readableEndedInner = true; + + if (stream.readableSync) { + stream.emitReadableNow(stream); + } else { + stream.readableNeedReadable = false; + stream.readableEmittedReadable = true; + stream.emitReadableInner(stream); + } + } + + private canBeReadMore(stream: Readable) { + function canBeReadMoreInner() { + while ((!stream.isReading && !stream.readableEndedInner) && + (stream.readableLengthInner < stream.readableHighWatermarkInner + || (stream.readableFlowingInner && stream.readableLengthInner === 0))) { + const preLen = stream.readableLengthInner; + stream.read(0); + if (preLen === stream.readableLengthInner) { + break; + } + stream.isReadingMore = false; + } + } + + if (!stream.isReadingMore) { + stream.isReadingMore = true; + Promise.resolve().then(()=>{ + canBeReadMoreInner(); + }) + } + } + + private bufHasSpace(stream: Readable) { + return !stream.readableEndedInner && + (stream.readableLengthInner < stream.readableHighWatermarkInner || + stream.readableLengthInner === 0) + } + + pushChunkInner(stream: Readable, chunk: any) { + if (stream.readableFlowingInner && stream.dataListenning + && !stream.readableSync && stream.readableLengthInner === 0) { + this.awaitDrainWriters.clear(); + stream.dataEmmited = true; + stream.listener?.emit(ReadableEvent.DATA, chunk) + } else { + if (chunk?.length) { + stream.readableLengthInner += chunk.length; + stream.buf.push(chunk); + } + if (stream.readableNeedReadable) { + this.emitReadableNow(stream); + } + } + stream.canBeReadMore(stream); + } + + private pushByteModeChunk(stream: Readable, chunk: any, encoding?: string): boolean { + if (chunk === null) { + stream.isReading = false; + stream.endOfFlow(stream); + return false; + } + + if (typeof chunk === 'string') { + if (!encoding) { + encoding ||= DEFAULT_ENCODING; + } + if (stream.readableEncodingInner !== encoding) { + chunk = Buffer.from(chunk, encoding); + encoding = ''; + } + } else if (chunk instanceof Uint8Array) { + chunk = Buffer.from(chunk); + encoding = ''; + } else if (chunk !== undefined) { + stream.erroredValueInner = new BusinessError('ERR_INVALID_ARG_TYPE'); + processErrOrClose(stream, stream.erroredValueInner, false); + return false; + } + + if (!chunk || chunk.length < 0) { + stream.isReading = false; + stream.canBeReadMore(stream); + return stream.bufHasSpace(stream); + } + + if (stream.readableEndedInner) { + let error = new BusinessError('write after end'); + processErrOrClose(stream, error); + return false; + } + + if (stream.erroredInner) { + return false; + } + + stream.isReading = false; + if (stream.readableDecoder && !encoding) { + chunk = stream.stringDecoderInner.write(new Uint8Array(chunk.buffer)); + if (chunk?.length === 0) { + stream.canBeReadMore(stream); + return stream.bufHasSpace(stream); + } + } + stream.pushChunkInner(stream, chunk); + return stream.bufHasSpace(stream); + } + + private calculateReadSize(size: number): number { + if (size <= 0 || (this.readableLengthInner === 0 && this.readableEndedInner)) { + return 0; + } + + if (this.readableObjectModeInner) { + return 1; + } + + if (Number.isNaN(size)) { + if(this.readableFlowingInner && this.readableLengthInner) { + return this.buf[this.bufIndex].length; + } + return this.readableLengthInner; + } + + if (size <= this.readableLengthInner) { + return size; + } + return this.readableEndedInner ? this.readableLengthInner : 0; + } + + private readDataFromBuf(size: number): any{ + if (this.readableLengthInner === 0) { + return null; + } + + let num = this.bufIndex; + let res; + + const buf = this.buf; + const len = buf.length; + + if (this.readableObjectModeInner) { + res = buf[num]; + buf[num++] = null; + } else if (!size || size >= this.readableLengthInner) { + if (this.readableDecoder) { + res = ''; + while (num < len) { + res += buf[num]; + buf[num++] = null; + } + } else if (len - num === 0) { + res = Buffer.alloc(0); + } else if (len - num === 1) { + res = buf[num]; + buf[num++] = null; + } else { + res = Buffer.allocUninitializedFromPool(this.readableLengthInner); + let index = 0; + while (num < len) { + buf[num].copy(res, index, 0 ,buf[num].length); + index += buf[num].length; + buf[num++] = null; + } + } + } else if (size < buf[num].length) { + res = buf[num].subarray(0, size); + buf[num] = buf[num].subarray(size); + } else if (size === buf[num].length) { + res = buf[num]; + buf[num++] = null; + } else if (this.readableDecoder) { + res = ''; + while (num < len) { + const str = buf[num]; + if (size > str.length) { + res += str; + size -= str.length; + buf[num++] = null; + } else { + if (size === buf[num].length) { + res += str; + buf[num++] = null; + } else { + res += str.subarray(0, size); + buf[num] = str.subarray(size); + } + break; + } + } + } else { + res = Buffer.allocUninitializedFromPool(size); + + const nRetLen = size; + while (num < len) { + const data = buf[num]; + if (size > data.length) { + data.copy(res, nRetLen - size, 0, data.length); + size -= data.length; + buf[num++] = null; + } else { + if (size === data.length) { + data.copy(res, nRetLen - size, 0, data.length); + buf[num++] = null; + } else { + data.copy(res, nRetLen - size, 0, size); + let RemainBuf = Buffer.allocUninitializedFromPool(data.length - size); + data.copy(RemainBuf, nRetLen - size, 0, size); + buf[num++] = RemainBuf; + } + break; + } + } + } + + if (num === len) { + this.buf.length = 0; + this.bufIndex = 0; + } else if (num > 1024) { + this.buf.splice(0, num); + this.bufIndex = 0; + } else { + this.bufIndex = num; + } + + return res; + } + setEndType(): void { Promise.resolve().then((): void => { this.readableInner = false; @@ -318,59 +757,109 @@ class Readable { * of the specified size is returned. Otherwise, if Readable has ended, all remaining buffers are returned. * * @param { number } size - Expected length of the data to be read. - * @returns { string | null } If no data is available to read, null is returned. + * @returns { any } The return value is of the buffer and string types. * @throws { BusinessError } 401 - if the input parameters are invalid. * @throws { BusinessError } 10200038 - if the doRead method has not been implemented, an error will be thrown. * @syscap SystemCapability.Utils.Lang * @crossplatform * @since 12 */ - read(size?: number): string | null { - if (size && typeof size !== 'number') { - this.throwError(new BusinessError(`Parameter error. The type of ${size} must be object`, 401)); - return null; + read(size?: number): any{ + if (size === undefined) { + size = NaN; + } else if (size && typeof size !== 'number') { + size = parseInt(size, 10); } - if (this.doRead === null && this.readableInner) { - this.readableInner = false; - Promise.resolve().then(() => { - this.closedInner = true; - this.erroredInner = new BusinessError('The doRead() method is not implemented', 10200038); - this.listener?.emit(ReadableEvent.ERROR, this.erroredInner); - this.listener?.emit(ReadableEvent.CLOSE); - }); - return null; - } - size = size ?? this.readableLengthInner; + + const originSize = size; + if (size > this.readableHighWatermarkInner) { this.readableHighWatermarkInner = this.computeNewReadableHighWatermark(size); } - if (size > this.readableLengthInner) { - if (!this.readableFlowingInner) { - return null; - } else { - size = this.readableLengthInner; - } + + if (size !== 0) { + this.readableEmittedReadable = false; } - let buffer = null; - if (size > 0 && size <= this.readableLengthInner) { - this.readableLengthInner -= size; - buffer = this.stringDecoder.write(new Uint8Array(this.buf.splice(0, size))); - this.doRead !== null && this.listener?.emit(ReadableEvent.DATA, buffer); + + if (size === 0 && this.readableNeedReadable && + ((this.readableHighWatermarkInner ? + this.readableLengthInner >= this.readableHighWatermarkInner : + this.readableLengthInner > 0) || this.readableEndedInner)) { + if (this.readableLengthInner === 0 && this.readableEndedInner) { + this.emitReadableEnd(this); + } else { + this.emitReadableNow(this); + } + return null; } - if ((!this.readableInner || size <= -1) && this.readableFlowingInner) { + + size = this.calculateReadSize(size); + + if (size === 0 && this.readableEndedInner) { + if (this.readableLengthInner === 0) { + this.emitReadableEnd(this); + } return null; } - if (this.readableFlowingInner) { + + let needDoRead = this.readableNeedReadable; + + if (this.readableLengthInner === 0 || this.readableLengthInner - size < this.readableHighWatermarkInner) { + needDoRead = true; + } + + if (this.isReading || this.readableEndedInner || this.erroredInner) { + needDoRead = false; + } else if (needDoRead) { + this.isReading = true; + this.readableSync = true; + if (this.readableLengthInner === 0) { + this.readableNeedReadable = true; + } + try { this.doRead(this.readableHighWatermarkInner); } catch (error) { - this.readableInner = false; - this.readableEndedInner = true; - this.listener?.emit(ReadableEvent.ERROR, error); - this.listener?.emit(ReadableEvent.CLOSE); + processErrOrClose(this, error as Error, false) + } + this.readableSync = false; + + if (!this.isReading) { + size = this.calculateReadSize(originSize); } } - return buffer; + + let res; + if (size > 0) { + res = this.readDataFromBuf(size); + } else { + res = null; + } + + if (res === null) { + this.readableNeedReadable = this.readableLengthInner <= this.readableHighWatermarkInner ? true : false; + size = 0; + } else { + this.readableLengthInner -= size; + this.awaitDrainWriters.clear(); + } + + if (this.readableLengthInner === 0) { + if (!this.readableEndedInner) { + this.readableNeedReadable = true; + } + + if (originSize !== size && this.readableEndedInner) { + this.emitReadableEnd(this); + } + } + + if (res !== null && !this.erroredInner && !this.closedInner) { + this.dataEmmited = true; + this.listener?.emit(ReadableEvent.DATA, res); + } + + return res; }; /** @@ -381,14 +870,18 @@ class Readable { * @since 12 */ resume(): Readable { - if (this.readableLengthInner === 0) { - Promise.resolve().then((): void => { - this.read(this.readableHighWatermarkInner < this.readableLengthInner ? -1 : this.readableLengthInner); - }); + if (!this.readableFlowingInner) { + this.readableHasFlowingInner = true; + this.readableFlowingInner = this.readableListenning ? false : true; + if (!this.readableEmittedResume) { + this.readableEmittedResume = true; + Promise.resolve().then(()=>{ + this.resumeInner(this); + }) + } } + this.readableHasPasued = true; this.pauseInner = false; - this.readableFlowingInner = true; - this.listener?.emit(ReadableEvent.RESUME); return this; } @@ -400,11 +893,16 @@ class Readable { * @since 12 */ pause(): Readable { - this.pauseInner = true; - Promise.resolve().then((): void => { + // 1、 未初始化状态添加data事件【即this.readableHasFlowingInner为false】,自动触发resume + // 2、 显示暂停后【即this.readableHasFlowingInner && this.readableFlowingInner】添加data事件不会自动恢复流动,必须显示调用resume事件 + // 3、 即使流处于暂停状态,添加data事件 + if (!this.readableHasFlowingInner || (this.readableHasFlowingInner && this.readableFlowingInner)) { + this.readableHasFlowingInner = true; this.readableFlowingInner = false; - }); - this.listener?.emit(ReadableEvent.PAUSE); + this.listener?.emit(ReadableEvent.PAUSE); + } + this.pauseInner = true; + this.readableHasPasued = true; return this; } @@ -412,45 +910,47 @@ class Readable { * Sets the encoding format of the input binary data. Default: utf8. * * @param { string } [encoding] - Original Data Encoding Type. - * @returns { boolean } Setting successful returns true, setting failed returns false. + * @returns { Readable } Returns the Readable object. * @throws { BusinessError } 401 - if the input parameters are invalid. * @syscap SystemCapability.Utils.Lang * @crossplatform * @since 12 */ - setEncoding(encoding?: string): boolean { - if(this.readableEncodingInner === encoding) { - return true; - } + setEncoding(encoding?: string): Readable { if (!encoding) { - this.readableEncodingInner = DEFAULT_ENCODING; - this.encoder = new TextEncoder(this.readableEncodingInner); - this.stringDecoder = new StringDecoder(this.readableEncodingInner); - return false; + encoding = DEFAULT_ENCODING; } if (encoding.toLowerCase() === 'utf8') { encoding = 'utf-8'; } - if (this.buf.length !== 0) { - console.error('stream: The buffer also has data, and encoding is not allowed'); - return false; - } let encodingLowCase = encoding.toLowerCase(); if (ENCODING_SET.indexOf(encodingLowCase) !== -1) { try { - this.encoder = new TextEncoder(encoding); this.stringDecoder = new StringDecoder(encoding); this.readableEncodingInner = encodingLowCase; + // Iterate over current buffer to convert already stored Buffers: + let currentBuffer = ""; + for (let i = 0; i < this.buf.length; i++) { + if (typeof this.buf[i] !== 'string' && !(this.buf[i] instanceof Uint8Array)) { + this.buf[i] = new Uint8Array(this.buf[i].buffer); + } + currentBuffer += this.stringDecoder.write(this.buf[i]); + } + this.buf.length = 0; + this.bufIndex = 0; + + if (currentBuffer != "") { + this.buf.push(currentBuffer); + } + this.readableLengthInner = currentBuffer.length; } catch (e) { this.throwError(e as Error); - return false; } - return true; } else { const err: BusinessError = new BusinessError(`Parameter error. The type of ${encoding} must be string.`); this.throwError(err); - return false; } + return this; } /** @@ -462,7 +962,7 @@ class Readable { * @since 12 */ isPaused(): boolean { - return this.pauseInner; + return this.pauseInner || (this.readableHasFlowingInner && !this.readableFlowingInner); } /** @@ -477,36 +977,123 @@ class Readable { * @since 12 */ pipe(destination: Writable, option?: Object): Writable { - this.pauseInner = false; - const obj: ReadablePipeStream = { - write: destination, - dataCallback: (data: { data: string | Uint8Array }): void => { - destination.write(data.data); - if ((destination.writableLength || 0) > (destination.writableHighWatermark || DEFAULT_HIGH_WATER_MARK)) { - this.pauseInner = true; - this.readableFlowingInner = false; + const src: Readable = this; + + if (src.pipeWritableArrayInner.length === 1 && !src.multiAwaitDrain) { + src.multiAwaitDrain = true; + } + + src.pipeWritableArrayInner.push(destination); + + // Writeable streams are automatically turned off when readable streams are turned off + const endCallback = onEnd; + if (src.readableEndEmitted) { + Promise.resolve().then(()=>{ + endCallback(); + }) + } else { + src.listener?.once(ReadableEvent.END, endCallback); + } + + destination.on(WritableEvent.UNPIPE, unpipeCallback) + function unpipeCallback(data: any) { + if (data.data.readable === src) { + if (data.data.info && data.data.info.hasUnpiped === false) { + data.data.info.hasUnpiped = true; + cleanup(); } - }, - drainCallback: (): void => { - this.pauseInner = false; - this.readableFlowingInner = true; - this.read(this.readableLengthInner); - }, - endCallback: (): void => { - destination.end(); } - }; - this.pipeWritableArrayInner.push(obj); - this.on(ReadableEvent.DATA, (data: { data: Function }) => { - obj.dataCallback(data); - }); - destination.on('drain', (): void => { - obj.drainCallback(); - }); - this.on(ReadableEvent.END, (): void => { - obj.endCallback(); - }); - destination?.listener?.emit('pipe', this); + } + + function onEnd() { + destination.end(); + } + + let ondrainCallback: undefined | Function; + let cleand:boolean = false; + + function cleanup() { + destination.listener?.off(WritableEvent.CLOSE, closeCallBack); + destination.listener?.off(WritableEvent.FINISH, finishCallback); + destination.listener?.off(WritableEvent.ERROR, errorCallBack); + if (ondrainCallback) { + destination.listener?.off(WritableEvent.DRAIN, ondrainCallback); + } + destination.listener?.off(WritableEvent.UNPIPE, unpipeCallback); + src.listener?.off(ReadableEvent.END, endCallback); + src.listener?.off(ReadableEvent.DATA, dataCallback); + cleand = true; + if(ondrainCallback && src.awaitDrainWriters.size > 0 && destination.writableNeedDrain) { + ondrainCallback(); + } + } + + + function pause() { + if (!cleand) { + if (src.pipeWritableArrayInner.length === 1 && src.pipeWritableArrayInner[0] === destination) { + src.awaitDrainWriters.add(destination); + src.multiAwaitDrain = false; + } else if (src.pipeWritableArrayInner.length > 1) { + const objIdx: number = src.pipeWritableArrayInner.findIndex((value: Writable) => value === destination); + if(objIdx !== -1) { + src.awaitDrainWriters.add(destination); + } + } + src.pause(); + } + if (!ondrainCallback) { + ondrainCallback = pipeOnDrain(src, destination); + destination.on(WritableEvent.DRAIN, ondrainCallback) + } + } + + function pipeOnDrain(src:Readable, dest:Writable) { + return function pipeOnDrainInner() { + src.awaitDrainWriters.delete(dest); + if (src.awaitDrainWriters.size == 0 && src.dataListenning) { + src.resume(); + } + } + } + + function dataCallback(data: { data: any }) { + const writeData = new Uint8Array(data.data.buffer); + const res = destination.write(writeData); + if (res === false) { + pause(); + } + } + + src.on(ReadableEvent.DATA, dataCallback) + + function errorCallBack(){ + unpipe(); + destination.listener?.off(WritableEvent.ERROR, errorCallBack); + } + + advanceListener(destination, WritableEvent.ERROR, errorCallBack); + function closeCallBack() { + destination.listener?.off(WritableEvent.FINISH, finishCallback); + unpipe(); + } + destination.listener?.once(WritableEvent.CLOSE, closeCallBack); + function unpipe() { + src.unpipe(destination); + } + + function finishCallback() { + destination.listener?.off(WritableEvent.CLOSE, closeCallBack); + unpipe(); + } + destination.listener?.once(WritableEvent.FINISH, finishCallback); + destination.listener?.emit(WritableEvent.PIPE, src); + + if (destination.writableNeedDrain) { + pause(); + } else if (!src.readableFlowingInner) { + src.resume(); + } return destination; } @@ -521,15 +1108,35 @@ class Readable { * @since 12 */ unpipe(destination?: Writable): Readable { - const objIdx: number = this.pipeWritableArrayInner.findIndex((value: ReadablePipeStream) => value.write === destination); - if (objIdx !== -1) { - this.readableInner = false; - const obj: ReadablePipeStream = this.pipeWritableArrayInner[objIdx]; - this.listener?.off(ReadableEvent.DATA, obj.dataCallback); - destination?.listener?.off('drain', obj.drainCallback); - this.listener?.off(ReadableEvent.END, obj.endCallback); - destination?.listener?.emit('unpipe', this); + const info = {hasUnpiped: false}; + + if (this.pipeWritableArrayInner.length === 0) { + return this; + } + + if (!destination) { + // remove all pipelines + const destinations = this.pipeWritableArrayInner; + this.pipeWritableArrayInner = []; + this.pause(); + + for (let i = 0; i < destinations.length; i++) { + destinations[i].listener?.emit(WritableEvent.UNPIPE, this, { hasUnpiped: false }); + } + return this; + } + + const objIdx: number = this.pipeWritableArrayInner.findIndex((value: Writable) => value === destination); + + if(objIdx === -1) + return this; + + this.pipeWritableArrayInner.splice(objIdx, 1); + if (this.pipeWritableArrayInner.length === 0) { + this.pause(); } + destination.listener?.emit(WritableEvent.UNPIPE, {readable:this, info}); + return this; } @@ -544,7 +1151,6 @@ class Readable { * @since 12 */ on(event: string, callback: Function): void { - const that = this; if (!this.isInitialized) { this.isInitialized = true; this.doInitialize?.(() => { @@ -554,19 +1160,35 @@ class Readable { const callbackFn = callback; this.callbacks[event].push(callbackFn); this.listener?.on(event, callbackFn); - Promise.resolve().then((): void => { - if (event === ReadableEvent.READABLE) { - this.readableFlowingInner = false; - if (this.readableInner) { - this.doRead?.(this.readableHighWatermarkInner); + if (event === ReadableEvent.DATA) { + this.dataListenning = true; + if(this.listener && this.listener.listenerCount(ReadableEvent.READABLE) > 0) { + this.readableListenning = true; + } else { + this.readableListenning = false; } - } else if (event === ReadableEvent.DATA) { - this.readableFlowingInner = true; - if (!this.pauseInner) { - this.read(); + // 1、 未初始化状态添加data事件【即this.readableHasFlowingInner为false】,自动触发resume + // 2、 显示暂停后【即this.readableHasFlowingInner && this.readableFlowingInner】添加data事件不会自动恢复流动,必须显示调用resume事件 + // 3、 即使流处于暂停状态,添加data事件 + if (!this.readableHasFlowingInner || (this.readableHasFlowingInner && this.readableFlowingInner)) { + this.resume(); } - } - }); + } else if (event === ReadableEvent.READABLE) { + if (!this.readableEndEmitted || !this.readableListenning) { + this.readableHasFlowingInner = true; + this.readableListenning = true; + this.readableNeedReadable = true; + this.readableFlowingInner = false; + this.readableEmittedReadable = false; + if (this.readableLengthInner) { + this.emitReadableNow(this); + } else if (!this.isReading) { + Promise.resolve().then(()=>{ + this.emitReadableNextCycle(this) + }) + } + } + } } /** @@ -597,6 +1219,28 @@ class Readable { } else { this.callbacks[event]?.forEach((it : Function) => this.listener?.off(event, it)); } + + if (event === ReadableEvent.READABLE) { + Promise.resolve().then(() => { + if (this.listener!.listenerCount(ReadableEvent.READABLE) > 0) { + this.readableListenning = true; + } else { + this.readableListenning = false; + } + + if (!this.pauseInner && (this.readableHasPasued || this.readableEmittedResume)) { + this.readableFlowingInner = true; + this.readableHasFlowingInner = true; + } else if (this.dataListenning) { + this.resume(); + } else if (!this.readableListenning) { + this.readableFlowingInner = false; + this.readableHasFlowingInner = false; + } + }) + } else if (event === ReadableEvent.DATA && this.listener?.listenerCount(ReadableEvent.DATA) === 0) { + this.dataListenning = false; + } } /** @@ -640,59 +1284,11 @@ class Readable { * @since 12 */ push(chunk: Uint8Array | string | null, encoding?: string): boolean { - let bufferArr: Uint8Array; - if (encoding) { - this.setEncoding(encoding); - } - if (typeof chunk === 'string' || chunk instanceof Uint8Array) { - if (typeof chunk === 'string') { - bufferArr = this.encoder.encodeInto(chunk); - this.buf.push(...bufferArr); - this.readableLengthInner += bufferArr.length; - } else if (chunk instanceof Uint8Array) { - this.buf.push(...chunk); - this.readableLengthInner += chunk.length; - } - const highWaterMark = this.readableLengthInner <= this.readableHighWatermarkInner; - Promise.resolve().then((): void => { - try { - if (this.readableFlowingInner) { - !this.pauseInner && this.read(highWaterMark ? this.readableLengthInner : -1); - } else { - if (highWaterMark) { - this.doRead?.(this.readableHighWatermarkInner); - } - } - } catch (error) { - this.listener?.emit(ReadableEvent.ERROR, error); - this.listener?.emit(ReadableEvent.CLOSE); - } - this.listener?.emit(ReadableEvent.READABLE); - }); - return this.readableLengthInner < this.readableHighWatermarkInner; - } else if (chunk === null) { - if (!this.readableEndedInner && this.readableInner) { - !this.readableFlowingInner && this.listener?.emit(ReadableEvent.READABLE); - this.readableInner = false; - Promise.resolve().then((): void => { - this.readableEndedInner = true; - this.pauseInner = true; - this.closedInner = true; - this.listener?.emit(ReadableEvent.END); - this.listener?.emit(ReadableEvent.CLOSE); - }); - } - return false; - } else { - this.readableInner = false; - this.erroredInner = new BusinessError('ERR_INVALID_ARG_TYPE'); - this.listener?.emit(ReadableEvent.ERROR, this.erroredInner); - return false; - } + return this.pushByteModeChunk(this, chunk, encoding) }; throwError(error: Error): void { - this.erroredInner = error; + this.erroredValueInner = error; if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { setTimeout((): void => { this.listener?.emit(WritableEvent.ERROR, error);