技术: C++ 实现线程池模型

summary of thread pool base on c++

线程池是 技术的一种, 最初都是由 对象池引出, 包括 线程池, 连接池, 内存池等.
线程池维护线程数量, 使之处在一个稳定的水平, 连接池, 内存池也类似(可以参考的就是STL).

引子

多线程模型中讲过, 当主线程 accept 之后, 立马开新线程处理该客户端的请求, 处理完毕关闭(结束)该线程; 下次再来请求, 再开新线程…
然而频繁地开辟与销毁线程极大地占用了系统的资源, 为了减小和缓和相关的开销, 这里引入了线程池模型.

(写完之后发现实现一个线程池不是一件容易的事儿)

正文

原理

在传统服务器结构中, 经常有一个总的监听线程监听有没有新的用户连接服务器, 每当有一个新的用户进入, 服务器就开启一个新的线程用户处理这个用户的数据包.
这个线程只服务于这个用户, 当用户与服务器端关闭连接以后, 服务器端销毁这个线程.

大量用户的情况下, 系统为了开辟和销毁线程将浪费大量的时间和资源.

线程池提供了一个解决外部大量用户与服务器有限资源的矛盾, 线程池和传统的一个用户对应一个线程的处理方法不同, 它的基本思想就是在程序开始时就在内存中开辟一些线程, 线程的数目是固定的, 他们独自形成一个类, 屏蔽了对外的操作, 而服务器只需要将数据包交给线程池就可以了.

当有新的客户请求到达时, 不是新创建一个线程为其服务, 而是从“池子”中选择一个空闲的线程为新的客户请求服务, 服务完毕后, 线程进入空闲线程池中. 如果没有线程空闲的话, 就将数据包暂时积累, 等待线程池内有线程空闲以后再进行处理, 也就是说这里会有一个任务队列; 当有了新空闲线程才会拿走任务.

通过重用已经存在的线程对象, 降低了对线程对象创建和销毁的开销. 当客户请求时, 线程对象已经存在, 可以提高请求的响应时间, 从而整体地提高了系统服务的表现.

实现

大致理论清楚了, 也不一定就能把握好该模型, 因为每个人的编码水平不一样, 对细节上的处理不一样, 使用技巧的能力不一样.

这个其实是个简单的模型, 很多细节都没有处理.(只看他的 线程池结构体 就能看出来)


当然我也写了(见下面), 后续如果有高手的代码, 我也会详细参考和学习(后续博客再说).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>


/* task/job list*/
typedef struct worker
{
/*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/
void *(*process) (void *arg);
void *arg;/*回调函数的参数*/
struct worker *next;
} CThread_worker;



/*thread pool struct*/
typedef struct
{
pthread_mutex_t queue_lock;
pthread_cond_t queue_ready;

/*链表结构,线程池中所有等待任务*/
CThread_worker *queue_head;

/*是否销毁线程池, 启用*/
int shutdown;

/*线程池中所有活着的线程*/
pthread_t *threadid;

/*线程池中允许的活动线程数目*/
int max_thread_num;

/*当前等待队列的任务数目*/
int cur_queue_size;
} CThread_pool;

/*往任务链表里添加任务*/
int pool_add_worker (void *(*process) (void *arg), void *arg);

/*线程回调函数*/
void *thread_routine (void *arg);

/*线程池对象*/
static CThread_pool *pool = NULL;

/*创建线程池*/
void pool_init (int max_thread_num)
{
pool = (CThread_pool *) malloc (sizeof (CThread_pool));

pthread_mutex_init (&(pool->queue_lock), NULL);
pthread_cond_init (&(pool->queue_ready), NULL);

pool->queue_head = NULL;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->shutdown = 0;
pool->threadid =(pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
int i = 0;

for (i = 0; i < max_thread_num; i++) {
pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);
}
}

/*向任务队列中加入任务*/
int pool_add_worker (void *(*process) (void *arg), void *arg)
{

/*构造一个新任务*/
CThread_worker *newworker =
(CThread_worker *) malloc (sizeof (CThread_worker));

newworker->process = process;
newworker->arg = arg;
newworker->next = NULL;/*别忘置空*/

pthread_mutex_lock (&(pool->queue_lock));

/*将任务加入到等待链表尾部*/
CThread_worker *member = pool->queue_head;
if (member != NULL) {
while (member->next != NULL)
member = member->next;
member->next = newworker;
} else {
pool->queue_head = newworker;
}

assert (pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock (&(pool->queue_lock));

/*好了,等待队列中有任务了,唤醒一个等待线程*/

pthread_cond_signal (&(pool->queue_ready));

return 0;
}

/*销毁线程池,等待队列中的任务不会再被执行,
但是正在运行的线程会一直把任务运行完后再退出*/
int pool_destroy ()
{

if (pool->shutdown) {
return -1;
}
pool->shutdown = 1;

/*唤醒所有等待线程,线程池要销毁了*/
pthread_cond_broadcast (&(pool->queue_ready));

/*阻塞等待线程退出,否则就成僵尸了*/
int i;
for (i = 0; i < pool->max_thread_num; i++) {
pthread_join (pool->threadid[i], NULL);
}

free (pool->threadid);

/*销毁等待队列*/
CThread_worker *head = NULL;

while (pool->queue_head != NULL) {
head = pool->queue_head;
pool->queue_head = pool->queue_head->next;
free (head);
}

pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));

