From 430fc29facdb40079b2fcb2c1b862b7e19ade53d Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Tue, 25 Mar 2025 11:10:26 +0800 Subject: [PATCH 01/14] =?UTF-8?q?20250325=20=E7=9B=AE=E5=89=8D=E7=9A=84?= =?UTF-8?q?=E5=85=A8=E9=83=A8=E4=BF=AE=E6=94=B9=E6=B1=87=E6=80=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 207 +++++++++++++++++-------- 1 file changed, 141 insertions(+), 66 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index bce236cb..da5a039b 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -718,12 +718,15 @@ class Writable { private writableHighWatermarkInner: number; private writableInner: boolean | undefined; private writableLengthInner: number | undefined; + private writableBufferLength: number | undefined;// 记录buffer[]中存的chunk长度 private writableNeedDrainInner: boolean | undefined; private writableCorkedInner: number = 0; private writableEndedInner: boolean | undefined; private writableFinishedInner: boolean | undefined; private erroredInner: Error | undefined | null; private closedInner: boolean | undefined; + private writableCb: number = 0;// 记录doWrite、doWritev中还未执行的callback回调函数的数量,用于finish事件检查 + private writableSync: boolean = true;// 同步回调标志,true表示doWrite、doWritev中的回调采用了同步回调模式 /** * The Writable constructor. @@ -746,6 +749,7 @@ class Writable { this.writableHighWatermarkInner = options.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; this.writableObjectModeInner = options.objectMode || false; this.writableLengthInner = 0; + this.writableBufferLength = 0; this.writableEndedInner = false; this.writableNeedDrainInner = false; this.writableInner = true; @@ -754,6 +758,8 @@ class Writable { this.erroredInner = null; this.encoding = 'utf8'; this.closedInner = false; + this.writableCb = 0; + this.writableSync = true; this.doInitialize((error: Error): void => { if (error) { this.listener?.emit(WritableEvent.ERROR, error); @@ -900,7 +906,7 @@ class Writable { if (typeof chunk !== 'string' && !(chunk instanceof Uint8Array)) { throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); } - if (this.ending && !this.writing) { + if (this.ending) { setTimeout((): void => { this.erroredInner = new BusinessError('write after end', 10200036); callback?.(this.erroredInner); @@ -932,7 +938,8 @@ class Writable { } private writeUint8Array(chunk: Uint8Array, encoding?: string, callback?: Function): boolean { - this.writableLengthInner! += this.getChunkLength(chunk); + const chunkLength = this.getChunkLength(chunk); + this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fnBack = runOnce((error?: Error): void => { if (error && error instanceof Error) { @@ -941,32 +948,53 @@ class Writable { return; } callback?.(error ?? null); - this.freshCache(); + this.writableLengthInner! -= chunkLength; + this.writing = false; + this.writableCb --; + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据 + // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 + if (!this.writableSync) { + this.freshCacheV(); + } }, (multipleTimes: boolean, err: Error): void => { this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); }); if (this.writableCorkedInner === 0) { if (!this.writing) { this.writing = true; + this.writableCb ++; + this.writableSync = true; this.doWrite(chunk, encoding ?? 'utf8', fnBack); + // 异步回调场景下,writableSync = false 会比回调函数fnBack先执行;同步回调场景下这里会晚于回调函数fnBack + this.writableSync = false; } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + this.writableBufferLength! += chunkLength; } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + this.writableBufferLength! += chunkLength; } return hasRemaining; } private writeString(chunk: string, encoding?: string, callback?: Function): boolean { - this.writableLengthInner! += this.getChunkLength(chunk); + const chunkLength = this.getChunkLength(chunk); + this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fb = runOnce((error?: Error): void => { if (error) { this.erroredInner = error; } callback?.(error ?? null); - this.freshCache(); + this.writableLengthInner! -= chunkLength; + this.writing = false; + this.writableCb --; + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据 + // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 + if (!this.writableSync) { + this.freshCacheV(); + } if (error && error instanceof Error) { this.writableInner = false; this.erroredInner = error; @@ -987,7 +1015,11 @@ class Writable { if (this.writableCorkedInner === 0) { if (!this.writing) { this.writing = true; + this.writableCb ++; + this.writableSync = true; this.doWrite?.(chunk, encoding ?? 'utf8', fb); + // 异步回调场景下,writableSync = false 会比回调函数fnBack先执行;同步回调场景下这里会晚于回调函数fnBack + this.writableSync = false; if (!this.doWrite && !hasRemaining) { Promise.resolve().then(() => { this.writableLengthInner = 0; @@ -996,99 +1028,140 @@ class Writable { } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + this.writableBufferLength! += chunkLength; } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + this.writableBufferLength! += chunkLength; } return this.erroredInner ? false : hasRemaining; } private freshCache(): void { - const current = this.buffer.shift(); - if (current) { + if (this.writableCorkedInner != 0) { + return; + } + let current = this.buffer.shift(); + while (current) { + this.writableBufferLength! -= this.getChunkLength(current.chunk); + this.writing = true; + this.writableCb ++; + this.writableSync = true; this.doWrite?.(current.chunk, current.encoding ?? 'utf8', current.callback); - this.writableLengthInner! -= this.getChunkLength(current.chunk); + this.writableSync = false; + // 同步回调场景,回调中writing已置false;异步回调场景下writing还为true。 + // 同步回调场景,需要循环处理缓冲区数据,异步回调场景通过回调函数请求下一份缓冲区数据,直接退出循环 + if (!this.writing) { + current = this.buffer.shift(); + } else { + break; + } + } + + if (!this.finishMayBe()) { + this.writableNeedDrainInner = false; + this.listener?.emit(WritableEvent.DRAIN); } else { - this.writing = false; - this.writableLengthInner = 0; - if (!this.finishMayBe()) { - this.writableNeedDrainInner = false; - this.listener?.emit(WritableEvent.DRAIN); + if (!this.writableFinishedInner && this.writableCb == 0 && + (!this.erroredInner || this.erroredInner.message === 'write after end')) { + this.writableFinishedInner = true; + this.listener?.emit(WritableEvent.FINISH); + asyncFn((): void => { + if (!this.erroredInner || this.erroredInner.message === 'write after end') { + this.listener?.emit(WritableEvent.CLOSE); + } + })(); } } } private freshCacheV(): void { + if (this.writableCorkedInner != 0) { + return; + } if (this.buffer.length > 0) { if (this.doWritev) { + const bufferChunkLength = this.writableBufferLength!; + this.writableBufferLength = 0; const funCallback = runOnce((error?: Error): void => { if (error && error instanceof Error) { this.erroredInner = error; this.listener?.emit(WritableEvent.ERROR, error); return; } - this.buffer = []; + this.writableLengthInner! -= bufferChunkLength; + this.writing = false; + this.writableCb --; + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据,因为可能有新写入的数据 + // 同步回调场景不会有未输出的缓冲区数据,不需要再次执行freshCacheV() + // 且同步回调场景下buffer置空操作还未进行,不能再次执行freshCacheV() + if (!this.writableSync) { + this.freshCacheV(); + } }, () => { this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); }); + this.writing = true; + this.writableCb ++; + this.writableSync = true; // @ts-ignore this.doWritev(this.buffer.map((item: { encoding?: string; chunk: string | Uint8Array; callback: Function }) => { return item.chunk; }), funCallback); + this.writableSync = false; + this.buffer = []; if (!this.finishMayBe()) { this.writableNeedDrainInner = true; this.listener?.emit(WritableEvent.DRAIN); + } else { + if (!this.writableFinishedInner && this.writableCb == 0 && + (!this.erroredInner || this.erroredInner.message === 'write after end')) { + this.writableFinishedInner = true; + this.listener?.emit(WritableEvent.FINISH); + asyncFn((): void => { + if (!this.erroredInner || this.erroredInner.message === 'write after end') { + this.listener?.emit(WritableEvent.CLOSE); + } + })(); + } } } else { this.freshCache(); } + } else { + if (!this.finishMayBe()) { + this.writableNeedDrainInner = true; + this.listener?.emit(WritableEvent.DRAIN); + } else { + if (!this.writableFinishedInner && this.writableCb == 0 && + (!this.erroredInner || this.erroredInner.message === 'write after end')) { + this.writableFinishedInner = true; + this.listener?.emit(WritableEvent.FINISH); + asyncFn((): void => { + if (!this.erroredInner || this.erroredInner.message === 'write after end') { + this.listener?.emit(WritableEvent.CLOSE); + } + })(); + } + } } } - endInner(chunk?: string | Uint8Array, encoding?: string, callback?: Function): void { - if (chunk) { - if (this.writing) { - this.write(chunk, encoding, callback); - } else { - this.doWrite?.(chunk!, encoding ?? 'utf8', (error?: Error): void => { - if (error && error instanceof Error) { - this.erroredInner = error; - this.listener?.emit(WritableEvent.ERROR, error); - } else { - this.writableLengthInner! -= this.getChunkLength(chunk); - this.writing = false; - this.finishMayBe(); - } - this.writableEndedInner = true; - this.writableInner = false; - asyncFn((): void => { - callback?.(this.erroredInner ?? error ?? null); - })(); - if (!this.writableFinishedInner) { - this.writableFinishedInner = true; - asyncFn((): void => { - if ((!this.erroredInner || this.erroredInner.message === 'write after end') && !this.isOnError()) { - this.listener?.emit(WritableEvent.FINISH); - } - })(); - } - }); - } + endInner(callback?: Function): void { + if (this.writableEndedInner) { + this.erroredInner = new BusinessError('write after end', 10200036); + callback?.(this.erroredInner); } else { - if (this.writableEndedInner) { - this.erroredInner = new BusinessError('write after end', 10200036); - callback?.(this.erroredInner); - } else { - setTimeout(() => callback?.(this.erroredInner)); - } - if (!this.writableFinishedInner && !this.writableEndedInner) { - this.writableFinishedInner = true; - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { - this.listener?.emit(WritableEvent.FINISH); - } - })(); - } + setTimeout(() => callback?.(this.erroredInner)); + } + if (!this.writableFinishedInner && !this.writableEndedInner) { + this.writableFinishedInner = true; + this.listener?.emit(WritableEvent.FINISH); + asyncFn((): void => { + if (!this.erroredInner || this.erroredInner.message === 'write after end') { + this.listener?.emit(WritableEvent.CLOSE); + } + })(); } } @@ -1123,18 +1196,20 @@ class Writable { } this.writableNeedDrainInner = false; this.closedInner = true; - this.ending = true; this.writableInner = false; - if (!this.writableEndedInner) { - if (this.writableCorkedInner === 0) { - this.endInner(chunk, encoding, callback); - } else { - this.writableCorkedInner = 1; - this.uncork(); - } + + if (chunk) { + this.write(chunk, encoding); } + + if (this.writableCorkedInner > 0) { + this.writableCorkedInner = 1; + this.uncork(); + } + + this.ending = true; + this.endInner(callback); this.writableEndedInner = true; - this.listener?.emit(WritableEvent.CLOSE); return this; } -- Gitee From 8d714da56cce0807571c35d13f2cc5cad9244b1f Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Tue, 25 Mar 2025 14:40:34 +0800 Subject: [PATCH 02/14] =?UTF-8?q?add=20endInner=E5=88=A4=E6=96=AD=E6=9D=A1?= =?UTF-8?q?=E4=BB=B6=E5=B0=91=E5=90=8C=E6=AD=A5=E4=B8=80=E4=B8=AA=EF=BC=8C?= =?UTF-8?q?finish=E6=97=A9=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index da5a039b..4f0c8b38 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1154,7 +1154,7 @@ class Writable { } else { setTimeout(() => callback?.(this.erroredInner)); } - if (!this.writableFinishedInner && !this.writableEndedInner) { + if (!this.writableFinishedInner && !this.writableEndedInner && this.writableCb == 0) { this.writableFinishedInner = true; this.listener?.emit(WritableEvent.FINISH); asyncFn((): void => { -- Gitee From fb8d84b8332f247f4f9a61ba46d7d4c2c3dc1d77 Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Wed, 9 Apr 2025 17:17:45 +0800 Subject: [PATCH 03/14] add note Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 4f0c8b38..5210a8ce 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1062,6 +1062,7 @@ class Writable { this.writableNeedDrainInner = false; this.listener?.emit(WritableEvent.DRAIN); } else { + // 补齐不同场景下finish、close事件相关处理 if (!this.writableFinishedInner && this.writableCb == 0 && (!this.erroredInner || this.erroredInner.message === 'write after end')) { this.writableFinishedInner = true; @@ -1114,6 +1115,7 @@ class Writable { this.writableNeedDrainInner = true; this.listener?.emit(WritableEvent.DRAIN); } else { + // 补齐不同场景下finish、close事件相关处理 if (!this.writableFinishedInner && this.writableCb == 0 && (!this.erroredInner || this.erroredInner.message === 'write after end')) { this.writableFinishedInner = true; @@ -1129,6 +1131,7 @@ class Writable { this.freshCache(); } } else { + // 补齐不同场景下drain、finish、close事件相关处理 if (!this.finishMayBe()) { this.writableNeedDrainInner = true; this.listener?.emit(WritableEvent.DRAIN); @@ -1147,6 +1150,7 @@ class Writable { } } + // 简化endInner,end写入部分全部提前到end()中,只保留部分后续处理。 endInner(callback?: Function): void { if (this.writableEndedInner) { this.erroredInner = new BusinessError('write after end', 10200036); @@ -1198,15 +1202,18 @@ class Writable { this.closedInner = true; this.writableInner = false; + // end的写入从endInner提前到这里统一传给write处理 if (chunk) { this.write(chunk, encoding); } + // uncork处理,且处理后依然需要进行endInner等处理 if (this.writableCorkedInner > 0) { this.writableCorkedInner = 1; this.uncork(); } + // 后移到uncork之后,配合write()进行'write after end'检查 this.ending = true; this.endInner(callback); this.writableEndedInner = true; -- Gitee From 884474f7e6970430762bce79c6687a9b81d1bb03 Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Thu, 10 Apr 2025 15:47:48 +0800 Subject: [PATCH 04/14] =?UTF-8?q?next=20stage=20fix=201=E3=80=81drain?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E6=B7=BB=E5=8A=A0needDrain=E5=88=A4=E6=96=AD?= =?UTF-8?q?=202=E3=80=81setEncoding=E5=88=86=E7=A6=BB=E9=BB=98=E8=AE=A4Enc?= =?UTF-8?q?oding=E5=92=8Cwrite=E4=B8=B4=E6=97=B6=E4=BD=BF=E7=94=A8Encoding?= =?UTF-8?q?=203=E3=80=81uncork=E4=B8=8D=E5=86=8D=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E5=8D=95=E7=8B=AC=E4=BD=BF=E7=94=A8=EF=BC=8C=E5=BF=85=E9=A1=BB?= =?UTF-8?q?=E5=92=8Ccork=E6=90=AD=E9=85=8D=EF=BC=8C=E6=B2=A1cork=E8=AE=A1?= =?UTF-8?q?=E6=95=B0=E6=97=B6=E6=97=A0=E6=95=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit drain fix drain fix 2 1、drain再增加cb-num的判断 2、修正finishMaybe前漏删的取反 drain fix 2.5 cb-num换成wsLength add !writing to uncork finish emit add asyncFn when callback sync 加注释 end fix 1、同步回调场景 异步顺次调整 2、end-cb触发时机修改 add writev callback 添加批量写出时的callback处理 fix callback forEach fix callbacks forEach 添加中途回调为空时的处理 log log2 log3 add writeCallbackBuffer to save write callback del log` Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 183 +++++++++++++++++-------- 1 file changed, 127 insertions(+), 56 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 5210a8ce..66a0e6f1 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -727,6 +727,9 @@ class Writable { private closedInner: boolean | undefined; private writableCb: number = 0;// 记录doWrite、doWritev中还未执行的callback回调函数的数量,用于finish事件检查 private writableSync: boolean = true;// 同步回调标志,true表示doWrite、doWritev中的回调采用了同步回调模式 + private defaultEncoding: string | undefined; + private endCallback: Function | undefined; + private writeCallbackBuffer: Function[] = []; /** * The Writable constructor. @@ -760,6 +763,7 @@ class Writable { this.closedInner = false; this.writableCb = 0; this.writableSync = true; + this.defaultEncoding = 'utf8'; this.doInitialize((error: Error): void => { if (error) { this.listener?.emit(WritableEvent.ERROR, error); @@ -898,7 +902,8 @@ class Writable { */ write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { if (encoding) { - this.setDefaultEncoding(encoding); + // 设置临时encoder + this.setEncoding(encoding); } if (chunk === null) { throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); @@ -937,8 +942,15 @@ class Writable { } } + // 用于在write没写入callback时填充writeCallbackBuffer + private doNothing(){} + private writeUint8Array(chunk: Uint8Array, encoding?: string, callback?: Function): boolean { const chunkLength = this.getChunkLength(chunk); + // 检查encoding和当前设置的defaultEncoding是否一致,不一致说明本次写入使用了临时encoder,需要还原成默认值 + if (this.encoding !== this.defaultEncoding) { + this.setEncoding(this.defaultEncoding); + } this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fnBack = runOnce((error?: Error): void => { @@ -956,6 +968,8 @@ class Writable { if (!this.writableSync) { this.freshCacheV(); } + // 处理drain + this.afterWrite(); }, (multipleTimes: boolean, err: Error): void => { this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); }); @@ -969,10 +983,20 @@ class Writable { this.writableSync = false; } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } this.writableBufferLength! += chunkLength; } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } this.writableBufferLength! += chunkLength; } return hasRemaining; @@ -980,6 +1004,10 @@ class Writable { private writeString(chunk: string, encoding?: string, callback?: Function): boolean { const chunkLength = this.getChunkLength(chunk); + // 检查encoding和当前设置的defaultEncoding是否一致,不一致说明本次写入使用了临时encoder,需要还原成默认值 + if (this.encoding !== this.defaultEncoding) { + this.setEncoding(this.defaultEncoding); + } this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fb = runOnce((error?: Error): void => { @@ -1008,6 +1036,8 @@ class Writable { }); return; } + // 处理drain + this.afterWrite(); }, () => { this.emitErrorOnce(ERR_MULTIPLE_CALLBACK, true); }); @@ -1028,10 +1058,20 @@ class Writable { } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } this.writableBufferLength! += chunkLength; } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); + if (typeof callback === 'function') { + this.writeCallbackBuffer.push(callback); + } else { + this.writeCallbackBuffer.push(this.doNothing); + } this.writableBufferLength! += chunkLength; } return this.erroredInner ? false : hasRemaining; @@ -1042,6 +1082,10 @@ class Writable { return; } let current = this.buffer.shift(); + // cb配合current同步取出writeCallbackBuffer里的callback + // 由于单项处理时buffer里存的callback已经包含了write写入的callback + // 这里cb只取callback但不执行 + let cb = this.writeCallbackBuffer.shift(); while (current) { this.writableBufferLength! -= this.getChunkLength(current.chunk); this.writing = true; @@ -1053,26 +1097,15 @@ class Writable { // 同步回调场景,需要循环处理缓冲区数据,异步回调场景通过回调函数请求下一份缓冲区数据,直接退出循环 if (!this.writing) { current = this.buffer.shift(); + cb = this.writeCallbackBuffer.shift(); } else { break; } } - if (!this.finishMayBe()) { - this.writableNeedDrainInner = false; - this.listener?.emit(WritableEvent.DRAIN); - } else { + if (this.finishMayBe()) { // 补齐不同场景下finish、close事件相关处理 - if (!this.writableFinishedInner && this.writableCb == 0 && - (!this.erroredInner || this.erroredInner.message === 'write after end')) { - this.writableFinishedInner = true; - this.listener?.emit(WritableEvent.FINISH); - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { - this.listener?.emit(WritableEvent.CLOSE); - } - })(); - } + this.finishWrite(); } } @@ -1084,6 +1117,7 @@ class Writable { if (this.doWritev) { const bufferChunkLength = this.writableBufferLength!; this.writableBufferLength = 0; + let writeCallbacks: Function[] = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 const funCallback = runOnce((error?: Error): void => { if (error && error instanceof Error) { this.erroredInner = error; @@ -1093,74 +1127,75 @@ class Writable { this.writableLengthInner! -= bufferChunkLength; this.writing = false; this.writableCb --; + writeCallbacks.forEach(callback => { + if (typeof callback === 'function') { + callback(); + } else { + } + }); + writeCallbacks = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 + // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据,因为可能有新写入的数据 // 同步回调场景不会有未输出的缓冲区数据,不需要再次执行freshCacheV() // 且同步回调场景下buffer置空操作还未进行,不能再次执行freshCacheV() if (!this.writableSync) { this.freshCacheV(); } + // 处理drain + this.afterWrite(); }, () => { this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); }); this.writing = true; this.writableCb ++; this.writableSync = true; + this.writeCallbackBuffer.forEach((callback: Function ) => { + writeCallbacks.push(callback); + }); // @ts-ignore this.doWritev(this.buffer.map((item: { encoding?: string; chunk: string | Uint8Array; callback: Function }) => { return item.chunk; }), funCallback); this.writableSync = false; this.buffer = []; - if (!this.finishMayBe()) { - this.writableNeedDrainInner = true; - this.listener?.emit(WritableEvent.DRAIN); - } else { + this.writeCallbackBuffer = []; + if (this.finishMayBe()) { // 补齐不同场景下finish、close事件相关处理 - if (!this.writableFinishedInner && this.writableCb == 0 && - (!this.erroredInner || this.erroredInner.message === 'write after end')) { - this.writableFinishedInner = true; - this.listener?.emit(WritableEvent.FINISH); - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { - this.listener?.emit(WritableEvent.CLOSE); - } - })(); - } + this.finishWrite(); } } else { this.freshCache(); } } else { // 补齐不同场景下drain、finish、close事件相关处理 - if (!this.finishMayBe()) { - this.writableNeedDrainInner = true; - this.listener?.emit(WritableEvent.DRAIN); - } else { - if (!this.writableFinishedInner && this.writableCb == 0 && - (!this.erroredInner || this.erroredInner.message === 'write after end')) { - this.writableFinishedInner = true; - this.listener?.emit(WritableEvent.FINISH); - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { - this.listener?.emit(WritableEvent.CLOSE); - } - })(); - } + if (this.finishMayBe()) { + this.finishWrite(); } } } - // 简化endInner,end写入部分全部提前到end()中,只保留部分后续处理。 - endInner(callback?: Function): void { - if (this.writableEndedInner) { - this.erroredInner = new BusinessError('write after end', 10200036); - callback?.(this.erroredInner); - } else { - setTimeout(() => callback?.(this.erroredInner)); + private afterWrite() { + if (!this.finishMayBe() && this.writableNeedDrainInner && this.writableLengthInner == 0) { + this.writableNeedDrainInner = false; + this.listener?.emit(WritableEvent.DRAIN); } - if (!this.writableFinishedInner && !this.writableEndedInner && this.writableCb == 0) { - this.writableFinishedInner = true; - this.listener?.emit(WritableEvent.FINISH); + } + + // 提取finish步骤,参考node对同步回调场景添加finish事件异步 + private finishWrite() { + if (!this.writableFinishedInner && this.writableCb == 0 && + (!this.erroredInner || this.erroredInner.message === 'write after end')) { + if (this.writableSync) { + asyncFn((): void => { + this.writableFinishedInner = true; + this.endCallback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); + })(); + } else { + this.writableFinishedInner = true; + this.endCallback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); + } asyncFn((): void => { if (!this.erroredInner || this.erroredInner.message === 'write after end') { this.listener?.emit(WritableEvent.CLOSE); @@ -1168,6 +1203,26 @@ class Writable { })(); } } + + // 简化endInner,end写入部分全部提前到end()中,只保留部分后续处理。------------话说第一段if是不是重复多余了 + endInner(callback?: Function): void { + if (this.writableEndedInner) { + this.erroredInner = new BusinessError('write after end', 10200036); + callback?.(this.erroredInner); + } else { + if (!this.writableFinishedInner && !this.writableEndedInner && this.writableCb == 0) { + // 目前仅同步回调场景会进这段处理,finish和close直接一起异步处理即可 + asyncFn((): void => { + if (!this.erroredInner || this.erroredInner.message === 'write after end') { + this.writableFinishedInner = true; + callback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); + this.listener?.emit(WritableEvent.CLOSE); + } + })(); + } + } + } /** * Write the last chunk to Writable. @@ -1214,6 +1269,7 @@ class Writable { } // 后移到uncork之后,配合write()进行'write after end'检查 + this.endCallback = callback; this.ending = true; this.endInner(callback); this.writableEndedInner = true; @@ -1235,7 +1291,17 @@ class Writable { * @since 12 */ + // setEncoding改为两段,用于兼容默认encoding和临时encoding。 + // 原流程改名为setEncoding用来设置encoder,外置一层用于记录defaultEncoding,用于write临时设置encoder后进行还原 setDefaultEncoding(encoding?: string): boolean { + const ret = this.setEncoding(encoding); + if (ret) { + this.defaultEncoding = encoding; + } + return ret; + } + + setEncoding(encoding?: string): boolean { if (!encoding) { return false; } @@ -1260,6 +1326,8 @@ class Writable { } } + + /** * After the call, all Write operations will be forced to write to the buffer instead of being flushed. * @@ -1284,10 +1352,13 @@ class Writable { uncork(): boolean { if (this.writableCorkedInner > 0) { this.writableCorkedInner -= 1; + + // uncork不再能单独生效,必须搭配cork使用;对标node添加writing限制 + if (this.writableCorkedInner === 0 && !this.writing) { + this.freshCacheV(); + } } - if (this.writableCorkedInner === 0) { - this.freshCacheV(); - } + return true; } -- Gitee From ffae96406077cfd5f8b4c3b58ef9f9e904847d9d Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Tue, 6 May 2025 20:41:29 +0800 Subject: [PATCH 05/14] =?UTF-8?q?=E6=95=B4=E7=90=86=E7=A9=BA=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 3 --- 1 file changed, 3 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 66a0e6f1..31664671 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1130,7 +1130,6 @@ class Writable { writeCallbacks.forEach(callback => { if (typeof callback === 'function') { callback(); - } else { } }); writeCallbacks = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 @@ -1326,8 +1325,6 @@ class Writable { } } - - /** * After the call, all Write operations will be forced to write to the buffer instead of being flushed. * -- Gitee From 917060b65d93ec2db933c79986cca3342b86f3d9 Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Wed, 7 May 2025 16:10:50 +0800 Subject: [PATCH 06/14] ascii-utf8 Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 31664671..4c4c9fcb 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1310,7 +1310,10 @@ class Writable { if (ENCODING_SET.indexOf(encoding.toLowerCase()) !== -1) { this.encoding = encoding.toLowerCase(); try { - if (encoding.toLowerCase() !== 'ascii') { + if (encoding.toLowerCase() === 'ascii') { + // TextEncoder不支持ascii,用utf-8替代 + this.encoder = new TextEncoder('utf-8'); + } else { this.encoder = new TextEncoder(encoding); } } catch (e) { -- Gitee From 4c45c1998c986c0e3d4eb2ecf6bdca8908481b2c Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Wed, 7 May 2025 17:48:42 +0800 Subject: [PATCH 07/14] fix ascii Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 4c4c9fcb..455c5d13 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1310,11 +1310,11 @@ class Writable { if (ENCODING_SET.indexOf(encoding.toLowerCase()) !== -1) { this.encoding = encoding.toLowerCase(); try { - if (encoding.toLowerCase() === 'ascii') { + if (encoding.toLowerCase() !== 'ascii') { + this.encoder = new TextEncoder(encoding); + } else { // TextEncoder不支持ascii,用utf-8替代 this.encoder = new TextEncoder('utf-8'); - } else { - this.encoder = new TextEncoder(encoding); } } catch (e) { this.throwError(e as Error); -- Gitee From 16f616791231f0e0c888bb975ee5866992b7c6ff Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Tue, 13 May 2025 09:36:16 +0800 Subject: [PATCH 08/14] del 'let cb =' Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 455c5d13..8a02f9e6 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1082,10 +1082,9 @@ class Writable { return; } let current = this.buffer.shift(); - // cb配合current同步取出writeCallbackBuffer里的callback - // 由于单项处理时buffer里存的callback已经包含了write写入的callback - // 这里cb只取callback但不执行 - let cb = this.writeCallbackBuffer.shift(); + // 配合current同步取出writeCallbackBuffer里的callback。 + // 由于单项处理时buffer里存的callback已经包含了write写入的callback,这里只取callback但不执行。 + this.writeCallbackBuffer.shift(); while (current) { this.writableBufferLength! -= this.getChunkLength(current.chunk); this.writing = true; @@ -1094,10 +1093,10 @@ class Writable { this.doWrite?.(current.chunk, current.encoding ?? 'utf8', current.callback); this.writableSync = false; // 同步回调场景,回调中writing已置false;异步回调场景下writing还为true。 - // 同步回调场景,需要循环处理缓冲区数据,异步回调场景通过回调函数请求下一份缓冲区数据,直接退出循环 + // 同步回调场景,需要循环处理缓冲区数据,异步回调场景通过回调函数请求下一份缓冲区数据,直接退出循环。 if (!this.writing) { current = this.buffer.shift(); - cb = this.writeCallbackBuffer.shift(); + this.writeCallbackBuffer.shift(); } else { break; } -- Gitee From 27235e57238569d85b9e8118586586dd97d5b0c6 Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Tue, 13 May 2025 11:14:16 +0800 Subject: [PATCH 09/14] =?UTF-8?q?del=20=E5=A4=9A=E4=BD=99=E7=9A=84drain?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 6 ------ 1 file changed, 6 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 8a02f9e6..a0170133 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1050,12 +1050,6 @@ class Writable { this.doWrite?.(chunk, encoding ?? 'utf8', fb); // 异步回调场景下,writableSync = false 会比回调函数fnBack先执行;同步回调场景下这里会晚于回调函数fnBack this.writableSync = false; - if (!this.doWrite && !hasRemaining) { - Promise.resolve().then(() => { - this.writableLengthInner = 0; - this.listener?.emit(WritableEvent.DRAIN); - }); - } } else { this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); if (typeof callback === 'function') { -- Gitee From 9688ef73165b7b004d9587b42064479add26c135 Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Thu, 29 May 2025 21:59:41 +0800 Subject: [PATCH 10/14] =?UTF-8?q?=E8=BF=9B=E4=B8=80=E6=AD=A5=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E4=B8=AD=EF=BC=9A=201=E3=80=81Promise.resolve().then(?= =?UTF-8?q?)=E5=85=A8=E9=9D=A2=E6=9B=BF=E6=8D=A2setTimeout=202=E3=80=81?= =?UTF-8?q?=E6=8A=A5=E9=94=99=E4=BD=93=E7=B3=BB=E7=BB=9F=E4=B8=80=E5=8C=96?= =?UTF-8?q?=E6=95=B4=E6=94=B9=EF=BC=8C=E4=B8=BB=E8=A6=81=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E6=89=BF=E8=BD=BD=E5=9C=A8throwError=E5=92=8CerrorFresh?= =?UTF-8?q?=E3=80=82=20=E4=BC=98=E5=85=88=E4=BF=9D=E7=95=99=E9=A6=96?= =?UTF-8?q?=E6=AC=A1=E6=8A=A5=E9=94=99=E4=BF=A1=E6=81=AF=EF=BC=9B=E6=97=A0?= =?UTF-8?q?on'error'=E6=97=B6throw=20err=EF=BC=9B=E6=8A=A5=E9=94=99?= =?UTF-8?q?=E6=97=B6Callback=E3=80=81=E7=BC=93=E5=86=B2=E5=8C=BA=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=8F=8A=E7=9B=B8=E5=85=B3=E5=B1=9E=E6=80=A7=E5=A4=84?= =?UTF-8?q?=E7=90=86=203=E3=80=81error=E4=BA=8B=E4=BB=B6=E5=90=8E=E8=B7=9F?= =?UTF-8?q?=E9=9A=8Fclose=E4=BA=8B=E4=BB=B6=204=E3=80=81=E5=88=A0=E6=8E=89?= =?UTF-8?q?=E9=83=A8=E5=88=86=E5=A4=9A=E4=BD=99=E4=BB=A3=E7=A0=81=205?= =?UTF-8?q?=E3=80=81=E5=85=B6=E4=BB=96=E5=8F=AF=E8=83=BD=E4=B8=80=E6=97=B6?= =?UTF-8?q?=E6=B2=A1=E6=83=B3=E8=B5=B7=E6=9D=A5=E7=9A=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 222 ++++++++++++++----------- 1 file changed, 129 insertions(+), 93 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index a0170133..f37ccb66 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -911,15 +911,28 @@ class Writable { if (typeof chunk !== 'string' && !(chunk instanceof Uint8Array)) { throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); } - if (this.ending) { - setTimeout((): void => { - this.erroredInner = new BusinessError('write after end', 10200036); + if (this.ending && !this.erroredInner) { + this.erroredInner = new BusinessError('write after end', 10200036); + this.writableInner = false; + this.errorFresh(); + // 统一改用微任务Promise + Promise.resolve().then((): void => { callback?.(this.erroredInner); - this.throwError(this.erroredInner); + if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { + this.listener?.emit(WritableEvent.ERROR, this.erroredInner); + if (!this.closedInner) { + this.closedInner = true; + this.listener?.emit(WritableEvent.CLOSE); + } + } else { + throw this.erroredInner; + } }); return false; } if (this.erroredInner) { + this.errorFresh(); + callback?.(this.erroredInner); return false; } let flag = false; @@ -954,15 +967,21 @@ class Writable { this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fnBack = runOnce((error?: Error): void => { - if (error && error instanceof Error) { - this.writableInner = false; + callback?.(error ?? null); + this.writing = false; + this.writableCb --; + + if (error && error instanceof Error && !this.erroredInner) { this.throwError(error); + this.errorFresh(); return; } - callback?.(error ?? null); + if (this.erroredInner) { + this.errorFresh(); + return; + } + this.writableLengthInner! -= chunkLength; - this.writing = false; - this.writableCb --; // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据 // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 if (!this.writableSync) { @@ -971,7 +990,10 @@ class Writable { // 处理drain this.afterWrite(); }, (multipleTimes: boolean, err: Error): void => { - this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); + if (!this.erroredInner) { + this.throwError(ERR_MULTIPLE_CALLBACK); + } + this.errorFresh(); }); if (this.writableCorkedInner === 0) { if (!this.writing) { @@ -1011,35 +1033,31 @@ class Writable { this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fb = runOnce((error?: Error): void => { - if (error) { - this.erroredInner = error; - } callback?.(error ?? null); - this.writableLengthInner! -= chunkLength; this.writing = false; this.writableCb --; + if (error && error instanceof Error && !this.erroredInner) { + this.throwError(error); + this.errorFresh(); + return; + } + if (this.erroredInner) { + this.errorFresh(); + return; + } + this.writableLengthInner! -= chunkLength; // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据 // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 if (!this.writableSync) { this.freshCacheV(); } - if (error && error instanceof Error) { - this.writableInner = false; - this.erroredInner = error; - Promise.resolve().then((): void => { - if (this.isOnError()) { - this.emitErrorOnce(error); - } else { - this.emitErrorOnce(error); - throw error; - } - }); - return; - } // 处理drain this.afterWrite(); }, () => { - this.emitErrorOnce(ERR_MULTIPLE_CALLBACK, true); + if (!this.erroredInner) { + this.throwError(ERR_MULTIPLE_CALLBACK); + } + this.errorFresh(); }); if (this.writableCorkedInner === 0) { @@ -1071,13 +1089,28 @@ class Writable { return this.erroredInner ? false : hasRemaining; } + private errorFresh(): void { + this.writeCallbackBuffer.forEach(callback => { + if (typeof callback === 'function') { + callback(this.erroredInner); + } + }); + this.endCallback?.(this.erroredInner); // end-cb不在writeCallbackBuffer里 + this.endCallback = undefined; + this.buffer = []; + this.writeCallbackBuffer = []; + this.writableBufferLength = 0; + this.writableLengthInner = 0; + } + private freshCache(): void { if (this.writableCorkedInner != 0) { return; } let current = this.buffer.shift(); - // 配合current同步取出writeCallbackBuffer里的callback。 - // 由于单项处理时buffer里存的callback已经包含了write写入的callback,这里只取callback但不执行。 + // cb配合current同步取出writeCallbackBuffer里的callback + // 由于单项处理时buffer里存的callback已经包含了write写入的callback + // 这里cb只取callback但不执行 this.writeCallbackBuffer.shift(); while (current) { this.writableBufferLength! -= this.getChunkLength(current.chunk); @@ -1112,14 +1145,28 @@ class Writable { this.writableBufferLength = 0; let writeCallbacks: Function[] = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 const funCallback = runOnce((error?: Error): void => { - if (error && error instanceof Error) { - this.erroredInner = error; - this.listener?.emit(WritableEvent.ERROR, error); + if (error && error instanceof Error && !this.erroredInner) { + // 异步回调场景部分writeCallbackBuffer已重置过,部分Callback需要通过writeCallbacks执行 + // 同步回调场景writeCallbackBuffer还没清,统一通过errorFresh处理 + if (!this.writableSync) { + writeCallbacks.forEach(callback => { + if (typeof callback === 'function') { + callback(error ?? null); + } + }); + } + writeCallbacks = []; + this.throwError(error); + this.errorFresh(); return; } - this.writableLengthInner! -= bufferChunkLength; this.writing = false; this.writableCb --; + if (this.erroredInner) { + this.errorFresh(); + return; + } + this.writableLengthInner! -= bufferChunkLength; writeCallbacks.forEach(callback => { if (typeof callback === 'function') { callback(); @@ -1136,7 +1183,10 @@ class Writable { // 处理drain this.afterWrite(); }, () => { - this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); + if (!this.erroredInner) { + this.throwError(ERR_MULTIPLE_CALLBACK); + } + this.errorFresh(); }); this.writing = true; this.writableCb ++; @@ -1175,44 +1225,40 @@ class Writable { // 提取finish步骤,参考node对同步回调场景添加finish事件异步 private finishWrite() { - if (!this.writableFinishedInner && this.writableCb == 0 && - (!this.erroredInner || this.erroredInner.message === 'write after end')) { + if (!this.writableFinishedInner && this.writableCb == 0 && !this.erroredInner) { if (this.writableSync) { - asyncFn((): void => { + Promise.resolve().then((): void => { this.writableFinishedInner = true; this.endCallback?.(this.erroredInner); this.listener?.emit(WritableEvent.FINISH); - })(); + }); } else { this.writableFinishedInner = true; this.endCallback?.(this.erroredInner); this.listener?.emit(WritableEvent.FINISH); } - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { + Promise.resolve().then((): void => { + if (!this.erroredInner) { + this.closedInner = true; this.listener?.emit(WritableEvent.CLOSE); } - })(); + }); } } // 简化endInner,end写入部分全部提前到end()中,只保留部分后续处理。------------话说第一段if是不是重复多余了 endInner(callback?: Function): void { - if (this.writableEndedInner) { - this.erroredInner = new BusinessError('write after end', 10200036); - callback?.(this.erroredInner); - } else { - if (!this.writableFinishedInner && !this.writableEndedInner && this.writableCb == 0) { - // 目前仅同步回调场景会进这段处理,finish和close直接一起异步处理即可 - asyncFn((): void => { - if (!this.erroredInner || this.erroredInner.message === 'write after end') { - this.writableFinishedInner = true; - callback?.(this.erroredInner); - this.listener?.emit(WritableEvent.FINISH); - this.listener?.emit(WritableEvent.CLOSE); - } - })(); - } + if (!this.writableFinishedInner && !this.writableEndedInner && this.writableCb == 0) { + // 目前仅同步回调场景会进这段处理,finish和close直接一起异步处理即可 + Promise.resolve().then((): void => { + if (!this.erroredInner) { + this.writableFinishedInner = true; + callback?.(this.erroredInner); + this.listener?.emit(WritableEvent.FINISH); + this.closedInner = true; + this.listener?.emit(WritableEvent.CLOSE); + } + }); } } @@ -1230,23 +1276,20 @@ class Writable { * @since 12 */ end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { + if (this.erroredInner) { + Promise.resolve().then((): void => {callback?.(this.erroredInner)}); + return this; + } if (this.writableFinishedInner) { - this.erroredInner = ERR_STREAM_ALREADY_FINISHED; - setTimeout(() => callback?.(this.erroredInner)); - this.emitErrorOnce(this.erroredInner); + Promise.resolve().then((): void => {callback?.(this.erroredInner)}); + this.throwError(ERR_STREAM_ALREADY_FINISHED); return this; } else if (this.writableEndedInner) { - this.erroredInner = ERR_WRITE_AFTER_END; - setTimeout(() => callback?.(this.erroredInner)); - this.emitErrorOnce(this.erroredInner); - return this; - } - if (this.erroredInner) { - setTimeout(() => callback?.(this.erroredInner)); + Promise.resolve().then((): void => {callback?.(this.erroredInner)}); + this.throwError(ERR_WRITE_AFTER_END); return this; } this.writableNeedDrainInner = false; - this.closedInner = true; this.writableInner = false; // end的写入从endInner提前到这里统一传给write处理 @@ -1311,12 +1354,14 @@ class Writable { } } catch (e) { this.throwError(e as Error); + this.errorFresh(); return false; } return true; } else { const err: BusinessError = new BusinessError(`Unknown encoding: ${encoding}`); this.throwError(err); + this.errorFresh(); return false; } } @@ -1385,6 +1430,7 @@ class Writable { off(event: string, callback?: Function): void { if (!event) { this.throwError(new BusinessError(`Parameter error. The value of event is null `, 401)); + this.errorFresh(); return; } if (callback) { @@ -1398,9 +1444,11 @@ class Writable { } } + // 没写doWrite的情况下切到doWritev noWriteOpes(chunk: string | Uint8Array, encoding: string, callback: Function): void { if (this.doWritev === null) { this.throwError(ERR_DOWRITE_NOT_IMPLEMENTED); + this.errorFresh(); } else { // @ts-ignore this.doWritev([chunk], callback); @@ -1452,36 +1500,24 @@ class Writable { throw ERR_DOWRITEV_NOT_IMPLEMENTED; } + // 原则上要做到产生error必须抛错并记录this.erroredInner,有this.erroredInner就搁置后续error全力收尾, throwError(error: Error): void { + this.writableInner = false; this.erroredInner = error; + // throw还是得扔Promise外面,抛错时机也更合理 if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { - setTimeout(() => { - this.listener?.emit(WritableEvent.ERROR, error); + Promise.resolve().then((): void => { + this.listener?.emit(WritableEvent.ERROR, this.erroredInner); + if (!this.closedInner) { + this.closedInner = true; + // Promise.resolve().then((): void => { + this.listener?.emit(WritableEvent.CLOSE); + // }); + } }); } else { - throw error; - } - } - - private isOnError(): boolean { - return this.listener?.isOn(WritableEvent.ERROR) || false; - } - - private emitErrorExecutedInner = false; - // @ts-ignore - private emitErrorIdInner: number; - - private emitErrorOnce(error: Error, reset?: boolean): void { - if (reset) { - this.emitErrorExecutedInner = false; - clearTimeout(this.emitErrorIdInner); - } - if (!this.emitErrorExecutedInner) { - this.emitErrorExecutedInner = true; - // @ts-ignore - this.emitErrorIdInner = setTimeout((): void => { - this.listener?.emit(WritableEvent.ERROR, this.erroredInner ?? error); - }); + throw this.erroredInner; + // throw放Promise.resolve().then里会被拒绝并忽略,导致不抛错,但是放外面会导致后置的on'error'来不及捕获error,遇错直接中断 } } } -- Gitee From 45212c13ea2659f5fc574ed591404f2cfbe89fbc Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Fri, 30 May 2025 11:22:39 +0800 Subject: [PATCH 11/14] =?UTF-8?q?errorFresh=E7=9A=84writeCallbacks?= =?UTF-8?q?=E5=A4=84=E7=90=86=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 33 +++++++++++++++----------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index f37ccb66..2f674f90 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -730,6 +730,8 @@ class Writable { private defaultEncoding: string | undefined; private endCallback: Function | undefined; private writeCallbackBuffer: Function[] = []; + private writeCallbacks: Function[] = []; + private writaCallbackMove: boolean = false; /** * The Writable constructor. @@ -1090,6 +1092,15 @@ class Writable { } private errorFresh(): void { + // writeCallbackBuffer完成批量转移到writeCallbacks的情况下先清writeCallbacks + // 如果没有完成转移(writeCallbackBuffer在转移后还没重置),则跳过,统一在writeCallbackBuffer处理 + if (this.writaCallbackMove) { + this.writeCallbacks.forEach(callback => { + if (typeof callback === 'function') { + callback(this.erroredInner ?? null); + } + }); + } this.writeCallbackBuffer.forEach(callback => { if (typeof callback === 'function') { callback(this.erroredInner); @@ -1098,6 +1109,7 @@ class Writable { this.endCallback?.(this.erroredInner); // end-cb不在writeCallbackBuffer里 this.endCallback = undefined; this.buffer = []; + this.writeCallbacks = []; this.writeCallbackBuffer = []; this.writableBufferLength = 0; this.writableLengthInner = 0; @@ -1143,19 +1155,10 @@ class Writable { if (this.doWritev) { const bufferChunkLength = this.writableBufferLength!; this.writableBufferLength = 0; - let writeCallbacks: Function[] = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 + // uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 + this.writeCallbacks = []; const funCallback = runOnce((error?: Error): void => { if (error && error instanceof Error && !this.erroredInner) { - // 异步回调场景部分writeCallbackBuffer已重置过,部分Callback需要通过writeCallbacks执行 - // 同步回调场景writeCallbackBuffer还没清,统一通过errorFresh处理 - if (!this.writableSync) { - writeCallbacks.forEach(callback => { - if (typeof callback === 'function') { - callback(error ?? null); - } - }); - } - writeCallbacks = []; this.throwError(error); this.errorFresh(); return; @@ -1167,12 +1170,12 @@ class Writable { return; } this.writableLengthInner! -= bufferChunkLength; - writeCallbacks.forEach(callback => { + this.writeCallbacks.forEach(callback => { if (typeof callback === 'function') { callback(); } }); - writeCallbacks = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 + this.writeCallbacks = [];// uncork开了writing限制,前一批彻底处理完前都老实在缓冲区里呆着,备份出的cb不会被干扰 // 仅异步回调场景通过freshCacheV()请求缓冲区后续数据,因为可能有新写入的数据 // 同步回调场景不会有未输出的缓冲区数据,不需要再次执行freshCacheV() @@ -1192,8 +1195,9 @@ class Writable { this.writableCb ++; this.writableSync = true; this.writeCallbackBuffer.forEach((callback: Function ) => { - writeCallbacks.push(callback); + this.writeCallbacks.push(callback); }); + this.writaCallbackMove = false; // @ts-ignore this.doWritev(this.buffer.map((item: { encoding?: string; chunk: string | Uint8Array; callback: Function }) => { return item.chunk; @@ -1201,6 +1205,7 @@ class Writable { this.writableSync = false; this.buffer = []; this.writeCallbackBuffer = []; + this.writaCallbackMove = true; if (this.finishMayBe()) { // 补齐不同场景下finish、close事件相关处理 this.finishWrite(); -- Gitee From cc9d4e2fc9b9673c2e308b243d790dc2a1f9c4ab Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Fri, 30 May 2025 17:15:34 +0800 Subject: [PATCH 12/14] =?UTF-8?q?runOnce=20callback=E6=94=B9=E5=90=8C?= =?UTF-8?q?=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 2f674f90..45a2d4ea 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -96,10 +96,8 @@ function runOnce(runFn: Function, callback?: (multipleTimes: boolean, error: Err return runFn(...args); } else { if (callback) { - Promise.resolve().then(():void => { - // @ts-ignore - callback(); - }); + // @ts-ignore + callback(); } } }; -- Gitee From 96ddbf5de95f5f493b4dc726ada9edc1b2d86b6d Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Wed, 4 Jun 2025 09:48:42 +0800 Subject: [PATCH 13/14] =?UTF-8?q?Uint8Array=20write=20=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E5=80=BC=E8=A1=A5=E5=85=85error=E5=9C=BA=E6=99=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index 45a2d4ea..a26c7358 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -1021,7 +1021,7 @@ class Writable { } this.writableBufferLength! += chunkLength; } - return hasRemaining; + return this.erroredInner ? false : hasRemaining; } private writeString(chunk: string, encoding?: string, callback?: Function): boolean { -- Gitee From 5600450249403a719d2a61ba12d63d58e0986764 Mon Sep 17 00:00:00 2001 From: RainbowseaExplore Date: Thu, 19 Jun 2025 20:52:45 +0800 Subject: [PATCH 14/14] sync drain fix Signed-off-by: RainbowseaExplore --- js_util_module/stream/src/stream_js.ts | 29 +++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/js_util_module/stream/src/stream_js.ts b/js_util_module/stream/src/stream_js.ts index a26c7358..a8bba48c 100644 --- a/js_util_module/stream/src/stream_js.ts +++ b/js_util_module/stream/src/stream_js.ts @@ -986,9 +986,14 @@ class Writable { // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 if (!this.writableSync) { this.freshCacheV(); + // 处理drain + this.afterWrite(); + } else { + // 同步回调场景下异步处理drain + Promise.resolve().then((): void => { + this.afterWrite(); + }); } - // 处理drain - this.afterWrite(); }, (multipleTimes: boolean, err: Error): void => { if (!this.erroredInner) { this.throwError(ERR_MULTIPLE_CALLBACK); @@ -1033,7 +1038,7 @@ class Writable { this.writableLengthInner! += chunkLength; const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; const fb = runOnce((error?: Error): void => { - callback?.(error ?? null); + callback?.(error ?? null); // 异步回调情况下,首次write的cb需不需要感知主任务同步流程产生的error,替换掉error,如:error ?? this.erroredInner ?? null this.writing = false; this.writableCb --; if (error && error instanceof Error && !this.erroredInner) { @@ -1050,9 +1055,14 @@ class Writable { // 同步回调场景通过freshCache中的循环获取缓冲区后续数据 if (!this.writableSync) { this.freshCacheV(); + // 处理drain + this.afterWrite(); + } else { + // 同步回调场景下异步处理drain + Promise.resolve().then((): void => { + this.afterWrite(); + }); } - // 处理drain - this.afterWrite(); }, () => { if (!this.erroredInner) { this.throwError(ERR_MULTIPLE_CALLBACK); @@ -1180,9 +1190,14 @@ class Writable { // 且同步回调场景下buffer置空操作还未进行,不能再次执行freshCacheV() if (!this.writableSync) { this.freshCacheV(); + // 处理drain + this.afterWrite(); + } else { + // 同步回调场景下异步处理drain + Promise.resolve().then((): void => { + this.afterWrite(); + }); } - // 处理drain - this.afterWrite(); }, () => { if (!this.erroredInner) { this.throwError(ERR_MULTIPLE_CALLBACK); -- Gitee