diff --git a/interface/inner_api/osal/task/blocking_queue.h b/interface/inner_api/osal/task/blocking_queue.h index db970dba98802c102312356c8b30b644a553ea47..e4316448cb0218d289b0ddd63cc8f4a9a06fe891 100644 --- a/interface/inner_api/osal/task/blocking_queue.h +++ b/interface/inner_api/osal/task/blocking_queue.h @@ -44,7 +44,7 @@ public: { return capacity_; } - size_t Empty() + bool Empty() { AutoLock lock(mutex_); return que_.empty(); diff --git a/interface/inner_api/osal/task/task.h b/interface/inner_api/osal/task/task.h index e30003137e6064c326dafd0b966c3560cdf32dea..171573ebd26951d3ba5975e75675cd6ed19cd1b6 100644 --- a/interface/inner_api/osal/task/task.h +++ b/interface/inner_api/osal/task/task.h @@ -75,6 +75,8 @@ private: void Run(); + Mutex stateMutex_{}; + ConditionVariable syncCond_{}; const std::string name_; const TaskPriority priority_; std::atomic runningState_{RunningState::STOPPED}; @@ -84,8 +86,6 @@ private: #else std::unique_ptr loop_; #endif - Mutex stateMutex_{}; - ConditionVariable syncCond_{}; }; } // namespace Media } // namespace OHOS diff --git a/interface/inner_api/osal/task/thread.h b/interface/inner_api/osal/task/thread.h index 1819982f0cb697fdbbdd44943aa6cec1aa707610..53f6488ab2a810a42d54cd475026587135b2d723 100644 --- a/interface/inner_api/osal/task/thread.h +++ b/interface/inner_api/osal/task/thread.h @@ -52,6 +52,8 @@ public: bool CreateThread(const std::function& func); + bool IsRunningInSelf(); + private: struct State { virtual ~State() = default; diff --git a/interface/inner_api/osal/utils/ring_buffer.h b/interface/inner_api/osal/utils/ring_buffer.h index 66adae1ea6e0fa33389ec162602a03d0acac1b71..aaf7d75823c8923a797f2c633ddfa995f672ca0d 100644 --- a/interface/inner_api/osal/utils/ring_buffer.h +++ b/interface/inner_api/osal/utils/ring_buffer.h @@ -44,14 +44,14 @@ public: size_t ReadBuffer(void* ptr, size_t readSize, int waitTimes = 0) { AutoLock lck(writeMutex_); - if (!isActive_) { + if (!isActive_ || !isReadBlockingAllowed_) { return 0; } auto available = tail_ - head_; while (waitTimes > 0 && available == 0) { MEDIA_LOG_DD("ReadBuffer wait , waitTimes is " PUBLIC_LOG_U64, waitTimes); writeCondition_.Wait(lck); - if (!isActive_) { + if (!isActive_ || !isReadBlockingAllowed_) { return 0; } available = tail_ - head_; @@ -70,7 +70,7 @@ public: mediaOffset_ += available; MEDIA_LOG_DD("ReadBuffer finish available is " PUBLIC_LOG_ZU ", mediaOffset_ " PUBLIC_LOG_U64, available, mediaOffset_); - writeCondition_.NotifyOne(); + writeCondition_.NotifyAll(); return available; } @@ -96,7 +96,7 @@ public: writeSize - (bufferSize_ - index)); } tail_ += writeSize; - writeCondition_.NotifyOne(); + writeCondition_.NotifyAll(); return true; } @@ -109,8 +109,17 @@ public: head_ = 0; tail_ = 0; } - writeCondition_.NotifyOne(); + writeCondition_.NotifyAll(); + } + } + + void SetReadBlocking(bool isReadBlockingAllowed) + { + { + AutoLock lck(writeMutex_); + isReadBlockingAllowed_ = isReadBlockingAllowed; } + writeCondition_.NotifyAll(); } size_t GetSize() @@ -133,7 +142,7 @@ public: AutoLock lck(writeMutex_); head_ = 0; tail_ = 0; - writeCondition_.NotifyOne(); + writeCondition_.NotifyAll(); } bool Seek(uint64_t offset) @@ -147,7 +156,7 @@ public: mediaOffset_ = offset; result = true; } - writeCondition_.NotifyOne(); + writeCondition_.NotifyAll(); return result; } private: @@ -159,6 +168,7 @@ private: ConditionVariable writeCondition_ {}; bool isActive_ {true}; uint64_t mediaOffset_ {0}; + bool isReadBlockingAllowed_ {true}; }; } // namespace Media } // namespace OHOS diff --git a/src/osal/task/pthread/task.cpp b/src/osal/task/pthread/task.cpp index 981277b662ca24b77788988d0e6e85d6bd8abac9..1381c61119b016a74ec20d1b55024b010a69b64f 100644 --- a/src/osal/task/pthread/task.cpp +++ b/src/osal/task/pthread/task.cpp @@ -53,16 +53,20 @@ Task::Task(std::string name, std::function job, TaskPriority priority) Task::~Task() { MEDIA_LOG_I("task " PUBLIC_LOG_S " dtor called", name_.c_str()); - runningState_ = RunningState::STOPPED; + { + AutoLock lock(stateMutex_); + runningState_ = RunningState::STOPPED; + } syncCond_.NotifyAll(); } void Task::Start() { - MEDIA_LOG_I("task " PUBLIC_LOG_S " start called", name_.c_str()); + MEDIA_LOG_I("task " PUBLIC_LOG_S " Start called", name_.c_str()); AutoLock lock(stateMutex_); if (loop_ && loop_->HasThread()) { - MEDIA_LOG_W("task " PUBLIC_LOG_S " has started", name_.c_str()); + MEDIA_LOG_W("task " PUBLIC_LOG_S " has created, current state: " PUBLIC_LOG_D32, + name_.c_str(), runningState_.load()); runningState_ = RunningState::STARTED; syncCond_.NotifyAll(); return; @@ -70,6 +74,7 @@ void Task::Start() if (!loop_) { // thread not exist loop_ = CppExt::make_unique(ConvertPriorityType(priority_)); + loop_->SetName(name_); } if (loop_->CreateThread([this] { Run(); })) { @@ -83,34 +88,54 @@ void Task::Start() void Task::Stop() { - MEDIA_LOG_W("task " PUBLIC_LOG_S " stop entered, current state: " PUBLIC_LOG_D32, - name_.c_str(), runningState_.load()); AutoLock lock(stateMutex_); + MEDIA_LOG_W("task " PUBLIC_LOG_S " Stop entered, current state: " PUBLIC_LOG_D32, + name_.c_str(), runningState_.load()); if (runningState_.load() != RunningState::STOPPED) { runningState_ = RunningState::STOPPING; - syncCond_.NotifyAll(); - syncCond_.Wait(lock, [this] { return runningState_.load() == RunningState::STOPPED; }); - if (loop_ && loop_->HasThread()) { - loop_ = nullptr; + if (loop_ && !(loop_->IsRunningInSelf())) { + // There is no need to perform notification in task's self thread, as no call would wait for STOPPING state. + // Perform notification to accelerate stopping when the task is already in PAUSED state. + syncCond_.NotifyAll(); + syncCond_.Wait(lock, [this] { return runningState_.load() == RunningState::STOPPED; }); + if (loop_->HasThread()) { + loop_ = nullptr; + } + MEDIA_LOG_W("task " PUBLIC_LOG_S " Stop done", name_.c_str()); + } else { + MEDIA_LOG_W("task " PUBLIC_LOG_S " can't use Task::Stop in self task, now replaced by Task::StopAsync", + name_.c_str()); } } - MEDIA_LOG_W("task " PUBLIC_LOG_S " stop exited", name_.c_str()); } void Task::StopAsync() { - MEDIA_LOG_D("task " PUBLIC_LOG_S " StopAsync called", name_.c_str()); - AutoLock lock(stateMutex_); - if (runningState_.load() != RunningState::STOPPED) { - runningState_ = RunningState::STOPPING; + { + AutoLock lock(stateMutex_); + MEDIA_LOG_W("task " PUBLIC_LOG_S " StopAsync called, current state: " PUBLIC_LOG_D32, + name_.c_str(), runningState_.load()); + if (runningState_.load() != RunningState::STOPPED) { + runningState_ = RunningState::STOPPING; + } } + // Perform notification to accelerate stopping when the task is already in PAUSED state. + syncCond_.NotifyAll(); } void Task::Pause() { AutoLock lock(stateMutex_); RunningState state = runningState_.load(); - MEDIA_LOG_I("task " PUBLIC_LOG_S " Pause called, running state = " PUBLIC_LOG_D32, name_.c_str(), state); + MEDIA_LOG_I("task " PUBLIC_LOG_S " Pause called, current state: " PUBLIC_LOG_D32, name_.c_str(), state); + if (loop_ && loop_->IsRunningInSelf()) { + if (state == RunningState::STARTED) { + runningState_ = RunningState::PAUSING; + } + MEDIA_LOG_W("task " PUBLIC_LOG_S " can't use Task::Pause in self task, now replaced by Task::PauseAsync", + name_.c_str()); + return; + } switch (state) { case RunningState::STARTED: { runningState_ = RunningState::PAUSING; @@ -124,7 +149,9 @@ void Task::Pause() break; } case RunningState::PAUSING: { - syncCond_.Wait(lock, [this] { return runningState_.load() == RunningState::PAUSED; }); + syncCond_.Wait(lock, [this] { + return runningState_.load() == RunningState::PAUSED || runningState_.load() == RunningState::STOPPED; + }); break; } default: @@ -133,6 +160,9 @@ void Task::Pause() MEDIA_LOG_I("task " PUBLIC_LOG_S " Pause done.", name_.c_str()); } + +// There is no need to perform notification, as no call would wait for PAUSING state. +// If perform notification may cause unnecessasy running when the task is already in PAUSED state. void Task::PauseAsync() { MEDIA_LOG_I("task " PUBLIC_LOG_S " PauseAsync called", name_.c_str()); @@ -156,8 +186,8 @@ void Task::DoTask() void Task::Run() { for (;;) { - MEDIA_LOG_DD("task " PUBLIC_LOG_S " is running on state : " PUBLIC_LOG_D32, - name_.c_str(), runningState_.load()); + MEDIA_LOG_DD("task " PUBLIC_LOG_S " is running on state: " PUBLIC_LOG_D32, + name_.c_str(), runningState_.load()); if (runningState_.load() == RunningState::STARTED) { job_(); } diff --git a/src/osal/task/pthread/thread.cpp b/src/osal/task/pthread/thread.cpp index c0e267b6d8476ca9b8284fc6df023c3f9f5e7777..5ee62c3f48c1d6b69deb5cc858242aa3009b4a72 100644 --- a/src/osal/task/pthread/thread.cpp +++ b/src/osal/task/pthread/thread.cpp @@ -97,6 +97,13 @@ bool Thread::CreateThread(const std::function& func) return rtv == 0; } +bool Thread::IsRunningInSelf() +{ + pthread_t tid = pthread_self(); + AutoLock lock(mutex_); + return tid == id_; +} + void Thread::SetNameInternal() { AutoLock lock(mutex_);