free (pool);
pool=NULL;

return 0;
}


/*线程取走任务*/
void* thread_routine (void *arg)
{

printf ("starting thread 0x%x\n", pthread_self ());

while (1) {
pthread_mutex_lock (&(pool->queue_lock));
/*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/

while (pool->cur_queue_size == 0 && !pool->shutdown) {
printf ("thread 0x%x is waiting\n", pthread_self ());
pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
}

/*线程池要销毁了*/
if (pool->shutdown) {
/*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
pthread_mutex_unlock (&(pool->queue_lock));
printf ("thread 0x%x will exit\n", pthread_self ());
pthread_exit (NULL);
}

printf ("thread 0x%x is starting to work\n", pthread_self ());

/*assert是调试的好帮手*/
assert (pool->cur_queue_size != 0);
assert (pool->queue_head != NULL);

/*等待队列长度减去1,并取出链表中的头元素*/
pool->cur_queue_size--;

CThread_worker *worker = pool->queue_head;
pool->queue_head = worker->next;

pthread_mutex_unlock (&(pool->queue_lock));

/*调用回调函数,执行任务*/
(*(worker->process)) (worker->arg);
free (worker);

worker = NULL;
}

/*这一句应该是不可达的*/
pthread_exit (NULL);
}


/*模拟执行任务*/
void* myprocess (void *arg)
{
printf ("threadid is 0x%x, working on task %d\n",
pthread_self (),*(int *) arg);
sleep (1);/*休息一秒,延长任务的执行时间*/
return NULL;
}



int main (void)
{

pool_init (3);/*线程池中最多三个活动线程*/

/*连续向池中投入10个任务*/
int *workingnum = (int *) malloc (sizeof (int) * 10);

int i;
for (i = 0; i < 10; i++) {
workingnum[i] = i;
pool_add_worker (myprocess, &workingnum[i]);
}

/*等待所有任务完成*/
sleep (5);
/*销毁线程池*/
pool_destroy ();

free (workingnum);
return 0;
}

上面的实现, 有很多不足:

  • 该实现, 没有去考虑按需扩充线程池大小, 删减线程等问题.
  • 对于任务队列的操作, ready只考虑了为空, 没有考虑任务队列满的情况(特别是添加任务的时候).
  • 线程取走任务应该还有一个队列为空的条件变量, 不仅仅只是加锁操作那么简单.

网上一个前辈写的(作为改进, 比我写的好), 总共 200 多行, 精炼:
头文件 tiny-threadpool.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#ifndef TINY_THREADPOOL_H
#define TINY_THREADPOOL_H

#include <pthread.h>

typedef struct _tthreadpool_s tthreadpool_t;
typedef struct _tthread_s tthread_t;
typedef struct _tjob_s tjob_t;

struct _tthreadpool_s {
tthread_t *threads;
tjob_t *jobs;
int num_jobs;
int num_threads;

pthread_mutex_t jobs_mutex;
pthread_mutex_t num_jobs_mutex;
pthread_cond_t jobs_not_empty_cond;
pthread_cond_t jobs_not_full_cond;
};

struct _tthread_s {
pthread_t thread_id;
tthreadpool_t *pool;
tthread_t *prev;
tthread_t *next;
int killed;
};

struct _tjob_s {
void (*job_function)(tjob_t *job);
void *user_data;
tjob_t *prev;
tjob_t *next;
};

extern int
tthreadpool_init(tthreadpool_t *pool, int numWorkers);

extern void
tthreadpool_shutdown(tthreadpool_t *pool);

extern void
tthreadpool_add_job(tthreadpool_t *pool, tjob_t *job);

extern void
tthreadpool_add_job_ex(tthreadpool_t *pool, tjob_t *job);

extern void
tthreadpool_wait(tthreadpool_t *pool);

#endif // TINY_THREADPOOL_H

以及实现文件 tiny-threadpool.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "tiny-threadpool.h"

#define ADD_THREAD(item, list) { \
item->prev = NULL; \
item->next = list; \
list = item; \
}

