From 0dcc1704cc4ef99f3193affacf8465e760e45eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E7=AC=91=E4=BD=B3?= Date: Tue, 10 Jun 2025 15:06:14 +0800 Subject: [PATCH] Add rewrite stream Issue:https://gitee.com/openharmony/commonlibrary_ets_utils/issues/ICDZWT MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 王笑佳 --- js_util_module/stream/src/formatStream_js.ts | 2209 ++++++++++++++++++ 1 file changed, 2209 insertions(+) create mode 100644 js_util_module/stream/src/formatStream_js.ts diff --git a/js_util_module/stream/src/formatStream_js.ts b/js_util_module/stream/src/formatStream_js.ts new file mode 100644 index 00000000..cd44c38a --- /dev/null +++ b/js_util_module/stream/src/formatStream_js.ts @@ -0,0 +1,2209 @@ +/* + * Copyright (c) 2025 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +type AnyType = Object | null | undefined | unknown; +declare function requireNapi(napiModuleName: string): AnyType; +// @ts-ignore +const { TextEncoder, StringDecoder } = requireNapi('util'); +const Buffer:any = requireNapi('buffer') + +const DEFAULT_HIGH_WATER_MARK = 16 * 1024; +const DEFAULT_ENCODING = 'utf-8'; +const TypeErrorCodeId = 401; +class BusinessError extends Error { + code: number | string; + + constructor(msg: string, code?: number) { + super(msg); + this.name = 'BusinessError'; + this.code = code ? code : TypeErrorCodeId; + } +} +const ERR_DOWRITE_NOT_IMPLEMENTED:BusinessError = new BusinessError('The doWrite() method is not implemented', 10200035); +const ERR_DOWRITEV_NOT_IMPLEMENTED:BusinessError = new BusinessError('The doWritev() method is not implemented', 10200035); +const ERR_MULTIPLE_CALLBACK:BusinessError = new BusinessError('Callback called multiple times', 10200037); +const ERR_STREAM_ALREADY_FINISHED:BusinessError = new BusinessError('stream already finished', 10200036); +const ERR_WRITE_AFTER_END:BusinessError = new BusinessError('write after end', 10200036); +const ENCODING_SET: Array = ['ascii', 'utf-8', 'UTF-8', 'gbk', 'GBK', 'GB2312', 'gb2312', + 'GB18030', 'gb18030', 'ibm866', 'iso-8859-2', 'iso-8859-3', + 'iso-8859-4', 'iso-8859-5', 'iso-8859-6', 'iso-8859-7', + 'iso-8859-8', 'iso-8859-8-i', 'iso-8859-10', 'iso-8859-13', + 'iso-8859-14', 'iso-8859-15', 'koi8-r', 'koi8-u', 'macintosh', + 'windows-874', 'windows-1250', 'windows-1251', 'windows-1252', + 'windows-1253', 'windows-1254', 'windows-1255', 'windows-1256', + 'windows-1257', 'windows-1258', 'big5', 'euc-jp', 'iso-2022-jp', + 'shift_jis', 'euc-kr', 'x-mac-cyrillic', 'utf-16be', + 'utf-16le']; +class EventEmitter { + handlers: { [key: string]: Function[] }; + + constructor() { + this.handlers = {}; + } + + on(event: string, callback: Function): void { + if (!this.handlers[event]) { + this.handlers[event] = []; + } + this.handlers[event].push(callback); + } + + off(event: string, callback: Function): void { + if (this.handlers[event]) { + const idx = this.handlers[event].findIndex((value: Function): boolean => value === callback); + if (idx !== -1) { + this.handlers[event].splice(idx, 1); + } + } + } + + emit(event: string, ...args: AnyType[]): void { + if (this.handlers[event]) { + let handler = [...this.handlers[event]]; + let handlerLen = this.handlers[event].length; + for(let i = 0; i < handlerLen; i++) { + if (args.length > 0) { + handler[i]({ data: args[0] }); + } else { + handler[i](); + } + } + } + } + + isOn(event: string): boolean { + const handler:Function[] = this.handlers[event]; + return handler && handler.length > 0; + } + + listenerCount(event: string): number { + return this.handlers[event]?.length || 0; + } + + once(event: string, callback: Function): EventEmitter { + if (typeof callback !== 'function') { + throw new BusinessError(`Parameter error. The type of callback must be string or Function`, 401); + } + this.on(event, _onceWrap(this, event, callback)); + return this; + } +} + +function processErrOrClose(stream: Readable, err: Error, sync: boolean = false):void { + // If the readable stream is disabled, return directly + if (stream.closed) { + return; + } + + if (err) { + stream.errored = true; + stream.closed = true; + if (sync) { + Promise.resolve().then(()=>{ + emitError(stream, err); + emitClose(stream); + }) + } else { + emitError(stream, err); + emitClose(stream); + } + } +} + +function emitError(stream: Readable, err: Error) { + if(stream.errorEmitted) { + return; + } + stream.errorEmitted = true; + stream.listener?.emit(ReadableEvent.ERROR, err); +} + + +function emitClose(stream: Readable) { + if(stream.closedEmitted) { + return; + } + stream.closedEmitted = true; + stream.listener?.emit(ReadableEvent.CLOSE); +} +function onceWrapper(this: OnceState) { + if (!this.fired) { + this.target.off(this.event, this.wrapFn!); + this.fired = true; + + if (arguments.length === 0) { + return this.callback.call(this.target); + } + return this.callback.apply(this.target, arguments as any); + } +} + +interface OnceState { + fired: boolean; + wrapFn: Function | undefined; + target: EventEmitter; + event: string; + callback: Function; +} + +function _onceWrap(target: EventEmitter, event: string, callback: Function):Function{ + const state: OnceState = { + fired: false, + wrapFn: undefined, + target, + event, + callback + }; + + const wrapped = onceWrapper.bind(state) as Function; + (wrapped as any).callback = callback; + state.wrapFn = wrapped; + + return wrapped +} + +function runOnce(runFn: Function, callback?: (multipleTimes: boolean, error: Error) => void): Function { + let executed = false; + return function (...args: Function[]) { + if (!executed) { + executed = true; + return runFn(...args); + } else { + if (callback) { + Promise.resolve().then(():void => { + // @ts-ignore + callback(); + }); + } + } + }; +} + +function asyncFn(asyncFn: Function) { + return function (...args: Function[]): void { + setTimeout(() => asyncFn(...args)); + }; +} + +function advanceListener(stream:Writable, event: WritableEvent, callback:Function) { + if (!stream.listener?.handlers || !stream.listener.handlers[event]) { + stream.on(event, callback); + } else if (Array.isArray(stream.listener.handlers[event])) { + const callbackFn = callback.bind(stream); + stream.listener.handlers[event].unshift(callbackFn); + } else { + const callbackFn = callback.bind(stream); + stream.listener.handlers[event] = [callbackFn, ...stream.listener.handlers[event]]; + } +} +enum ReadableEvent { + CLOSE = 'close', + DATA = 'data', + END = 'end', + ERROR = 'error', + READABLE = 'readable', + PAUSE = 'pause', + RESUME = 'resume', +} +enum WritableEvent { + CLOSE = 'close', + DRAIN = 'drain', + ERROR = 'error', + FINISH = 'finish', + PIPE = 'pipe', + UNPIPE = 'unpipe', +} + +interface ReadablePipeStream { + write: Writable; + dataCallback: Function; + drainCallback: Function; + endCallback: Function; +} +class Readable { + private buf: Array; + private bufIndex: number; + private awaitDrainWriters: Set; + public listener: EventEmitter | undefined; + private callbacks: { [key: string]: Function[] } = {}; + protected encoder = new TextEncoder(); + protected stringDecoderInner = new StringDecoder(); + private readableDecoder: boolean = false; + private isInitialized: boolean = false; + private pauseInner: boolean; + private readableHasPasued: boolean = false; + private pipeWritableArrayInner: Array = []; + private readableObjectModeInner: boolean = false; + private readableInner: boolean; + private readableHighWatermarkInner: number; + private readableFlowingInner: boolean | null = null; + private readableLengthInner: number; + private readableEncodingInner: string | null = null; + private readableEndedInner: boolean; + private erroredInner: boolean = false; + private erroredValueInner: Error | undefined; + private closedInner: boolean | undefined; + private dataListenning: boolean | undefined; + private dataEmmited: boolean = false; + private readableListenning: boolean | undefined; + private readableHasFlowingInner: boolean = false; + private readableEndEmitted: boolean = false; + private readableNeedReadable: boolean | undefined = false; + private readableEmittedReadable: boolean = false; + private isReading: boolean | undefined; + private readableEmittedResume: boolean = false; + private readableSync: boolean = true; + private isReadingMore: boolean = false; + private multiAwaitDrain: boolean = false; + public errorEmitted: boolean = false; + public closedEmitted: boolean = false; + /** + * The Readable constructor. + * + * @syscap SystemCapability.Utils.Lang + * @throws { BusinessError } 401 - Parameter error. Possible causes: + * 1.Incorrect parameter types. + * @crossplatform + * @since 12 + */ + constructor(options?: { encoding: string | null; highWatermark?: number; doRead?: (size: number) => void }) { + this.readableHighWatermarkInner = options?.highWatermark || DEFAULT_HIGH_WATER_MARK; + this.readableObjectModeInner = false; + this.readableLengthInner = 0; + this.pauseInner = false; + this.readableInner = true; + this.readableEndedInner = false; + this.dataListenning = false; + this.listener = new EventEmitter(); + this.buf = []; + this.bufIndex = 0; + this.awaitDrainWriters = new Set(); + if (arguments.length !== 0) { + if (typeof options?.doRead === 'function') { + this.doRead = options.doRead; + } + if (options?.encoding) { + this.readableEncodingInner = options.encoding; + if (this.readableEncodingInner.toLowerCase() === 'utf8') { + this.readableEncodingInner = DEFAULT_ENCODING; + } + if (ENCODING_SET.indexOf(this.readableEncodingInner.toLowerCase()) === -1) { + let error = new BusinessError('Parameter error. Incorrect parameter types.', 401); + throw error; + } + this.stringDecoder = new StringDecoder(this.readableEncodingInner) + } + } + } + + /** + * Returns boolean indicating whether it is in ObjectMode. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readableObjectMode(): boolean | undefined { + return this.readableObjectModeInner; + } + + /** + * Is true if it is safe to call readable.read(), which means + * the stream has not been destroyed or emitted 'error' or 'end'. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readable(): boolean { + if (this.readableInner) { + return this.readableInner && !this.errorEmitted && !this.readableEndEmitted; + } else if (!this.readableInner && this.readableEndedInner) { + return false; + } + return true; + } + + /** + * Returns the value of highWatermark passed when creating this Readable. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readableHighWatermark(): number { + return this.readableHighWatermarkInner; + } + + /** + * This property reflects the current state of the readable stream null/true/false. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readableFlowing(): boolean | null { + return this.readableFlowingInner; + } + + /** + * Size of the data that can be read, in bytes or objects. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readableLength(): number { + return this.readableLengthInner; + } + + /** + * Getter for the property encoding of a given Readable stream. The encoding property can be set using the + * readable.setEncoding() method. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readableEncoding(): string | null { + return this.readableEncodingInner; + } + + /** + * Whether all data has been generated. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get readableEnded(): boolean { + return this.readableEndEmitted; + } + + /** + * Returns error if the stream has been destroyed with an error. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get errored(): boolean { + return this.erroredInner; + } + + set errored(value: boolean) { + this.erroredInner = value; + } + /** + * Readable completes destroyfile and returns true, otherwise returns false. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get closed(): boolean { + return this.closedInner || false; + } + set closed(value: boolean) { + this.closedInner = value; + } + get stringDecoder(): any { + return this.readableDecoder ? this.stringDecoderInner : null; + } + + set stringDecoder(value) { + if (value) { + this.stringDecoderInner = value; + this.readableDecoder = true; + } else { + this.readableDecoder = false; + } + } + + private computeNewReadableHighWatermark(readSize: number): number { + readSize--; + readSize |= readSize >>> 1; + readSize |= readSize >>> 2; + readSize |= readSize >>> 4; + readSize |= readSize >>> 8; + readSize |= readSize >>> 16; + readSize++; + return readSize; + } + + private flowInner(stream: Readable) { + while (this.readableFlowing && stream.read() !== null); + } + + private emitReadableNextCycle(stream: Readable) { + stream.read(0) + } + + private emitReadableNow(stream: Readable) { + stream.readableNeedReadable = false; + if (!stream.readableEmittedReadable) { + stream.readableEmittedReadable = true; + Promise.resolve().then(()=>{ + stream.emitReadableInner(stream); + }) + } + } + + private emitReadableEnd(stream: Readable) { + if (!stream.readableEndEmitted) { + stream.readableEndedInner = true; + Promise.resolve().then(()=>{ + this.emitReadableEndNextCycle(stream) + }) + } + } + + private emitReadableEndNextCycle(stream: Readable) { + if (!stream.erroredInner && !stream.closedInner + && !stream.readableEndEmitted && stream.readableLengthInner === 0) { + stream.readableEndEmitted = true; + stream.listener?.emit(ReadableEvent.END); + } + } + + private emitReadableInner(stream: Readable) { + if (!stream.erroredInner && (stream.readableLengthInner || stream.readableEndedInner)) { + stream.listener?.emit(ReadableEvent.READABLE); + stream.readableEmittedReadable = false; + } + if (!stream.readableFlowingInner && !stream.readableEndedInner + && stream.readableLengthInner <= stream.readableHighWatermarkInner) { + stream.readableNeedReadable = true; + } else { + stream.readableNeedReadable = false; + } + stream.flowInner(stream) + } + + private resumeInner(stream: Readable) { + if (!this.isReading) { + stream.read(0); + } + stream.readableEmittedResume = false; + stream.listener?.emit(ReadableEvent.RESUME); + this.flowInner(stream); + if (stream.readableFlowingInner && !stream.isReading) { + stream.read(0); + } + } + + private endOfFlow(stream: Readable) { + if (stream.readableEndedInner) { + return; + } + const stringDecoder = stream.readableDecoder ? stream.stringDecoderInner :null; + if (stringDecoder) { + const chunk = stringDecoder.end(); + if(chunk?.length) { + stream.buf.push(chunk); + stream.readableLengthInner += stream.readableObjectModeInner ? 1 : chunk.length; + } + } + stream.readableEndedInner = true; + + if (stream.readableSync) { + stream.emitReadableNow(stream); + } else { + stream.readableNeedReadable = false; + stream.readableEmittedReadable = true; + stream.emitReadableInner(stream); + } + } + + private canBeReadMore(stream: Readable) { + function canBeReadMoreInner() { + while ((!stream.isReading && !stream.readableEndedInner) && + (stream.readableLengthInner < stream.readableHighWatermarkInner + || (stream.readableFlowingInner && stream.readableLengthInner === 0))) { + const preLen = stream.readableLengthInner; + stream.read(0); + if (preLen === stream.readableLengthInner) { + break; + } + stream.isReadingMore = false; + } + } + + if (!stream.isReadingMore) { + stream.isReadingMore = true; + Promise.resolve().then(()=>{ + canBeReadMoreInner(); + }) + } + } + + private bufHasSpace(stream: Readable) { + return !stream.readableEndedInner && + (stream.readableLengthInner < stream.readableHighWatermarkInner || + stream.readableLengthInner === 0) + } + + pushChunkInner(stream: Readable, chunk: any) { + if (stream.readableFlowingInner && stream.dataListenning + && !stream.readableSync && stream.readableLengthInner === 0) { + this.awaitDrainWriters.clear(); + stream.dataEmmited = true; + stream.listener?.emit(ReadableEvent.DATA, chunk) + } else { + if (chunk?.length) { + stream.readableLengthInner += chunk.length; + stream.buf.push(chunk); + } + if (stream.readableNeedReadable) { + this.emitReadableNow(stream); + } + } + stream.canBeReadMore(stream); + } + + private pushByteModeChunk(stream: Readable, chunk: any, encoding?: string): boolean { + if (chunk === null) { + stream.isReading = false; + stream.endOfFlow(stream); + return false; + } + + if (typeof chunk === 'string') { + if (!encoding) { + encoding ||= DEFAULT_ENCODING; + } + if (stream.readableEncodingInner !== encoding) { + chunk = Buffer.from(chunk, encoding); + encoding = ''; + } + } else if (chunk instanceof Uint8Array) { + chunk = Buffer.from(chunk); + encoding = ''; + } else if (chunk !== undefined) { + stream.erroredValueInner = new BusinessError('ERR_INVALID_ARG_TYPE'); + processErrOrClose(stream, stream.erroredValueInner, false); + return false; + } + + if (!chunk || chunk.length < 0) { + stream.isReading = false; + stream.canBeReadMore(stream); + return stream.bufHasSpace(stream); + } + + if (stream.readableEndedInner) { + let error = new BusinessError('write after end'); + processErrOrClose(stream, error); + return false; + } + + if (stream.erroredInner) { + return false; + } + + stream.isReading = false; + if (stream.readableDecoder && !encoding) { + chunk = stream.stringDecoderInner.write(new Uint8Array(chunk.buffer)); + if (chunk?.length === 0) { + stream.canBeReadMore(stream); + return stream.bufHasSpace(stream); + } + } + stream.pushChunkInner(stream, chunk); + return stream.bufHasSpace(stream); + } + + private calculateReadSize(size: number): number { + if (size <= 0 || (this.readableLengthInner === 0 && this.readableEndedInner)) { + return 0; + } + + if (this.readableObjectModeInner) { + return 1; + } + + if (Number.isNaN(size)) { + if(this.readableFlowingInner && this.readableLengthInner) { + return this.buf[this.bufIndex].length; + } + return this.readableLengthInner; + } + + if (size <= this.readableLengthInner) { + return size; + } + return this.readableEndedInner ? this.readableLengthInner : 0; + } + + private readDataFromBuf(size: number): any{ + if (this.readableLengthInner === 0) { + return null; + } + + let num = this.bufIndex; + let res; + + const buf = this.buf; + const len = buf.length; + + if (this.readableObjectModeInner) { + res = buf[num]; + buf[num++] = null; + } else if (!size || size >= this.readableLengthInner) { + if (this.readableDecoder) { + res = ''; + while (num < len) { + res += buf[num]; + buf[num++] = null; + } + } else if (len - num === 0) { + res = Buffer.alloc(0); + } else if (len - num === 1) { + res = buf[num]; + buf[num++] = null; + } else { + res = Buffer.allocUninitializedFromPool(this.readableLengthInner); + let index = 0; + while (num < len) { + buf[num].copy(res, index, 0 ,buf[num].length); + index += buf[num].length; + buf[num++] = null; + } + } + } else if (size < buf[num].length) { + res = buf[num].subarray(0, size); + buf[num] = buf[num].subarray(size); + } else if (size === buf[num].length) { + res = buf[num]; + buf[num++] = null; + } else if (this.readableDecoder) { + res = ''; + while (num < len) { + const str = buf[num]; + if (size > str.length) { + res += str; + size -= str.length; + buf[num++] = null; + } else { + if (size === buf[num].length) { + res += str; + buf[num++] = null; + } else { + res += str.subarray(0, size); + buf[num] = str.subarray(size); + } + break; + } + } + } else { + res = Buffer.allocUninitializedFromPool(size); + + const nRetLen = size; + while (num < len) { + const data = buf[num]; + if (size > data.length) { + data.copy(res, nRetLen - size, 0, data.length); + size -= data.length; + buf[num++] = null; + } else { + if (size === data.length) { + data.copy(res, nRetLen - size, 0, data.length); + buf[num++] = null; + } else { + data.copy(res, nRetLen - size, 0, size); + let RemainBuf = Buffer.allocUninitializedFromPool(data.length - size); + data.copy(RemainBuf, nRetLen - size, 0, size); + buf[num++] = RemainBuf; + } + break; + } + } + } + + if (num === len) { + this.buf.length = 0; + this.bufIndex = 0; + } else if (num > 1024) { + this.buf.splice(0, num); + this.bufIndex = 0; + } else { + this.bufIndex = num; + } + + return res; + } + + setEndType(): void { + Promise.resolve().then((): void => { + this.readableInner = false; + this.readableEndedInner = true; + this.listener?.emit(ReadableEvent.END); + }); + } + + /** + * Reads a buffer of a specified size from the buffer. If the available buffer is sufficient, the result + * of the specified size is returned. Otherwise, if Readable has ended, all remaining buffers are returned. + * + * @param { number } size - Expected length of the data to be read. + * @returns { any } The return value is of the buffer and string types. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200038 - if the doRead method has not been implemented, an error will be thrown. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + read(size?: number): any{ + if (size === undefined) { + size = NaN; + } else if (size && typeof size !== 'number') { + size = parseInt(size, 10); + } + + const originSize = size; + + if (size > this.readableHighWatermarkInner) { + this.readableHighWatermarkInner = this.computeNewReadableHighWatermark(size); + } + + if (size !== 0) { + this.readableEmittedReadable = false; + } + + if (size === 0 && this.readableNeedReadable && + ((this.readableHighWatermarkInner ? + this.readableLengthInner >= this.readableHighWatermarkInner : + this.readableLengthInner > 0) || this.readableEndedInner)) { + if (this.readableLengthInner === 0 && this.readableEndedInner) { + this.emitReadableEnd(this); + } else { + this.emitReadableNow(this); + } + return null; + } + + size = this.calculateReadSize(size); + + if (size === 0 && this.readableEndedInner) { + if (this.readableLengthInner === 0) { + this.emitReadableEnd(this); + } + return null; + } + + let needDoRead = this.readableNeedReadable; + + if (this.readableLengthInner === 0 || this.readableLengthInner - size < this.readableHighWatermarkInner) { + needDoRead = true; + } + + if (this.isReading || this.readableEndedInner || this.erroredInner) { + needDoRead = false; + } else if (needDoRead) { + this.isReading = true; + this.readableSync = true; + if (this.readableLengthInner === 0) { + this.readableNeedReadable = true; + } + + try { + this.doRead(this.readableHighWatermarkInner); + } catch (error) { + processErrOrClose(this, error as Error, false) + } + this.readableSync = false; + + if (!this.isReading) { + size = this.calculateReadSize(originSize); + } + } + + let res; + if (size > 0) { + res = this.readDataFromBuf(size); + } else { + res = null; + } + + if (res === null) { + this.readableNeedReadable = this.readableLengthInner <= this.readableHighWatermarkInner ? true : false; + size = 0; + } else { + this.readableLengthInner -= size; + this.awaitDrainWriters.clear(); + } + + if (this.readableLengthInner === 0) { + if (!this.readableEndedInner) { + this.readableNeedReadable = true; + } + + if (originSize !== size && this.readableEndedInner) { + this.emitReadableEnd(this); + } + } + + if (res !== null && !this.erroredInner && !this.closedInner) { + this.dataEmmited = true; + this.listener?.emit(ReadableEvent.DATA, res); + } + + return res; + }; + + /** + * Switch Readable to Streaming Mode. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + resume(): Readable { + if (!this.readableFlowingInner) { + this.readableHasFlowingInner = true; + this.readableFlowingInner = this.readableListenning ? false : true; + if (!this.readableEmittedResume) { + this.readableEmittedResume = true; + Promise.resolve().then(()=>{ + this.resumeInner(this); + }) + } + } + this.readableHasPasued = true; + this.pauseInner = false; + return this; + } + + /** + * Toggle Readable to Suspend Mode. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + pause(): Readable { + // 1、 未初始化状态添加data事件【即this.readableHasFlowingInner为false】,自动触发resume + // 2、 显示暂停后【即this.readableHasFlowingInner && this.readableFlowingInner】添加data事件不会自动恢复流动,必须显示调用resume事件 + // 3、 即使流处于暂停状态,添加data事件 + if (!this.readableHasFlowingInner || (this.readableHasFlowingInner && this.readableFlowingInner)) { + this.readableHasFlowingInner = true; + this.readableFlowingInner = false; + this.listener?.emit(ReadableEvent.PAUSE); + } + this.pauseInner = true; + this.readableHasPasued = true; + return this; + } + + /** + * Sets the encoding format of the input binary data. Default: utf8. + * + * @param { string } [encoding] - Original Data Encoding Type. + * @returns { Readable } Returns the Readable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + setEncoding(encoding?: string): Readable { + if (!encoding) { + encoding = DEFAULT_ENCODING; + } + if (encoding.toLowerCase() === 'utf8') { + encoding = 'utf-8'; + } + let encodingLowCase = encoding.toLowerCase(); + if (ENCODING_SET.indexOf(encodingLowCase) !== -1) { + try { + this.stringDecoder = new StringDecoder(encoding); + this.readableEncodingInner = encodingLowCase; + // Iterate over current buffer to convert already stored Buffers: + let currentBuffer = ""; + for (let i = 0; i < this.buf.length; i++) { + if (typeof this.buf[i] !== 'string' && !(this.buf[i] instanceof Uint8Array)) { + this.buf[i] = new Uint8Array(this.buf[i].buffer); + } + currentBuffer += this.stringDecoder.write(this.buf[i]); + } + this.buf.length = 0; + this.bufIndex = 0; + + if (currentBuffer != "") { + this.buf.push(currentBuffer); + } + this.readableLengthInner = currentBuffer.length; + } catch (e) { + this.throwError(e as Error); + } + } else { + const err: BusinessError = new BusinessError(`Parameter error. The type of ${encoding} must be string.`); + this.throwError(err); + } + return this; + } + + /** + * Query whether it is in pause state. + * + * @returns { boolean } Pause state returns true, otherwise returns false. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + isPaused(): boolean { + return this.pauseInner || (this.readableHasFlowingInner && !this.readableFlowingInner); + } + + /** + * Concatenates a Writable to a Readable and switches the Readable to stream mode. + * + * @param { Writable } destination - Output writable stream. + * @param { Object } [option] - Pipeline Options. + * @returns { Writable } Returns the Writable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + pipe(destination: Writable, option?: Object): Writable { + const src: Readable = this; + + if (src.pipeWritableArrayInner.length === 1 && !src.multiAwaitDrain) { + src.multiAwaitDrain = true; + } + + src.pipeWritableArrayInner.push(destination); + + // Writeable streams are automatically turned off when readable streams are turned off + const endCallback = onEnd; + if (src.readableEndEmitted) { + Promise.resolve().then(()=>{ + endCallback(); + }) + } else { + src.listener?.once(ReadableEvent.END, endCallback); + } + + destination.on(WritableEvent.UNPIPE, unpipeCallback) + function unpipeCallback(data: any) { + if (data.data.readable === src) { + if (data.data.info && data.data.info.hasUnpiped === false) { + data.data.info.hasUnpiped = true; + cleanup(); + } + } + } + + function onEnd() { + destination.end(); + } + + let ondrainCallback: undefined | Function; + let cleand:boolean = false; + + function cleanup() { + destination.listener?.off(WritableEvent.CLOSE, closeCallBack); + destination.listener?.off(WritableEvent.FINISH, finishCallback); + destination.listener?.off(WritableEvent.ERROR, errorCallBack); + if (ondrainCallback) { + destination.listener?.off(WritableEvent.DRAIN, ondrainCallback); + } + destination.listener?.off(WritableEvent.UNPIPE, unpipeCallback); + src.listener?.off(ReadableEvent.END, endCallback); + src.listener?.off(ReadableEvent.DATA, dataCallback); + cleand = true; + if(ondrainCallback && src.awaitDrainWriters.size > 0 && destination.writableNeedDrain) { + ondrainCallback(); + } + } + + + function pause() { + if (!cleand) { + if (src.pipeWritableArrayInner.length === 1 && src.pipeWritableArrayInner[0] === destination) { + src.awaitDrainWriters.add(destination); + src.multiAwaitDrain = false; + } else if (src.pipeWritableArrayInner.length > 1) { + const objIdx: number = src.pipeWritableArrayInner.findIndex((value: Writable) => value === destination); + if(objIdx !== -1) { + src.awaitDrainWriters.add(destination); + } + } + src.pause(); + } + if (!ondrainCallback) { + ondrainCallback = pipeOnDrain(src, destination); + destination.on(WritableEvent.DRAIN, ondrainCallback) + } + } + + function pipeOnDrain(src:Readable, dest:Writable) { + return function pipeOnDrainInner() { + src.awaitDrainWriters.delete(dest); + if (src.awaitDrainWriters.size == 0 && src.dataListenning) { + src.resume(); + } + } + } + + function dataCallback(data: { data: any }) { + const writeData = new Uint8Array(data.data.buffer); + const res = destination.write(writeData); + if (res === false) { + pause(); + } + } + + src.on(ReadableEvent.DATA, dataCallback) + + function errorCallBack(){ + unpipe(); + destination.listener?.off(WritableEvent.ERROR, errorCallBack); + } + + advanceListener(destination, WritableEvent.ERROR, errorCallBack); + function closeCallBack() { + destination.listener?.off(WritableEvent.FINISH, finishCallback); + unpipe(); + } + destination.listener?.once(WritableEvent.CLOSE, closeCallBack); + function unpipe() { + src.unpipe(destination); + } + + function finishCallback() { + destination.listener?.off(WritableEvent.CLOSE, closeCallBack); + unpipe(); + } + destination.listener?.once(WritableEvent.FINISH, finishCallback); + destination.listener?.emit(WritableEvent.PIPE, src); + + if (destination.writableNeedDrain) { + pause(); + } else if (!src.readableFlowingInner) { + src.resume(); + } + return destination; + } + + /** + * Disconnect Writable from Readable. + * + * @param { Writable } [destination] - Writable Streams Needing to Be Disconnected. + * @returns { Readable } Returns the Readable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + unpipe(destination?: Writable): Readable { + const info = {hasUnpiped: false}; + + if (this.pipeWritableArrayInner.length === 0) { + return this; + } + + if (!destination) { + // remove all pipelines + const destinations = this.pipeWritableArrayInner; + this.pipeWritableArrayInner = []; + this.pause(); + + for (let i = 0; i < destinations.length; i++) { + destinations[i].listener?.emit(WritableEvent.UNPIPE, this, { hasUnpiped: false }); + } + return this; + } + + const objIdx: number = this.pipeWritableArrayInner.findIndex((value: Writable) => value === destination); + + if(objIdx === -1) + return this; + + this.pipeWritableArrayInner.splice(objIdx, 1); + if (this.pipeWritableArrayInner.length === 0) { + this.pause(); + } + destination.listener?.emit(WritableEvent.UNPIPE, {readable:this, info}); + + return this; + } + + /** + * Registering Event Messages. + * + * @param { string } event - Registering Events. + * @param { Callback } callback - Event callback. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + on(event: string, callback: Function): void { + if (!this.isInitialized) { + this.isInitialized = true; + this.doInitialize?.(() => { + }); + } + this.callbacks[event] = this.callbacks[event] ?? []; + const callbackFn = callback; + this.callbacks[event].push(callbackFn); + this.listener?.on(event, callbackFn); + if (event === ReadableEvent.DATA) { + this.dataListenning = true; + if(this.callbacks[ReadableEvent.READABLE] && + this.callbacks[ReadableEvent.READABLE].length > 0) { + this.readableListenning = true; + } else { + this.readableListenning = false; + } + // 1、 未初始化状态添加data事件【即this.readableHasFlowingInner为false】,自动触发resume + // 2、 显示暂停后【即this.readableHasFlowingInner && this.readableFlowingInner】添加data事件不会自动恢复流动,必须显示调用resume事件 + // 3、 即使流处于暂停状态,添加data事件 + if (!this.readableHasFlowingInner || (this.readableHasFlowingInner && this.readableFlowingInner)) { + this.resume(); + } + } else if (event === ReadableEvent.READABLE) { + if (!this.readableEndEmitted || !this.readableListenning) { + this.readableHasFlowingInner = true; + this.readableListenning = true; + this.readableNeedReadable = true; + this.readableFlowingInner = false; + this.readableEmittedReadable = false; + if (this.readableLengthInner) { + this.emitReadableNow(this); + } else if (this.isReading) { + Promise.resolve().then(()=>{ + this.emitReadableNextCycle(this) + }) + } + } + } + } + + /** + * Cancel event message. + * + * @param { string } event - Registering Events. + * @param { Callback } callback - Event callback. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + off(event: string, callback?: Function): void { + if (!event) { + this.throwError(new BusinessError(`Parameter error. The value of ${event} is null`, 401)); + return; + } + if (event && typeof event !== 'string') { + this.throwError(new BusinessError(`Parameter error. The type of ${event} must be string`, 401)); + return; + } + if (callback) { + this.callbacks[event]?.forEach((it: Function): void => { + if (callback === it) { + this.listener?.off(event, it); + } + }); + } else { + this.callbacks[event]?.forEach((it : Function) => this.listener?.off(event, it)); + } + } + + /** + * Called by the Readable during initialization. It should not be called actively. Call callback () after the + * resource has been initialized within the doInitialize, or call callback (err) when an error occurs. + * + * @param { Function } callback - Callback when the stream has completed the initial. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doInitialize(callback: Function): void { + } + + /** + * The specific implementation of data production should not be actively called. Readable.read controls the + * calling. After data production, Readable.push should be called to push the produced data into the buffer. + * If push is not called, doRead will not be called again. + * + * @param { number } size -Expected length of the data to be read. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doRead(size: number): void { + }; + + /** + * Adds the generated data to the buffer. The return value indicates whether the data in the buffer has not + * reached the highWaterMark (similar to Writable.write). If the chunk is null, all data has been generated. + * + * @param { Uint8Array | string | null } chunk - Binary data to be stored in the buffer. + * @param { string } [encoding] - Binary data encoding type. + * @returns { boolean } If true is returned, the data in the buffer reaches the highWaterMark. Otherwise, the + * data in the buffer does not reach the highWaterMark. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + push(chunk: Uint8Array | string | null, encoding?: string): boolean { + return this.pushByteModeChunk(this, chunk, encoding) + }; + + throwError(error: Error): void { + this.erroredValueInner = error; + if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { + setTimeout((): void => { + this.listener?.emit(WritableEvent.ERROR, error); + }); + } else { + throw error; + } + } +} + +// @ts-ignore +Readable.prototype.doRead = null; + +class Writable { + public listener: EventEmitter | undefined; + private callbacks: { [key: string]: Function[] } = {}; + private buffer: ({ encoding?: string, chunk: string | Uint8Array, callback: Function })[] = []; + private writing: boolean = false; + private encoding: string | undefined; + protected encoder = new TextEncoder(); + private ending: boolean = false; + private writableObjectModeInner: boolean | undefined; + private writableHighWatermarkInner: number; + private writableInner: boolean | undefined; + private writableLengthInner: number | undefined; + private writableNeedDrainInner: boolean | undefined; + private writableCorkedInner: number = 0; + private writableEndedInner: boolean | undefined; + private writableFinishedInner: boolean | undefined; + private erroredInner: Error | undefined | null; + private closedInner: boolean | undefined; + + /** + * The Writable constructor. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + constructor(options?: { + highWaterMark?: number | undefined; + objectMode?: boolean | undefined; + }) { + this.listener = new EventEmitter(); + if (!options) { + options = { + highWaterMark: DEFAULT_HIGH_WATER_MARK, + objectMode: false, + }; + } + this.writableHighWatermarkInner = options.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; + this.writableObjectModeInner = options.objectMode || false; + this.writableLengthInner = 0; + this.writableEndedInner = false; + this.writableNeedDrainInner = false; + this.writableInner = true; + this.writableCorkedInner = 0; + this.writableFinishedInner = false; + this.erroredInner = null; + this.encoding = 'utf8'; + this.closedInner = false; + this.doInitialize((error: Error): void => { + if (error) { + this.listener?.emit(WritableEvent.ERROR, error); + } + }); + } + + /** + * Returns boolean indicating whether it is in ObjectMode. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableObjectMode(): boolean | undefined { + return this.writableObjectModeInner; + } + + /** + * Value of highWaterMark. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableHighWatermark(): number | undefined { + return this.writableHighWatermarkInner; + } + + /** + * Is true if it is safe to call writable.write(), which means the stream has not been destroyed, errored, or ended. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writable(): boolean | undefined { + return this.writableInner; + } + + /** + * Size of data this can be flushed, in bytes or objects. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableLength(): number | undefined { + return this.writableLengthInner; + } + + /** + * If the buffer of the stream is full and true, otherwise it is false. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableNeedDrain(): boolean | undefined { + return this.writableNeedDrainInner; + } + + /** + * Number of times writable.uncork() needs to be called in order to fully uncork the stream. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableCorked(): number | undefined { + return this.writableCorkedInner; + }; + + /** + * Whether Writable.end has been called. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableEnded(): boolean | undefined { + return this.writableEndedInner; + } + + /** + * Whether Writable.end has been called and all buffers have been flushed. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get writableFinished(): boolean | undefined { + return this.writableFinishedInner; + } + + /** + * Returns error if the stream has been destroyed with an error. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get errored(): Error | undefined | null { + return this.erroredInner; + } + + /** + * Writable completes destroyfile and returns true, otherwise returns false. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + get closed(): boolean | undefined { + return this.closedInner; + } + + /** + * writes a chunk to Writable and invokes callback when the chunk is flushed. The return value indicates + * whether the internal buffer of the Writable reaches the hightWaterMark. If true is returned, the buffer + * does not reach the hightWaterMark. If false is returned, the buffer has been reached. The write function + * should be called after the drain event is triggered. If the write function is called continuously, + * the chunk is still added to the buffer until the memory overflows + * + * @param { string | Uint8Array } [chunk] - Data to be written. + * @param { string } [encoding] - Encoding type. + * @param { Function } [callback] - Callback after writing. + * @returns { boolean } Write success returns true, write failure returns false. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200035 - if doWrite not implemented, an exception will be thrown. + * @throws { BusinessError } 10200036 - if stream has been ended, writing data to it will throw an error. + * @throws { BusinessError } 10200037 - if the callback is called multiple times consecutively, an error will be thrown. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { + if (encoding) { + this.setDefaultEncoding(encoding); + } + if (chunk === null) { + throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); + } + if (typeof chunk !== 'string' && !(chunk instanceof Uint8Array)) { + throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); + } + if (this.ending && !this.writing) { + setTimeout((): void => { + this.erroredInner = new BusinessError('write after end', 10200036); + callback?.(this.erroredInner); + this.throwError(this.erroredInner); + }); + return false; + } + if (this.erroredInner) { + return false; + } + let flag = false; + if (chunk instanceof Uint8Array) { + flag = this.writeUint8Array(chunk, encoding ?? this.encoding, callback); + } else { + flag = this.writeString(chunk!, encoding ?? this.encoding, callback); + } + if (!flag) { + this.writableNeedDrainInner = true; + } + return flag; + } + + private getChunkLength(chunk: string | Uint8Array): number { + if (chunk instanceof Uint8Array) { + return chunk.byteLength; + } else { + return this.encoder.encodeInto(chunk).byteLength; + } + } + + private writeUint8Array(chunk: Uint8Array, encoding?: string, callback?: Function): boolean { + this.writableLengthInner! += this.getChunkLength(chunk); + const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; + const fnBack = runOnce((error?: Error): void => { + if (error && error instanceof Error) { + this.writableInner = false; + this.throwError(error); + return; + } + callback?.(error ?? null); + this.freshCache(); + }, (multipleTimes: boolean, err: Error): void => { + this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); + }); + if (this.writableCorkedInner === 0) { + if (!this.writing) { + this.writing = true; + this.doWrite(chunk, encoding ?? 'utf8', fnBack); + } else { + this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + } + } else { + this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + } + return hasRemaining; + } + + private writeString(chunk: string, encoding?: string, callback?: Function): boolean { + this.writableLengthInner! += this.getChunkLength(chunk); + const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; + const fb = runOnce((error?: Error): void => { + if (error) { + this.erroredInner = error; + } + callback?.(error ?? null); + this.freshCache(); + if (error && error instanceof Error) { + this.writableInner = false; + this.erroredInner = error; + Promise.resolve().then((): void => { + if (this.isOnError()) { + this.emitErrorOnce(error); + } else { + this.emitErrorOnce(error); + throw error; + } + }); + return; + } + }, () => { + this.emitErrorOnce(ERR_MULTIPLE_CALLBACK, true); + }); + + if (this.writableCorkedInner === 0) { + if (!this.writing) { + this.writing = true; + this.doWrite?.(chunk, encoding ?? 'utf8', fb); + if (!this.doWrite && !hasRemaining) { + Promise.resolve().then(() => { + this.writableLengthInner = 0; + this.listener?.emit(WritableEvent.DRAIN); + }); + } + } else { + this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + } + } else { + this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + } + return this.erroredInner ? false : hasRemaining; + } + + private freshCache(): void { + const current = this.buffer.shift(); + if (current) { + this.doWrite?.(current.chunk, current.encoding ?? 'utf8', current.callback); + this.writableLengthInner! -= this.getChunkLength(current.chunk); + } else { + this.writing = false; + this.writableLengthInner = 0; + if (!this.finishMayBe()) { + this.writableNeedDrainInner = false; + this.listener?.emit(WritableEvent.DRAIN); + } + } + } + + private freshCacheV(): void { + if (this.buffer.length > 0) { + if (this.doWritev) { + const funCallback = runOnce((error?: Error): void => { + if (error && error instanceof Error) { + this.erroredInner = error; + this.listener?.emit(WritableEvent.ERROR, error); + return; + } + this.buffer = []; + }, () => { + this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); + }); + // @ts-ignore + this.doWritev(this.buffer.map((item: { encoding?: string; chunk: string | Uint8Array; callback: Function }) => { + return item.chunk; + }), funCallback); + if (!this.finishMayBe()) { + this.writableNeedDrainInner = true; + this.listener?.emit(WritableEvent.DRAIN); + } + } else { + this.freshCache(); + } + } + } + + endInner(chunk?: string | Uint8Array, encoding?: string, callback?: Function): void { + if (chunk) { + if (this.writing) { + this.write(chunk, encoding, callback); + } else { + this.doWrite?.(chunk!, encoding ?? 'utf8', (error?: Error): void => { + if (error && error instanceof Error) { + this.erroredInner = error; + this.listener?.emit(WritableEvent.ERROR, error); + } else { + this.writableLengthInner! -= this.getChunkLength(chunk); + this.writing = false; + this.finishMayBe(); + } + this.writableEndedInner = true; + this.writableInner = false; + asyncFn((): void => { + callback?.(this.erroredInner ?? error ?? null); + })(); + if (!this.writableFinishedInner) { + this.writableFinishedInner = true; + asyncFn((): void => { + if ((!this.erroredInner || this.erroredInner.message === 'write after end') && !this.isOnError()) { + this.listener?.emit(WritableEvent.FINISH); + } + })(); + } + }); + } + } else { + if (this.writableEndedInner) { + this.erroredInner = new BusinessError('write after end', 10200036); + callback?.(this.erroredInner); + } else { + setTimeout(() => callback?.(this.erroredInner)); + } + if (!this.writableFinishedInner && !this.writableEndedInner) { + this.writableFinishedInner = true; + asyncFn((): void => { + if (!this.erroredInner || this.erroredInner.message === 'write after end') { + this.listener?.emit(WritableEvent.FINISH); + } + })(); + } + } + } + + /** + * Write the last chunk to Writable. + * + * @param { string | Uint8Array } [chunk] - Data to be written. + * @param { string } [encoding] - Encoding type. + * @param { Function } [callback] - Callback after writing. + * @returns { Writable } Returns the Writable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200035 - if doWrite not implemented, an exception will be thrown. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { + if (this.writableFinishedInner) { + this.erroredInner = ERR_STREAM_ALREADY_FINISHED; + setTimeout(() => callback?.(this.erroredInner)); + this.emitErrorOnce(this.erroredInner); + return this; + } else if (this.writableEndedInner) { + this.erroredInner = ERR_WRITE_AFTER_END; + setTimeout(() => callback?.(this.erroredInner)); + this.emitErrorOnce(this.erroredInner); + return this; + } + if (this.erroredInner) { + setTimeout(() => callback?.(this.erroredInner)); + return this; + } + this.writableNeedDrainInner = false; + this.closedInner = true; + this.ending = true; + this.writableInner = false; + if (!this.writableEndedInner) { + if (this.writableCorkedInner === 0) { + this.endInner(chunk, encoding, callback); + } else { + this.writableCorkedInner = 1; + this.uncork(); + } + } + this.writableEndedInner = true; + this.listener?.emit(WritableEvent.CLOSE); + return this; + } + + private finishMayBe(): boolean { + return !this.writing && this.writableCorkedInner === 0 && this.ending; + } + + /** + * Set the default encoding mode. + * + * @param { string } [encoding] - Encoding type.Default: utf8. + * @returns { boolean } Setting successful returns true, setting failed returns false. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + + setDefaultEncoding(encoding?: string): boolean { + if (!encoding) { + return false; + } + if (encoding.toLowerCase() === 'utf8') { + encoding = 'utf-8'; + } + if (ENCODING_SET.indexOf(encoding.toLowerCase()) !== -1) { + this.encoding = encoding.toLowerCase(); + try { + if (encoding.toLowerCase() !== 'ascii') { + this.encoder = new TextEncoder(encoding); + } + } catch (e) { + this.throwError(e as Error); + return false; + } + return true; + } else { + const err: BusinessError = new BusinessError(`Unknown encoding: ${encoding}`); + this.throwError(err); + return false; + } + } + + /** + * After the call, all Write operations will be forced to write to the buffer instead of being flushed. + * + * @returns { boolean } Setting successful returns true, setting failed returns false. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + cork(): boolean { + this.writableCorkedInner += 1; + return true; + } + + /** + * After calling, flush all buffers. + * + * @returns { boolean } Setting successful returns true, setting failed returns false. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + uncork(): boolean { + if (this.writableCorkedInner > 0) { + this.writableCorkedInner -= 1; + } + if (this.writableCorkedInner === 0) { + this.freshCacheV(); + } + return true; + } + + /** + * Registering Event Messages. + * + * @param { string } event - Register Event. + * @param { Callback } callback - event callbacks. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + on(event: string, callback: Function): void { + this.callbacks[event] = this.callbacks[event] ?? []; + const callbackFn = callback.bind(this); + this.callbacks[event].push(callbackFn); + this.listener?.on(event, callbackFn); + } + + /** + * Cancel event message. + * + * @param { string } event - Register Event. + * @param { Callback } callback - event callbacks. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + off(event: string, callback?: Function): void { + if (!event) { + this.throwError(new BusinessError(`Parameter error. The value of event is null `, 401)); + return; + } + if (callback) { + this.callbacks[event]?.forEach((it: Function): void => { + if (callback && callback === it) { + this.listener?.off(event, it); + } + }); + } else { + this.callbacks[event]?.forEach((it: Function) => this.listener?.off(event, it)); + } + } + + noWriteOpes(chunk: string | Uint8Array, encoding: string, callback: Function): void { + if (this.doWritev === null) { + this.throwError(ERR_DOWRITE_NOT_IMPLEMENTED); + } else { + // @ts-ignore + this.doWritev([chunk], callback); + } + } + + /** + * This method is invoked by the Writable method during initialization and should not be invoked actively. + * After the resource is initialized in the doInitialize method, the callback () method is invoked. + * + * @param { Function } callback - Callback when the stream has completed the initial. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doInitialize(callback: Function): void { + } + + /** + * Implemented by subclass inheritance. The implementation logic of flushing chunks in the buffer should not be + * actively called. The call is controlled by Writable.write. + * + * @param { string | Uint8Array } chunk - Data to be written. + * @param { string } encoding - Encoding type. + * @param { Function } callback - Callback after writing. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doWrite(chunk: string | Uint8Array, encoding: string, callback: Function): void { + throw ERR_DOWRITE_NOT_IMPLEMENTED; + } + + /** + * The implementation logic of flushing chunks in the buffer in batches should not be actively called. + * The call is controlled by Writable.write. + * + * @param { string[] | Uint8Array[] } chunk - Data to be written. + * @param { Function } callback - Callback after writing. + * @returns { Writable } Returns the Writable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doWritev(chunk: string[] | Uint8Array[], callback: Function): void { + throw ERR_DOWRITEV_NOT_IMPLEMENTED; + } + + throwError(error: Error): void { + this.erroredInner = error; + if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { + setTimeout(() => { + this.listener?.emit(WritableEvent.ERROR, error); + }); + } else { + throw error; + } + } + + private isOnError(): boolean { + return this.listener?.isOn(WritableEvent.ERROR) || false; + } + + private emitErrorExecutedInner = false; + // @ts-ignore + private emitErrorIdInner: number; + + private emitErrorOnce(error: Error, reset?: boolean): void { + if (reset) { + this.emitErrorExecutedInner = false; + clearTimeout(this.emitErrorIdInner); + } + if (!this.emitErrorExecutedInner) { + this.emitErrorExecutedInner = true; + // @ts-ignore + this.emitErrorIdInner = setTimeout((): void => { + this.listener?.emit(WritableEvent.ERROR, this.erroredInner ?? error); + }); + } + } +} + +// @ts-ignore +Writable.prototype.doWritev = null; +Writable.prototype.doWrite = Writable.prototype.noWriteOpes; + +class Duplex extends Readable { + private _writable: Writable; + + constructor() { + super(); + this._writable = new Writable(); + const that = this; + if (this.doWrite) { + this._writable.doWrite = this.doWrite?.bind(that); + } + this._writable.doWritev = this.doWritev?.bind(that); + Object.defineProperties(that, { + doWrite: { + get(): Function { + return that._writable.doWrite?.bind(that); + }, + set(value: Function):void { + that._writable.doWrite = value.bind(that); + } + }, + doWritev: { + get(): Function { + return that._writable.doWritev?.bind(that); + }, + set(value: Function):void { + // @ts-ignore + that._writable.doWritev = value?.bind(that); + } + } + }); + } + + /** + * writes a chunk to Writable and invokes callback when the chunk is flushed. The return value indicates + * whether the internal buffer of the Writable reaches the hightWaterMark. If true is returned, the buffer + * does not reach the hightWaterMark. If false is returned, the buffer has been reached. The write function + * should be called after the drain event is triggered. If the write function is called continuously, + * the chunk is still added to the buffer until the memory overflows + * + * @param { string | Uint8Array } [chunk] - Data to be written. + * @param { string } [encoding] - Encoding type. + * @param { Function } [callback] - Callback after writing. + * @returns { boolean } Write success returns true, write failure returns false. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200036 - if stream has been ended, writing data to it will throw an error. + * @throws { BusinessError } 10200037 - if the callback is called multiple times consecutively, an error will be thrown. + * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { + return this._writable.write(chunk, encoding, callback); + } + + /** + * Write the last chunk to Writable. + * + * @param { string | Uint8Array } [chunk] - Data to be written. + * @param { string } [encoding] - Encoding type. + * @param { Function } [callback] - Callback after writing. + * @returns { Writable } Returns the Writable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { + super.setEndType(); + return this._writable.end(chunk, encoding, callback); + } + + on(event: string, callback: Function): void { + super.on(event, callback); + this._writable.on(event, callback); + } + + off(event: string, callback?: Function): void { + super.off(event); + this._writable.off(event, callback); + } + + getListener(): EventEmitter | undefined { + return this._writable.listener; + } + + setDefaultEncoding(encoding?: string): boolean { + return this._writable.setDefaultEncoding(encoding); + } + + cork(): boolean { + return this._writable.cork(); + } + + uncork(): boolean { + return this._writable.uncork(); + } + + doInitialize(callback: Function): void { + super.doInitialize(callback); + this._writable.doInitialize(callback); + } + + doWrite(chunk: string | Uint8Array, encoding: string, callback: Function): void { + } + + doWritev(chunk: string[] | Uint8Array[], callback: Function): void { + this._writable.doWritev?.(chunk, callback); + } + + get writableObjectMode(): boolean { + return this._writable.writableObjectMode || false; + } + + get writableHighWatermark(): number { + return this._writable.writableHighWatermark || 0; + } + + get writable(): boolean { + return this._writable.writable || false; + } + + get writableLength(): number { + return this._writable.writableLength || 0; + } + + get writableNeedDrain(): boolean { + return this._writable.writableNeedDrain || false; + } + + get writableCorked(): number { + return this._writable.writableCorked || 0; + } + + get writableEnded(): boolean { + return this._writable.writableEnded || false; + } + + get writableFinished(): boolean { + return this._writable.writableFinished || false; + } +} + +// @ts-ignore +Duplex.prototype.doWrite = null; +// @ts-ignore +Duplex.prototype.doWritev = null; + +class Transform extends Duplex { + /** + * The Transform constructor. + * + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + constructor() { + super(); + } + + on(event: string, callback: Function): void { + super.on(event, callback); + } + + /** + * Write the last chunk to Writable. + * + * @param { string | Uint8Array } [chunk] - Data to be written. + * @param { string } [encoding] - Encoding type. + * @param { Function } [callback] - Callback after writing. + * @returns { Writable } Returns the Writable object. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { + if (!this.doTransform) { + throw new BusinessError('The doTransform() method is not implemented', 10200039); + } + if (chunk instanceof Uint8Array) { + const chunkString = this.stringDecoder.write(chunk); + this.doTransform(chunkString, encoding || 'utf8', callback || ((): void => { + })); + } else if (typeof chunk === 'string') { + this.doTransform(chunk, encoding || 'utf8', callback || ((): void => { + })); + } + this.doFlush?.((...args: (string | Uint8Array)[]) => { + args.forEach((it: string | Uint8Array) => { + if (it) { + this.push(it ?? '', encoding); + } + }); + }); + const write:Writable = super.end(chunk, encoding, callback); + return write; + } + + push(chunk: Uint8Array | string | null, encoding?: string): boolean { + return super.push(chunk, encoding); + } + + /** + * Convert the input data. After the conversion, Transform.push can be called to send the input to the read stream. + * Transform.push should not be called Transform.write to call. + * + * @param { string } chunk - Input data to be converted. + * @param { string } encoding - If the chunk is a string, then this is the encoding type. If chunk is a buffer, + * then this is the special value 'buffer'. Ignore it in that case. + * @param { Function } callback - Callback after conversion. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doTransform(chunk: string, encoding: string, callback: Function): void { + throw new BusinessError('The doTransform() method is not implemented'); + } + + /** + * After all data is flushed to the write stream, you can use the Transform.doFlush writes some extra data, should + * not be called actively, only called by Writable after flushing all data. + * + * @param { Function } callback - Callback after flush completion. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + doFlush(callback: Function): void { + } + + /** + * writes a chunk to Writable and invokes callback when the chunk is flushed. The return value indicates + * whether the internal buffer of the Writable reaches the hightWaterMark. If true is returned, the buffer + * does not reach the hightWaterMark. If false is returned, the buffer has been reached. The write function + * should be called after the drain event is triggered. If the write function is called continuously, + * the chunk is still added to the buffer until the memory overflows + * + * @param { string | Uint8Array } [chunk] - Data to be written. + * @param { string } [encoding] - Encoding type. + * @param { Function } [callback] - Callback after writing. + * @returns { boolean } Write success returns true, write failure returns false. + * @throws { BusinessError } 401 - if the input parameters are invalid. + * @throws { BusinessError } 10200036 - if stream has been ended, writing data to it will throw an error. + * @throws { BusinessError } 10200037 - if the callback is called multiple times consecutively, an error will be thrown. + * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. + * @syscap SystemCapability.Utils.Lang + * @crossplatform + * @since 12 + */ + write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { + if (typeof chunk === 'string') { + const callBackFunction = runOnce((error: Error) => { + if (error) { + this.getListener()?.emit(WritableEvent.ERROR, error); + } + }, () => { + const err:BusinessError = new BusinessError('Callback called multiple times', 10200037); + this.getListener()?.emit(WritableEvent.ERROR, err); + }); + this.doTransform?.(chunk ?? '', encoding ?? 'utf8', callBackFunction); + } + return super.write(chunk, encoding, callback); + } + + doRead(size: number): void { + } + + doWrite(chunk: string | Uint8Array, encoding: string, callback: Function):void { + super.doWrite?.(chunk, encoding, callback); + } +} + +// @ts-ignore +Transform.prototype.doTransform = null; +// @ts-ignore +Transform.prototype.doFlush = null; + +export default { + Readable: Readable, + Writable: Writable, + Duplex: Duplex, + Transform: Transform, +}; -- Gitee