线程池原理详解及如何用C语言实现线程池( 三 )


线程池原理详解及如何用C语言实现线程池

文章插图
 
 
/*向线程池的任务队列中添加一个任务*/
int
threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
pthread_mutex_lock(&(pool->lock));
/*如果队列满了,调用wait阻塞*/
while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));
/*如果线程池处于关闭状态*/
if (pool->shutdown)
{
pthread_mutex_unlock(&(pool->lock));
return -1;
}
/*清空工作线程的回调函数的参数arg*/
if (pool->task_queue[pool->queue_rear].arg != NULL)
{
free(pool->task_queue[pool->queue_rear].arg);
pool->task_queue[pool->queue_rear].arg = NULL;
}
/*添加任务到任务队列*/
pool->task_queue[pool->queue_rear].function = function;
pool->task_queue[pool->queue_rear].arg = arg;
pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size; /* 逻辑环 */
pool->queue_size++;
/*添加完任务后,队列就不为空了,唤醒线程池中的一个线程*/
pthread_cond_signal(&(pool->queue_not_empty));
pthread_mutex_unlock(&(pool->lock));
return 0;
}
四、管理者线程作为线程池的管理者 , 该线程的主要功能包括:检查线程池内线程的存活状态 , 工作状态;负责根据服务器当前的请求状态去动态的增加或删除线程 , 保证线程池中的线程数量维持在一个合理高效的平衡上;
说到底 , 它就是一个单独的线程 , 定时的去检查 , 根据我们的一个维持平衡算法去增删线程;
/*管理线程*/
void *
admin_thread(void *threadpool)
{
int i;
threadpool_t *pool = (threadpool_t *)threadpool;
while (!pool->shutdown)
{
printf("admin -----------------n");
sleep(DEFAULT_TIME); /*隔一段时间再管理*/
pthread_mutex_lock(&(pool->lock)); /*加锁*/
int queue_size = pool->queue_size; /*任务数*/
int live_thr_num = pool->live_thr_num; /*存活的线程数*/
pthread_mutex_unlock(&(pool->lock)); /*解锁*/
pthread_mutex_lock(&(pool->thread_counter));
int busy_thr_num = pool->busy_thr_num; /*忙线程数*/
pthread_mutex_unlock(&(pool->thread_counter));
printf("admin busy live -%d--%d-n", busy_thr_num, live_thr_num);
/*创建新线程 实际任务数量大于 最小正在等待的任务数量 , 存活线程数小于最大线程数*/
if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
{
printf("admin add-----------n");
pthread_mutex_lock(&(pool->lock));
int add=0;
/*一次增加 DEFAULT_THREAD_NUM 个线程*/
for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM
&& pool->live_thr_num < pool->max_thr_num; i++)
{
if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
{
pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
add++;
pool->live_thr_num++;
printf("new thread -----------------------n");
}
}
pthread_mutex_unlock(&(pool->lock));
}
/*销毁多余的线程 忙线程x2 都小于 存活线程 , 并且存活的大于最小线程数*/
if ((busy_thr_num*2) < live_thr_num && live_thr_num > pool->min_thr_num)
{
// printf("admin busy --%d--%d----n", busy_thr_num, live_thr_num);
/*一次销毁DEFAULT_THREAD_NUM个线程*/
pthread_mutex_lock(&(pool->lock));
pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
pthread_mutex_unlock(&(pool->lock));
for (i=0; i<DEFAULT_THREAD_NUM; i++)
{
//通知正在处于空闲的线程 , 自杀
pthread_cond_signal(&(pool->queue_not_empty));
printf("admin cler --n");
}
}
}
return NULL;
/*线程是否存活*/
int
is_thread_alive(pthread_t tid)
{
int kill_rc = pthread_kill(tid, 0); //发送0号信号 , 测试是否存活
if (kill_rc == ESRCH) //线程不存在
{
return false;
}
return true;
}
五、释放
/*释放线程池*/
int
threadpool_free(threadpool_t *pool)
{
if (pool == NULL)
return -1;
if (pool->task_queue)
free(pool->task_queue);


推荐阅读