pthread 相关内容。
个人觉得对于线程的把控,难度要小于进程,进程的通信方式多到能写一本书,并且每种方式还有其各自的适用范围以及注意事项,真正玩起来还是有点儿多。甚至对于内核而言,它都不区分进程和线程。
但是线程就不一样,只是Linux下面的话,线程当前的模型就一种:内核支持的NPTL
(Native POSIX Thread Library), 但是如果你读过《7周7并发模型》的话就会知道并发问题远不止这么点儿东西。
本文从实战的角度展开主要涉及4个方面:
- Thread management - creating, joining threads etc.
- Mutexes
- Condition variables
- Synchronization between threads using read/write locks and barriers
The POSIX semaphore API works with POSIX threads but is not part of threads standard.
但是实际上就两大类:
- 线程的控制原语(包括线程属性)
- 线程的同步原语(信号量(PV),互斥量,竞争/冒险条件,文件锁,屏障)
贴个图:
API偏多, 注意一下常用的使用场景以及一些注意事项即可, 深入理解线程
请参考专业书籍, 后续再写.
(本文是详细总结, 所以细节偏多)
引子
那么先问一个问题,一个Linux进程最多起多少个线程?
本文涉及哪些内容, 见下图
- 线程控制函数
pthread_create(), pthread_exit(), pthread_join() - 线程的同步(控制线程同时执行 && 访问代码临界区)
信号量: sem_()
互斥量: pthreadmutex()
条件变量: pthreadcond*()
(线程与信号配合的并不好,所以不要这样相互使用) - 线程的属性(比较多,但是常用的不多,大概6个左右)
pthreadattr*()
设置一些取消状态等 - 其他
正文
线程的控制
主要内容:(直接返回errno,而不是返回-1设置errno了)
- pthread_self()—–相当于getpid(); 但是注意pthread_t,虽然在linux下是%ul但是在别的系统下可能是结构体.(始终成功)
- pthread_create()—–相当于fork(),但是这个函数的参数比较多;一定注意,如果失败了,传入的pthread就不要再使用了
- pthread_exit()—–相当于exit(), 但是是退出单个线程, 并给出返回值
- pthread_join()—-相当于waitpid(), 阻塞等待某一个线程退出, 并拿到其返回值(注意指针的传参)
- pthread_detach()— 线程分离函数, 进程控制没有这个函数
- pthread_cancel()—-相当于进程的kill(), 表示杀死某个子线程
- pthread_equal()—-判断两个线程是否相同(主要是由于pthread_t定义在不同系统可能不一样, 所以用该函数判断)
(return, exit, pthread_exit(), 推荐最后一个)
下面给出一些demo:
创建一个线程
1 |
|
其实核心代码也就:
1 | pthread_t mythread; |
但是可以看到,使用ps -Lf <pid>
看到的LWP号码,要比这里pthread_self()拿到的要小的多。1
2
3ps -Lf
UID PID PPID LWP C NLWP STIME TTY TIME CMD
merlin 20185 19023 20185 0 1 17:53 pts/1 00:00:00 -bash
(pthread_self()拿到的数字非常大, 而LWP是linux内核分配cpu时间片的依据)
循环创建多个线程
1 | for(i =0; i< 10; ++i){ |
为什么传递值而不是传递地址, 因为当子线程真正解引用去拿值的时候main函数这个线程可能已经走到了别的位置,i的值已经在main中被改变了;所以这里只能用值传递。
加上pthread_join和pthread_exit之后, 完整的创建多个线程的例子如下:(既然创建了, 就要负责回收—如果它不是detach线程的话)
1 |
|
运行结果1
2pthread_create error:Resource temporarily unavailable
now thread sum number = 32756
(大致可以看到, 一个进程能够创建多少个线程和线程栈空间大小, 以及物理内存多少有关)
拿到线程的返回值
1 | typedef struct |
(最好把ret作为传入参数传递给pthread_create, 之后申请和释放都在main函数中了)
线程分离
好处是, 不需要再由其他线程等待回收, 在状态上可以避免僵尸线程(结束时自动把自己的pcb在内存中的残留资源清理掉), 但是进程没有该机制, 所以当进程死亡如果没有回收, 总会在内存中残留一些资源, 导致内核始终认为该进程还存在.1
int pthread_detach(pthread_t thread);
(也可以使用线程属性进行设置分离状态)
创建完毕即可以分离1
2pthread_create(&tid, NULL, func, NULL);
pthread_detach(tid); //退出后自动释放资源
设置分离之后, 再用join去回收, 则失败, 即会返回错误号22(无效的参数invalid argument).
(没有分离的时候, 是阻塞等待回收; 一旦设置线程的分离状态, 那么join直接反馈传递的tid无效)
杀死线程
kill发送信号, 杀死进程(相对实时); 而线程中, 不一定都能杀死, 要在一定的 “取消点” 才可以(非实时).
1 | pthread_cancel(tid); |
线程的取消并不是实时的,而有一定的延时。需要等待线程到达某个取消点(检查点).取消点:是线程检查是否被取消,并按请求进行动作的一个位置。通常是一些系统调用 creat, open, pause,close, read, write….. 执行命令 man 7 pthreads 可以查看具备这些取消点的系统调用列表。也可参阅 APUE.12.7 取消选项小节。
可以简单的认为, 陷入系统调用, 就进入了检查点; 如果没有检查点, 可以手动添加一个:1
pthread_testcancel();
被cancel杀死的线程, 如果在用join去回收, 则退出值是-1 (实际上是由 PTHREAD_CANCELED
宏决定的); 被取消的线程, 退出值定义在 Linux的 pthread 库中; 常数 PTHREAD_CANCELED 的值是-1. 可在头文件 pthread.h中找到它的定义: #define PTHREAD_CANCELED ((void *) -1)
. 因此当我们对一个已经被取消的线程使用 pthread_join回收时, 得到的返回值为-1.
补充: 清理函数(解决线程退出时资源释放问题, 特别是锁相关的资源)1
2
3
4
5//push and pop thread cancel-lation clean-up handlers
pthread_cleanup_pop()
pthread_cleanup_push()
pthread_cleanup_pop_restore_np()
pthread_cleanup_push_defer_np()
可以读读 man手册, 写的非常详细的三种情况:
- When a thread is canceled (某检查点处被终止, 默认的线程属性可以被取消)
- When a thread terminates by calling pthread_exit()
- When a thread calls pthread_cleanup_pop() with a nonzero execute argument
判断相等1
pthread_equal()
为了防止, 将来pthread_t类型可能变成结构体类型, 所以最好还是用该函数进行判断.
线程属性控制
进行线程属性设置, 也是 线程控制
的一部分.
修改线程属性的方法
1 | int pthread_attr_init(pthread_attr_t *attr); //成功: 0; 失败:错误号 |
线程属性主要包括如下属性:作用域(scope)、栈尺寸(stack size)、栈地址(stack address)、优先级(priority)、分离的状态(detached state)、调度策略和参数(scheduling policy and parameters)。默认的属性为非绑定、非分离、缺省的堆栈、与父进程同样级别的优先级.
- detached state (joinable? Default: PTHREAD_CREATE_JOINABLE. Other option: PTHREAD_CREATE_DETACHED)
- scheduling policy (real-time? PTHREAD_INHERIT_SCHED,PTHREAD_EXPLICIT_SCHED,SCHED_OTHER)
- scheduling parameter
- inheritsched attribute (Default: PTHREAD_EXPLICIT_SCHED Inherit from parent thread: PTHREAD_INHERIT_SCHED)
- scope (Kernel threads: PTHREAD_SCOPE_SYSTEM User threads: PTHREAD_SCOPE_PROCESS Pick one or the other not both.)
- guard size
- stack address (See unistd.h and bits/posix_opt.h _POSIX_THREAD_ATTR_STACKADDR)
- stack size (default minimum PTHREAD_STACK_SIZE set in pthread.h),
一般修改: (可以参考APUE 12.3)
- 线程的分离状态
- 能否被取消(默认PTHREAD_CANCEL_ENABLE)
- 是否立即取消(默认PTHREAD_CANCEL_DERERRED, 到达取消点才取消)
- 线程栈的大小
- 线程栈末尾警戒区的大小
- 调度策略(这个属于高级内容)
1 | int pthread_attr_setscope (pthread_attr_t* attr, int scope); |
创建优先级为10的线程:
1 | pthread_attr_t attr; |
设置取消
1 | pthread_attr_t attr; |
如果线程的rountine函数执行比较长, 最好在其中设置为不能被主线程取消(此时主线程的取消操作仍旧是return 0, 即成功的), 之后相关操作完成后, 再设置为可以被主线程取消.
(注意, 不管相关线程是否相应取消操作, 取消请求一直存在; 如果开始为不响应状态, 即未决, 那么即使之后设置为可以取消状态, 也不处理相关的取消请求)
设置分离状态
1 | pthread_attr_t attr; |
用完attr记得销毁;
并且如果要创建的线程在pthread_create返回之前就运行完毕了, 这个时候pthread_create返回错误代码, 此时最好应该让子线程的运行逻辑里加上等待逻辑, 例如:1
pthread_cond_timewait()
修改栈大小
查看栈大小(线程均分进程栈空间)1
2ulimit -a | grep stack
#8192字节, 即8M; 一个进程栈大小8M
可以从堆中手动申请, 然后设置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 1024);
/*
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstacksize(pthread_attr_t *attr, size_t *stacksize);
*/
//或者不让内核指定位置, 即位置, 大小都由我们手动指定
//可以把地址指定到堆空间上, 而不是栈空间上了
//char *buffer = (char*)malloc(1024*sizeof(char));
//pthread_attr_setstatck(&attr, buffer, 1024);
/*int pthread_attr_setstack(pthread_attr_t *attr,
void *stackaddr, size_t stacksize);
int pthread_attr_getstack(pthread_attr_t *attr,
void **stackaddr, size_t *stacksize); */
(堆空间相对大一些, 再把stack size设置小一点, 那么每个进程可以创建的线程数量就会变多)
设置警戒区
每个线程栈的末尾有一段警戒区, 用来防止线程和线程之间干扰或者栈溢出.
用的不是太多, 略.
其他属性
察看当前 pthread 库版本 getconf GNU_LIBPTHREAD_VERSION
.
(NPTL 实现机制(POSIX), Native POSIX Thread Library)
线程控制总结
- 主线程退出其他线程不退出, 主线程应调用 pthread_exit
- 避免僵尸线程
pthread_join
,pthread_detach
,pthread_create
指定分离属性 - 被 join 线程可能在 join 函数返回前就释放完自己的所有内存资源, 所以不应当返回被回收线程栈中的值(即临时变量);
- malloc 和 mmap 申请的内存可以被其他线程释放
- 应避免在多线程模型中调用 fork ; 进程中只有调用 fork 的线程存在(该线程独占进程了), 其他线程在子进程中均 pthread_exit 自动退出.
- 信号的复杂语义很难和多线程共存,应避免在多线程引入信号机制
线程的同步(重点)
(该部分内容非常多)
互斥是同步的工具
线程互斥和线程同步还有一定的距离
同步条件
访问某一个共享资源有先后的次序, 具体说, 某一个线程访问某一个资源, 得到结果之前, 该调用不返回, 并且其他线程不可访问.
(多个线程需要访问某一个共享资源时, 需要同步处理)
- 多个独立对象
- 竞争访问
- 统一资源(共享资源)
同步的手段(并发编程): 加锁(多线程); 任务队列(actor)
(锁是建议锁, 不具有强制性; 意思说, 你访问共享资源的时候, 线程不写拿锁逻辑, 直接访问也是可以的, 但这种方式就没有经过同步处理, 是不推荐的)
一把锁, 对应一个需要操作的共享数据.
互斥量
互斥锁, 也是一把建议锁&协同锁, 不具有强制性; 涉及到的相关函数如下:
(apue上面称之为 协同锁
)1
2
3
4
5
6pthread_mutex_init()
pthread_mutex_destroy()
pthread_mutex_lock() //相当于把mutex实例减1
pthread_trylock() //如果能加锁成功, 则加锁成功;否则就返回
pthread_mutex_unlock() //相当于把mutex实例加1
成功返回0, 失败返回错误码;
pthread_mutex_t
是结构体, 使用时忽略细节(不需要关注该结构体的细节)可以简单认为是整型, 有两种取值0,1; 相当于一个把锁.pthread_mutexattr_t
是指互斥锁的属性.
下面给予一个简单的实例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
pthread_mutex_t mutex;
void *func(void *arg)
{
srand(time(NULL));
while(1) {
pthread_mutex_lock(&mutex);
printf("small");
sleep(rand()%2);
printf("\n");
pthread_mutex_unlock(&mutex);
sleep(rand()%2);
}
return NULL;
}
int main(void)
{
pthread_t tid;
int ret;
srand(time(NULL));
if(pthread_mutex_init(&mutex, NULL)) {//mutex == 1
fprintf(stderr, "mutex init error");
exit(-1);
}
ret = pthread_create(&tid, NULL, func, NULL/*(void*)i*/);
if(ret) {
fprintf(stderr, "pthread_create error:%s\n", strerror(ret));
exit(-1);
}
while(1) {
pthread_mutex_lock(&mutex);
printf("BIG");
sleep(rand()%2);
printf("\n");
pthread_mutex_unlock(&mutex);
sleep(rand()%2);
}
pthread_mutex_destroy(&mutex);
return 0;
}
(注意保证另外一个线程也能抢到cpu, 所以保证锁的粒度越小越好, 共享的标准输出一旦使用完毕, 立即解锁)
读写锁
与互斥量类似, 但是读写锁运行 更高的并发性
; 写独占, 读共享; 共享独占锁, 适合读的情况比写的情况的场景. 拥有写锁的线程, 其他线程全部阻塞等待, 直到完成; 而读锁加锁成功时, 并且同时需要加锁的没有写锁(即之后的全是读锁), 那么其他线程仍可以加读锁, 但是一旦有写锁, 则大家一起等着(即读锁也不能加锁成功); 并且读写锁一起抢的时候, 写锁的优先级高
.
一把rwlock
锁有3个状态:
- 读模式下加锁(读锁)
- 写模式下加锁(写锁)
- 不加锁状态
主要函数总结:1
2
3
4
5
6
7
8
9
10pthread_rwlock_init()
pthread_rwlock_destroy()
pthread_rwlock_rdlock() //读加锁
pthread_rwlock_wrlock() //写加锁
pthread_rwlock_tryrdlock() //尝试读加锁
pthread_rwlock_trywrlock() //尝试写加锁
pthread_rwlock_unlock() //解锁
成功返回0, 失败返回错误码; 关于解锁, 特别的情况是, 如果多个读锁锁定某个资源, 那么直到最后一个读锁解锁该资源才算真正解锁了.
pthread_rwlock_t
定义了读写锁的类型; pthread_rwlockattr_t
读写锁的属性.
写一个demo, 演示读写锁1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
int counter = 0; //shared resource
pthread_rwlock_t rwlock;
void *thread_write(void *arg)
{
int t;
int i = (int)arg;
while(1) {
t = counter;
usleep(1000);
pthread_rwlock_wrlock(&rwlock); //write lock
printf("---write %d: %lu: counter=%d ++counter=%d\n", i, pthread_self(), t, ++counter);
pthread_rwlock_unlock(&rwlock);
usleep(2000);
}
return NULL;
}
void *thread_read(void *arg)
{
int i = (int)arg;
while(1) {
pthread_rwlock_rdlock(&rwlock);
printf("--------------------read %d: %lu: %d\n", i, pthread_self(), counter);
pthread_rwlock_unlock(&rwlock);
usleep(900);
}
return NULL;
}
int main(void)
{
int i;
pthread_t tid[8]; //3 write, 5 read
pthread_rwlock_init(&rwlock, NULL);
for(int i = 0; i<3; i++) {
pthread_create(&tid[i], NULL, thread_write, (void*)i);
}
for(int i =0; i<5; i++) {
pthread_create(&tid[i+3], NULL, thread_read, (void*)i);
}
for(i = 0; i<8; i++) {
pthread_join(tid[i], NULL);
}
pthread_rwlock_destroy(&rwlock);
return 0;
}
上面例子只是演示了最基本的用法. 总之对于加锁效率, 如果存在大量的读情况, 读写锁是非常不错的选择.
条件变量
(sudo apt-get install manpages-posix-dev
)
条件变量本身 不是锁
, 一般用于谓词判断(判断一下条件), 可造成线程阻塞.
通常要和互斥锁mutex配合在一起, 完成同步操作;
(满足某个条件才阻塞或者得到cpu)
主要函数有:1
2
3
4
5
6
7pthread_cond_init()
pthread_cond_destroy()
pthread_cond_wait() //核心
pthread_cond_timedwait()// 时间到, 自动返回(而非永久阻塞)
pthread_cond_signal() //唤醒其他线程, 至少一个(可以是具体的某个)
pthread_cond_broadcast() //唤醒其他所有阻塞线程(不指定具体的某个)
其中 pthread_cond_wait()
:1
2int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
- 该函数调用时, 阻塞等待条件变量, 并解锁互斥量(相当于unlock), 且是原子操作(放开锁让别的线程操作)
- 阻塞等待一个条件变量(条件变量不满足则一直等待)—或者等其他线程唤醒本线程
- 该函数返回时(其他线程唤醒该线程时), 重新申请获取互斥锁pthread_mutex_lock(&mutex);//或者trylock; 本次拿不到就等下次, 不一定能拿到, 但是一旦拿到锁则阻塞解除, 可以开始操作数据了
(什么时候条件变量满足? 别的线程调用signal, broadcast的时候自然会传入cond, 此时即表明条件变量已经满足, 有点儿像java的notify和notifyAll)
具体的例子, 可以参考一下下面的 生产者消费者模型
(生产队列中有没有产品, 就依靠条件变量做判断; 加锁依靠mutex)
(也就是大家伙都cond_wait着, 就依靠别人signal或者broadcast通知, 即当条件变量满足时通知等待的众人, 然后大家抢锁成功的操作共享资源, 没有成功的继续循环调用wait阻塞等待下一次通知)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*static init*/
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;
struct msg{
int num;
struct msg *next;
};
struct msg *head = NULL;
struct msg *tmp = NULL;
void *productor(void *arg)
{
for(;;) {
tmp = (struct msg*) malloc(sizeof(struct msg));
tmp->num = rand() % 100 + 1;
printf("---producer has produced %d \n", tmp->num);
pthread_mutex_lock(&mutex);
tmp->next = head;
head = tmp;
//在解锁之前通知(特别注意)
pthread_cond_signal(&has_product);
pthread_mutex_unlock(&mutex);
usleep(rand()%1000); //给消费者机会
}
return NULL;
}
void *consumer(void *arg)
{
for(;;) {
/*开始操作Msg队列*/
pthread_mutex_lock(&mutex);
while(head==NULL) { //队列为空, 放弃锁, 阻塞, 等待下一波唤醒
pthread_cond_wait(&has_product, &mutex);
}
//有产品, 取走一个队头
tmp = head;
head = head->next;
pthread_mutex_unlock(&mutex);
printf("consumer has consumed %d \n", tmp->num);
free(tmp);
tmp = NULL;
usleep(rand()%1000); //给生产者机会
}
return NULL;
}
int main(void)
{
pthread_t ptid, ctid; //processer and consumer
/*remember checking return code*/
pthread_create(&ptid, NULL, productor, NULL);
pthread_create(&ctid, NULL, consumer, NULL);
pthread_join(ptid, NULL);
pthread_join(ctid, NULL);
return 0;
}
至于 pthread_cond_timewait
注意一下时间的使用:
(指定阻塞时常, struct timespec具体定义可以参考 `man sem_timedwait)1
2
3
4
5
6
7
8
9
10/*
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
*/
time_t cur = time(NULL); //获取当前时间
struct timespec t = {0, 0}; //绝对时间(可以直接在这里设置定时时间)
t.tv_sec = cur + 10; //当前时间往后, 定时10秒(作为绝对时间)
pthread_cond_timewait(&cond, &mutex, &t);
该部分可以参考 APUE 11.6 章节.
总结: 条件变量在达到同步阻塞的目的是, 更多的减少了不必要的竞争(例如多个抢锁的进程, 原来是直接抢锁, 现在是等到条件满足的时候才去抢锁).
信号量
信号量和信号完全两码事儿.
(semaphore可以用于线程, 进程间同步; mutex的增强版本)
主要函数如下:1
2
3
4
5
6
7
8
9
10
11
12
/*
pshared控制信号量的类型,0表示这个信号量是当前进程的局部信号量,
否则,这个信号量就可以在多个进程之间共享
*/
int sem_init(sem_t *sem, int pshared, unsigned int value); //value为的1时, 相当于互斥量mutex
sem_destroy()
sem_wait() //相当于加锁, 但不一定阻塞 value--, value减到0才会阻塞
sem_trywait()
sem_timedwait()
sem_post() //相当于 value++ 并signal通知唤醒阻塞的线程
具体可以查看man手册 man sem_init
, 上述的加减操作对开发者隐藏; 并且可以看出, 上述初值value决定了同时占用的共享资源的个数.
具体案例, 见下面 消费者生产者模型
(不同于条件变量的实现, 这个思路比较简单, 实现多生产者, 多消费者)
(定义两个信号量, 其中一个表示实际生产出来的产品数量, 另外一个信号量表示总共的产品 (线性)队列
空余容量)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
int queue[NUM]={0}; //全局队列
sem_t blank_num, product_num; //空余容量, 已生产容量
/*本案例中, 队列用的数组代替,
所以队列的移动等操作忽略(可能出现消费者消费产品为0的情况)*/
void *produce(void *arg)
{
int i = 0;
while (1) {
sem_wait(&blank_num); //有空余才生产, 无空余则阻塞
queue[i] = rand() % 100 + 1; //产生一个产品
printf("----produced %d\n", queue[i]);
sem_post(&product_num);
i = (i+1) % NUM; //逻辑环
sleep(rand()%1); //给消费者机会
}
}
void *consume(void *arg){
int i = 0;
while (1) {
sem_wait(&product_num); //有产品才消费, 没有产品则等待
//队列首部出队
printf("consume %d\n", queue[i]);
queue[i] = 0;
sem_post(&blank_num);
i = (i+1) % NUM; //逻辑环
sleep(rand()%3); //给生产者机会多生产
}
}
int main(void)
{
/*忽略rerurn code的检查*/
pthread_t pid, cid;
sem_init(&blank_num, 0, NUM);
sem_init(&product_num, 0, 0);
pthread_create(&pid, NULL, produce, NULL);
pthread_create(&cid, NULL, consume, NULL);
pthread_join(pid, NULL);
pthread_join(cid, NULL);
sem_destroy(&blank_num);
sem_destroy(&product_num);
return 0;
}
死锁
使用互斥量或者其他锁机制中出现的一种现象.
一般我遇到的情况:
- 线程试图对互斥量A重复加锁
- 线程1拥有A锁, 线程2拥有B锁; 两者互相请求对方的锁.
解决方案, 一般是找到问题, 对症下药.
- 加锁之前, 一定先检查一下之前是否加过同样的锁(及时解锁)
- (得不到资源的时候, 把已经占有的先释放)已经有一把锁, 再加锁另一把的时候可以使用try_lock逻辑(try_lock失败,就主动放弃已有的锁,让对方线程先运行)
一个简单的demo, 看看下面这个死锁现象(注意执行步骤):
1 |
|
这种类型解法上面已经说了, “牺牲自己, 成全别人”: 使用trylock逻辑, 如果我拿不到别人的锁, 我先放弃自己已经占有的锁, 将来再去占锁.
总结起来也就是这样:1
2
3
4
5
6
7
8
9
10
11
12pthread_mutex_lock(&mutex_1);
while ( pthread_mutex_trylock(&mutex_2) ) /* Test if already locked */
{
pthread_mutex_unlock(&mutex_1); /* Free resource to avoid deadlock */
...
/* stall here */
...
pthread_mutex_lock(&mutex_1);
}
count++;
pthread_mutex_unlock(&mutex_2);
pthread_mutex_unlock(&mutex_1);
常见模型
- 生产者-消费者模型
这里使用的是, 采用互斥量+条件变量一起使用的案例, 具体代码如下:
1 |
|
上面用一个简单的链表, 条件变量绑定mutex就实现了一个 “产品空” 的 阻塞通知模型
.
上面的代码只是把意思表现出来了, 但是没有涵盖 “产品满” 的情况, 下面给出一个相对规范的代码写法:
(用具体的数组模拟环形队列, 从而控制产品数量)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//封装产品队列条件结构体
typedef struct cond_struct
{
pthread_cond_t notfull;
pthread_cond_t notempty;
pthread_mutex_t lock; //绑定条件变量的Mutex
int buffer[SIZE]; //产品队列(数组)
int readpos; //消费位置
int writepos; //生产位置
}cond_t;
void init(cond_t*); //初始化 cond_t 条件&产品队列结构体
void put(cond_t *, int); //"放入产品"函数的封装(注意检测满的情况)
int get(cond_t *); //"取出产品"函数的封装(注意检测空的情况)
void *produce(void*);
void *consume(void*);
cond_t queue; //全局队列
/*主函数(主线程负责回收)*/
int main(void)
{
pthread_t producer, consumer;
init(&queue);
pthread_create(&producer, NULL, produce, NULL);
pthread_create(&consumer, NULL, consume, NULL);
pthread_join(producer, NULL);
pthread_join(consumer, NULL);
return 0;
}
void init(cond_t *arg)
{
pthread_mutex_init(&(arg->lock), NULL);
pthread_cond_init(&(arg->notempty), NULL);
pthread_cond_init(&(arg->notfull), NULL);
arg->readpos = 0;
arg->writepos = 0;
}
/*放入和取出注意一下是存储结构是数组, 而不是链表*/
void put(cond_t *queue, int data)
{
pthread_mutex_lock(&(queue->lock));
//生产之前判断不为满
while( (queue->writepos+1)%SIZE == queue->readpos ) {
//说明队列满了
printf("producer watting for the queue not full\n");
pthread_cond_wait(&(queue->notfull), &(queue->lock));
}
//队列没有满的情况, 直接生产
queue->buffer[queue->writepos] = data;
queue->writepos = (queue->writepos + 1) % SIZE;
printf("producer produce data = %d\n", data);
pthread_cond_signal(&(queue->notempty));
pthread_mutex_unlock(&(queue->lock));
}
//出队一个产品
int get(cond_t *queue)
{
int data;
pthread_mutex_lock(&(queue->lock));
while( queue->readpos == queue->writepos ) {
printf("consumer waitting for queue not empty.\n");
pthread_cond_wait(&(queue->notempty), &(queue->lock));
}
//不为空, 直接出队
data = queue->buffer[queue->readpos];
queue->readpos = (queue->readpos + 1) % SIZE;
printf("---consumer consume data = %d\n", data);
pthread_cond_signal(&(queue->notfull));
pthread_mutex_unlock(&(queue->lock));
return data;
}
void *produce(void *arg)
{
srand(time(NULL));
/*可以控制生产速度, 用循环控制生产产品个数*/
//for(int i = 0; i < n; ++)
while(1) {
put(&queue, rand()%10);
sleep(1); //1秒生产一个(让生产者稍快)
}
return NULL;
}
void *consume(void *arg)
{
while(1) {
get(&queue);
sleep(2);
}
return NULL;
}
条件变量如果想在判断的同时, 控制产品的个数, 则比较困难(当然你多加变量控制链表或者数组当然没有问题), 最多也就是告知是否有产品, 队列是否已满. 同样的逻辑信号量实现起来更加简单, 并且还能控制产品数量.
(定义两个信号量, 其中一个表示实际生产出来的产品数量, 另外一个信号量表示总共的产品 (线性)队列
空余容量)
1 |
|
- 哲学家问题
这里有个很经典的分析过程, 具体略, 主要思想是: 要么能拿到2把锁, 开始就餐; 要么拿不到释放已经占有的锁(或者就餐完毕释放已经占有的两把锁)
下面可以采用 mutex数组
或者 信号量
去实现(思想类似)5个哲学家就餐问题:
1 | if (i == 4) { |
之后可以进行加锁操作:
左手加锁成功则尝试加右锁, 如果右锁加锁成功则就餐, 完毕释放资源(先释放右锁); 如果失败, 则释放左锁, 阻塞等待(再从加左锁开始).
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41/*
struct hands
{
int left;
int right
};
*/
//strut hands[5]; //假设已经初始化完毕
pthread_mutex_t m[5]; //假设已经初始化完毕
pthread_t t[5]; //假设已经初始化完毕并已经创建
void *run(void *arg)
{
//通过传入参数得知自己是几号线程(哲学家)
int pthread_num = (int) arg;
int left;
int right;
if (pthread_num == 4) {
left = pthread_num;
right = 0;
} else {
left = pthread_num;
right = pthread_num+1;
}
while(1) {
pthread_mutex_lock(&m[left]);
if (pthread_mutex_trylock(&m[right])) {
printf("thread[%d] lock right failed, relese left lock %d\n", pthread_num, left);
} else {
printf("thread[%d] start eating\n", pthread_num);
pthread_mutex_unlock(&m[right]);
}
pthread_mutex_unlock(&m[left]);
usleep(500); //休息500毫秒再抢
}
return NULL;
}
完整的代码可以是:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
struct hands
{
//按照顺序前面N-1个哲学家线程抢left, 最后一个抢right
int left;
int right;
};
pthread_t t[N]; //5个哲学家线程
pthread_mutex_t m[N]; //5把锁
struct hands h[N];//5个线程左右手对应锁的编号
void * func(void *arg)
{
int pthread_num = (int) arg;
int left = h[pthread_num].left;
int right = h[pthread_num].right;
while (1) {
pthread_mutex_lock(&m[left]);
if (pthread_mutex_trylock(&m[right])) {
printf("thread[%d] lock right failed, relese left lock %d\n", pthread_num, left);
} else {
printf("thread[%d] start eating\n", pthread_num);
pthread_mutex_unlock(&m[right]);
}
pthread_mutex_unlock(&m[left]);
//休息20纳秒再抢(不给别人机会很可能自己一直在loop)
//给别人机会越多, 大家不容易冲突; 睡的少, 容易不停放锁
usleep(20);
}
return NULL;
}
int main(void)
{
int i;
//初始化
for(i=0; i<N; i++) {
//初始化锁
if (pthread_mutex_init(&m[i], NULL) ) {
fprintf(stderr, "mutex[%d] init error\n", i);
exit(-1);
}
//初始化哲学家线程应该抢的锁
h[i].left = i;
h[i].right = (i+1)%N;
//初始化线程
if ( pthread_create(&t[i], NULL, func, (void*)i) ) {
fprintf(stderr, "thread[%d] init error\n", i);
exit(-1);
}
}//end of for init
//join and destroy
for(i=0; i<N; i++) {
pthread_join(t[i], NULL);
}
for(i=0; i<N; i++) {
pthread_mutex_destroy(&m[i]);
}
return 0;
}
关于锁的选择, 其实可以使用信号量semophore.
sem_t s[N]; //value 0或者1
然后使用sem_wait以及sem_trywait, sem_post进行主要逻辑, 其过程类似上面的mutex.
(注意, 此处用的是信号量 semaphore.h
不建议使用信号 sys/sem.h
, sys/shm.h
, 多线程和信号兼容不好, 多进程问题可以使用信号 )
总结: 让哲学家按一定顺序就餐, 得不到时即释放当前获得的锁.
另外一种高效的方式是:
引入一个对各个线程(进程)的调度者, 当哲学家饥饿时向该调度者申请用餐, 而调度者根据哲学家面前是否同时有两只筷子空闲来判断哲学家此时能否就餐, 能则占用这两只筷子并且让哲学家就餐, 不能则让哲学家继续等待. 引入一个调度者能很有效地管理进程资源的分配.
线程私有数据
很多人把它解释为 线程局部存储
(Thread Local Storage), 简称 TSL
, 不过Linux平台最好理解成 TSD
, 即 Thread Specific Data. 即使各个线程都有一样的名字的变量, 但是它们是不同的变量(有不同的存储地点), 各个线程独占; 你可以把它理解成, 线程局部变量
. (就像Java中ThreadLocal)
出现原因自然是, 多线程编程中, 全局变量被一个线程修改, 就会影响另外一个线程的使用; 所以最好有一个线程特有变量的机制, 或称线程局部变量, 或static memory local to a thread (线程局部静态变量).
线程局部存储在不同的平台有不同的实现, 可移植性不太好. 幸好要实现线程局部存储并不难. “最简单的办法就是建立一个全局表”, 通过当前线程ID去查询相应的数据, 因为各个线程的ID不同, 查到的数据自然也不同了. (如果当初由你涉及全局表, 估计今天看到的是另外一种使用方式; 当然你可以完全不必关心相关的实现)
并且Linux平台已经提供了相关的实现:
1 | int pthread_key_create(pthread_key_t *key, |
补充说明:
- 当线程被创建时, 会将所有的线程局部存储变量初始化为NULL
- destructor() 被自动调用时机: 线程终止时且key关联的值不为NULL.
- pthread_key_delete()并不检查当前是否有线程正在使用该线程局部数据变量,也不会调用清理函数destructor.
- pthread_key_t的key数量使有限的, linux中可以通过 PTHREAD_KEY_MAX(定义于limits.h文件中)或者系统调用sysconf(_SC_THREAD_KEYS_MAX)来确定当前系统最多支持多少个键(默认1024), key太多, 请进行封装.
- pthread_key_create对应的key的
const void *value
可以是普通类型强制类型转换过去的, 此时destructor可以传入NULL - pthread_setspecific()设置的value取出。在使用取出的值前最好是将void*转换成原始数据类型的指针, 再取值.
(注: linux平台的实现是, 两张全局表, 进程一张, 线程各一张; 全局表的某个key:pthread_keys[index] 对应 线程表tsd[index]; 所以即使大家在 进程领域
都叫一样的名字, 但是 线程表领域
值是不一样的)
我写个使用案例, 你就大概明白它的 二级表
实现了:
1 |
|
当然GCC扩展的TSD实现 __thread
也不错, 只是移植性不好.
在Linux中还有一种更为高效的线程局部存储方法,就是使用关键字__thread来定义变量。__thread是GCC内置的线程局部存储设施(Thread-Local Storage),它的实现非常高效,与pthread_key_t向比较更为快速,其存储性能可以与全局变量相媲美,而且使用方式也更为简单。创建线程局部变量只需简单的在全局或者静态变量的声明中加入__thread说明即可。凡是带有__thread的变量,每个线程都拥有该变量的一份拷贝,且互不干扰。线程局部存储中的变量将一直存在,直至线程终止,当线程终止时会自动释放这一存储。__thread并不是所有数据类型都可以使用的,因为其只支持POD(Plain old data structure) 类型,不支持class类型——其不能自动调用构造函数和析构函数。同时__thread可以用于修饰全局变量、函数内的静态变量,但是不能用于修饰函数的局部变量或者class的普通成员变量。另外,__thread变量的初始化只能用编译期常量.
1 | //如果变量声明中使用量关键字static或者extern,那么关键字__thread必须紧随其后 |
具体不展开, 有兴趣自己去查一下吧, 我还是建议使用 “不那么高效”的 pthread_key_t
线程的调试
详细信息可以参看 多线程-进程的调试 .
其他注意事项
线程的诸多好处, 貌似总是相对于进程的, 比如通信, 共享等都比进程方便; 但是线程也有不方便的:
- gdb调试困难
- 线程对于
信号
的支持不好(有些函数在不同平台的意义未定义) - 保证使用的pthread版本一致,
getconf GNU_LIBPTHREAD_VERSION
- detach后的进程,不要去join,会得到invalid argument的运行时报错(人家自己已经处理完毕资源了,在Join函数之前);
- 被cancel的线程, 也需要回收,但是得到的返回值是-1
- malloc, mmap的堆空间会被其他线程共享(小心其他线程给你free掉)—-可重入中也说过,如果想要可重入,不要调用malloc和free
- 多线程模型中最好不要调用fork, 除非立即exec*; 此时创建出来的子进程中只有调用fork调用的这个线程存在,其他线程全部pthread_exit()了—最好不要用fork.
总结
本文几乎通篇都在介绍pthread API及其含义&使用, 从线程的控制, 线程的属性, 以及线程的同步(mutex函数,条件变量函数,信号量,同步屏障函数,读写锁), 线程私有变量, 最后其他的一些uitl函数. 当然也还有没有涉及到了如 pthread_once
.
但是, 已经花费了很长时间, 几乎把pthread全部总结了一下.
可以看到,玩下来,基本上操作&弄懂API没有多大难度,难的是理解这么设计的理由是什么?以及设计并发流程,换句话说,到目前为了所有的练习全部是为了掌握并发编程的工具,真正的核心可能在于并发的模式或者算法上,而相关内容本文并没有深入设计,而这方面你只能参考业界相关领域的大牛,甚至是写相关库的本人的论述。
参考资料
- POSIX threads explained –IBM的技术贴, 关键看它开头部分
- 《Programming With POSIX Theads》 作者: David R.Butenhof (市面上已经绝版)—强烈推荐
- 《Unix Networking Programming Volume 1》 chapter 26–作者: W.Richard Stevens等
- 《APUE》
- 《linux高级程序设计》 作者:杨宗德等—人民邮电出版社
- 《pthread primer》 这个数绝版了, 结合linux内核讲解线程, 比较不错
- [《多线程服务器的常用编程模型》] 陈硕大佬的网文
- [《Linux/UNIX系统编程手册(上)》] 线程私有数据