代码拉取完成,页面将自动刷新
#include <iostream>
#include <queue>
#include <vector>
#include <string>
#include <unistd.h>
#include <functional>
//#include "Task.hpp"
#include "Thread.hpp"
#include "Log.hpp"
using namespace ThreadMoudle;
using namespace log_ns;
static const int gdefaultnum = 5; // 线程池中线程的数量
template <typename T>
class ThreadPool
{
public:
// 返回单例模式对象指针
// 存在问题:多线程获取单例模式的线程池,如果线程池对象没有被创建,此时多个线程可能会同时执行new创建线程池对象
// 这就不是单例模式的线程池了,所以必须对创建线程池对象的操作加锁!
static ThreadPool<T> *GetInstance()
{
if (_tp == nullptr)//只有当线程池对象没有被创建时才需要加锁,否则直接返回单例模式线程池的对象指针即可
{
pthread_mutex_lock(&_sig_mutex); // 加锁
if (_tp == nullptr) // 说明对象还没有开辟
{
LOG(INFO, "create threadpool\n");
_tp = new ThreadPool(); // 懒汉实现方式
_tp->Init();
_tp->Start();
}
else
{
LOG(INFO, "get threadpool\n");
}
pthread_mutex_unlock(&_sig_mutex); // 解锁
}
return _tp;
}
// 关闭线程池
void Stop()
{
LockQueue();
_isrunning = false;
WakeupAll(); // 唤醒所有线程,防止有线程在休眠不经过判断条件,无法退出
UnlockQueue();
LOG(INFO, "Thread Pool Stop Sucess!\n");
}
// 向线程池发送任务
void Equeue(const T &in)
{
LockQueue(); // 加锁
if (_isrunning) // 只有线程池处于运行状态时,才可以向任务队列添加任务
{
_task_queue.push(in); // 向任务队列中添加任务
if (_sleep_thread_num > 0) // 只要有线程在休眠就唤醒线程
{
Wakeup();
}
}
UnlockQueue(); // 解锁
}
// 析构函数
~ThreadPool()
{
pthread_mutex_destroy(&_mutex); // 销毁互斥锁
pthread_cond_destroy(&_cond); // 销毁条件变量
}
private:
ThreadPool(const ThreadPool<T> &) = delete; // 禁用拷贝构造函数
void operator=(const ThreadPool<T> &) = delete; // 禁用赋值运算符重载
// 构造函数
ThreadPool(int thread_num = gdefaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0)
{
pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁
pthread_cond_init(&_cond, nullptr); // 初始化条件变量
}
// 线程池初始化
void Init()
{
func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1); // 强关联,构建新对象,作为任务队列中对象的参数
for (int i = 0; i < _thread_num; ++i)
{
std::string threadname = "thread-" + std::to_string(i + 1);
_threads.emplace_back(threadname, func); // emplace_back会自动根据参数创建对象填入vector容器中
LOG(DEBUG, "construct thread %s done,init sucess\n", threadname.c_str());
}
}
// 启动线程池
void Start()
{
_isrunning = true;
// 创建并运行所有线程
for (auto &thread : _threads)
{
LOG(DEBUG, "start thread %s done.\n", thread.Name().c_str());
thread.Start();
}
}
// 任务队列加锁
void LockQueue()
{
pthread_mutex_lock(&_mutex);
}
// 任务队列解锁
void UnlockQueue()
{
pthread_mutex_unlock(&_mutex);
}
// 唤醒单个线程
void Wakeup()
{
pthread_cond_signal(&_cond);
}
// 唤醒所有线程
void WakeupAll()
{
pthread_cond_broadcast(&_cond);
}
// 判断任务队列是否为空
bool IsEmpty()
{
return _task_queue.empty();
}
// 线程处理任务队列中的任务
void HandlerTask(const std::string &name)
{
while (true)
{
LockQueue(); // 加锁
while (IsEmpty() && _isrunning) // 如果任务队列为空,线程休眠(使用while防止伪唤醒问题)
{ // 任务队列为空但是线程池还要运行时,线程才能去休眠
_sleep_thread_num++;
LOG(INFO, "%s thread sleep begin!\n", name.c_str());
Sleep(); // 线程休眠
LOG(INFO, "%s thread wake up!\n", name.c_str());
_sleep_thread_num--;
}
// 判定线程池是否要退出
if (IsEmpty() && !_isrunning) // 只有当任务队列为空,并且线程池也要退出时,再退出线程
{
// std::cout<<name<<" quit"<<std::endl;
LOG(INFO, "%s thread quit\n", name.c_str());
UnlockQueue();
break;
}
// 处理任务
T t = _task_queue.front(); // 取出任务
_task_queue.pop(); // 删除任务队列中的任务
UnlockQueue(); // 解锁
t(); // 处理任务,必须在解锁之后处理,因为任务被取出之后就属于线程了,不属于临界资源了
// std::cout<<name<<": "<<t.result()<<std::endl;
//LOG(DEBUG, "hander task done,task is: %s\n", t.result().c_str());
}
}
// 线程休眠
void Sleep()
{
pthread_cond_wait(&_cond, &_mutex);
}
private:
int _thread_num; // 线程池中线程的个数
std::vector<Thread> _threads; // 管理线程
std::queue<T> _task_queue; // 任务队列
bool _isrunning; // 判断线程池是否正在运行
pthread_mutex_t _mutex; // 互斥锁,保护临界资源:任务队列
pthread_cond_t _cond; // 条件变量,实现线程同步
int _sleep_thread_num; // 休眠线程的个数
// 单例模式
static ThreadPool<T> *_tp;
// 如果多个线程同时调用单例模式的线程池,会创建多个对象,这样就不是单例模式了,所以要在
static pthread_mutex_t _sig_mutex; // 单例的锁,
};
template <class T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr; // 静态成员类外初始化
template <class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex = PTHREAD_MUTEX_INITIALIZER; // 静态成员类外初始化
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。