记录一下学习线程池的过程,代码用到的函数归结:
pthread_mutex_lock pthread_mutex_unlock pthread_cond_wait pthread_cond_signal pthread_cond_broadcast pthread_create pthread_join 程序中还用到了链表, 还有一个知识点:任何类型的数据都可以是void类型, 但void类型在使用之前必须进行强制类型转换。/**Author:Greens_Ren*Description:线程池*//*头文件*/#include#include /* begin : add */ #include #include #include #include /* end : add */ /*数据结构*/ typedef struct Thread_worker { void *(*worker)(void *arg); void * arg; struct Thread_worker *next; }CThread_worker; typedef struct Thread_pool { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; int max_thread_num; pthread_t *phead_threadid; int cur_queue_size; CThread_worker *phead; int shutdown; }CThread_pool; /*全局区*/ static CThread_pool *pool = NULL; /*函数*/ void pthread_init(int max_thread_num); void *thread_roution(void * arg); void pthread_add_worker(void *(*worker)(void *arg), void * arg); void pthread_destroy(void); void* my_process(void *arg); /*main*/ int main(void) { int max_thread_num = 3; int worker_num = 10; /*初始化*/ pthread_init(max_thread_num); /*投入任务*/ int *workernum = (int *)malloc(sizeof(int) * worker_num); int i; for(i = 0; i < worker_num; i++) { workernum[i] = i; pthread_add_worker(my_process, &workernum[i]); } /*等待处理任务*/ sleep(8); pthread_destroy(); return 0; } void pthread_init(int max_thread_num) { pool = (CThread_pool*)malloc(sizeof(CThread_pool)); /*free(pool)*/ /* begin : modified 初始化互斥锁和条件变量*/ pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); /* end : modified */ /*初始化线程*/ pool->max_thread_num = max_thread_num; pool->phead_threadid = (pthread_t *)malloc(sizeof(pthread_t) * max_thread_num); /*free*/ int i; for (i = 0; i < max_thread_num; i ++) { pthread_create(&(pool->phead_threadid[i]), NULL, thread_roution, NULL); } /*初始化任务等待队列*/ pool->cur_queue_size = 0; //pool->phead = (CThread_worker *)malloc(sizeof(CThread_worker)); //pool->phead->next = NULL; /*罪过,这里考虑多了。在pool中就是存了一个指针,只不过它是指定了一个链表*/ pool->phead = NULL; /*线程池销毁标记*/ pool->shutdown = 0; return; } void *thread_roution(void * arg) { printf("starting thread 0x%x\n", pthread_self()); while(1) /*Added 线程要持续运行,这里考虑使用while(1)来实现*/ { pthread_mutex_lock(&(pool->queue_lock)); /*如果目前没有任务处理且线程池未销毁就睡眠等待添加任务*/ //if(pool->cur_queue_size == 0 && pool->shutdown != 1) while(pool->cur_queue_size == 0 && pool->shutdown != 1) /*modified by*/ { //pthread_cond_wait( &(pool->queue_lock), &(pool->queue_ready) ); printf("thread 0x%x is waiting\n", pthread_self()); pthread_cond_wait( &(pool->queue_ready), &(pool->queue_lock) ); } /*如果线程池已被标记销毁,那就退出线程*/ if(pool->shutdown == 1) { pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%x will exit\n", pthread_self()); pthread_exit(NULL); } printf("thread 0x%x is starting to work\n", pthread_self()); /*assert是调试用的好助手,assert如果为家,它就会通过stderr打印错误信息,并终止程序*/ assert(pool->cur_queue_size != 0); assert(pool->shutdown != 1); /*开始处理等待队列中的任务*/ /*回调函数*/ pool->cur_queue_size--; CThread_worker * worker_waiting = pool->phead; pool->phead = worker_waiting->next; /*这里加锁的目的就是为了处理任务链表,处理完后就 可以解锁,让其它线程再次去处理任务链表*/ pthread_mutex_unlock(&(pool->queue_lock)); /*modified by 开始的时候讲解锁放在了回调函数后面,这里做修正*/ /*调用回调函数,执行任务*/ (*(worker_waiting->worker))(worker_waiting->arg); /*删除链表中已经执行过的任务节点*/ free(worker_waiting); worker_waiting = NULL; } } void pthread_add_worker(void *(*worker)(void *arg), void * arg) { assert(worker != NULL); assert(pool->shutdown != 1); /*构建新任务插入到任务链表尾部*/ CThread_worker * worker_insert = (CThread_worker *)malloc(sizeof(CThread_worker));/*free*/ worker_insert->worker = worker; worker_insert->arg = arg; worker_insert->next = NULL; /*下面要处理任务链表了,这里要加锁,保护链表。第一次写的时候就忘记了加锁*/ pthread_mutex_lock(&(pool->queue_lock)); CThread_worker* phead_worker = pool->phead; /*链表的头指针一开始其实就是空指针,所以如果为NULL,就直接将构建的节点地址赋值给它就可以了*/ if(phead_worker != NULL) { while( phead_worker->next != NULL ) { phead_worker = phead_worker->next; } phead_worker->next = worker_insert; /*这里将新构建的任务节点添加到了等待任务链表的结尾*/ } else { pool->phead = worker_insert; } assert(pool->phead != NULL);/*这里检查一下等待链表不为空*/ pool->cur_queue_size++; /*同步修改等待队列的长度信息*/ pthread_mutex_unlock(&(pool->queue_lock)); /*等待队列中添加了新任务,这里就要唤醒线程去处理,如果无线程睡眠,这条语句就无效*/ pthread_cond_signal(&(pool->queue_ready)); return; } void pthread_destroy(void) { if(pool->shutdown) { return; /*这里防止多次调用*/ } /*这里先将销毁线程池标记置位*/ pool->shutdown = 1; /*唤醒所以等待线程,线程池要销毁了*/ pthread_cond_broadcast(&(pool->queue_ready)); /*阻塞等待线程线程退出,否则就成了僵尸了*/ int i; for (i = 0; i < pool->max_thread_num; i ++) { pthread_join(pool->phead_threadid[i], NULL); } /*释放线程号存储占用资源*/ free(pool->phead_threadid); pool->phead_threadid = NULL; /*释放任务等待队列*/ CThread_worker *pworker_del = NULL; while(pool->phead != NULL) { pworker_del = pool->phead; pool->phead = pool->phead->next; free(pworker_del); pworker_del = NULL; } /*销毁条件变量和互斥锁,刚开始也忘记了*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); /*释放线程池*/ free(pool); pool = NULL; return; } void *my_process(void * arg) { printf("thread is 0x%x, working ont task %d\n", pthread_self(), *(int *)arg); sleep(1); return; }