#define REMOVE_JOB(item, list) { \
if (item->prev != NULL) item->prev->next = item->next; \
if (item->next != NULL) item->next->prev = item->prev; \
if (list == item) list = item->next; \
item->prev = item->next = NULL; \
}

static void *thread_function(void *ptr) {
tthread_t *thread = (tthread_t *)ptr;
tjob_t *job;

while (1) {
pthread_mutex_lock(&thread->pool->num_jobs_mutex);
pthread_mutex_lock(&thread->pool->jobs_mutex);
while (thread->pool->jobs == NULL) {
pthread_cond_wait(&thread->pool->jobs_not_empty_cond,
&thread->pool->jobs_mutex);
}
job = thread->pool->jobs;
if (job != NULL) {
REMOVE_JOB(job, thread->pool->jobs);
thread->pool->num_jobs--;
pthread_cond_signal(&thread->pool->jobs_not_full_cond);

}
pthread_mutex_unlock(&thread->pool->jobs_mutex);
pthread_mutex_unlock(&thread->pool->num_jobs_mutex);
if (thread->killed) break;
if (job == NULL) continue;
job->job_function(job);
}
free(thread);
pthread_exit(NULL);
}

int tthreadpool_init(tthreadpool_t *pool, int num_threads) {
int i = 0;
tthread_t *thread;
pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;

if (num_threads < 1) num_threads = 1;
memset(pool, 0, sizeof(*pool));
memcpy(&pool->jobs_mutex, &blank_mutex, sizeof(pool->jobs_mutex));
memcpy(&pool->num_jobs_mutex, &blank_mutex, sizeof(pool->num_jobs_mutex));
memcpy(&pool->jobs_not_empty_cond, &blank_cond, sizeof(pool->jobs_not_empty_cond));
memcpy(&pool->jobs_not_full_cond, &blank_cond, sizeof(pool->jobs_not_full_cond));
pool->num_threads = num_threads;
pool->num_jobs = 0;

for (i = 0; i < num_threads; i++) {
if ((thread = malloc(sizeof(tthread_t))) == NULL) {
fprintf(stderr, "Failed to allocate threads");
return -1;
}
memset(thread, 0, sizeof(tthread_t));
thread->pool = pool;
if (pthread_create(&thread->thread_id, NULL,
thread_function, (void *)thread)) {
fprintf(stderr, "Failed to start all threads");
free(thread);
return -1;
}
ADD_THREAD(thread, thread->pool->threads);
}

return 0;
}

void tthreadpool_shutdown(tthreadpool_t *pool) {
tthread_t *thread = NULL;

for (thread = pool->threads; thread != NULL; thread = thread->next) {
thread->killed = 1;
}

pthread_mutex_lock(&pool->jobs_mutex);
pool->threads = NULL;
pool->jobs = NULL;
pthread_cond_broadcast(&pool->jobs_not_empty_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}

void tthreadpool_add_job(tthreadpool_t *pool, tjob_t *job) {
pthread_mutex_lock(&pool->jobs_mutex);
ADD_THREAD(job, pool->jobs);
pthread_cond_signal(&pool->jobs_not_empty_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}

void tthreadpool_add_job_ex(tthreadpool_t *pool, tjob_t *job) {
pthread_mutex_lock(&pool->jobs_mutex);

while(pool->num_jobs == 2 * pool->num_threads) {
pthread_cond_wait(&pool->jobs_not_full_cond, &pool->jobs_mutex);
}

ADD_THREAD(job, pool->jobs);
pool->num_jobs++;
pthread_cond_signal(&pool->jobs_not_empty_cond);
pthread_mutex_unlock(&pool->jobs_mutex);
}

void tthreadpool_wait(tthreadpool_t *pool) {
tthread_t *thread = NULL;
for (thread = pool->threads; thread != NULL; thread = thread->next) {
pthread_join(thread->thread_id, NULL);
}
}

该模型的执行就比我的严谨, 还通过宏函数, 把重复逻辑抽取出来了, 非常不错.
参考网址是: 如何在Linux下实现你的线程池

尾巴

总的来说, 我写的还是不能用的, 要想写好, 至少考虑:

  • 工作资源的操作除了加锁操作之外, 还要考虑条件变量(为空为满)
  • 需要一个监管线程处理Live, Busy线程的动态调度问题(饱和因子)

就这样吧, 个人编码能力有限(工作年限, 经验也有限), 后续有高手的实现, 我们再说.

文章目录
  1. 1. 引子
  2. 2. 正文
    1. 2.1. 原理
    2. 2.2. 实现
  3. 3. 尾巴
|