1 Star 1 Fork 0

lixinyu7/threadpool

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
threadPooldemo.c 10.78 KB
一键复制 编辑 原始数据 按行查看 历史
lixinyu7 提交于 2021-10-20 13:01 . add file
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <signal.h>
#define DEFAULT_TIME 60
#define DEFAULT_THREAD_VERY 5 //每次新增5个线程
#define MIN_WAIT_TASK_NUM 10 //
typedef struct
{
void *(*func)(void*); //函数指针
void *arg; //参数通用指针
}ThreadPoolTask_T; //各子线程工作结构体
typedef struct
{
pthread_mutex_t lock; //互斥锁-->用于锁住本结构体
pthread_mutex_t thread_counter; //忙线程个数锁
pthread_cond_t queue_not_full; //当队列任务满时,添加任务的线程阻塞,等待此条件
pthread_cond_t queue_not_empty; //任务队列不为空时,通知等待任务的线程
pthread_t *workds_id; //存放线程池中每个线程的id,数组
pthread_t manager_tid; //存放管理线程id
ThreadPoolTask_T *task_queue; //任务队列
int min_thread_num; //线程池最小线程数
int max_thread_num; //线程池最大线程数
int live_thread_num; //当前存活线程个数
int busy_thread_num; //忙线程个数
int wait_exit_thread_num; //待销毁的线程个数
int queue_front; //task_queue 队头下标
int queue_rear; //task_queue 队尾下标
int queue_size; //task_queue 队列中实际任务数
int queue_max_size; //task_queue 队列可容纳任务上限
int shutdown; //标志位,线程池使用状态,true or false
}ThreadPool_T;
int is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); /*发0号信号,测试线程是否存活*/
if (kill_rc == ESRCH)
{
return 0;
}
return 1;
}
/*
* @brief 工作线程
*/
void* workers_thread(void *arg)
{
ThreadPool_T *pool = (ThreadPool_T*)arg;
ThreadPoolTask_T task;
while(1)
{
/*
* 等待任务队列里面有任务,否则阻塞等待任务队列里有任务在唤醒
*/
pthread_mutex_lock(&pool->lock);
//queue_size == 0说明任务队列为空
while(pool->queue_size == 0 && (!pool->shutdown))
{
printf("works thread Id 0x%x is waiting\n",(unsigned int)pthread_self());
pthread_cond_wait(&pool->queue_not_empty,&pool->lock);
if(pool->wait_exit_thread_num > 0)
{
//如果线程池里的线程数大于最小值时可以结束当前线程
if(pool->live_thread_num > pool->min_thread_num)
{
printf("works thread Id 0x%x is exiting\n",(unsigned int)pthread_self());
pool->live_thread_num--;
pool->wait_exit_thread_num--;
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
}
} //end pool->queue_size==0
if(pool->shutdown == 1)
{
printf("works thread 0x%x is exiting\n",(unsigned int)pthread_self());
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
//从任务队列中获取任务,是一个出队列操作
task.func = pool->task_queue[pool->queue_front].func;
task.arg = pool->task_queue[pool->queue_front].arg;
//出队,模拟环形队列
pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;
pool->queue_size--;
//通知可以有新的任务添加进来
pthread_cond_broadcast(&pool->queue_not_full);
//取出任务后,立即将线程池锁释放
pthread_mutex_unlock(&pool->lock);
//设置当前线程忙状态
pthread_mutex_lock(&pool->thread_counter);
pool->busy_thread_num++;
pthread_mutex_unlock(&pool->thread_counter);
//执行任务
(*task.func)(task.arg);
//由忙状态切换为空闲状态
pthread_mutex_lock(&pool->thread_counter);
pool->busy_thread_num--;
pthread_mutex_unlock(&pool->thread_counter);
}
return NULL;
}
/*
*@brief 管理线程函数
*/
void* manager_thread(void *arg)
{
ThreadPool_T *pool = (ThreadPool_T *)arg;
int queue_size = 0;
int live_thread_num = 0;
int busy_thread_num = 0;
int add = 0;
int i = 0;
while(!pool->shutdown)
{
sleep(DEFAULT_TIME); //定时对线程池管理
pthread_mutex_lock(&pool->lock);
queue_size = pool->queue_size; //任务队列中实际任务数
live_thread_num = pool->live_thread_num; //线程池中存在的线程数量
pthread_mutex_unlock(&pool->lock);
pthread_mutex_lock(&pool->thread_counter); //忙线程锁
busy_thread_num = pool->busy_thread_num;
pthread_mutex_unlock(&pool->thread_counter); //
/*创建新线程算法:任务数量大于最新线程池个数,且存活的线程数小于最大线程数*/
if(queue_size > pool->min_thread_num && live_thread_num < pool->max_thread_num)
{
pthread_mutex_lock(&pool->lock);
for(i = 0; i <pool->max_thread_num && add < DEFAULT_THREAD_VERY && pool->live_thread_num < pool->max_thread_num;i++)
{
if(pool->workds_id[i] == 0 || !is_thread_alive(pool->workds_id[i]))
{
pthread_create(&pool->workds_id[i],NULL, workers_thread,(void*)pool);
add++;
pool->live_thread_num++;
}
}
pthread_mutex_unlock(&pool->lock);
}
//销毁多余的空闲线程算法,忙线程 * 2 小于存活的线程数且存活的线程数大于最小线程数时
if(busy_thread_num * 2 < live_thread_num && live_thread_num > pool->min_thread_num)
{
//一次销毁DEFAULT_THREAD_VERY个线程
pthread_mutex_lock(&pool->lock);
pool->wait_exit_thread_num = DEFAULT_THREAD_VERY;
pthread_mutex_unlock(&pool->lock);
for(i = 0; i < DEFAULT_THREAD_VERY;i++)
{
//通知空闲的线程,他们会自行终止线程自杀
pthread_cond_signal(&pool->queue_not_empty);
}
}
}
return NULL;
}
/*
* @brief 创建一个指定大小的线程池
*
*/
ThreadPool_T *threadpool_create(int min_thread_num, int max_thread_num, int queue_max_size)
{
int i = 0;
ThreadPool_T *pool = NULL;
do
{
pool = (ThreadPool_T *)malloc(sizeof(ThreadPool_T));
if(NULL == pool)
{
printf("err:malloc threadpool fail\n");
goto err1;
}
pool->min_thread_num = min_thread_num;
pool->max_thread_num = max_thread_num;
pool->busy_thread_num = 0;
pool->live_thread_num = min_thread_num;
pool->queue_size = 0;
pool->queue_max_size = queue_max_size;
pool->queue_front = 0;
pool->queue_rear = 0;
pool->shutdown = 0;
//根据最大线程上限数 给工作线程数据开辟空间 并清0
pool->workds_id = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num);
if(NULL === pool->workds_id)
{
printf("malloc workd_id fail\n");
goto err2;
}
memset(pool->workds_id, 0, sizeof(pthread_t) * max_thread_num);
//根据最大任务队列数 给工作队列开辟空间 并清0
pool->task_queue = (ThreadPoolTask_T *)malloc(sizeof(ThreadPoolTask_T) * queue_max_size);
if(pool->task_queue)
{
printf("malloc task_queue fail\n");
goto err3;
}
if(pthread_mutex_init(&pool->lock, NULL) != 0 ||
pthread_mutex_init(&pool->thread_counter, NULL) != 0 ||
pthread_cond_init(&pool->queue_not_full, NULL) != 0 ||
pthread_cond_init(&pool->queue_not_empty, NULL) != 0 )
{
printf("init the lock or cond fail\n");
goto err4;
}
/*启动min_thread_num个 work thread*/
for(i = 0; i < min_thread_num; i++)
{
pthread_create(&pool->workds_id[i],NULL, workers_thread,(void*)pool); /*pool 指向当前的线程池*/
printf("---------------->1.start workds_thread:%d\n",pool->workds_id[i]);
}
//创建管理者线程
pthread_create(&pool->manager_tid, NULL, manager_thread,(void*)pool);
}while(0);
return pool;
err4:
if(pool->task_queue)
{
free(pool->task_queue);
}
err3:
if(pool->workds_id)
{
free(pool->workds_id);
}
err2:
if(pool)
{
free(pool);
}
err1:
return NULL;
}
/*
*@brief 向线程池添加任务
*/
int threadpool_add(ThreadPool_T* pool, void* func(void *arg),void *arg)
{
pthread_mutex_lock(&pool->lock);
//为真说明队列已满,等待
while(pool->queue_size == pool->queue_max_size && !pool->shutdown)
{
pthread_cond_wait(&pool->queue_not_empty, &pool->lock);
}
if(pool->shutdown)
{
pthread_mutex_unlock(&pool->lock);
return 0;
}
//清空工作线程,调用回调函数的参数arg
if(pool->task_queue[pool->queue_rear].arg != NULL)
{
free(pool->task_queue[pool->queue_rear].arg);
pool->task_queue[pool->queue_rear].arg = NULL;
}
//添加任务到任务队列里面
pool->task_queue[pool->queue_rear].func = func;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;
pool->queue_size++;
//添加完任务后,队列不为空唤醒线程池中等待处理任务的线程
pthread_cond_signal(&pool->queue_not_empty);
pthread_mutex_unlock(&pool->lock);
return 0;
}
//销毁线程池
int threadpool_destroy(ThreadPool_T* pool)
{
int i = 0;
if(pool == NULL)
{
return -1;
}
pool->shutdown = 1;
pthread_join(pool->manager_tid, NULL);
for(i = 0;i < pool->live_thread_num; i++)
{
pthread_cond_broadcast(&pool->queue_not_empty);
}
for (i = 0; i < pool->live_thread_num; i++)
{/*回收所有管理者线程资源*/
pthread_join(pool->workds_id[i], NULL);
}
threadpool_free(pool);
}
int threadpool_free(ThreadPool_T *pool)
{
if (pool == NULL) {
printf("thread pool is already free\n");
return -1;
}
if (pool->task_queue) {
free(pool->task_queue);
}
if (pool->workds_id) {
free(pool->workds_id);
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_mutex_lock(&(pool->thread_counter));
pthread_mutex_destroy(&(pool->thread_counter));
pthread_cond_destroy(&(pool->queue_not_empty));
pthread_cond_destroy(&(pool->queue_not_full));
}
free(pool);
pool = NULL;
return 0;
}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
C
1
https://gitee.com/lixinyu7/threadpool.git
git@gitee.com:lixinyu7/threadpool.git
lixinyu7
threadpool
threadpool
master

搜索帮助