summary of thread pool base on c++
线程池是 池
技术的一种, 最初都是由 对象池
引出, 包括 线程池, 连接池, 内存池等.
线程池维护线程数量, 使之处在一个稳定的水平, 连接池, 内存池也类似(可以参考的就是STL).
多线程模型中讲过, 当主线程 accept
之后, 立马开新线程处理该客户端的请求, 处理完毕关闭(结束)该线程; 下次再来请求, 再开新线程…
然而频繁地开辟与销毁线程极大地占用了系统的资源, 为了减小和缓和相关的开销, 这里引入了线程池模型.
在传统服务器结构中, 经常有一个总的监听线程监听有没有新的用户连接服务器, 每当有一个新的用户进入, 服务器就开启一个新的线程用户处理这个用户的数据包.
这个线程只服务于这个用户, 当用户与服务器端关闭连接以后, 服务器端销毁这个线程.
大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.
线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序开始时就在内存中开辟一些线程, 线程的数目是固定的, 他们独自形成一个类, 屏蔽了对外的操作, 而服务器只需要将数据包交给线程池就可以了.
当有新的客户请求到达时, 不是新创建一个线程为其服务, 而是从“池子”中选择一个空闲的线程为新的客户请求服务, 服务完毕后, 线程进入空闲线程池中. 如果没有线程空闲的话, 就将数据包暂时积累, 等待线程池内有线程空闲以后再进行处理, 也就是说这里会有一个任务队列; 当有了新空闲线程才会拿走任务.
通过重用已经存在的线程对象, 降低了对线程对象创建和销毁的开销. 当客户请求时, 线程对象已经存在, 可以提高请求的响应时间, 从而整体地提高了系统服务的表现.
大致理论清楚了, 也不一定就能把握好该模型, 因为每个人的编码水平不一样, 对细节上的处理不一样, 使用技巧的能力不一样.
这个其实是个简单的模型, 很多细节都没有处理.(只看他的 线程池结构体
当然我也写了(见下面), 后续如果有高手的代码, 我也会详细参考和学习(后续博客再说).
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h>
typedef struct worker { void *(*process) (void *arg); void *arg; struct worker *next; } CThread_worker;
typedef struct { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; CThread_worker *queue_head; int shutdown;
pthread_t *threadid; int max_thread_num; int cur_queue_size; } CThread_pool;
int pool_add_worker (void *(*process) (void *arg), void *arg);
void *thread_routine (void *arg);
static CThread_pool *pool = NULL;
void pool_init (int max_thread_num) { pool = (CThread_pool *) malloc (sizeof (CThread_pool));
pthread_mutex_init (&(pool->queue_lock), NULL); pthread_cond_init (&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_thread_num; pool->cur_queue_size = 0; pool->shutdown = 0; pool->threadid =(pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); int i = 0;
for (i = 0; i < max_thread_num; i++) { pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL); } }
int pool_add_worker (void *(*process) (void *arg), void *arg) {
CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));
newworker->process = process; newworker->arg = arg; newworker->next = NULL;
pthread_mutex_lock (&(pool->queue_lock));
CThread_worker *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) member = member->next; member->next = newworker; } else { pool->queue_head = newworker; }
assert (pool->queue_head != NULL); pool->cur_queue_size++; pthread_mutex_unlock (&(pool->queue_lock));
pthread_cond_signal (&(pool->queue_ready));
return 0; }
int pool_destroy () {
if (pool->shutdown) { return -1; } pool->shutdown = 1;
pthread_cond_broadcast (&(pool->queue_ready));
int i; for (i = 0; i < pool->max_thread_num; i++) { pthread_join (pool->threadid[i], NULL); }
free (pool->threadid);
CThread_worker *head = NULL;
while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free (head); }
pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready));
free (pool); pool=NULL;
return 0; }
void* thread_routine (void *arg) {
printf ("starting thread 0x%x\n", pthread_self ());
while (1) { pthread_mutex_lock (&(pool->queue_lock));
while (pool->cur_queue_size == 0 && !pool->shutdown) { printf ("thread 0x%x is waiting\n", pthread_self ()); pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); }
if (pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); printf ("thread 0x%x will exit\n", pthread_self ()); pthread_exit (NULL); }
printf ("thread 0x%x is starting to work\n", pthread_self ());
assert (pool->cur_queue_size != 0); assert (pool->queue_head != NULL);
pool->cur_queue_size--; CThread_worker *worker = pool->queue_head; pool->queue_head = worker->next;
pthread_mutex_unlock (&(pool->queue_lock));
(*(worker->process)) (worker->arg); free (worker);
worker = NULL; }
pthread_exit (NULL); }
void* myprocess (void *arg) { printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg); sleep (1); return NULL; }
int main (void) {
pool_init (3); int *workingnum = (int *) malloc (sizeof (int) * 10);
int i; for (i = 0; i < 10; i++) { workingnum[i] = i; pool_add_worker (myprocess, &workingnum[i]); }
sleep (5); pool_destroy (); free (workingnum); return 0; }
上面的实现, 有很多不足:
- 该实现, 没有去考虑按需扩充线程池大小, 删减线程等问题.
- 对于任务队列的操作, ready只考虑了为空, 没有考虑任务队列满的情况(特别是添加任务的时候).
- 线程取走任务应该还有一个队列为空的条件变量, 不仅仅只是加锁操作那么简单.
网上一个前辈写的(作为改进, 比我写的好), 总共 200 多行, 精炼:
头文件 tiny-threadpool.h
#include <pthread.h>
typedef struct _tthreadpool_s tthreadpool_t; typedef struct _tthread_s tthread_t; typedef struct _tjob_s tjob_t;
struct _tthreadpool_s { tthread_t *threads; tjob_t *jobs; int num_jobs; int num_threads;
pthread_mutex_t jobs_mutex; pthread_mutex_t num_jobs_mutex; pthread_cond_t jobs_not_empty_cond; pthread_cond_t jobs_not_full_cond; };
struct _tthread_s { pthread_t thread_id; tthreadpool_t *pool; tthread_t *prev; tthread_t *next; int killed; };
struct _tjob_s { void (*job_function)(tjob_t *job); void *user_data; tjob_t *prev; tjob_t *next; };
extern int tthreadpool_init(tthreadpool_t *pool, int numWorkers);
extern void tthreadpool_shutdown(tthreadpool_t *pool);
extern void tthreadpool_add_job(tthreadpool_t *pool, tjob_t *job);
extern void tthreadpool_add_job_ex(tthreadpool_t *pool, tjob_t *job);
extern void tthreadpool_wait(tthreadpool_t *pool);
以及实现文件 tiny-threadpool.c
| #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <string.h>
#include "tiny-threadpool.h"
#define ADD_THREAD(item, list) { \ item->prev = NULL; \ item->next = list; \ list = item; \ }
#define REMOVE_JOB(item, list) { \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ }
static void *thread_function(void *ptr) { tthread_t *thread = (tthread_t *)ptr; tjob_t *job;
while (1) { pthread_mutex_lock(&thread->pool->num_jobs_mutex); pthread_mutex_lock(&thread->pool->jobs_mutex); while (thread->pool->jobs == NULL) { pthread_cond_wait(&thread->pool->jobs_not_empty_cond, &thread->pool->jobs_mutex); } job = thread->pool->jobs; if (job != NULL) { REMOVE_JOB(job, thread->pool->jobs); thread->pool->num_jobs--; pthread_cond_signal(&thread->pool->jobs_not_full_cond);
} pthread_mutex_unlock(&thread->pool->jobs_mutex); pthread_mutex_unlock(&thread->pool->num_jobs_mutex); if (thread->killed) break; if (job == NULL) continue; job->job_function(job); } free(thread); pthread_exit(NULL); }
int tthreadpool_init(tthreadpool_t *pool, int num_threads) { int i = 0; tthread_t *thread; pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
if (num_threads < 1) num_threads = 1; memset(pool, 0, sizeof(*pool)); memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pool->jobs_mutex)); memcpy(&pool->num_jobs_mutex, &blank_mutex, sizeof(pool->num_jobs_mutex)); memcpy(&pool->jobs_not_empty_cond, &blank_cond, sizeof(pool->jobs_not_empty_cond)); memcpy(&pool->jobs_not_full_cond, &blank_cond, sizeof(pool->jobs_not_full_cond)); pool->num_threads = num_threads; pool->num_jobs = 0;
for (i = 0; i < num_threads; i++) { if ((thread = malloc(sizeof(tthread_t))) == NULL) { fprintf(stderr, "Failed to allocate threads"); return -1; } memset(thread, 0, sizeof(tthread_t)); thread->pool = pool; if (pthread_create(&thread->thread_id, NULL, thread_function, (void *)thread)) { fprintf(stderr, "Failed to start all threads"); free(thread); return -1; } ADD_THREAD(thread, thread->pool->threads); }
return 0; }
void tthreadpool_shutdown(tthreadpool_t *pool) { tthread_t *thread = NULL;
for (thread = pool->threads; thread != NULL; thread = thread->next) { thread->killed = 1; }
pthread_mutex_lock(&pool->jobs_mutex); pool->threads = NULL; pool->jobs = NULL; pthread_cond_broadcast(&pool->jobs_not_empty_cond); pthread_mutex_unlock(&pool->jobs_mutex); }
void tthreadpool_add_job(tthreadpool_t *pool, tjob_t *job) { pthread_mutex_lock(&pool->jobs_mutex); ADD_THREAD(job, pool->jobs); pthread_cond_signal(&pool->jobs_not_empty_cond); pthread_mutex_unlock(&pool->jobs_mutex); }
void tthreadpool_add_job_ex(tthreadpool_t *pool, tjob_t *job) { pthread_mutex_lock(&pool->jobs_mutex);
while(pool->num_jobs == 2 * pool->num_threads) { pthread_cond_wait(&pool->jobs_not_full_cond, &pool->jobs_mutex); }
ADD_THREAD(job, pool->jobs); pool->num_jobs++; pthread_cond_signal(&pool->jobs_not_empty_cond); pthread_mutex_unlock(&pool->jobs_mutex); }
void tthreadpool_wait(tthreadpool_t *pool) { tthread_t *thread = NULL; for (thread = pool->threads; thread != NULL; thread = thread->next) { pthread_join(thread->thread_id, NULL); } }
该模型的执行就比我的严谨, 还通过宏函数, 把重复逻辑抽取出来了, 非常不错.
参考网址是: 如何在Linux下实现你的线程池
总的来说, 我写的还是不能用的, 要想写好, 至少考虑:
- 工作资源的操作除了加锁操作之外, 还要考虑条件变量(为空为满)
- 需要一个监管线程处理Live, Busy线程的动态调度问题(饱和因子)
就这样吧, 个人编码能力有限(工作年限, 经验也有限), 后续有高手的实现, 我们再说.