diff --git a/libpandabase/BUILD.gn b/libpandabase/BUILD.gn index 26604c2155b60375ab321dde29037fd6512256e8..bc8b3b6b4450e7468beea8e7dc3e4940ebf35b88 100644 --- a/libpandabase/BUILD.gn +++ b/libpandabase/BUILD.gn @@ -84,7 +84,7 @@ if (is_mingw) { "$ark_root/libpandabase/os/stacktrace_stub.cpp", "$ark_root/libpandabase/os/time.cpp", "$ark_root/libpandabase/taskmanager/task.cpp", - "$ark_root/libpandabase/taskmanager/task_queue.cpp", + "$ark_root/libpandabase/taskmanager/task_queue_interface.cpp", "$ark_root/libpandabase/taskmanager/task_scheduler.cpp", "$ark_root/libpandabase/taskmanager/worker_thread.cpp", "$ark_root/libpandabase/utils/dfx.cpp", @@ -117,7 +117,7 @@ if (is_mingw) { "$ark_root/libpandabase/os/native_stack.cpp", "$ark_root/libpandabase/os/property.cpp", "$ark_root/libpandabase/taskmanager/task.cpp", - "$ark_root/libpandabase/taskmanager/task_queue.cpp", + "$ark_root/libpandabase/taskmanager/task_queue_interface.cpp", "$ark_root/libpandabase/taskmanager/task_scheduler.cpp", "$ark_root/libpandabase/taskmanager/worker_thread.cpp", diff --git a/libpandabase/CMakeLists.txt b/libpandabase/CMakeLists.txt index 8e31cdc51d2ae563663ae53c2efc20aa943f71c4..a316b9b7452e2844eb1b943c7175bc86cbdfe38f 100644 --- a/libpandabase/CMakeLists.txt +++ b/libpandabase/CMakeLists.txt @@ -44,8 +44,8 @@ set(SOURCES ${PANDA_ROOT}/libpandabase/os/property.cpp ${PANDA_ROOT}/libpandabase/os/dfx_option.cpp ${PANDA_ROOT}/libpandabase/taskmanager/task.cpp + ${PANDA_ROOT}/libpandabase/taskmanager/task_queue_interface.cpp ${PANDA_ROOT}/libpandabase/taskmanager/task_scheduler.cpp - ${PANDA_ROOT}/libpandabase/taskmanager/task_queue.cpp ${PANDA_ROOT}/libpandabase/taskmanager/worker_thread.cpp ${ARKBASE_LTO_SOURCES} ) diff --git a/libpandabase/taskmanager/task_queue.cpp b/libpandabase/taskmanager/task_queue-inl.h similarity index 36% rename from libpandabase/taskmanager/task_queue.cpp rename to libpandabase/taskmanager/task_queue-inl.h index 543ac5b29245bd5107637a0bdd6533543eed5c5b..187d41f52cae165fdfd2a277d526b77c7d67f9c2 100644 --- a/libpandabase/taskmanager/task_queue.cpp +++ b/libpandabase/taskmanager/task_queue-inl.h @@ -15,63 +15,21 @@ #include "libpandabase/taskmanager/task_queue.h" -namespace panda::taskmanager { +#ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_INL_H +#define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_INL_H -TaskQueue::TaskQueue(TaskType task_type, VMType vm_type, uint8_t priority) - : priority_(priority), task_type_(task_type), vm_type_(vm_type) -{ - ASSERT(priority >= MIN_PRIORITY); - ASSERT(priority <= MAX_PRIORITY); -} +namespace panda::taskmanager { -Task TaskQueue::PopTaskFromQueue(std::queue &queue) +template +Task TaskQueue::PopTaskFromQueue(InternalTaskQueue &queue) { auto task = std::move(queue.front()); queue.pop(); return task; } -void TaskQueue::PushTaskToInternalQueues(Task &&task) -{ - if (task.GetTaskProperties().GetTaskExecutionMode() == TaskExecutionMode::FOREGROUND) { - foreground_task_queue_.push(std::move(task)); - } else { - background_task_queue_.push(std::move(task)); - } -} - -size_t TaskQueue::AddTask(Task &&task) -{ - ASSERT(task.GetTaskProperties().GetTaskType() == task_type_); - ASSERT(task.GetTaskProperties().GetVMType() == vm_type_); - auto properties = task.GetTaskProperties(); - size_t size = 0; - { - os::memory::LockHolder lock_holder(task_queue_lock_); - PushTaskToInternalQueues(std::move(task)); - task_queue_cond_var_.Signal(); - size = SumSizeOfInternalQueues(); - } - { - os::memory::LockHolder lock_holder(subscriber_lock_); - // Notify subscriber about new task - if (new_tasks_callback_ != nullptr) { - new_tasks_callback_(properties, 1); - } - } - return size; -} - -bool TaskQueue::HasTaskWithExecutionMode(TaskExecutionMode mode) const -{ - os::memory::LockHolder lock_holder(task_queue_lock_); - if (mode == TaskExecutionMode::FOREGROUND) { - return !foreground_task_queue_.empty(); - } - return !background_task_queue_.empty(); -} - -Task TaskQueue::PopTaskFromInternalQueues() +template +Task TaskQueue::PopTaskFromInternalQueues() { if (!foreground_task_queue_.empty()) { return PopTaskFromQueue(foreground_task_queue_); @@ -79,7 +37,8 @@ Task TaskQueue::PopTaskFromInternalQueues() return PopTaskFromQueue(background_task_queue_); } -std::optional TaskQueue::PopTask() +template +std::optional TaskQueue::PopTask() { os::memory::LockHolder lock_holder(task_queue_lock_); while (AreInternalQueuesEmpty()) { @@ -93,10 +52,11 @@ std::optional TaskQueue::PopTask() return std::make_optional(std::move(task)); } -std::optional TaskQueue::PopTask(TaskExecutionMode mode) +template +std::optional TaskQueue::PopTask(TaskExecutionMode mode) { os::memory::LockHolder lock_holder(task_queue_lock_); - std::queue *queue = &foreground_task_queue_; + auto *queue = &foreground_task_queue_; if (mode != TaskExecutionMode::FOREGROUND) { queue = &background_task_queue_; } @@ -111,88 +71,31 @@ std::optional TaskQueue::PopTask(TaskExecutionMode mode) return std::make_optional(std::move(task)); } -bool TaskQueue::AreInternalQueuesEmpty() const -{ - return foreground_task_queue_.empty() && background_task_queue_.empty(); -} - -bool TaskQueue::IsEmpty() const -{ - os::memory::LockHolder lock_holder(task_queue_lock_); - return AreInternalQueuesEmpty(); -} - -size_t TaskQueue::SumSizeOfInternalQueues() const -{ - return foreground_task_queue_.size() + background_task_queue_.size(); -} - -size_t TaskQueue::Size() const -{ - os::memory::LockHolder lock_holder(task_queue_lock_); - return SumSizeOfInternalQueues(); -} - -uint8_t TaskQueue::GetPriority() const -{ - // Atomic with acquire order reason: data race with priority_ with dependencies on reads after the - // load which should become visible - return priority_.load(std::memory_order_acquire); -} - -void TaskQueue::SetPriority(uint8_t priority) -{ - ASSERT(priority >= MIN_PRIORITY); - ASSERT(priority <= MAX_PRIORITY); - // Atomic with release order reason: data race with priority_ with no synchronization or ordering constraints - // imposed on other reads or writes - priority_.store(priority, std::memory_order_release); -} - -TaskType TaskQueue::GetTaskType() const -{ - return task_type_; -} - -VMType TaskQueue::GetVMType() const -{ - return vm_type_; -} - -void TaskQueue::SubscribeCallbackToAddTask(NewTasksCallback callback) +template +void TaskQueue::SubscribeCallbackToAddTask(NewTasksCallback callback) { os::memory::LockHolder subscriber_lock_holder(subscriber_lock_); new_tasks_callback_ = std::move(callback); { os::memory::LockHolder lock_holder(task_queue_lock_); if (!foreground_task_queue_.empty()) { - new_tasks_callback_({task_type_, vm_type_, TaskExecutionMode::FOREGROUND}, foreground_task_queue_.size()); + new_tasks_callback_({GetTaskType(), GetVMType(), TaskExecutionMode::FOREGROUND}, + foreground_task_queue_.size()); } if (!background_task_queue_.empty()) { - new_tasks_callback_({task_type_, vm_type_, TaskExecutionMode::BACKGROUND}, background_task_queue_.size()); + new_tasks_callback_({GetTaskType(), GetVMType(), TaskExecutionMode::BACKGROUND}, + background_task_queue_.size()); } } } -void TaskQueue::UnsubscribeCallback() +template +void TaskQueue::UnsubscribeCallback() { os::memory::LockHolder lock_holder(subscriber_lock_); new_tasks_callback_ = nullptr; } -void TaskQueue::WaitForQueueEmptyAndFinish() -{ - os::memory::LockHolder lock_holder(task_queue_lock_); - while (!AreInternalQueuesEmpty()) { - finish_var_.Wait(&task_queue_lock_); - } - finish_ = true; - task_queue_cond_var_.SignalAll(); -} - -TaskQueue::~TaskQueue() -{ - WaitForQueueEmptyAndFinish(); -} +} // namespace panda::taskmanager -} // namespace panda::taskmanager \ No newline at end of file +#endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H \ No newline at end of file diff --git a/libpandabase/taskmanager/task_queue.h b/libpandabase/taskmanager/task_queue.h index 5b0895f6ac8b655a5f6536c2b27ae5d7f7737dc3..450629aad5b4e94b0806dc998320981ebb7a814f 100644 --- a/libpandabase/taskmanager/task_queue.h +++ b/libpandabase/taskmanager/task_queue.h @@ -17,84 +17,85 @@ #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_H #include "libpandabase/os/mutex.h" -#include "libpandabase/taskmanager/task.h" +#include "libpandabase/taskmanager/task_queue_for_scheduler.h" +#include "taskmanager/task_queue_interface.h" #include +#include #include #include namespace panda::taskmanager { - -class TaskQueueId { -public: - constexpr TaskQueueId(TaskType tt, VMType vt) - : val_(static_cast(tt) | - static_cast(static_cast(vt) << (BITS_PER_BYTE * sizeof(TaskQueueId) / 2))) - { - static_assert(sizeof(TaskType) == sizeof(TaskQueueId) / 2); - static_assert(sizeof(VMType) == sizeof(TaskQueueId) / 2); - } - - friend constexpr bool operator==(const TaskQueueId &lv, const TaskQueueId &rv) - { - return lv.val_ == rv.val_; - } - - friend constexpr bool operator!=(const TaskQueueId &lv, const TaskQueueId &rv) - { - return lv.val_ != rv.val_; - } - - friend constexpr bool operator<(const TaskQueueId &lv, const TaskQueueId &rv) - { - return lv.val_ < rv.val_; - } - -private: - uint16_t val_; -}; - -constexpr TaskQueueId INVALID_TASKQUEUE_ID = TaskQueueId(TaskType::UNKNOWN, VMType::UNKNOWN); - /** * @brief TaskQueue is a thread-safe queue for tasks. Queues can be registered in TaskScheduler and used to execute * tasks on workers. Also, queues can notify other threads when a new task is pushed. */ -class TaskQueue { + +template > +class TaskQueue : TaskQueueForScheduler { + using TaskAllocatorType = typename AllocatorType::template rebind::other; + using TaskQueueAllocatorType = typename AllocatorType::template rebind>::other; + public: NO_COPY_SEMANTIC(TaskQueue); NO_MOVE_SEMANTIC(TaskQueue); - /** - * NewTasksCallback instance should be called after tasks adding. As argument you should input count of added - * tasks. - */ - using NewTasksCallback = std::function; - - static constexpr uint8_t MAX_PRIORITY = 10; - static constexpr uint8_t MIN_PRIORITY = 1; - static constexpr uint8_t DEFAULT_PRIORITY = 5; - /** * @brief When you construct a queue, you should write @arg task_type and @arg vm_type of Tasks that will be * contained in it. Also, you should write @arg priority, which will be used in Task Manager for task selection. * It's a number from 1 to 10 and determines the weight of the queue for the Task Manager. * MAX_PRIORITY = 10, MIN_PRIORITY = 1, DEFAULT_PRIORITY = 5; */ - PANDA_PUBLIC_API TaskQueue(TaskType task_type, VMType vm_type, uint8_t priority); - PANDA_PUBLIC_API ~TaskQueue(); + static TaskQueueInterface *Create(TaskType task_type, VMType vm_type, uint8_t priority) + { + TaskQueueAllocatorType allocator; + auto *mem = allocator.allocate(sizeof(TaskQueue)); + return new (mem) TaskQueue(task_type, vm_type, priority); + } + + static void Destroy(TaskQueueInterface *queue) + { + TaskQueueAllocatorType allocator; + std::allocator_traits::destroy(allocator, queue); + allocator.deallocate(static_cast *>(queue), sizeof(TaskQueue)); + } + + PANDA_PUBLIC_API ~TaskQueue() override + { + WaitForEmpty(); + } /** * @brief Adds task in task queue. Operation is thread-safe. * @param task - task that will be added * @return the size of queue after @arg task was added to it. */ - PANDA_PUBLIC_API size_t AddTask(Task &&task); + PANDA_PUBLIC_API size_t AddTask(Task &&task) override + { + ASSERT(task.GetTaskProperties().GetTaskType() == GetTaskType()); + ASSERT(task.GetTaskProperties().GetVMType() == GetVMType()); + auto properties = task.GetTaskProperties(); + size_t size = 0; + { + os::memory::LockHolder lock_holder(task_queue_lock_); + PushTaskToInternalQueues(std::move(task)); + task_queue_cond_var_.Signal(); + size = SumSizeOfInternalQueues(); + } + { + os::memory::LockHolder lock_holder(subscriber_lock_); + // Notify subscriber about new task + if (new_tasks_callback_ != nullptr) { + new_tasks_callback_(properties, 1); + } + } + return size; + } /** * @brief Pops task from task queue. Operation is thread-safe. The method will wait new task if queue is empty and * method WaitForQueueEmptyAndFinish has not been executed. Otherwise it will return std::nullopt. */ - [[nodiscard]] std::optional PopTask(); + [[nodiscard]] std::optional PopTask() override; /** * @brief Pops task from task queue with specified execution mode. Operation is thread-safe. The method will wait @@ -102,73 +103,104 @@ public: * executed. Otherwise it will return std::nullopt. * @param mode - execution mode of task that we want to pop. */ - [[nodiscard]] std::optional PopTask(TaskExecutionMode mode); + [[nodiscard]] std::optional PopTask(TaskExecutionMode mode) override; + + [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const override + { + os::memory::LockHolder lock_holder(task_queue_lock_); + return AreInternalQueuesEmpty(); + } - [[nodiscard]] PANDA_PUBLIC_API bool IsEmpty() const; - [[nodiscard]] PANDA_PUBLIC_API size_t Size() const; + [[nodiscard]] PANDA_PUBLIC_API size_t Size() const override + { + os::memory::LockHolder lock_holder(task_queue_lock_); + return SumSizeOfInternalQueues(); + } /** * @brief Method @returns true if queue does not have queue with specified execution mode * @param mode - execution mode of tasks */ - [[nodiscard]] PANDA_PUBLIC_API bool HasTaskWithExecutionMode(TaskExecutionMode mode) const; - - PANDA_PUBLIC_API TaskType GetTaskType() const; - PANDA_PUBLIC_API VMType GetVMType() const; - - PANDA_PUBLIC_API uint8_t GetPriority() const; - PANDA_PUBLIC_API void SetPriority(uint8_t priority); + [[nodiscard]] PANDA_PUBLIC_API bool HasTaskWithExecutionMode(TaskExecutionMode mode) const override + { + os::memory::LockHolder lock_holder(task_queue_lock_); + if (mode == TaskExecutionMode::FOREGROUND) { + return !foreground_task_queue_.empty(); + } + return !background_task_queue_.empty(); + } /** * @brief This method saves the @arg callback. It will be called after adding new task in AddTask method. * @param callback - function that get count of inputted tasks. */ - void SubscribeCallbackToAddTask(NewTasksCallback callback); + void SubscribeCallbackToAddTask(NewTasksCallback callback) override; /// @brief Removes callback function. - void UnsubscribeCallback(); + void UnsubscribeCallback() override; /** * @brief Method waits until internal queue will be empty and finalize using of TaskQueue * After this method PopTask will not wait for new tasks. */ - void WaitForQueueEmptyAndFinish(); + void WaitForQueueEmptyAndFinish() override + { + WaitForEmpty(); + } private: - bool AreInternalQueuesEmpty() const REQUIRES(task_queue_lock_); + void WaitForEmpty() + { + os::memory::LockHolder lock_holder(task_queue_lock_); + while (!AreInternalQueuesEmpty()) { + finish_var_.Wait(&task_queue_lock_); + } + finish_ = true; + task_queue_cond_var_.SignalAll(); + } - size_t SumSizeOfInternalQueues() const REQUIRES(task_queue_lock_); + using InternalTaskQueue = std::queue>; - void PushTaskToInternalQueues(Task &&task) REQUIRES(task_queue_lock_); - Task PopTaskFromInternalQueues() REQUIRES(task_queue_lock_); + PANDA_PUBLIC_API TaskQueue(TaskType task_type, VMType vm_type, uint8_t priority) + : TaskQueueForScheduler(task_type, vm_type, priority), + foreground_task_queue_(TaskAllocatorType()), + background_task_queue_(TaskAllocatorType()) + { + } - Task PopTaskFromQueue(std::queue &queue) REQUIRES(task_queue_lock_); + bool AreInternalQueuesEmpty() const REQUIRES(task_queue_lock_) + { + return foreground_task_queue_.empty() && background_task_queue_.empty(); + } - std::atomic_uint8_t priority_; - TaskType task_type_; - VMType vm_type_; + size_t SumSizeOfInternalQueues() const REQUIRES(task_queue_lock_) + { + return foreground_task_queue_.size() + background_task_queue_.size(); + } + + void PushTaskToInternalQueues(Task &&task) REQUIRES(task_queue_lock_) + { + if (task.GetTaskProperties().GetTaskExecutionMode() == TaskExecutionMode::FOREGROUND) { + foreground_task_queue_.push(std::move(task)); + } else { + background_task_queue_.push(std::move(task)); + } + } + + Task PopTaskFromInternalQueues() REQUIRES(task_queue_lock_); + + Task PopTaskFromQueue(InternalTaskQueue &queue) REQUIRES(task_queue_lock_); /** * foreground_task_queue_ is queue that contains task with ExecutionMode::FOREGROUND. If method PopTask() is used, * foreground_task_queue_ will be checked first and if it's not empty, Task will be gotten from it. */ - std::queue foreground_task_queue_ GUARDED_BY(task_queue_lock_); + InternalTaskQueue foreground_task_queue_ GUARDED_BY(task_queue_lock_); /** * background_task_queue_ is queue that contains task with ExecutionMode::BACKGROUND. If method PopTask() is used, * background_task_queue_ will be popped only if foreground_task_queue_ is empty. */ - std::queue background_task_queue_ GUARDED_BY(task_queue_lock_); - - /// task_queue_lock_ is used in case of interaction with internal queues - mutable os::memory::Mutex task_queue_lock_; - os::memory::ConditionVariable task_queue_cond_var_ GUARDED_BY(task_queue_lock_); - os::memory::ConditionVariable finish_var_ GUARDED_BY(task_queue_lock_); - - /// subscriber_lock_ is used in case of calling new_tasks_callback_ - os::memory::Mutex subscriber_lock_; - NewTasksCallback new_tasks_callback_ GUARDED_BY(subscriber_lock_); - - bool finish_ {false}; + InternalTaskQueue background_task_queue_ GUARDED_BY(task_queue_lock_); }; } // namespace panda::taskmanager diff --git a/libpandabase/taskmanager/task_queue_for_scheduler.h b/libpandabase/taskmanager/task_queue_for_scheduler.h new file mode 100644 index 0000000000000000000000000000000000000000..034ab4cc676f208685963caedd9d6e6b9823c9d0 --- /dev/null +++ b/libpandabase/taskmanager/task_queue_for_scheduler.h @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_FOR_SCHEDULER_INTERFACE_H +#define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_FOR_SCHEDULER_INTERFACE_H + +#include "libpandabase/taskmanager/task_queue_interface.h" + +namespace panda::taskmanager { + +class TaskQueueForScheduler : public TaskQueueInterface { +public: + NO_COPY_SEMANTIC(TaskQueueForScheduler); + NO_MOVE_SEMANTIC(TaskQueueForScheduler); + + TaskQueueForScheduler(TaskType task_type, VMType vm_type, uint8_t priority) + : TaskQueueInterface(task_type, vm_type, priority) + { + } + ~TaskQueueForScheduler() override = default; + /** + * @brief Pops task from task queue. Operation is thread-safe. The method will wait new task if queue is empty and + * method WaitForQueueEmptyAndFinish has not been executed. Otherwise it will return std::nullopt. + */ + [[nodiscard]] virtual std::optional PopTask() = 0; + + /** + * @brief Pops task from task queue with specified execution mode. Operation is thread-safe. The method will wait + * new task if queue with specified execution mode is empty and method WaitForQueueEmptyAndFinish has not been + * executed. Otherwise it will return std::nullopt. + * @param mode - execution mode of task that we want to pop. + */ + [[nodiscard]] virtual std::optional PopTask(TaskExecutionMode mode) = 0; + + /** + * @brief This method saves the @arg callback. It will be called after adding new task in AddTask method. + * @param callback - function that get count of inputted tasks. + */ + void virtual SubscribeCallbackToAddTask(NewTasksCallback callback) = 0; + + /// @brief Removes callback function. + void virtual UnsubscribeCallback() = 0; +}; + +} // namespace panda::taskmanager + +#endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_FOR_SCHEDULER_INTERFACE_H \ No newline at end of file diff --git a/libpandabase/taskmanager/task_queue_interface.cpp b/libpandabase/taskmanager/task_queue_interface.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e5b05cb4aeaf5cd7a77de5615df720b477f3fa74 --- /dev/null +++ b/libpandabase/taskmanager/task_queue_interface.cpp @@ -0,0 +1,53 @@ +/** + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "libpandabase/taskmanager/task_queue_interface.h" + +namespace panda::taskmanager { + +TaskQueueInterface::TaskQueueInterface(TaskType task_type, VMType vm_type, uint8_t priority) + : priority_(priority), task_type_(task_type), vm_type_(vm_type) +{ + ASSERT(priority >= MIN_PRIORITY); + ASSERT(priority <= MAX_PRIORITY); +} + +uint8_t TaskQueueInterface::GetPriority() const +{ + // Atomic with acquire order reason: data race with priority_ with dependencies on reads after the + // load which should become visible + return priority_.load(std::memory_order_acquire); +} + +void TaskQueueInterface::SetPriority(uint8_t priority) +{ + ASSERT(priority >= MIN_PRIORITY); + ASSERT(priority <= MAX_PRIORITY); + // Atomic with release order reason: data race with priority_ with no synchronization or ordering constraints + // imposed on other reads or writes + priority_.store(priority, std::memory_order_release); +} + +TaskType TaskQueueInterface::GetTaskType() const +{ + return task_type_; +} + +VMType TaskQueueInterface::GetVMType() const +{ + return vm_type_; +} + +} // namespace panda::taskmanager \ No newline at end of file diff --git a/libpandabase/taskmanager/task_queue_interface.h b/libpandabase/taskmanager/task_queue_interface.h new file mode 100644 index 0000000000000000000000000000000000000000..33b3f881a7349d0b2d53639cce88524f4ccee5a6 --- /dev/null +++ b/libpandabase/taskmanager/task_queue_interface.h @@ -0,0 +1,136 @@ +/** + * Copyright (c) 2023 Huawei Device Co., Ltd. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_INTERFACE_H +#define PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_INTERFACE_H + +#include "libpandabase/taskmanager/task.h" +#include "libpandabase/os/mutex.h" +#include +#include +#include + +namespace panda::taskmanager { + +class TaskQueueId { +public: + constexpr TaskQueueId(TaskType tt, VMType vt) + : val_(static_cast(tt) | + static_cast(static_cast(vt) << (BITS_PER_BYTE * sizeof(TaskQueueId) / 2))) + { + static_assert(sizeof(TaskType) == sizeof(TaskQueueId) / 2); + static_assert(sizeof(VMType) == sizeof(TaskQueueId) / 2); + } + + friend constexpr bool operator==(const TaskQueueId &lv, const TaskQueueId &rv) + { + return lv.val_ == rv.val_; + } + + friend constexpr bool operator!=(const TaskQueueId &lv, const TaskQueueId &rv) + { + return lv.val_ != rv.val_; + } + + friend constexpr bool operator<(const TaskQueueId &lv, const TaskQueueId &rv) + { + return lv.val_ < rv.val_; + } + +private: + uint16_t val_; +}; + +constexpr TaskQueueId INVALID_TASKQUEUE_ID = TaskQueueId(TaskType::UNKNOWN, VMType::UNKNOWN); + +/** + * @brief TaskQueueInteface is an interface of thread-safe queue for tasks. Queues can be registered in TaskScheduler + * and used to execute tasks on workers. Also, queues can notify other threads when a new task is pushed. + */ +class TaskQueueInterface { +public: + NO_COPY_SEMANTIC(TaskQueueInterface); + NO_MOVE_SEMANTIC(TaskQueueInterface); + + /** + * NewTasksCallback instance should be called after tasks adding. As argument you should input count of added + * tasks. + */ + using NewTasksCallback = std::function; + + static constexpr uint8_t MAX_PRIORITY = 10; + static constexpr uint8_t MIN_PRIORITY = 1; + static constexpr uint8_t DEFAULT_PRIORITY = 5; + + /** + * @brief When you construct a queue, you should write @arg task_type and @arg vm_type of Tasks that will be + * contained in it. Also, you should write @arg priority, which will be used in Task Manager for task selection. + * It's a number from 1 to 10 and determines the weight of the queue for the Task Manager. + * MAX_PRIORITY = 10, MIN_PRIORITY = 1, DEFAULT_PRIORITY = 5; + */ + PANDA_PUBLIC_API TaskQueueInterface(TaskType task_type, VMType vm_type, uint8_t priority); + PANDA_PUBLIC_API virtual ~TaskQueueInterface() = default; + + /** + * @brief Adds task in task queue. Operation is thread-safe. + * @param task - task that will be added + * @return the size of queue after @arg task was added to it. + */ + PANDA_PUBLIC_API virtual size_t AddTask(Task &&task) = 0; + + [[nodiscard]] PANDA_PUBLIC_API virtual bool IsEmpty() const = 0; + [[nodiscard]] PANDA_PUBLIC_API virtual size_t Size() const = 0; + + /** + * @brief Method @returns true if queue does not have queue with specified execution mode + * @param mode - execution mode of tasks + */ + [[nodiscard]] PANDA_PUBLIC_API virtual bool HasTaskWithExecutionMode(TaskExecutionMode mode) const = 0; + + PANDA_PUBLIC_API TaskType GetTaskType() const; + PANDA_PUBLIC_API VMType GetVMType() const; + + PANDA_PUBLIC_API uint8_t GetPriority() const; + PANDA_PUBLIC_API void SetPriority(uint8_t priority); + + /** + * @brief Method waits until internal queue will be empty and finalize using of TaskQueue + * After this method PopTask will not wait for new tasks. + */ + void virtual WaitForQueueEmptyAndFinish() = 0; + +protected: + // NOLINTBEGIN(misc-non-private-member-variables-in-classes) + /// task_queue_lock_ is used in case of interaction with internal queues + mutable os::memory::Mutex task_queue_lock_; + os::memory::ConditionVariable task_queue_cond_var_ GUARDED_BY(task_queue_lock_); + os::memory::ConditionVariable finish_var_ GUARDED_BY(task_queue_lock_); + + /// subscriber_lock_ is used in case of calling new_tasks_callback_ + os::memory::Mutex subscriber_lock_; + NewTasksCallback new_tasks_callback_ GUARDED_BY(subscriber_lock_); + + bool finish_ {false}; + + // NOLINTEND(misc-non-private-member-variables-in-classes) +private: + std::atomic_uint8_t priority_; + TaskType task_type_; + VMType vm_type_; +}; + +} // namespace panda::taskmanager + +#endif // PANDA_LIBPANDABASE_TASKMANAGER_TASK_QUEUE_INTERFACE_H \ No newline at end of file diff --git a/libpandabase/taskmanager/task_scheduler.cpp b/libpandabase/taskmanager/task_scheduler.cpp index 9c6ed72e267186195b82130a42367a794bdf8623..a97da5758acd13053080e9133ae36e623bfbc3e5 100644 --- a/libpandabase/taskmanager/task_scheduler.cpp +++ b/libpandabase/taskmanager/task_scheduler.cpp @@ -15,6 +15,8 @@ #include "libpandabase/taskmanager/task_scheduler.h" #include "libpandabase/utils/logger.h" +#include "taskmanager/task_queue_for_scheduler.h" +#include "taskmanager/task_queue_interface.h" namespace panda::taskmanager { @@ -45,7 +47,7 @@ void TaskScheduler::Destroy() instance_ = nullptr; } -TaskQueueId TaskScheduler::RegisterQueue(TaskQueue *queue) +TaskQueueId TaskScheduler::RegisterQueue(TaskQueueInterface *queue) { os::memory::LockHolder lock_holder(task_manager_lock_); ASSERT(!start_); @@ -53,8 +55,9 @@ TaskQueueId TaskScheduler::RegisterQueue(TaskQueue *queue) if (task_queues_.find(id) != task_queues_.end()) { return INVALID_TASKQUEUE_ID; } - task_queues_[id] = queue; - queue->SubscribeCallbackToAddTask( + auto *queue_for_scheduler = static_cast(queue); + task_queues_[id] = queue_for_scheduler; + queue_for_scheduler->SubscribeCallbackToAddTask( [this](TaskProperties properties, size_t count) { this->IncrementNewTaskCounter(properties, count); }); return id; } @@ -116,18 +119,18 @@ Task TaskScheduler::GetNextTask() // NOLINTNEXTLINE(cert-msc50-cpp) size_t choice = std::rand() % kinetic_max; // Get random number in range [0, kinetic_max) - TaskQueue *queue = nullptr; + TaskQueueForScheduler *queue = nullptr; std::tie(std::ignore, queue) = *kinetic_priorities.upper_bound(choice); // Get queue of chosen element return queue->PopTask().value(); } -std::map TaskScheduler::GetKineticPriorities() const +std::map TaskScheduler::GetKineticPriorities() const { ASSERT(!task_queues_.empty()); // no TaskQueues size_t kinetic_sum = 0; - std::map kinetic_priorities; - TaskQueue *queue = nullptr; + std::map kinetic_priorities; + TaskQueueForScheduler *queue = nullptr; for (auto &traits_queue_pair : task_queues_) { std::tie(std::ignore, queue) = traits_queue_pair; if (queue->IsEmpty()) { @@ -146,7 +149,7 @@ void TaskScheduler::PutTaskInWorker(WorkerThread *worker, Task &&task) bool TaskScheduler::AreQueuesEmpty() const { - TaskQueue *queue = nullptr; + TaskQueueForScheduler *queue = nullptr; for (const auto &traits_queue_pair : task_queues_) { std::tie(std::ignore, queue) = traits_queue_pair; if (!queue->IsEmpty()) { @@ -172,7 +175,7 @@ std::optional TaskScheduler::GetTaskFromQueue(TaskQueueId id, TaskExecutio } LOG(FATAL, COMMON) << "Attempt to take a task from a non-existent queue"; } - TaskQueue *queue = nullptr; + TaskQueueForScheduler *queue = nullptr; std::tie(std::ignore, queue) = *task_queues_iterator; if (!queue->HasTaskWithExecutionMode(mode)) { return std::nullopt; @@ -211,7 +214,7 @@ void TaskScheduler::Finalize() worker->Join(); delete worker; } - TaskQueue *queue = nullptr; + TaskQueueForScheduler *queue = nullptr; for (auto &traits_queue_pair : task_queues_) { std::tie(std::ignore, queue) = traits_queue_pair; queue->UnsubscribeCallback(); diff --git a/libpandabase/taskmanager/task_scheduler.h b/libpandabase/taskmanager/task_scheduler.h index 722a322b2f9318d7565aec2fa35d19c65cffd788..2fb347bcc7d2db163204963305c8f5aa038dc305 100644 --- a/libpandabase/taskmanager/task_scheduler.h +++ b/libpandabase/taskmanager/task_scheduler.h @@ -17,7 +17,8 @@ #define PANDA_LIBPANDABASE_TASKMANAGER_TASK_MANAGER_H #include "libpandabase/taskmanager/worker_thread.h" -#include "libpandabase/taskmanager/task_queue.h" +#include "libpandabase/taskmanager/task_queue-inl.h" +#include "taskmanager/task_queue_interface.h" #include #include @@ -56,7 +57,7 @@ public: * @return TaskQueueId of queue that was added. If queue with same TaskType and VMType is already added, method * returns INVALID_TASKQUEUE_ID */ - PANDA_PUBLIC_API TaskQueueId RegisterQueue(TaskQueue *queue); + PANDA_PUBLIC_API TaskQueueId RegisterQueue(TaskQueueInterface *queue); /// @brief Creates and starts workers with registered queues. After this method, you can not register new queues. PANDA_PUBLIC_API void Initialize(); @@ -121,7 +122,7 @@ private: * @brief This method @returns map from kinetic sum of non-empty queues to queues pointer * in the same order as they place in task_queues_. Use this method to choose next thread */ - std::map GetKineticPriorities() const REQUIRES(task_manager_lock_); + std::map GetKineticPriorities() const REQUIRES(task_manager_lock_); /// @brief Checks if task queues are empty bool AreQueuesEmpty() const REQUIRES(task_manager_lock_); @@ -158,7 +159,7 @@ private: * Since we can change the map only before creating the workers, we do not need to synchronize access after * Initialize method */ - std::map task_queues_; + std::map task_queues_; /** * task_manager_lock_ is used in case of access to shared resources operated by the task manager: diff --git a/libpandabase/tests/taskmanager/task_scheduler_test.cpp b/libpandabase/tests/taskmanager/task_scheduler_test.cpp index 16a28e7058534cbaeaf0760d9d9528c2b69ac85f..4d8fa9e340b3625199770d8e2ccf4225cb801316 100644 --- a/libpandabase/tests/taskmanager/task_scheduler_test.cpp +++ b/libpandabase/tests/taskmanager/task_scheduler_test.cpp @@ -49,15 +49,16 @@ public: { return new std::thread( [task_type, vm_type, priority, mode](TaskSchedulerTest *test) { - TaskQueue queue(task_type, vm_type, priority); - TaskScheduler::GetTaskScheduler()->RegisterQueue(&queue); + TaskQueueInterface *queue = TaskQueue<>::Create(task_type, vm_type, priority); + TaskScheduler::GetTaskScheduler()->RegisterQueue(queue); test->RegisteredOneQueue(); for (size_t i = 0; i < THREADED_TASKS_COUNT; i++) { - queue.AddTask( + queue->AddTask( Task::Create({task_type, vm_type, mode}, [test]() { test->IncrementGlobalCounter(); })); } test->AddedSetOfTasks(); test->WaitFinish(); + TaskQueue<>::Destroy(queue); }, this); } @@ -160,11 +161,12 @@ TEST_F(TaskSchedulerTest, TaskQueueRegistration) { constexpr size_t THREADS_COUNT = 1; auto *tm = TaskScheduler::Create(THREADS_COUNT); - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - EXPECT_EQ(tm->RegisterQueue(&queue), TaskQueueId(TaskType::GC, VMType::STATIC_VM)); - EXPECT_EQ(tm->RegisterQueue(&queue), INVALID_TASKQUEUE_ID); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + EXPECT_EQ(tm->RegisterQueue(queue), TaskQueueId(TaskType::GC, VMType::STATIC_VM)); + EXPECT_EQ(tm->RegisterQueue(queue), INVALID_TASKQUEUE_ID); TaskScheduler::Destroy(); + TaskQueue<>::Destroy(queue); } TEST_F(TaskSchedulerTest, TaskQueuesFillingFromOwner) @@ -174,24 +176,24 @@ TEST_F(TaskSchedulerTest, TaskQueuesFillingFromOwner) constexpr size_t THREADS_COUNT = 5; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - TaskQueue jit_queue(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); - tm->RegisterQueue(&jit_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + TaskQueueInterface *jit_queue = TaskQueue<>::Create(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); + tm->RegisterQueue(jit_queue); // Initialize tm workers tm->Initialize(); // Fill queues with tasks that increment counter with its type. constexpr size_t COUNT_OF_TASK = 10; std::array counters = {0, 0}; for (size_t i = 0; i < COUNT_OF_TASK; i++) { - gc_queue.AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { + gc_queue->AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { constexpr size_t GC_COUNTER = 0; // Atomic with relaxed order reason: data race with counters[GC_COUNTER] with no synchronization or ordering // constraints counters[GC_COUNTER].fetch_add(1, std::memory_order_relaxed); })); - jit_queue.AddTask(Task::Create(JIT_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { + jit_queue->AddTask(Task::Create(JIT_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { constexpr size_t JIT_COUNTER = 1; // Atomic with relaxed order reason: data race with counters[JIT_COUNTER] with no synchronization or // ordering constraints @@ -203,6 +205,8 @@ TEST_F(TaskSchedulerTest, TaskQueuesFillingFromOwner) ASSERT_EQ(counter, COUNT_OF_TASK) << "seed:" << GetSeed(); } TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); + TaskQueue<>::Destroy(jit_queue); } TEST_F(TaskSchedulerTest, TaskQueuesFillingFromTaskScheduler) @@ -212,11 +216,11 @@ TEST_F(TaskSchedulerTest, TaskQueuesFillingFromTaskScheduler) constexpr size_t THREADS_COUNT = 5; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - TaskQueue jit_queue(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); - tm->RegisterQueue(&jit_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + TaskQueueInterface *jit_queue = TaskQueue<>::Create(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); + tm->RegisterQueue(jit_queue); // Initialize tm workers tm->Initialize(); // Fill queues with tasks that increment counter with its type. @@ -241,6 +245,8 @@ TEST_F(TaskSchedulerTest, TaskQueuesFillingFromTaskScheduler) ASSERT_EQ(counter, COUNT_OF_TASK) << "seed:" << GetSeed(); } TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); + TaskQueue<>::Destroy(jit_queue); } TEST_F(TaskSchedulerTest, ForegroundQueueTest) @@ -250,11 +256,11 @@ TEST_F(TaskSchedulerTest, ForegroundQueueTest) constexpr size_t THREADS_COUNT = 1; // IMPORTANT: only one worker to see effect of using foreground execution mode auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - TaskQueue jit_queue(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); - tm->RegisterQueue(&jit_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + TaskQueueInterface *jit_queue = TaskQueue<>::Create(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); + tm->RegisterQueue(jit_queue); // Fill queues with tasks that push their TaskType to global queue. std::queue global_queue; @@ -281,6 +287,8 @@ TEST_F(TaskSchedulerTest, ForegroundQueueTest) global_queue.pop(); ASSERT_TRUE(global_queue.empty()); TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); + TaskQueue<>::Destroy(jit_queue); } TEST_F(TaskSchedulerTest, TaskCreateTask) @@ -290,11 +298,11 @@ TEST_F(TaskSchedulerTest, TaskCreateTask) constexpr size_t THREADS_COUNT = 5; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - TaskQueue jit_queue(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); - tm->RegisterQueue(&jit_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + TaskQueueInterface *jit_queue = TaskQueue<>::Create(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); + tm->RegisterQueue(jit_queue); // Initialize tm workers tm->Initialize(); @@ -320,6 +328,8 @@ TEST_F(TaskSchedulerTest, TaskCreateTask) ASSERT_EQ(counter, COUNT_OF_TASK) << "seed:" << GetSeed(); } TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); + TaskQueue<>::Destroy(jit_queue); } TEST_F(TaskSchedulerTest, MultithreadingUsage) @@ -332,7 +342,7 @@ TEST_F(TaskSchedulerTest, MultithreadingUsage) constexpr size_t PRODUCER_THREADS_COUNT = 4; SetQueueCount(PRODUCER_THREADS_COUNT); SetTasksSetCount(PRODUCER_THREADS_COUNT); - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; auto jit_static_thread = CreateTaskProducerThread(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY, TaskExecutionMode::BACKGROUND); auto jit_dynamic_thread = @@ -370,9 +380,9 @@ TEST_F(TaskSchedulerTest, TaskSchedulerGetTask) // Create TaskScheduler constexpr size_t THREADS_COUNT = 1; // Worker will not be used in this test auto *tm = TaskScheduler::Create(THREADS_COUNT); - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - auto queue = TaskQueue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + auto queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(queue); std::queue global_queue; constexpr size_t COUNT_OF_TASKS = 100; for (size_t i = 0; i < COUNT_OF_TASKS; i++) { @@ -390,6 +400,7 @@ TEST_F(TaskSchedulerTest, TaskSchedulerGetTask) tm->Finalize(); ASSERT_FALSE(tm->GetTaskFromQueue(GC_STATIC_VM_BACKGROUND_PROPERTIES).has_value()) << "seed:" << GetSeed(); tm->Destroy(); + TaskQueue<>::Destroy(queue); } TEST_F(TaskSchedulerTest, TasksWithMutex) @@ -399,11 +410,11 @@ TEST_F(TaskSchedulerTest, TasksWithMutex) constexpr size_t THREADS_COUNT = 10; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; - TaskQueue gc_task_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - TaskQueue jit_task_queue(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_task_queue); - tm->RegisterQueue(&jit_task_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + TaskQueueInterface *jit_queue = TaskQueue<>::Create(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); + tm->RegisterQueue(jit_queue); // Initialize tm workers tm->Initialize(); // Fill queues with tasks that increment counter with its type. @@ -411,12 +422,12 @@ TEST_F(TaskSchedulerTest, TasksWithMutex) std::array counters = {0, 0}; os::memory::Mutex main_mutex; for (size_t i = 0; i < COUNT_OF_TASK; i++) { - gc_task_queue.AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [&main_mutex, &counters]() { + gc_queue->AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [&main_mutex, &counters]() { constexpr size_t GC_COUNTER = 0; os::memory::LockHolder lock_holder(main_mutex); counters[GC_COUNTER]++; })); - jit_task_queue.AddTask(Task::Create(JIT_STATIC_VM_BACKGROUND_PROPERTIES, [&main_mutex, &counters]() { + jit_queue->AddTask(Task::Create(JIT_STATIC_VM_BACKGROUND_PROPERTIES, [&main_mutex, &counters]() { constexpr size_t JIT_COUNTER = 1; os::memory::LockHolder lock_holder(main_mutex); counters[JIT_COUNTER]++; @@ -427,6 +438,8 @@ TEST_F(TaskSchedulerTest, TasksWithMutex) ASSERT_EQ(counter, COUNT_OF_TASK) << "seed:" << GetSeed(); } TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); + TaskQueue<>::Destroy(jit_queue); } TEST_F(TaskSchedulerTest, TaskCreateTaskRecursively) @@ -436,9 +449,9 @@ TEST_F(TaskSchedulerTest, TaskCreateTaskRecursively) constexpr size_t THREADS_COUNT = 5; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); // Initialize tm workers tm->Initialize(); @@ -463,8 +476,9 @@ TEST_F(TaskSchedulerTest, TaskCreateTaskRecursively) tm->AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [runner]() { runner(0); })); } tm->Finalize(); - ASSERT_TRUE(gc_queue.IsEmpty()); + ASSERT_TRUE(gc_queue->IsEmpty()); TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); } TEST_F(TaskSchedulerTest, TaskSchedulerTaskGetTask) @@ -474,9 +488,9 @@ TEST_F(TaskSchedulerTest, TaskSchedulerTaskGetTask) constexpr size_t THREADS_COUNT = 5; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); // Initialize tm workers tm->Initialize(); @@ -506,8 +520,9 @@ TEST_F(TaskSchedulerTest, TaskSchedulerTaskGetTask) })); } tm->Finalize(); - ASSERT_TRUE(gc_queue.IsEmpty()); + ASSERT_TRUE(gc_queue->IsEmpty()); TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); } TEST_F(TaskSchedulerTest, TaskSchedulerWaitForFinishAllTaskFromQueue) @@ -517,28 +532,28 @@ TEST_F(TaskSchedulerTest, TaskSchedulerWaitForFinishAllTaskFromQueue) constexpr size_t THREADS_COUNT = 5; auto *tm = TaskScheduler::Create(THREADS_COUNT); // Create and register 2 queues - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::DEFAULT_PRIORITY; - TaskQueue gc_queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); - TaskQueue jit_queue(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); - tm->RegisterQueue(&gc_queue); - tm->RegisterQueue(&jit_queue); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::DEFAULT_PRIORITY; + TaskQueueInterface *gc_queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + TaskQueueInterface *jit_queue = TaskQueue<>::Create(TaskType::JIT, VMType::STATIC_VM, QUEUE_PRIORITY); + tm->RegisterQueue(gc_queue); + tm->RegisterQueue(jit_queue); // Fill queues with tasks that increment counter with its type. constexpr size_t COUNT_OF_TASK = 10'000; std::array counters = {0, 0, 0}; for (size_t i = 0; i < COUNT_OF_TASK; i++) { - gc_queue.AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { + gc_queue->AddTask(Task::Create(GC_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { constexpr size_t GC_BACKGROUND_COUNTER = 0; // Atomic with relaxed order reason: data race with counters[GC_BACKGROUND_COUNTER] with no synchronization // or ordering constraints counters[GC_BACKGROUND_COUNTER].fetch_add(1, std::memory_order_relaxed); })); - gc_queue.AddTask(Task::Create(GC_STATIC_VM_FOREGROUND_PROPERTIES, [&counters]() { + gc_queue->AddTask(Task::Create(GC_STATIC_VM_FOREGROUND_PROPERTIES, [&counters]() { constexpr size_t GC_FOREGROUND_COUNTER = 1; // Atomic with relaxed order reason: data race with counters[GC_FOREGROUND_COUNTER] with no synchronization // or ordering constraints counters[GC_FOREGROUND_COUNTER].fetch_add(1, std::memory_order_relaxed); })); - jit_queue.AddTask(Task::Create(JIT_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { + jit_queue->AddTask(Task::Create(JIT_STATIC_VM_BACKGROUND_PROPERTIES, [&counters]() { constexpr size_t JIT_COUNTER = 2; // Atomic with relaxed order reason: data race with counters[JIT_COUNTER] with no synchronization or // ordering constraints @@ -548,16 +563,18 @@ TEST_F(TaskSchedulerTest, TaskSchedulerWaitForFinishAllTaskFromQueue) // Initialize tm workers tm->Initialize(); tm->WaitForFinishAllTasksWithProperties(GC_STATIC_VM_FOREGROUND_PROPERTIES); - ASSERT_FALSE(gc_queue.HasTaskWithExecutionMode(TaskExecutionMode::FOREGROUND)); + ASSERT_FALSE(gc_queue->HasTaskWithExecutionMode(TaskExecutionMode::FOREGROUND)); tm->WaitForFinishAllTasksWithProperties(GC_STATIC_VM_BACKGROUND_PROPERTIES); - ASSERT_TRUE(gc_queue.IsEmpty()); + ASSERT_TRUE(gc_queue->IsEmpty()); tm->WaitForFinishAllTasksWithProperties(JIT_STATIC_VM_BACKGROUND_PROPERTIES); - ASSERT_TRUE(jit_queue.IsEmpty()); + ASSERT_TRUE(jit_queue->IsEmpty()); tm->Finalize(); for (auto &counter : counters) { ASSERT_EQ(counter, COUNT_OF_TASK) << "seed:" << GetSeed(); } TaskScheduler::Destroy(); + TaskQueue<>::Destroy(gc_queue); + TaskQueue<>::Destroy(jit_queue); } } // namespace panda::taskmanager \ No newline at end of file diff --git a/libpandabase/tests/taskmanager/task_test.cpp b/libpandabase/tests/taskmanager/task_test.cpp index 94697b69a2310f07b756e50826d1a2c0c3a5297a..8af95f124e6de696a30e9c9646cbfd5ce624d005 100644 --- a/libpandabase/tests/taskmanager/task_test.cpp +++ b/libpandabase/tests/taskmanager/task_test.cpp @@ -1,19 +1,20 @@ /** - * Copyright (c) 2023 Huawei Device Co., Ltd. - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. + * Copyright (c) 2023 Huawei Device Co->, Ltd. + * Licensed under the Apache License, Version 2->0 (the "License"); + * you may not use this file except in compliance with the License-> * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www->apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied-> * See the License for the specific language governing permissions and - * limitations under the License. + * limitations under the License-> */ -#include "libpandabase/taskmanager/task_queue.h" +#include "libpandabase/taskmanager/task_queue-inl.h" +#include "taskmanager/task_queue.h" #include #include @@ -52,79 +53,80 @@ TEST_F(TaskTest, TaskQueueSimpleTest) { size_t counter = 0; // Creation of TaskQueue - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::DYNAMIC_VM, QUEUE_PRIORITY); - EXPECT_EQ(queue.GetTaskType(), TaskType::GC); - EXPECT_TRUE(queue.IsEmpty()); - EXPECT_EQ(queue.Size(), 0); - EXPECT_EQ(queue.GetPriority(), QUEUE_PRIORITY); - // Add COUNT_OF_TASKS tasks in queue. Each task increment counter. + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::DYNAMIC_VM, QUEUE_PRIORITY); + EXPECT_EQ(queue->GetTaskType(), TaskType::GC); + EXPECT_TRUE(queue->IsEmpty()); + EXPECT_EQ(queue->Size(), 0); + EXPECT_EQ(queue->GetPriority(), QUEUE_PRIORITY); + // Add COUNT_OF_TASKS tasks in queue-> Each task increment counter. constexpr size_t COUNT_OF_TASKS = 10; for (size_t i = 0; i < COUNT_OF_TASKS; i++) { - queue.AddTask(Task::Create({TaskType::GC, VMType::DYNAMIC_VM, TaskExecutionMode::BACKGROUND}, - [&counter]() { counter++; })); + queue->AddTask(Task::Create({TaskType::GC, VMType::DYNAMIC_VM, TaskExecutionMode::BACKGROUND}, + [&counter]() { counter++; })); } - EXPECT_EQ(queue.GetTaskType(), TaskType::GC); - EXPECT_FALSE(queue.IsEmpty()); - EXPECT_EQ(queue.Size(), COUNT_OF_TASKS); - EXPECT_EQ(queue.GetPriority(), QUEUE_PRIORITY); + EXPECT_EQ(queue->GetTaskType(), TaskType::GC); + EXPECT_FALSE(queue->IsEmpty()); + EXPECT_EQ(queue->Size(), COUNT_OF_TASKS); + EXPECT_EQ(queue->GetPriority(), QUEUE_PRIORITY); // Pop count_of_done_task tasks from queue and execute them. constexpr size_t COUNT_OF_DONE_TASKS = 6; ASSERT(COUNT_OF_DONE_TASKS < COUNT_OF_TASKS); for (size_t i = 0; i < COUNT_OF_DONE_TASKS; i++) { - auto pop_task = queue.PopTask(); + auto pop_task = static_cast(queue)->PopTask(); EXPECT_EQ(pop_task.value().GetTaskProperties().GetTaskType(), TaskType::GC); pop_task.value().RunTask(); EXPECT_EQ(counter, i + 1); } // Now in queue counter_of_tasks - COUNT_OF_DONE_TASKS objects. - EXPECT_EQ(queue.GetTaskType(), TaskType::GC); - EXPECT_EQ(queue.IsEmpty(), COUNT_OF_TASKS == COUNT_OF_DONE_TASKS); - EXPECT_EQ(queue.Size(), COUNT_OF_TASKS - COUNT_OF_DONE_TASKS); - EXPECT_EQ(queue.GetPriority(), QUEUE_PRIORITY); + EXPECT_EQ(queue->GetTaskType(), TaskType::GC); + EXPECT_EQ(queue->IsEmpty(), COUNT_OF_TASKS == COUNT_OF_DONE_TASKS); + EXPECT_EQ(queue->Size(), COUNT_OF_TASKS - COUNT_OF_DONE_TASKS); + EXPECT_EQ(queue->GetPriority(), QUEUE_PRIORITY); // Change priority of this queue - constexpr size_t NEW_QUEUE_PRIORITY = TaskQueue::MIN_PRIORITY; - queue.SetPriority(NEW_QUEUE_PRIORITY); - EXPECT_EQ(queue.GetTaskType(), TaskType::GC); - EXPECT_EQ(queue.IsEmpty(), COUNT_OF_TASKS == COUNT_OF_DONE_TASKS); - EXPECT_EQ(queue.Size(), COUNT_OF_TASKS - COUNT_OF_DONE_TASKS); - EXPECT_EQ(queue.GetPriority(), NEW_QUEUE_PRIORITY); - // Add in queue counter_of_tasks new tasks. Each add 2 to counter + constexpr size_t NEW_QUEUE_PRIORITY = TaskQueueInterface::MIN_PRIORITY; + queue->SetPriority(NEW_QUEUE_PRIORITY); + EXPECT_EQ(queue->GetTaskType(), TaskType::GC); + EXPECT_EQ(queue->IsEmpty(), COUNT_OF_TASKS == COUNT_OF_DONE_TASKS); + EXPECT_EQ(queue->Size(), COUNT_OF_TASKS - COUNT_OF_DONE_TASKS); + EXPECT_EQ(queue->GetPriority(), NEW_QUEUE_PRIORITY); + // Add in queue counter_of_tasks new tasks-> Each add 2 to counter for (size_t i = 0; i < COUNT_OF_TASKS; i++) { - queue.AddTask(Task::Create({TaskType::GC, VMType::DYNAMIC_VM, TaskExecutionMode::BACKGROUND}, - [&counter]() { counter += 2; })); + queue->AddTask(Task::Create({TaskType::GC, VMType::DYNAMIC_VM, TaskExecutionMode::BACKGROUND}, + [&counter]() { counter += 2; })); } // After we have 2 * counter_of_tasks - counter_of_done_tasks objects in queue - EXPECT_EQ(queue.GetTaskType(), TaskType::GC); - EXPECT_FALSE(queue.IsEmpty()); - EXPECT_EQ(queue.Size(), 2 * COUNT_OF_TASKS - COUNT_OF_DONE_TASKS); - EXPECT_EQ(queue.GetPriority(), NEW_QUEUE_PRIORITY); - // Pop and execute all tasks in queue. - while (!queue.IsEmpty()) { - auto next_task = queue.PopTask(); + EXPECT_EQ(queue->GetTaskType(), TaskType::GC); + EXPECT_FALSE(queue->IsEmpty()); + EXPECT_EQ(queue->Size(), 2 * COUNT_OF_TASKS - COUNT_OF_DONE_TASKS); + EXPECT_EQ(queue->GetPriority(), NEW_QUEUE_PRIORITY); + // Pop and execute all tasks in queue-> + while (!queue->IsEmpty()) { + auto next_task = static_cast(queue)->PopTask(); next_task.value().RunTask(); } // After all task is done, counter = 3 * COUNT_OF_TASKS EXPECT_EQ(counter, 3 * COUNT_OF_TASKS); - EXPECT_EQ(queue.GetTaskType(), TaskType::GC); - EXPECT_EQ(queue.Size(), 0); - EXPECT_EQ(queue.GetPriority(), NEW_QUEUE_PRIORITY); + EXPECT_EQ(queue->GetTaskType(), TaskType::GC); + EXPECT_EQ(queue->Size(), 0); + EXPECT_EQ(queue->GetPriority(), NEW_QUEUE_PRIORITY); + TaskQueue<>::Destroy(queue); } TEST_F(TaskTest, TaskQueueMultithreadingOnePushOnePop) { - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); std::atomic_size_t counter = 0; constexpr size_t RESULT_COUNT = 10'000; auto pusher = [&queue, &counter]() { for (size_t i = 0; i < RESULT_COUNT; i++) { - queue.AddTask(Task::Create(TASK_PROPERTIES, [&counter]() { counter++; })); + queue->AddTask(Task::Create(TASK_PROPERTIES, [&counter]() { counter++; })); } }; auto popper = [&queue]() { for (size_t i = 0; i < RESULT_COUNT; i++) { - auto task = queue.PopTask(); + auto task = static_cast(queue)->PopTask(); task->RunTask(); } }; @@ -135,22 +137,23 @@ TEST_F(TaskTest, TaskQueueMultithreadingOnePushOnePop) delete worker_1; delete worker_2; EXPECT_EQ(counter, RESULT_COUNT); + TaskQueue<>::Destroy(queue); } TEST_F(TaskTest, TaskQueueMultithreadingNPushNPop) { - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); std::atomic_size_t counter = 0; constexpr size_t RESULT_COUNT = 100'000; auto pusher = [&queue, &counter]() { for (size_t i = 0; i < RESULT_COUNT; i++) { - queue.AddTask(Task::Create(TASK_PROPERTIES, [&counter]() { counter++; })); + queue->AddTask(Task::Create(TASK_PROPERTIES, [&counter]() { counter++; })); } }; auto popper = [&queue]() { for (size_t i = 0; i < RESULT_COUNT; i++) { - auto task = queue.PopTask(); + auto task = static_cast(queue)->PopTask(); task->RunTask(); } }; @@ -168,16 +171,17 @@ TEST_F(TaskTest, TaskQueueMultithreadingNPushNPop) delete poppers[i]; } EXPECT_EQ(counter, RESULT_COUNT * COUNT_OF_WORKERS); + TaskQueue<>::Destroy(queue); } TEST_F(TaskTest, TaskQueueWaitForQueueEmptyAndFinish) { - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); std::atomic_size_t counter = 0; constexpr size_t TASK_COUNT = 100'000; for (size_t i = 0; i < TASK_COUNT; i++) { - queue.AddTask(Task::Create(TASK_PROPERTIES, [&counter]() { counter++; })); + queue->AddTask(Task::Create(TASK_PROPERTIES, [&counter]() { counter++; })); } constexpr size_t THREAD_COUNTER = 10; @@ -185,7 +189,7 @@ TEST_F(TaskTest, TaskQueueWaitForQueueEmptyAndFinish) for (size_t i = 0; i < THREAD_COUNTER; i++) { poppers.emplace_back([&queue]() { while (true) { - auto task = queue.PopTask(); + auto task = static_cast(queue)->PopTask(); if (!task.has_value()) { break; } @@ -194,33 +198,34 @@ TEST_F(TaskTest, TaskQueueWaitForQueueEmptyAndFinish) }); } - queue.WaitForQueueEmptyAndFinish(); + queue->WaitForQueueEmptyAndFinish(); for (auto &popper : poppers) { popper.join(); } EXPECT_EQ(counter, TASK_COUNT); + TaskQueue<>::Destroy(queue); } TEST_F(TaskTest, TaskQueueForegroundAndBackgroundTasks) { - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); std::queue mode_queue; constexpr TaskProperties FOREGROUND_PROPERTIES(TaskType::GC, VMType::STATIC_VM, TaskExecutionMode::FOREGROUND); constexpr TaskProperties BACKGROUND_PROPERTIES(TaskType::GC, VMType::STATIC_VM, TaskExecutionMode::BACKGROUND); constexpr size_t TASKS_COUNT = 100; for (size_t i = 0; i < TASKS_COUNT; i++) { - queue.AddTask( + queue->AddTask( Task::Create(BACKGROUND_PROPERTIES, [&mode_queue]() { mode_queue.push(TaskExecutionMode::BACKGROUND); })); } for (size_t i = 0; i < TASKS_COUNT; i++) { - queue.AddTask( + queue->AddTask( Task::Create(FOREGROUND_PROPERTIES, [&mode_queue]() { mode_queue.push(TaskExecutionMode::FOREGROUND); })); } for (size_t i = 0; i < 2 * TASKS_COUNT; i++) { - auto task = queue.PopTask(); + auto task = static_cast(queue)->PopTask(); ASSERT_TRUE(task.has_value()); task.value().RunTask(); } @@ -236,31 +241,32 @@ TEST_F(TaskTest, TaskQueueForegroundAndBackgroundTasks) EXPECT_EQ(mode, TaskExecutionMode::BACKGROUND); } EXPECT_TRUE(mode_queue.empty()); + TaskQueue<>::Destroy(queue); } TEST_F(TaskTest, PopTaskWithExecutionMode) { - constexpr uint8_t QUEUE_PRIORITY = TaskQueue::MAX_PRIORITY; - TaskQueue queue(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); + constexpr uint8_t QUEUE_PRIORITY = TaskQueueInterface::MAX_PRIORITY; + TaskQueueInterface *queue = TaskQueue<>::Create(TaskType::GC, VMType::STATIC_VM, QUEUE_PRIORITY); std::queue mode_queue; constexpr TaskProperties FOREGROUND_PROPERTIES(TaskType::GC, VMType::STATIC_VM, TaskExecutionMode::FOREGROUND); constexpr TaskProperties BACKGROUND_PROPERTIES(TaskType::GC, VMType::STATIC_VM, TaskExecutionMode::BACKGROUND); constexpr size_t TASKS_COUNT = 100; for (size_t i = 0; i < TASKS_COUNT; i++) { - queue.AddTask( + queue->AddTask( Task::Create(BACKGROUND_PROPERTIES, [&mode_queue]() { mode_queue.push(TaskExecutionMode::BACKGROUND); })); } for (size_t i = 0; i < TASKS_COUNT; i++) { - queue.AddTask( + queue->AddTask( Task::Create(FOREGROUND_PROPERTIES, [&mode_queue]() { mode_queue.push(TaskExecutionMode::FOREGROUND); })); } for (size_t i = 0; i < TASKS_COUNT; i++) { - auto task = queue.PopTask(TaskExecutionMode::FOREGROUND); + auto task = static_cast(queue)->PopTask(TaskExecutionMode::FOREGROUND); ASSERT_TRUE(task.has_value()); task.value().RunTask(); - task = queue.PopTask(TaskExecutionMode::BACKGROUND); + task = static_cast(queue)->PopTask(TaskExecutionMode::BACKGROUND); ASSERT_TRUE(task.has_value()); task.value().RunTask(); } @@ -274,6 +280,7 @@ TEST_F(TaskTest, PopTaskWithExecutionMode) EXPECT_EQ(mode, TaskExecutionMode::BACKGROUND); } EXPECT_TRUE(mode_queue.empty()); + TaskQueue<>::Destroy(queue); } } // namespace panda::taskmanager \ No newline at end of file diff --git a/runtime/compiler_task_manager_worker.cpp b/runtime/compiler_task_manager_worker.cpp index aa3561edc73b23b77447ddb4e849193d24f4fadc..cdf48b3836541ae788e87aeff4b382029fe81c8a 100644 --- a/runtime/compiler_task_manager_worker.cpp +++ b/runtime/compiler_task_manager_worker.cpp @@ -15,14 +15,15 @@ #include "runtime/compiler.h" #include "runtime/compiler_task_manager_worker.h" +#include "taskmanager/task_queue_interface.h" namespace panda { CompilerTaskManagerWorker::CompilerTaskManagerWorker(mem::InternalAllocatorPtr internal_allocator, Compiler *compiler) : CompilerWorker(internal_allocator, compiler) { - compiler_task_manager_queue_ = internal_allocator_->New( - taskmanager::TaskType::JIT, taskmanager::VMType::STATIC_VM, taskmanager::TaskQueue::DEFAULT_PRIORITY); + compiler_task_manager_queue_ = taskmanager::TaskQueueAdapter())>::Create( + taskmanager::TaskType::JIT, taskmanager::VMType::STATIC_VM, taskmanager::TaskQueueInterface::DEFAULT_PRIORITY); ASSERT(compiler_task_manager_queue_ != nullptr); } diff --git a/runtime/compiler_task_manager_worker.h b/runtime/compiler_task_manager_worker.h index add06e5eddbaa9bc2b46d8ae6f1a6f42301a20fa..dcadc617db762abb1bd2170ac14a04421fe37709 100644 --- a/runtime/compiler_task_manager_worker.h +++ b/runtime/compiler_task_manager_worker.h @@ -21,6 +21,7 @@ #include "libpandabase/taskmanager/task.h" #include "libpandabase/taskmanager/task_queue.h" #include "libpandabase/taskmanager/task_scheduler.h" +#include "taskmanager/task_queue_interface.h" namespace panda { @@ -60,14 +61,14 @@ public: ~CompilerTaskManagerWorker() override { - internal_allocator_->Delete(compiler_task_manager_queue_); + taskmanager::TaskQueueAdapter())>::Destroy(compiler_task_manager_queue_); } private: void AddTaskInTaskManager(CompilerTask &&ctx); void CompileNextMethod() REQUIRES(task_queue_lock_); - taskmanager::TaskQueue *compiler_task_manager_queue_ {nullptr}; + taskmanager::TaskQueueInterface *compiler_task_manager_queue_ {nullptr}; os::memory::Mutex task_queue_lock_; // This queue is used for methods need to be compiled inside TaskScheduler without compilation_lock_. PandaDeque compiler_task_deque_ GUARDED_BY(task_queue_lock_); diff --git a/runtime/mem/gc/gc.cpp b/runtime/mem/gc/gc.cpp index 51600852b977197560ee94a9b5477c4d3254eea9..aab3bf9a2d31c394db684c77385261bf20f4b505 100644 --- a/runtime/mem/gc/gc.cpp +++ b/runtime/mem/gc/gc.cpp @@ -43,6 +43,7 @@ #include "runtime/include/object_accessor-inl.h" #include "runtime/include/coretypes/class.h" #include "runtime/thread_manager.h" +#include "taskmanager/task_queue_interface.h" namespace panda::mem { using TaggedValue = coretypes::TaggedValue; @@ -56,7 +57,7 @@ GC::GC(ObjectAllocatorBase *object_allocator, const GCSettings &settings) { if (gc_settings_.UseTaskManagerForGC()) { // Create gc task queue for task manager - gc_workers_task_queue_ = internal_allocator_->New( + gc_workers_task_queue_ = taskmanager::TaskQueueAdapter())>::Create( taskmanager::TaskType::GC, taskmanager::VMType::STATIC_VM, GC_TASK_QUEUE_PRIORITY); ASSERT(gc_workers_task_queue_ != nullptr); // Register created gc task queue in task manager @@ -88,7 +89,7 @@ GC::~GC() allocator->Delete(workers_task_pool_); } if (gc_workers_task_queue_ != nullptr) { - allocator->Delete(gc_workers_task_queue_); + taskmanager::TaskQueueAdapter())>::Destroy(gc_workers_task_queue_); } } diff --git a/runtime/mem/gc/gc.h b/runtime/mem/gc/gc.h index 64a1e47f217662f58abe58392eb2b863d6f75c96..bcafa536c51a9728876342fcf9dc44408fe5224b 100644 --- a/runtime/mem/gc/gc.h +++ b/runtime/mem/gc/gc.h @@ -712,7 +712,7 @@ private: // TODO(ipetrov): choose suitable priority static constexpr size_t GC_TASK_QUEUE_PRIORITY = 6U; - taskmanager::TaskQueue *gc_workers_task_queue_ = nullptr; + taskmanager::TaskQueueInterface *gc_workers_task_queue_ = nullptr; /* GC worker specific variables */ GCWorker *gc_worker_ = nullptr;