Ai
1 Star 0 Fork 1

rustlab/7-Zip-zstd

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
克隆/下载
MtDec.c 28.25 KB
一键复制 编辑 原始数据 按行查看 历史
Tino Reichardt 提交于 2023-01-15 19:42 +08:00 . Add other methods to compression dialog
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139
/* MtDec.c -- Multi-thread Decoder
2021-12-21 : Igor Pavlov : Public domain */
#include "Precomp.h"
// #define SHOW_DEBUG_INFO
// #include <stdio.h>
#include <string.h>
#ifdef SHOW_DEBUG_INFO
#include <stdio.h>
#endif
#include "MtDec.h"
#ifndef _7ZIP_ST
#ifdef SHOW_DEBUG_INFO
#define PRF(x) x
#else
#define PRF(x)
#endif
#define PRF_STR_INT(s, d) PRF(printf("\n" s " %d\n", (unsigned)d))
void MtProgress_Init(CMtProgress *p, ICompressProgress *progress)
{
p->progress = progress;
p->res = SZ_OK;
p->totalInSize = 0;
p->totalOutSize = 0;
}
SRes MtProgress_Progress_ST(CMtProgress *p)
{
if (p->res == SZ_OK && p->progress)
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
p->res = SZ_ERROR_PROGRESS;
return p->res;
}
SRes MtProgress_ProgressAdd(CMtProgress *p, UInt64 inSize, UInt64 outSize)
{
SRes res;
CriticalSection_Enter(&p->cs);
p->totalInSize += inSize;
p->totalOutSize += outSize;
if (p->res == SZ_OK && p->progress)
if (ICompressProgress_Progress(p->progress, p->totalInSize, p->totalOutSize) != SZ_OK)
p->res = SZ_ERROR_PROGRESS;
res = p->res;
CriticalSection_Leave(&p->cs);
return res;
}
SRes MtProgress_GetError(CMtProgress *p)
{
SRes res;
CriticalSection_Enter(&p->cs);
res = p->res;
CriticalSection_Leave(&p->cs);
return res;
}
void MtProgress_SetError(CMtProgress *p, SRes res)
{
CriticalSection_Enter(&p->cs);
if (p->res == SZ_OK)
p->res = res;
CriticalSection_Leave(&p->cs);
}
#define RINOK_THREAD(x) RINOK_WRes(x)
static WRes ArEvent_OptCreate_And_Reset(CEvent *p)
{
if (Event_IsCreated(p))
return Event_Reset(p);
return AutoResetEvent_CreateNotSignaled(p);
}
struct __CMtDecBufLink
{
struct __CMtDecBufLink *next;
void *pad[3];
};
typedef struct __CMtDecBufLink CMtDecBufLink;
#define MTDEC__LINK_DATA_OFFSET sizeof(CMtDecBufLink)
#define MTDEC__DATA_PTR_FROM_LINK(link) ((Byte *)(link) + MTDEC__LINK_DATA_OFFSET)
static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp);
static WRes MtDecThread_CreateEvents(CMtDecThread *t)
{
WRes wres = ArEvent_OptCreate_And_Reset(&t->canWrite);
if (wres == 0)
{
wres = ArEvent_OptCreate_And_Reset(&t->canRead);
if (wres == 0)
return SZ_OK;
}
return wres;
}
static SRes MtDecThread_CreateAndStart(CMtDecThread *t)
{
WRes wres = MtDecThread_CreateEvents(t);
// wres = 17; // for test
if (wres == 0)
{
if (Thread_WasCreated(&t->thread))
return SZ_OK;
wres = Thread_Create(&t->thread, ThreadFunc, t);
if (wres == 0)
return SZ_OK;
}
return MY_SRes_HRESULT_FROM_WRes(wres);
}
void MtDecThread_FreeInBufs(CMtDecThread *t)
{
if (t->inBuf)
{
void *link = t->inBuf;
t->inBuf = NULL;
do
{
void *next = ((CMtDecBufLink *)link)->next;
ISzAlloc_Free(t->mtDec->alloc, link);
link = next;
}
while (link);
}
}
static void MtDecThread_CloseThread(CMtDecThread *t)
{
if (Thread_WasCreated(&t->thread))
{
Event_Set(&t->canWrite); /* we can disable it. There are no threads waiting canWrite in normal cases */
Event_Set(&t->canRead);
Thread_Wait_Close(&t->thread);
}
Event_Close(&t->canRead);
Event_Close(&t->canWrite);
}
static void MtDec_CloseThreads(CMtDec *p)
{
unsigned i;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
MtDecThread_CloseThread(&p->threads[i]);
}
static void MtDecThread_Destruct(CMtDecThread *t)
{
MtDecThread_CloseThread(t);
MtDecThread_FreeInBufs(t);
}
static SRes FullRead(ISeqInStream *stream, Byte *data, size_t *processedSize)
{
size_t size = *processedSize;
*processedSize = 0;
while (size != 0)
{
size_t cur = size;
SRes res = ISeqInStream_Read(stream, data, &cur);
*processedSize += cur;
data += cur;
size -= cur;
RINOK(res);
if (cur == 0)
return SZ_OK;
}
return SZ_OK;
}
static SRes MtDec_GetError_Spec(CMtDec *p, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
SRes res;
CriticalSection_Enter(&p->mtProgress.cs);
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
res = p->mtProgress.res;
CriticalSection_Leave(&p->mtProgress.cs);
return res;
}
static SRes MtDec_Progress_GetError_Spec(CMtDec *p, UInt64 inSize, UInt64 outSize, UInt64 interruptIndex, BoolInt *wasInterrupted)
{
SRes res;
CriticalSection_Enter(&p->mtProgress.cs);
p->mtProgress.totalInSize += inSize;
p->mtProgress.totalOutSize += outSize;
if (p->mtProgress.res == SZ_OK && p->mtProgress.progress)
if (ICompressProgress_Progress(p->mtProgress.progress, p->mtProgress.totalInSize, p->mtProgress.totalOutSize) != SZ_OK)
p->mtProgress.res = SZ_ERROR_PROGRESS;
*wasInterrupted = (p->needInterrupt && interruptIndex > p->interruptIndex);
res = p->mtProgress.res;
CriticalSection_Leave(&p->mtProgress.cs);
return res;
}
static void MtDec_Interrupt(CMtDec *p, UInt64 interruptIndex)
{
CriticalSection_Enter(&p->mtProgress.cs);
if (!p->needInterrupt || interruptIndex < p->interruptIndex)
{
p->interruptIndex = interruptIndex;
p->needInterrupt = True;
}
CriticalSection_Leave(&p->mtProgress.cs);
}
Byte *MtDec_GetCrossBuff(CMtDec *p)
{
Byte *cr = p->crossBlock;
if (!cr)
{
cr = (Byte *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
if (!cr)
return NULL;
p->crossBlock = cr;
}
return MTDEC__DATA_PTR_FROM_LINK(cr);
}
/*
ThreadFunc2() returns:
0 - in all normal cases (even for stream error or memory allocation error)
(!= 0) - WRes error return by system threading function
*/
// #define MTDEC_ProgessStep (1 << 22)
#define MTDEC_ProgessStep (1 << 0)
static WRes ThreadFunc2(CMtDecThread *t)
{
CMtDec *p = t->mtDec;
PRF_STR_INT("ThreadFunc2", t->index);
// SetThreadAffinityMask(GetCurrentThread(), 1 << t->index);
for (;;)
{
SRes res, codeRes;
BoolInt wasInterrupted, isAllocError, overflow, finish;
SRes threadingErrorSRes;
BoolInt needCode, needWrite, needContinue;
size_t inDataSize_Start;
UInt64 inDataSize;
// UInt64 inDataSize_Full;
UInt64 blockIndex;
UInt64 inPrev = 0;
UInt64 outPrev = 0;
UInt64 inCodePos;
UInt64 outCodePos;
Byte *afterEndData = NULL;
size_t afterEndData_Size = 0;
BoolInt afterEndData_IsCross = False;
BoolInt canCreateNewThread = False;
// CMtDecCallbackInfo parse;
CMtDecThread *nextThread;
PRF_STR_INT("=============== Event_Wait(&t->canRead)", t->index);
RINOK_THREAD(Event_Wait(&t->canRead));
if (p->exitThread)
return 0;
PRF_STR_INT("after Event_Wait(&t->canRead)", t->index);
// if (t->index == 3) return 19; // for test
blockIndex = p->blockIndex++;
// PRF(printf("\ncanRead\n"))
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
finish = p->readWasFinished;
needCode = False;
needWrite = False;
isAllocError = False;
overflow = False;
inDataSize_Start = 0;
inDataSize = 0;
// inDataSize_Full = 0;
if (res == SZ_OK && !wasInterrupted)
{
// if (p->inStream)
{
CMtDecBufLink *prev = NULL;
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
size_t crossSize = p->crossEnd - p->crossStart;
PRF(printf("\ncrossSize = %d\n", crossSize));
for (;;)
{
if (!link)
{
link = (CMtDecBufLink *)ISzAlloc_Alloc(p->alloc, MTDEC__LINK_DATA_OFFSET + p->inBufSize);
if (!link)
{
finish = True;
// p->allocError_for_Read_BlockIndex = blockIndex;
isAllocError = True;
break;
}
link->next = NULL;
if (prev)
{
// static unsigned g_num = 0;
// printf("\n%6d : %x", ++g_num, (unsigned)(size_t)((Byte *)link - (Byte *)prev));
prev->next = link;
}
else
t->inBuf = (void *)link;
}
{
Byte *data = MTDEC__DATA_PTR_FROM_LINK(link);
Byte *parseData = data;
size_t size;
if (crossSize != 0)
{
inDataSize = crossSize;
// inDataSize_Full = inDataSize;
inDataSize_Start = crossSize;
size = crossSize;
parseData = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
PRF(printf("\ncross : crossStart = %7d crossEnd = %7d finish = %1d",
(int)p->crossStart, (int)p->crossEnd, (int)finish));
}
else
{
size = p->inBufSize;
res = FullRead(p->inStream, data, &size);
// size = 10; // test
inDataSize += size;
// inDataSize_Full = inDataSize;
if (!prev)
inDataSize_Start = size;
p->readProcessed += size;
finish = (size != p->inBufSize);
if (finish)
p->readWasFinished = True;
// res = E_INVALIDARG; // test
if (res != SZ_OK)
{
// PRF(printf("\nRead error = %d\n", res))
// we want to decode all data before error
p->readRes = res;
// p->readError_BlockIndex = blockIndex;
p->readWasFinished = True;
finish = True;
res = SZ_OK;
// break;
}
if (inDataSize - inPrev >= MTDEC_ProgessStep)
{
res = MtDec_Progress_GetError_Spec(p, 0, 0, blockIndex, &wasInterrupted);
if (res != SZ_OK || wasInterrupted)
break;
inPrev = inDataSize;
}
}
{
CMtDecCallbackInfo parse;
parse.startCall = (prev == NULL);
parse.src = parseData;
parse.srcSize = size;
parse.srcFinished = finish;
parse.canCreateNewThread = True;
PRF(printf("\nParse size = %d\n", (unsigned)size));
p->mtCallback->Parse(p->mtCallbackObject, t->index, &parse);
PRF(printf(" Parse processed = %d, state = %d \n", (unsigned)parse.srcSize, (unsigned)parse.state));
needWrite = True;
canCreateNewThread = parse.canCreateNewThread;
// printf("\n\n%12I64u %12I64u", (UInt64)p->mtProgress.totalInSize, (UInt64)p->mtProgress.totalOutSize);
if (
// parseRes != SZ_OK ||
// inDataSize - (size - parse.srcSize) > p->inBlockMax
// ||
parse.state == MTDEC_PARSE_OVERFLOW
// || wasInterrupted
)
{
// Overflow or Parse error - switch from MT decoding to ST decoding
finish = True;
overflow = True;
{
PRF(printf("\n Overflow"));
// PRF(printf("\nisBlockFinished = %d", (unsigned)parse.blockWasFinished));
PRF(printf("\n inDataSize = %d", (unsigned)inDataSize));
}
if (crossSize != 0)
memcpy(data, parseData, size);
p->crossStart = 0;
p->crossEnd = 0;
break;
}
if (crossSize != 0)
{
memcpy(data, parseData, parse.srcSize);
p->crossStart += parse.srcSize;
}
if (parse.state != MTDEC_PARSE_CONTINUE || finish)
{
// we don't need to parse in current thread anymore
if (parse.state == MTDEC_PARSE_END)
finish = True;
needCode = True;
// p->crossFinished = finish;
if (parse.srcSize == size)
{
// full parsed - no cross transfer
p->crossStart = 0;
p->crossEnd = 0;
break;
}
if (parse.state == MTDEC_PARSE_END)
{
afterEndData = parseData + parse.srcSize;
afterEndData_Size = size - parse.srcSize;
if (crossSize != 0)
afterEndData_IsCross = True;
// we reduce data size to required bytes (parsed only)
inDataSize -= afterEndData_Size;
if (!prev)
inDataSize_Start = parse.srcSize;
break;
}
{
// partial parsed - need cross transfer
if (crossSize != 0)
inDataSize = parse.srcSize; // it's only parsed now
else
{
// partial parsed - is not in initial cross block - we need to copy new data to cross block
Byte *cr = MtDec_GetCrossBuff(p);
if (!cr)
{
{
PRF(printf("\ncross alloc error error\n"));
// res = SZ_ERROR_MEM;
finish = True;
// p->allocError_for_Read_BlockIndex = blockIndex;
isAllocError = True;
break;
}
}
{
size_t crSize = size - parse.srcSize;
inDataSize -= crSize;
p->crossEnd = crSize;
p->crossStart = 0;
memcpy(cr, parseData + parse.srcSize, crSize);
}
}
// inDataSize_Full = inDataSize;
if (!prev)
inDataSize_Start = parse.srcSize; // it's partial size (parsed only)
finish = False;
break;
}
}
if (parse.srcSize != size)
{
res = SZ_ERROR_FAIL;
PRF(printf("\nfinished error SZ_ERROR_FAIL = %d\n", res));
break;
}
}
}
prev = link;
link = link->next;
if (crossSize != 0)
{
crossSize = 0;
p->crossStart = 0;
p->crossEnd = 0;
}
}
}
if (res == SZ_OK)
res = MtDec_GetError_Spec(p, blockIndex, &wasInterrupted);
}
codeRes = SZ_OK;
if (res == SZ_OK && needCode && !wasInterrupted)
{
codeRes = p->mtCallback->PreCode(p->mtCallbackObject, t->index);
if (codeRes != SZ_OK)
{
needCode = False;
finish = True;
// SZ_ERROR_MEM is expected error here.
// if (codeRes == SZ_ERROR_MEM) - we will try single-thread decoding later.
// if (codeRes != SZ_ERROR_MEM) - we can stop decoding or try single-thread decoding.
}
}
if (res != SZ_OK || wasInterrupted)
finish = True;
nextThread = NULL;
threadingErrorSRes = SZ_OK;
if (!finish)
{
if (p->numStartedThreads < p->numStartedThreads_Limit && canCreateNewThread)
{
SRes res2 = MtDecThread_CreateAndStart(&p->threads[p->numStartedThreads]);
if (res2 == SZ_OK)
{
// if (p->numStartedThreads % 1000 == 0) PRF(printf("\n numStartedThreads=%d\n", p->numStartedThreads));
p->numStartedThreads++;
}
else
{
PRF(printf("\nERROR: numStartedThreads=%d\n", p->numStartedThreads));
if (p->numStartedThreads == 1)
{
// if only one thread is possible, we leave muti-threading code
finish = True;
needCode = False;
threadingErrorSRes = res2;
}
else
p->numStartedThreads_Limit = p->numStartedThreads;
}
}
if (!finish)
{
unsigned nextIndex = t->index + 1;
nextThread = &p->threads[nextIndex >= p->numStartedThreads ? 0 : nextIndex];
RINOK_THREAD(Event_Set(&nextThread->canRead))
// We have started executing for new iteration (with next thread)
// And that next thread now is responsible for possible exit from decoding (threading_code)
}
}
// each call of Event_Set(&nextThread->canRead) must be followed by call of Event_Set(&nextThread->canWrite)
// if ( !finish ) we must call Event_Set(&nextThread->canWrite) in any case
// if ( finish ) we switch to single-thread mode and there are 2 ways at the end of current iteration (current block):
// - if (needContinue) after Write(&needContinue), we restore decoding with new iteration
// - otherwise we stop decoding and exit from ThreadFunc2()
// Don't change (finish) variable in the further code
// ---------- CODE ----------
inPrev = 0;
outPrev = 0;
inCodePos = 0;
outCodePos = 0;
if (res == SZ_OK && needCode && codeRes == SZ_OK)
{
BoolInt isStartBlock = True;
CMtDecBufLink *link = (CMtDecBufLink *)t->inBuf;
for (;;)
{
size_t inSize;
int stop;
if (isStartBlock)
inSize = inDataSize_Start;
else
{
UInt64 rem = inDataSize - inCodePos;
inSize = p->inBufSize;
if (inSize > rem)
inSize = (size_t)rem;
}
inCodePos += inSize;
stop = True;
codeRes = p->mtCallback->Code(p->mtCallbackObject, t->index,
(const Byte *)MTDEC__DATA_PTR_FROM_LINK(link), inSize,
(inCodePos == inDataSize), // srcFinished
&inCodePos, &outCodePos, &stop);
if (codeRes != SZ_OK)
{
PRF(printf("\nCode Interrupt error = %x\n", codeRes));
// we interrupt only later blocks
MtDec_Interrupt(p, blockIndex);
break;
}
if (stop || inCodePos == inDataSize)
break;
{
const UInt64 inDelta = inCodePos - inPrev;
const UInt64 outDelta = outCodePos - outPrev;
if (inDelta >= MTDEC_ProgessStep || outDelta >= MTDEC_ProgessStep)
{
// Sleep(1);
res = MtDec_Progress_GetError_Spec(p, inDelta, outDelta, blockIndex, &wasInterrupted);
if (res != SZ_OK || wasInterrupted)
break;
inPrev = inCodePos;
outPrev = outCodePos;
}
}
link = link->next;
isStartBlock = False;
}
}
// ---------- WRITE ----------
RINOK_THREAD(Event_Wait(&t->canWrite));
{
BoolInt isErrorMode = False;
BoolInt canRecode = True;
BoolInt needWriteToStream = needWrite;
if (p->exitThread) return 0; // it's never executed in normal cases
if (p->wasInterrupted)
wasInterrupted = True;
else
{
if (codeRes != SZ_OK) // || !needCode // check it !!!
{
p->wasInterrupted = True;
p->codeRes = codeRes;
if (codeRes == SZ_ERROR_MEM)
isAllocError = True;
}
if (threadingErrorSRes)
{
p->wasInterrupted = True;
p->threadingErrorSRes = threadingErrorSRes;
needWriteToStream = False;
}
if (isAllocError)
{
p->wasInterrupted = True;
p->isAllocError = True;
needWriteToStream = False;
}
if (overflow)
{
p->wasInterrupted = True;
p->overflow = True;
needWriteToStream = False;
}
}
if (needCode)
{
if (wasInterrupted)
{
inCodePos = 0;
outCodePos = 0;
}
{
const UInt64 inDelta = inCodePos - inPrev;
const UInt64 outDelta = outCodePos - outPrev;
// if (inDelta != 0 || outDelta != 0)
res = MtProgress_ProgressAdd(&p->mtProgress, inDelta, outDelta);
}
}
needContinue = (!finish);
// if (res == SZ_OK && needWrite && !wasInterrupted)
if (needWrite)
{
// p->inProcessed += inCodePos;
PRF(printf("\n--Write afterSize = %d\n", (unsigned)afterEndData_Size));
res = p->mtCallback->Write(p->mtCallbackObject, t->index,
res == SZ_OK && needWriteToStream && !wasInterrupted, // needWrite
afterEndData, afterEndData_Size, afterEndData_IsCross,
&needContinue,
&canRecode);
// res = SZ_ERROR_FAIL; // for test
PRF(printf("\nAfter Write needContinue = %d\n", (unsigned)needContinue));
PRF(printf("\nprocessed = %d\n", (unsigned)p->inProcessed));
if (res != SZ_OK)
{
PRF(printf("\nWrite error = %d\n", res));
isErrorMode = True;
p->wasInterrupted = True;
}
if (res != SZ_OK
|| (!needContinue && !finish))
{
PRF(printf("\nWrite Interrupt error = %x\n", res));
MtDec_Interrupt(p, blockIndex);
}
}
if (canRecode)
if (!needCode
|| res != SZ_OK
|| p->wasInterrupted
|| codeRes != SZ_OK
|| wasInterrupted
|| p->numFilledThreads != 0
|| isErrorMode)
{
if (p->numFilledThreads == 0)
p->filledThreadStart = t->index;
if (inDataSize != 0 || !finish)
{
t->inDataSize_Start = inDataSize_Start;
t->inDataSize = inDataSize;
p->numFilledThreads++;
}
PRF(printf("\np->numFilledThreads = %d\n", p->numFilledThreads));
PRF(printf("p->filledThreadStart = %d\n", p->filledThreadStart));
}
if (!finish)
{
RINOK_THREAD(Event_Set(&nextThread->canWrite));
}
else
{
if (needContinue)
{
// we restore decoding with new iteration
RINOK_THREAD(Event_Set(&p->threads[0].canWrite));
}
else
{
// we exit from decoding
if (t->index == 0)
return SZ_OK;
p->exitThread = True;
}
RINOK_THREAD(Event_Set(&p->threads[0].canRead));
}
}
}
}
#ifdef _WIN32
#define USE_ALLOCA
#endif
#ifdef USE_ALLOCA
#ifdef _WIN32
#include <malloc.h>
#else
#include <stdlib.h>
#endif
#endif
static THREAD_FUNC_DECL ThreadFunc1(void *pp)
{
WRes res;
CMtDecThread *t = (CMtDecThread *)pp;
CMtDec *p;
// fprintf(stdout, "\n%d = %p\n", t->index, &t);
res = ThreadFunc2(t);
p = t->mtDec;
if (res == 0)
return (THREAD_FUNC_RET_TYPE)(UINT_PTR)p->exitThreadWRes;
{
// it's unexpected situation for some threading function error
if (p->exitThreadWRes == 0)
p->exitThreadWRes = res;
PRF(printf("\nthread exit error = %d\n", res));
p->exitThread = True;
Event_Set(&p->threads[0].canRead);
Event_Set(&p->threads[0].canWrite);
MtProgress_SetError(&p->mtProgress, MY_SRes_HRESULT_FROM_WRes(res));
}
return (THREAD_FUNC_RET_TYPE)(UINT_PTR)res;
}
static MY_NO_INLINE THREAD_FUNC_DECL ThreadFunc(void *pp)
{
#ifdef USE_ALLOCA
CMtDecThread *t = (CMtDecThread *)pp;
// fprintf(stderr, "\n%d = %p - before", t->index, &t);
t->allocaPtr = alloca(t->index * 128);
#endif
return ThreadFunc1(pp);
}
int MtDec_PrepareRead(CMtDec *p)
{
if (p->crossBlock && p->crossStart == p->crossEnd)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
{
unsigned i;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
if (i > p->numStartedThreads
|| p->numFilledThreads <=
(i >= p->filledThreadStart ?
i - p->filledThreadStart :
i + p->numStartedThreads - p->filledThreadStart))
MtDecThread_FreeInBufs(&p->threads[i]);
}
return (p->numFilledThreads != 0) || (p->crossStart != p->crossEnd);
}
const Byte *MtDec_Read(CMtDec *p, size_t *inLim)
{
while (p->numFilledThreads != 0)
{
CMtDecThread *t = &p->threads[p->filledThreadStart];
if (*inLim != 0)
{
{
void *link = t->inBuf;
void *next = ((CMtDecBufLink *)link)->next;
ISzAlloc_Free(p->alloc, link);
t->inBuf = next;
}
if (t->inDataSize == 0)
{
MtDecThread_FreeInBufs(t);
if (--p->numFilledThreads == 0)
break;
if (++p->filledThreadStart == p->numStartedThreads)
p->filledThreadStart = 0;
t = &p->threads[p->filledThreadStart];
}
}
{
size_t lim = t->inDataSize_Start;
if (lim != 0)
t->inDataSize_Start = 0;
else
{
UInt64 rem = t->inDataSize;
lim = p->inBufSize;
if (lim > rem)
lim = (size_t)rem;
}
t->inDataSize -= lim;
*inLim = lim;
return (const Byte *)MTDEC__DATA_PTR_FROM_LINK(t->inBuf);
}
}
{
size_t crossSize = p->crossEnd - p->crossStart;
if (crossSize != 0)
{
const Byte *data = MTDEC__DATA_PTR_FROM_LINK(p->crossBlock) + p->crossStart;
*inLim = crossSize;
p->crossStart = 0;
p->crossEnd = 0;
return data;
}
*inLim = 0;
if (p->crossBlock)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
return NULL;
}
}
void MtDec_Construct(CMtDec *p)
{
unsigned i;
p->inBufSize = (size_t)1 << 18;
p->numThreadsMax = 0;
p->inStream = NULL;
// p->inData = NULL;
// p->inDataSize = 0;
p->crossBlock = NULL;
p->crossStart = 0;
p->crossEnd = 0;
p->numFilledThreads = 0;
p->progress = NULL;
p->alloc = NULL;
p->mtCallback = NULL;
p->mtCallbackObject = NULL;
p->allocatedBufsSize = 0;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
{
CMtDecThread *t = &p->threads[i];
t->mtDec = p;
t->index = i;
t->inBuf = NULL;
Event_Construct(&t->canRead);
Event_Construct(&t->canWrite);
Thread_Construct(&t->thread);
}
// Event_Construct(&p->finishedEvent);
CriticalSection_Init(&p->mtProgress.cs);
}
static void MtDec_Free(CMtDec *p)
{
unsigned i;
p->exitThread = True;
for (i = 0; i < MTDEC__THREADS_MAX; i++)
MtDecThread_Destruct(&p->threads[i]);
// Event_Close(&p->finishedEvent);
if (p->crossBlock)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
}
void MtDec_Destruct(CMtDec *p)
{
MtDec_Free(p);
CriticalSection_Delete(&p->mtProgress.cs);
}
SRes MtDec_Code(CMtDec *p)
{
unsigned i;
p->inProcessed = 0;
p->blockIndex = 1; // it must be larger than not_defined index (0)
p->isAllocError = False;
p->overflow = False;
p->threadingErrorSRes = SZ_OK;
p->needContinue = True;
p->readWasFinished = False;
p->needInterrupt = False;
p->interruptIndex = (UInt64)(Int64)-1;
p->readProcessed = 0;
p->readRes = SZ_OK;
p->codeRes = SZ_OK;
p->wasInterrupted = False;
p->crossStart = 0;
p->crossEnd = 0;
p->filledThreadStart = 0;
p->numFilledThreads = 0;
{
unsigned numThreads = p->numThreadsMax;
if (numThreads > MTDEC__THREADS_MAX)
numThreads = MTDEC__THREADS_MAX;
p->numStartedThreads_Limit = numThreads;
p->numStartedThreads = 0;
}
if (p->inBufSize != p->allocatedBufsSize)
{
for (i = 0; i < MTDEC__THREADS_MAX; i++)
{
CMtDecThread *t = &p->threads[i];
if (t->inBuf)
MtDecThread_FreeInBufs(t);
}
if (p->crossBlock)
{
ISzAlloc_Free(p->alloc, p->crossBlock);
p->crossBlock = NULL;
}
p->allocatedBufsSize = p->inBufSize;
}
MtProgress_Init(&p->mtProgress, p->progress);
// RINOK_THREAD(ArEvent_OptCreate_And_Reset(&p->finishedEvent));
p->exitThread = False;
p->exitThreadWRes = 0;
{
WRes wres;
SRes sres;
CMtDecThread *nextThread = &p->threads[p->numStartedThreads++];
// wres = MtDecThread_CreateAndStart(nextThread);
wres = MtDecThread_CreateEvents(nextThread);
if (wres == 0) { wres = Event_Set(&nextThread->canWrite);
if (wres == 0) { wres = Event_Set(&nextThread->canRead);
if (wres == 0) { THREAD_FUNC_RET_TYPE res = ThreadFunc(nextThread);
wres = (WRes)(UINT_PTR)res;
if (wres != 0)
{
p->needContinue = False;
MtDec_CloseThreads(p);
}}}}
// wres = 17; // for test
// wres = Event_Wait(&p->finishedEvent);
sres = MY_SRes_HRESULT_FROM_WRes(wres);
if (sres != 0)
p->threadingErrorSRes = sres;
if (
// wres == 0
// wres != 0
// || p->mtc.codeRes == SZ_ERROR_MEM
p->isAllocError
|| p->threadingErrorSRes != SZ_OK
|| p->overflow)
{
// p->needContinue = True;
}
else
p->needContinue = False;
if (p->needContinue)
return SZ_OK;
// if (sres != SZ_OK)
return sres;
// return SZ_ERROR_FAIL;
}
}
#endif
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/rustlab/7-Zip-zstd.git
git@gitee.com:rustlab/7-Zip-zstd.git
rustlab
7-Zip-zstd
7-Zip-zstd
master

搜索帮助