summary of thread pool base on c++
线程池是 池
技术的一种, 最初都是由 对象池
引出, 包括 线程池, 连接池, 内存池等.
线程池维护线程数量, 使之处在一个稳定的水平, 连接池, 内存池也类似(可以参考的就是STL).
引子
多线程模型中讲过, 当主线程 accept
之后, 立马开新线程处理该客户端的请求, 处理完毕关闭(结束)该线程; 下次再来请求, 再开新线程…
然而频繁地开辟与销毁线程极大地占用了系统的资源, 为了减小和缓和相关的开销, 这里引入了线程池模型.
(写完之后发现实现一个线程池不是一件容易的事儿)
正文
原理
在传统服务器结构中, 经常有一个总的监听线程监听有没有新的用户连接服务器, 每当有一个新的用户进入, 服务器就开启一个新的线程用户处理这个用户的数据包.
这个线程只服务于这个用户, 当用户与服务器端关闭连接以后, 服务器端销毁这个线程.
大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.
线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序开始时就在内存中开辟一些线程, 线程的数目是固定的, 他们独自形成一个类, 屏蔽了对外的操作, 而服务器只需要将数据包交给线程池就可以了.
当有新的客户请求到达时, 不是新创建一个线程为其服务, 而是从“池子”中选择一个空闲的线程为新的客户请求服务, 服务完毕后, 线程进入空闲线程池中. 如果没有线程空闲的话, 就将数据包暂时积累, 等待线程池内有线程空闲以后再进行处理, 也就是说这里会有一个任务队列; 当有了新空闲线程才会拿走任务.
通过重用已经存在的线程对象, 降低了对线程对象创建和销毁的开销. 当客户请求时, 线程对象已经存在, 可以提高请求的响应时间, 从而整体地提高了系统服务的表现.
实现
大致理论清楚了, 也不一定就能把握好该模型, 因为每个人的编码水平不一样, 对细节上的处理不一样, 使用技巧的能力不一样.
这个其实是个简单的模型, 很多细节都没有处理.(只看他的 线程池结构体
就能看出来)
当然我也写了(见下面), 后续如果有高手的代码, 我也会详细参考和学习(后续博客再说).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
| #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <pthread.h> #include <assert.h>
typedef struct worker { void *(*process) (void *arg); void *arg; struct worker *next; } CThread_worker;
typedef struct { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; CThread_worker *queue_head; int shutdown;
pthread_t *threadid; int max_thread_num; int cur_queue_size; } CThread_pool;
int pool_add_worker (void *(*process) (void *arg), void *arg);
void *thread_routine (void *arg);
static CThread_pool *pool = NULL;
void pool_init (int max_thread_num) { pool = (CThread_pool *) malloc (sizeof (CThread_pool));
pthread_mutex_init (&(pool->queue_lock), NULL); pthread_cond_init (&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_thread_num; pool->cur_queue_size = 0; pool->shutdown = 0; pool->threadid =(pthread_t *) malloc (max_thread_num * sizeof (pthread_t)); int i = 0;
for (i = 0; i < max_thread_num; i++) { pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL); } }
int pool_add_worker (void *(*process) (void *arg), void *arg) {
CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));
newworker->process = process; newworker->arg = arg; newworker->next = NULL;
pthread_mutex_lock (&(pool->queue_lock));
CThread_worker *member = pool->queue_head; if (member != NULL) { while (member->next != NULL) member = member->next; member->next = newworker; } else { pool->queue_head = newworker; }
assert (pool->queue_head != NULL); pool->cur_queue_size++; pthread_mutex_unlock (&(pool->queue_lock));
pthread_cond_signal (&(pool->queue_ready));
return 0; }
int pool_destroy () {
if (pool->shutdown) { return -1; } pool->shutdown = 1;
pthread_cond_broadcast (&(pool->queue_ready));
int i; for (i = 0; i < pool->max_thread_num; i++) { pthread_join (pool->threadid[i], NULL); }
free (pool->threadid);
CThread_worker *head = NULL;
while (pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free (head); }
pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready));
free (pool); pool=NULL;
return 0; }
void* thread_routine (void *arg) {
printf ("starting thread 0x%x\n", pthread_self ());
while (1) { pthread_mutex_lock (&(pool->queue_lock));
while (pool->cur_queue_size == 0 && !pool->shutdown) { printf ("thread 0x%x is waiting\n", pthread_self ()); pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock)); }
if (pool->shutdown) { pthread_mutex_unlock (&(pool->queue_lock)); printf ("thread 0x%x will exit\n", pthread_self ()); pthread_exit (NULL); }
printf ("thread 0x%x is starting to work\n", pthread_self ());
assert (pool->cur_queue_size != 0); assert (pool->queue_head != NULL);
pool->cur_queue_size--; CThread_worker *worker = pool->queue_head; pool->queue_head = worker->next;
pthread_mutex_unlock (&(pool->queue_lock));
(*(worker->process)) (worker->arg); free (worker);
worker = NULL; }
pthread_exit (NULL); }
void* myprocess (void *arg) { printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg); sleep (1); return NULL; }
int main (void) {
pool_init (3); int *workingnum = (int *) malloc (sizeof (int) * 10);
int i; for (i = 0; i < 10; i++) { workingnum[i] = i; pool_add_worker (myprocess, &workingnum[i]); }
sleep (5); pool_destroy (); free (workingnum); return 0; }
|
上面的实现, 有很多不足:
- 该实现, 没有去考虑按需扩充线程池大小, 删减线程等问题.
- 对于任务队列的操作, ready只考虑了为空, 没有考虑任务队列满的情况(特别是添加任务的时候).
- 线程取走任务应该还有一个队列为空的条件变量, 不仅仅只是加锁操作那么简单.
网上一个前辈写的(作为改进, 比我写的好), 总共 200 多行, 精炼:
头文件 tiny-threadpool.h
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| #ifndef TINY_THREADPOOL_H #define TINY_THREADPOOL_H
#include <pthread.h>
typedef struct _tthreadpool_s tthreadpool_t; typedef struct _tthread_s tthread_t; typedef struct _tjob_s tjob_t;
struct _tthreadpool_s { tthread_t *threads; tjob_t *jobs; int num_jobs; int num_threads;
pthread_mutex_t jobs_mutex; pthread_mutex_t num_jobs_mutex; pthread_cond_t jobs_not_empty_cond; pthread_cond_t jobs_not_full_cond; };
struct _tthread_s { pthread_t thread_id; tthreadpool_t *pool; tthread_t *prev; tthread_t *next; int killed; };
struct _tjob_s { void (*job_function)(tjob_t *job); void *user_data; tjob_t *prev; tjob_t *next; };
extern int tthreadpool_init(tthreadpool_t *pool, int numWorkers);
extern void tthreadpool_shutdown(tthreadpool_t *pool);
extern void tthreadpool_add_job(tthreadpool_t *pool, tjob_t *job);
extern void tthreadpool_add_job_ex(tthreadpool_t *pool, tjob_t *job);
extern void tthreadpool_wait(tthreadpool_t *pool);
#endif
|
以及实现文件 tiny-threadpool.c
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
| #include <assert.h> #include <stdio.h> #include <stdlib.h> #include <string.h>
#include "tiny-threadpool.h"
#define ADD_THREAD(item, list) { \ item->prev = NULL; \ item->next = list; \ list = item; \ }
#define REMOVE_JOB(item, list) { \ if (item->prev != NULL) item->prev->next = item->next; \ if (item->next != NULL) item->next->prev = item->prev; \ if (list == item) list = item->next; \ item->prev = item->next = NULL; \ }
static void *thread_function(void *ptr) { tthread_t *thread = (tthread_t *)ptr; tjob_t *job;
while (1) { pthread_mutex_lock(&thread->pool->num_jobs_mutex); pthread_mutex_lock(&thread->pool->jobs_mutex); while (thread->pool->jobs == NULL) { pthread_cond_wait(&thread->pool->jobs_not_empty_cond, &thread->pool->jobs_mutex); } job = thread->pool->jobs; if (job != NULL) { REMOVE_JOB(job, thread->pool->jobs); thread->pool->num_jobs--; pthread_cond_signal(&thread->pool->jobs_not_full_cond);
} pthread_mutex_unlock(&thread->pool->jobs_mutex); pthread_mutex_unlock(&thread->pool->num_jobs_mutex); if (thread->killed) break; if (job == NULL) continue; job->job_function(job); } free(thread); pthread_exit(NULL); }
int tthreadpool_init(tthreadpool_t *pool, int num_threads) { int i = 0; tthread_t *thread; pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
if (num_threads < 1) num_threads = 1; memset(pool, 0, sizeof(*pool)); memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pool->jobs_mutex)); memcpy(&pool->num_jobs_mutex, &blank_mutex, sizeof(pool->num_jobs_mutex)); memcpy(&pool->jobs_not_empty_cond, &blank_cond, sizeof(pool->jobs_not_empty_cond)); memcpy(&pool->jobs_not_full_cond, &blank_cond, sizeof(pool->jobs_not_full_cond)); pool->num_threads = num_threads; pool->num_jobs = 0;
for (i = 0; i < num_threads; i++) { if ((thread = malloc(sizeof(tthread_t))) == NULL) { fprintf(stderr, "Failed to allocate threads"); return -1; } memset(thread, 0, sizeof(tthread_t)); thread->pool = pool; if (pthread_create(&thread->thread_id, NULL, thread_function, (void *)thread)) { fprintf(stderr, "Failed to start all threads"); free(thread); return -1; } ADD_THREAD(thread, thread->pool->threads); }
return 0; }
void tthreadpool_shutdown(tthreadpool_t *pool) { tthread_t *thread = NULL;
for (thread = pool->threads; thread != NULL; thread = thread->next) { thread->killed = 1; }
pthread_mutex_lock(&pool->jobs_mutex); pool->threads = NULL; pool->jobs = NULL; pthread_cond_broadcast(&pool->jobs_not_empty_cond); pthread_mutex_unlock(&pool->jobs_mutex); }
void tthreadpool_add_job(tthreadpool_t *pool, tjob_t *job) { pthread_mutex_lock(&pool->jobs_mutex); ADD_THREAD(job, pool->jobs); pthread_cond_signal(&pool->jobs_not_empty_cond); pthread_mutex_unlock(&pool->jobs_mutex); }
void tthreadpool_add_job_ex(tthreadpool_t *pool, tjob_t *job) { pthread_mutex_lock(&pool->jobs_mutex);
while(pool->num_jobs == 2 * pool->num_threads) { pthread_cond_wait(&pool->jobs_not_full_cond, &pool->jobs_mutex); }
ADD_THREAD(job, pool->jobs); pool->num_jobs++; pthread_cond_signal(&pool->jobs_not_empty_cond); pthread_mutex_unlock(&pool->jobs_mutex); }
void tthreadpool_wait(tthreadpool_t *pool) { tthread_t *thread = NULL; for (thread = pool->threads; thread != NULL; thread = thread->next) { pthread_join(thread->thread_id, NULL); } }
|
该模型的执行就比我的严谨, 还通过宏函数, 把重复逻辑抽取出来了, 非常不错.
参考网址是: 如何在Linux下实现你的线程池
尾巴
总的来说, 我写的还是不能用的, 要想写好, 至少考虑:
- 工作资源的操作除了加锁操作之外, 还要考虑条件变量(为空为满)
- 需要一个监管线程处理Live, Busy线程的动态调度问题(饱和因子)
就这样吧, 个人编码能力有限(工作年限, 经验也有限), 后续有高手的实现, 我们再说.