diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index bce236cb4c061a5d8451384b8962fbfd438850e8..a8bba48c7f626c111f91842a71512f9ca56895a9 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -96,10 +96,8 @@ function runOnce(runFn: Function, callback?: (multipleTimes: boolean, error: Err return runFn(...args); } else { if (callback) { - Promise.resolve().then(():void => { - // @ts-ignore - callback(); - }); + // @ts-ignore + callback(); } } }; @@ -718,12 +716,20 @@ class Writable { private writableHighWatermarkInner: number; private writableInner: boolean | undefined; private writableLengthInner: number | undefined; + private writableBufferLength: number | undefined;// 记录buffer[]中存的chunk长度 private writableNeedDrainInner: boolean | undefined; private writableCorkedInner: number = 0; private writableEndedInner: boolean | undefined; private writableFinishedInner: boolean | undefined; private erroredInner: Error | undefined | null; private closedInner: boolean | undefined; + private writableCb: number = 0;// 记录doWrite、doWritev中还未执行的callback回调函数的数量,用于finish事件检查 + private writableSync: boolean = true;// 同步回调标志,true表示doWrite、doWritev中的回调采用了同步回调模式 + private defaultEncoding: string | undefined; + private endCallback: Function | undefined; + private writeCallbackBuffer: Function[] = []; + private writeCallbacks: Function[] = []; + private writaCallbackMove: boolean = false; /** * The Writable constructor. @@ -746,6 +752,7 @@ class Writable { this.writableHighWatermarkInner = options.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; this.writableObjectModeInner = options.objectMode || false; this.writableLengthInner = 0; + this.writableBufferLength = 0; this.writableEndedInner = false; this.writableNeedDrainInner = false; this.writableInner = true; @@ -754,6 +761,9 @@ class Writable { this.erroredInner = null; this.encoding = 'utf8'; this.closedInner = false; + this.writableCb = 0; + this.writableSync = true; + this.defaultEncoding = 'utf8'; this.doInitialize((error: Error): void => { if (error) { this.listener?.emit(WritableEvent.ERROR, error); @@ -892,7 +902,8 @@ class Writable { */ write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { if (encoding) { - this.setDefaultEncoding(encoding); + // 设置临时encoder + this.setEncoding(encoding); } if (chunk === null) { throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); @@ -900,15 +911,28 @@ class Writable { if (typeof chunk !== 'string' && !(chunk instanceof Uint8Array)) { throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); } - if (this.ending && !this.writing) { - setTimeout((): void => { - this.erroredInner = new BusinessError('write after end', 10200036); + if (this.ending && !this.erroredInner) { + this.erroredInner = new BusinessError('write after end', 10200036); + this.writableInner = false; + this.errorFresh(); + // 统一改用微任务Promise + Promise.resolve().then((): void => { callback?.(this.erroredInner); - this.throwError(this.erroredInner); + if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { + this.listener?.emit(WritableEvent.ERROR, this.erroredInner); + if (!this.closedInner) { + this.closedInner = true; + this.listener?.emit(WritableEvent.CLOSE); + } + } else { + throw this.erroredInner; + } }); return false; } if (this.erroredInner) { + this.errorFresh(); + callback?.(this.erroredInner); return false; } let flag = false; @@ -931,164 +955,328 @@ class Writable { } } + // 用于在write没写入callback时填充writeCallbackBuffer + private doNothing(){} + private writeUint8Array(chunk: Uint8Array, encoding?: string, callback?: Function): boolean { - this.writableLengthInner! += this.getChunkLength(chunk); + const chunkLength = this.getChunkLength(chunk); + // 检查encoding和当前设置的defaultEncoding是否一致,不一致说明本次写入使用了临时encoder,需要还原成默认值 + if (this.encoding !== this.defaultEncoding) { + this.setEncoding(this.defaultEncoding); + } + this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fnBack = runOnce((error?: Error): void => { - if (error && error instanceof Error) { - this.writableInner = false; + callback?.(error ?? null); + this.writing = false; + this.writableCb --; + + if (error && error instanceof Error && !this.erroredInner) { this.throwError(error); + this.errorFresh(); return; } - callback?.(error ?? null); - this.freshCache(); + if (this.erroredInner) { + this.errorFresh(); + return; + } + + this.writableLengthInner! -= chunkLength; + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据 + // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 + if (!this.writableSync) { + this.freshCacheV(); + // 处理drain + this.afterWrite(); + } else { + // 同步回调场景下异步处理drain + Promise.resolve().then((): void => { + this.afterWrite(); + }); + } }, (multipleTimes: boolean, err: Error): void => { - this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); + if (!this.erroredInner) { + this.throwError(ERR_MULTIPLE_CALLBACK); + } + this.errorFresh(); }); if (this.writableCorkedInner === 0) { if (!this.writing) { this.writing = true; + this.writableCb ++; + this.writableSync = true; this.doWrite(chunk, encoding ?? 'utf8', fnBack); + // 异步回调场景下,writableSync = false 会比回调函数fnBack先执行;同步回调场景下这里会晚于回调函数fnBack + this.writableSync = false; } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } + this.writableBufferLength! += chunkLength; } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } + this.writableBufferLength! += chunkLength; } - return hasRemaining; + return this.erroredInner ? false : hasRemaining; } private writeString(chunk: string, encoding?: string, callback?: Function): boolean { - this.writableLengthInner! += this.getChunkLength(chunk); + const chunkLength = this.getChunkLength(chunk); + // 检查encoding和当前设置的defaultEncoding是否一致,不一致说明本次写入使用了临时encoder,需要还原成默认值 + if (this.encoding !== this.defaultEncoding) { + this.setEncoding(this.defaultEncoding); + } + this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fb = runOnce((error?: Error): void => { - if (error) { - this.erroredInner = error; + callback?.(error ?? null); // 异步回调情况下,首次write的cb需不需要感知主任务同步流程产生的error,替换掉error,如:error ?? this.erroredInner ?? null + this.writing = false; + this.writableCb --; + if (error && error instanceof Error && !this.erroredInner) { + this.throwError(error); + this.errorFresh(); + return; } - callback?.(error ?? null); - this.freshCache(); - if (error && error instanceof Error) { - this.writableInner = false; - this.erroredInner = error; + if (this.erroredInner) { + this.errorFresh(); + return; + } + this.writableLengthInner! -= chunkLength; + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据 + // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 + if (!this.writableSync) { + this.freshCacheV(); + // 处理drain + this.afterWrite(); + } else { + // 同步回调场景下异步处理drain Promise.resolve().then((): void => { - if (this.isOnError()) { - this.emitErrorOnce(error); - } else { - this.emitErrorOnce(error); - throw error; - } + this.afterWrite(); }); - return; } }, () => { - this.emitErrorOnce(ERR_MULTIPLE_CALLBACK, true); + if (!this.erroredInner) { + this.throwError(ERR_MULTIPLE_CALLBACK); + } + this.errorFresh(); }); if (this.writableCorkedInner === 0) { if (!this.writing) { this.writing = true; + this.writableCb ++; + this.writableSync = true; this.doWrite?.(chunk, encoding ?? 'utf8', fb); - if (!this.doWrite && !hasRemaining) { - Promise.resolve().then(() => { - this.writableLengthInner = 0; - this.listener?.emit(WritableEvent.DRAIN); - }); - } + // 异步回调场景下,writableSync = false 会比回调函数fnBack先执行;同步回调场景下这里会晚于回调函数fnBack + this.writableSync = false; } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } + this.writableBufferLength! += chunkLength; } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } + this.writableBufferLength! += chunkLength; } return this.erroredInner ? false : hasRemaining; } + private errorFresh(): void { + // writeCallbackBuffer完成批量转移到writeCallbacks的情况下先清writeCallbacks + // 如果没有完成转移(writeCallbackBuffer在转移后还没重置),则跳过,统一在writeCallbackBuffer处理 + if (this.writaCallbackMove) { + this.writeCallbacks.forEach(callback => { + if (typeof callback === 'function') { + callback(this.erroredInner ?? null); + } + }); + } + this.writeCallbackBuffer.forEach(callback => { + if (typeof callback === 'function') { + callback(this.erroredInner); + } + }); + this.endCallback?.(this.erroredInner); // end-cb不在writeCallbackBuffer里 + this.endCallback = undefined; + this.buffer = []; + this.writeCallbacks = []; + this.writeCallbackBuffer = []; + this.writableBufferLength = 0; + this.writableLengthInner = 0; + } + private freshCache(): void { - const current = this.buffer.shift(); - if (current) { + if (this.writableCorkedInner != 0) { + return; + } + let current = this.buffer.shift(); + // cb配合current同步取出writeCallbackBuffer里的callback + // 由于单项处理时buffer里存的callback已经包含了write写入的callback + // 这里cb只取callback但不执行 + this.writeCallbackBuffer.shift(); + while (current) { + this.writableBufferLength! -= this.getChunkLength(current.chunk); + this.writing = true; + this.writableCb ++; + this.writableSync = true; this.doWrite?.(current.chunk, current.encoding ?? 'utf8', current.callback); - this.writableLengthInner! -= this.getChunkLength(current.chunk); - } else { - this.writing = false; - this.writableLengthInner = 0; - if (!this.finishMayBe()) { - this.writableNeedDrainInner = false; - this.listener?.emit(WritableEvent.DRAIN); + this.writableSync = false; + // 同步回调场景,回调中writing已置false;异步回调场景下writing还为true。 + // 同步回调场景,需要循环处理缓冲区数据,异步回调场景通过回调函数请求下一份缓冲区数据,直接退出循环。 + if (!this.writing) { + current = this.buffer.shift(); + this.writeCallbackBuffer.shift(); + } else { + break; } } + + if (this.finishMayBe()) { + // 补齐不同场景下finish、close事件相关处理 + this.finishWrite(); + } } private freshCacheV(): void { + if (this.writableCorkedInner != 0) { + return; + } if (this.buffer.length > 0) { if (this.doWritev) { + const bufferChunkLength = this.writableBufferLength!; + this.writableBufferLength = 0; + // uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 + this.writeCallbacks = []; const funCallback = runOnce((error?: Error): void => { - if (error && error instanceof Error) { - this.erroredInner = error; - this.listener?.emit(WritableEvent.ERROR, error); + if (error && error instanceof Error && !this.erroredInner) { + this.throwError(error); + this.errorFresh(); + return; + } + this.writing = false; + this.writableCb --; + if (this.erroredInner) { + this.errorFresh(); return; } - this.buffer = []; + this.writableLengthInner! -= bufferChunkLength; + this.writeCallbacks.forEach(callback => { + if (typeof callback === 'function') { + callback(); + } + }); + this.writeCallbacks = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 + + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据,因为可能有新写入的数据 + // 同步回调场景不会有未输出的缓冲区数据,不需要再次执行freshCacheV() + // 且同步回调场景下buffer置空操作还未进行,不能再次执行freshCacheV() + if (!this.writableSync) { + this.freshCacheV(); + // 处理drain + this.afterWrite(); + } else { + // 同步回调场景下异步处理drain + Promise.resolve().then((): void => { + this.afterWrite(); + }); + } }, () => { - this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); + if (!this.erroredInner) { + this.throwError(ERR_MULTIPLE_CALLBACK); + } + this.errorFresh(); }); + this.writing = true; + this.writableCb ++; + this.writableSync = true; + this.writeCallbackBuffer.forEach((callback: Function ) => { + this.writeCallbacks.push(callback); + }); + this.writaCallbackMove = false; // @ts-ignore this.doWritev(this.buffer.map((item: { encoding?: string; chunk: string | Uint8Array; callback: Function }) => { return item.chunk; }), funCallback); - if (!this.finishMayBe()) { - this.writableNeedDrainInner = true; - this.listener?.emit(WritableEvent.DRAIN); + this.writableSync = false; + this.buffer = []; + this.writeCallbackBuffer = []; + this.writaCallbackMove = true; + if (this.finishMayBe()) { + // 补齐不同场景下finish、close事件相关处理 + this.finishWrite(); } } else { this.freshCache(); } + } else { + // 补齐不同场景下drain、finish、close事件相关处理 + if (this.finishMayBe()) { + this.finishWrite(); + } } } - endInner(chunk?: string | Uint8Array, encoding?: string, callback?: Function): void { - if (chunk) { - if (this.writing) { - this.write(chunk, encoding, callback); - } else { - this.doWrite?.(chunk!, encoding ?? 'utf8', (error?: Error): void => { - if (error && error instanceof Error) { - this.erroredInner = error; - this.listener?.emit(WritableEvent.ERROR, error); - } else { - this.writableLengthInner! -= this.getChunkLength(chunk); - this.writing = false; - this.finishMayBe(); - } - this.writableEndedInner = true; - this.writableInner = false; - asyncFn((): void => { - callback?.(this.erroredInner ?? error ?? null); - })(); - if (!this.writableFinishedInner) { - this.writableFinishedInner = true; - asyncFn((): void => { - if ((!this.erroredInner || this.erroredInner.message === 'write after end') && !this.isOnError()) { - this.listener?.emit(WritableEvent.FINISH); - } - })(); - } + private afterWrite() { + if (!this.finishMayBe() && this.writableNeedDrainInner && this.writableLengthInner == 0) { + this.writableNeedDrainInner = false; + this.listener?.emit(WritableEvent.DRAIN); + } + } + + // 提取finish步骤,参考node对同步回调场景添加finish事件异步 + private finishWrite() { + if (!this.writableFinishedInner && this.writableCb == 0 && !this.erroredInner) { + if (this.writableSync) { + Promise.resolve().then((): void => { + this.writableFinishedInner = true; + this.endCallback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); }); - } - } else { - if (this.writableEndedInner) { - this.erroredInner = new BusinessError('write after end', 10200036); - callback?.(this.erroredInner); } else { - setTimeout(() => callback?.(this.erroredInner)); - } - if (!this.writableFinishedInner && !this.writableEndedInner) { this.writableFinishedInner = true; - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { - this.listener?.emit(WritableEvent.FINISH); - } - })(); + this.endCallback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); } + Promise.resolve().then((): void => { + if (!this.erroredInner) { + this.closedInner = true; + this.listener?.emit(WritableEvent.CLOSE); + } + }); + } + } + + // 简化endInner,end写入部分全部提前到end()中,只保留部分后续处理。------------话说第一段if是不是重复多余了 + endInner(callback?: Function): void { + if (!this.writableFinishedInner && !this.writableEndedInner && this.writableCb == 0) { + // 目前仅同步回调场景会进这段处理,finish和close直接一起异步处理即可 + Promise.resolve().then((): void => { + if (!this.erroredInner) { + this.writableFinishedInner = true; + callback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); + this.closedInner = true; + this.listener?.emit(WritableEvent.CLOSE); + } + }); } } @@ -1106,35 +1294,38 @@ class Writable { * @since 12 */ end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { + if (this.erroredInner) { + Promise.resolve().then((): void => {callback?.(this.erroredInner)}); + return this; + } if (this.writableFinishedInner) { - this.erroredInner = ERR_STREAM_ALREADY_FINISHED; - setTimeout(() => callback?.(this.erroredInner)); - this.emitErrorOnce(this.erroredInner); + Promise.resolve().then((): void => {callback?.(this.erroredInner)}); + this.throwError(ERR_STREAM_ALREADY_FINISHED); return this; } else if (this.writableEndedInner) { - this.erroredInner = ERR_WRITE_AFTER_END; - setTimeout(() => callback?.(this.erroredInner)); - this.emitErrorOnce(this.erroredInner); - return this; - } - if (this.erroredInner) { - setTimeout(() => callback?.(this.erroredInner)); + Promise.resolve().then((): void => {callback?.(this.erroredInner)}); + this.throwError(ERR_WRITE_AFTER_END); return this; } this.writableNeedDrainInner = false; - this.closedInner = true; - this.ending = true; this.writableInner = false; - if (!this.writableEndedInner) { - if (this.writableCorkedInner === 0) { - this.endInner(chunk, encoding, callback); - } else { - this.writableCorkedInner = 1; - this.uncork(); - } + + // end的写入从endInner提前到这里统一传给write处理 + if (chunk) { + this.write(chunk, encoding); + } + + // uncork处理,且处理后依然需要进行endInner等处理 + if (this.writableCorkedInner > 0) { + this.writableCorkedInner = 1; + this.uncork(); } + + // 后移到uncork之后,配合write()进行'write after end'检查 + this.endCallback = callback; + this.ending = true; + this.endInner(callback); this.writableEndedInner = true; - this.listener?.emit(WritableEvent.CLOSE); return this; } @@ -1153,7 +1344,17 @@ class Writable { * @since 12 */ + // setEncoding改为两段,用于兼容默认encoding和临时encoding。 + // 原流程改名为setEncoding用来设置encoder,外置一层用于记录defaultEncoding,用于write临时设置encoder后进行还原 setDefaultEncoding(encoding?: string): boolean { + const ret = this.setEncoding(encoding); + if (ret) { + this.defaultEncoding = encoding; + } + return ret; + } + + setEncoding(encoding?: string): boolean { if (!encoding) { return false; } @@ -1165,15 +1366,20 @@ class Writable { try { if (encoding.toLowerCase() !== 'ascii') { this.encoder = new TextEncoder(encoding); + } else { + // TextEncoder不支持ascii,用utf-8替代 + this.encoder = new TextEncoder('utf-8'); } } catch (e) { this.throwError(e as Error); + this.errorFresh(); return false; } return true; } else { const err: BusinessError = new BusinessError(`Unknown encoding: ${encoding}`); this.throwError(err); + this.errorFresh(); return false; } } @@ -1202,10 +1408,13 @@ class Writable { uncork(): boolean { if (this.writableCorkedInner > 0) { this.writableCorkedInner -= 1; + + // uncork不再能单独生效,必须搭配cork使用;对标node添加writing限制 + if (this.writableCorkedInner === 0 && !this.writing) { + this.freshCacheV(); + } } - if (this.writableCorkedInner === 0) { - this.freshCacheV(); - } + return true; } @@ -1239,6 +1448,7 @@ class Writable { off(event: string, callback?: Function): void { if (!event) { this.throwError(new BusinessError(`Parameter error. The value of event is null `, 401)); + this.errorFresh(); return; } if (callback) { @@ -1252,9 +1462,11 @@ class Writable { } } + // 没写doWrite的情况下切到doWritev noWriteOpes(chunk: string | Uint8Array, encoding: string, callback: Function): void { if (this.doWritev === null) { this.throwError(ERR_DOWRITE_NOT_IMPLEMENTED); + this.errorFresh(); } else { // @ts-ignore this.doWritev([chunk], callback); @@ -1306,36 +1518,24 @@ class Writable { throw ERR_DOWRITEV_NOT_IMPLEMENTED; } + // 原则上要做到产生error必须抛错并记录this.erroredInner,有this.erroredInner就搁置后续error全力收尾, throwError(error: Error): void { + this.writableInner = false; this.erroredInner = error; + // throw还是得扔Promise外面,抛错时机也更合理 if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { - setTimeout(() => { - this.listener?.emit(WritableEvent.ERROR, error); + Promise.resolve().then((): void => { + this.listener?.emit(WritableEvent.ERROR, this.erroredInner); + if (!this.closedInner) { + this.closedInner = true; + // Promise.resolve().then((): void => { + this.listener?.emit(WritableEvent.CLOSE); + // }); + } }); } else { - throw error; - } - } - - private isOnError(): boolean { - return this.listener?.isOn(WritableEvent.ERROR) || false; - } - - private emitErrorExecutedInner = false; - // @ts-ignore - private emitErrorIdInner: number; - - private emitErrorOnce(error: Error, reset?: boolean): void { - if (reset) { - this.emitErrorExecutedInner = false; - clearTimeout(this.emitErrorIdInner); - } - if (!this.emitErrorExecutedInner) { - this.emitErrorExecutedInner = true; - // @ts-ignore - this.emitErrorIdInner = setTimeout((): void => { - this.listener?.emit(WritableEvent.ERROR, this.erroredInner ?? error); - }); + throw this.erroredInner; + // throw放Promise.resolve().then里会被拒绝并忽略,导致不抛错,但是放外面会导致后置的on'error'来不及捕获error,遇错直接中断 } } }