pthread 相关内容。 个人觉得对于线程的把控,难度要小于进程,进程的通信方式多到能写一本书,并且每种方式还有其各自的适用范围以及注意事项,真正玩起来还是有点儿多。甚至对于内核而言,它都不区分进程和线程。 但是线程就不一样,只是Linux下面的话,线程当前的模型就一种:内核支持的NPTL
(Native POSIX Thread Library), 但是如果你读过《7周7并发模型》 的话就会知道并发问题远不止这么点儿东西。 本文从实战的角度展开主要涉及4个方面:
Thread management - creating, joining threads etc.
Mutexes
Condition variables
Synchronization between threads using read/write locks and barriers
The POSIX semaphore API works with POSIX threads but is not part of threads standard.
但是实际上就两大类:
线程的控制原语(包括线程属性)
线程的同步原语(信号量(PV),互斥量,竞争/冒险条件,文件锁,屏障)
贴个图:
API偏多, 注意一下常用的使用场景以及一些注意事项即可, 深入理解线程
请参考专业书籍, 后续再写. (本文是详细总结, 所以细节偏多)
引子
那么先问一个问题,一个Linux进程最多起多少个线程?
本文涉及哪些内容, 见下图
线程控制函数 pthread_create(), pthread_exit(), pthread_join()
线程的同步(控制线程同时执行 && 访问代码临界区) 信号量: sem_() 互斥量: pthreadmutex () 条件变量: pthreadcond *() (线程与信号配合的并不好,所以不要这样相互使用)
线程的属性(比较多,但是常用的不多,大概6个左右) pthreadattr *() 设置一些取消状态等
其他
正文 线程的控制 主要内容:(直接返回errno,而不是返回-1设置errno了)
pthread_self()—–相当于getpid(); 但是注意pthread_t,虽然在linux下是%ul但是在别的系统下可能是结构体.(始终成功)
pthread_create()—–相当于fork(),但是这个函数的参数比较多;一定注意,如果失败了,传入的pthread就不要再使用了
pthread_exit()—–相当于exit(), 但是是退出单个线程, 并给出返回值
pthread_join()—-相当于waitpid(), 阻塞等待某一个线程退出, 并拿到其返回值(注意指针的传参)
pthread_detach()— 线程分离函数, 进程控制没有这个函数
pthread_cancel()—-相当于进程的kill(), 表示杀死某个子线程
pthread_equal()—-判断两个线程是否相同(主要是由于pthread_t定义在不同系统可能不一样, 所以用该函数判断)
(return, exit, pthread_exit(), 推荐最后一个)
下面给出一些demo:
创建一个线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 #include <stdio.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <string.h> void *my_func (void *arg) { printf ("in thread id = %lu\n" , pthread_self()); return NULL ; } int main (void ) { pthread_t tid; int ret; printf ("in main id = %lu\n" , pthread_self()); ret = pthread_create(&tid, NULL , my_func, NULL ); if ( ret ){ printf ("create thread failed, %s\n" ,strerror(ret)); exit (-1 ); } sleep(1 ); return 0 ; }
其实核心代码也就:
1 2 3 4 5 6 7 8 9 10 11 pthread_t mythread; if ( pthread_create( &mythread, NULL , thread_function, NULL ) ) { printf ("error creating thread." ); abort (); } if ( pthread_join ( mythread, NULL ) ) { printf ("error joining thread." ); abort (); }
但是可以看到,使用ps -Lf <pid>
看到的LWP号码,要比这里pthread_self()拿到的要小的多。
1 2 3 ps -Lf UID PID PPID LWP C NLWP STIME TTY TIME CMD merlin 20185 19023 20185 0 1 17:53 pts/1 00:00:00 -bash
(pthread_self()拿到的数字非常大, 而LWP是linux内核分配cpu时间片的依据)
循环创建多个线程
1 2 3 4 5 6 7 for (i =0 ; i< 10 ; ++i){ ret = pthread_create(&tid[i], NULL , my_func, (void *)i); if (ret != 0 ){ fprintf (stderr , "pthread_create thread %d error: %s\n" , i+1 , strerror(ret)); exit (-1 ); } }
为什么传递值而不是传递地址, 因为当子线程真正解引用去拿值的时候main函数这个线程可能已经走到了别的位置,i的值已经在main中被改变了;所以这里只能用值传递。
加上pthread_join和pthread_exit之后, 完整的创建多个线程的例子如下:(既然创建了, 就要负责回收—如果它不是detach线程的话)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> void *func (void *arg) { while (1 ) { sleep(1 ); } return NULL ; } int main (void ) { pthread_t tid; int ret; unsigned long i=1 ; for (;;) { ret = pthread_create(&tid, NULL , func, NULL ); if (ret) { fprintf (stderr , "pthread_create error:%s\n" , strerror(ret)); printf ("now thread sum number = %lu\n" , i+1 ); exit (-1 ); } i++; } return 0 ; }
运行结果
1 2 pthread_create error:Resource temporarily unavailable now thread sum number = 32756
(大致可以看到, 一个进程能够创建多少个线程和线程栈空间大小, 以及物理内存多少有关)
拿到线程的返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 typedef struct { int a; int b; }exit_t ; void *func (void *arg) { exit_t *ret; ret = malloc (sizeof (exit_t )); ret->a = 1 ; ret->b = 2 ; pthread_exit((void *)ret); } int main (void ) { pthread_t tid; exit_t *ret; if ( pthread_create(&tid, NULL , func, NULL ) ){ fprintf (stderr , "pthread_create error" ); exit (-1 ); } pthread_join(tid, (void **) &ret); printf ("return value %d, %d" , ret->a, ret->b); free (ret); ret = NULL ; return 0 ; }
(最好把ret作为传入参数传递给pthread_create, 之后申请和释放都在main函数中了)
线程分离 好处是, 不需要再由其他线程等待回收, 在状态上可以避免僵尸线程(结束时自动把自己的pcb在内存中的残留资源清理掉), 但是进程没有该机制, 所以当进程死亡如果没有回收, 总会在内存中残留一些资源, 导致内核始终认为该进程还存在.
1 int pthread_detach (pthread_t thread) ;
(也可以使用线程属性进行设置分离状态) 创建完毕即可以分离
1 2 pthread_create(&tid, NULL , func, NULL ); pthread_detach(tid);
设置分离之后, 再用join去回收, 则失败, 即会返回错误号22(无效的参数invalid argument). (没有分离的时候, 是阻塞等待回收; 一旦设置线程的分离状态, 那么join直接反馈传递的tid无效)
杀死线程 kill发送信号, 杀死进程(相对实时); 而线程中, 不一定都能杀死, 要在一定的 “取消点” 才可以(非实时).
线程的取消并不是实时的,而有一定的延时。需要等待线程到达某个取消点(检查点).取消点:是线程检查是否被取消,并按请求进行动作的一个位置。通常是一些系统调用 creat, open, pause,close, read, write….. 执行命令 man 7 pthreads 可以查看具备这些取消点的系统调用列表。也可参阅 APUE.12.7 取消选项小节。
可以简单的认为, 陷入系统调用, 就进入了检查点; 如果没有检查点, 可以手动添加一个:
被cancel杀死的线程, 如果在用join去回收, 则退出值是-1 (实际上是由 PTHREAD_CANCELED
宏决定的); 被取消的线程, 退出值定义在 Linux的 pthread 库中; 常数 PTHREAD_CANCELED 的值是-1. 可在头文件 pthread.h中找到它的定义: #define PTHREAD_CANCELED ((void *) -1)
. 因此当我们对一个已经被取消的线程使用 pthread_join回收时, 得到的返回值为-1.
补充: 清理函数(解决线程退出时资源释放问题, 特别是锁相关的资源)
1 2 3 4 5 pthread_cleanup_pop() pthread_cleanup_push() pthread_cleanup_pop_restore_np() pthread_cleanup_push_defer_np()
可以读读 man手册 , 写的非常详细的三种情况:
When a thread is canceled (某检查点处被终止, 默认的线程属性可以被取消)
When a thread terminates by calling pthread_exit()
When a thread calls pthread_cleanup_pop() with a nonzero execute argument
判断相等
为了防止, 将来pthread_t类型可能变成结构体类型, 所以最好还是用该函数进行判断.
线程属性控制 进行线程属性设置, 也是 线程控制
的一部分.
修改线程属性的方法 1 2 int pthread_attr_init (pthread_attr_t *attr) ; int pthread_attr_destroy (pthread_attr_t *attr) ;
线程属性主要包括如下属性:作用域(scope)、栈尺寸(stack size)、栈地址(stack address)、优先级(priority)、分离的状态(detached state)、调度策略和参数(scheduling policy and parameters)。默认的属性为非绑定、非分离、缺省的堆栈、与父进程同样级别的优先级.
detached state (joinable? Default: PTHREAD_CREATE_JOINABLE. Other option: PTHREAD_CREATE_DETACHED)
scheduling policy (real-time? PTHREAD_INHERIT_SCHED,PTHREAD_EXPLICIT_SCHED,SCHED_OTHER)
scheduling parameter
inheritsched attribute (Default: PTHREAD_EXPLICIT_SCHED Inherit from parent thread: PTHREAD_INHERIT_SCHED)
scope (Kernel threads: PTHREAD_SCOPE_SYSTEM User threads: PTHREAD_SCOPE_PROCESS Pick one or the other not both.)
guard size
stack address (See unistd.h and bits/posix_opt.h _POSIX_THREAD_ATTR_STACKADDR)
stack size (default minimum PTHREAD_STACK_SIZE set in pthread.h),
一般修改: (可以参考APUE 12.3)
线程的分离状态
能否被取消(默认PTHREAD_CANCEL_ENABLE)
是否立即取消(默认PTHREAD_CANCEL_DERERRED, 到达取消点才取消)
线程栈的大小
线程栈末尾警戒区的大小
调度策略(这个属于高级内容)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 int pthread_attr_setscope (pthread_attr_t* attr, int scope); 功能:设置线程绑定属性。 attr:线程属性。 scope:PTHREAD_SCOPE_SYSTEM(绑定);PTHREAD_SCOPE_PROCESS(非绑定) 函数返回值:成功:0,失败:-1 int pthread_attr_setschedpolicy(pthread_attr_t* attr, int policy); 功能:设置创建线程的调度策略。 attr:线程属性; policy:线程调度策略:SCHED_FIFO、SCHED_RR和SCHED_OTHER。 函数返回值:成功:0,失败:-1 int pthread_attr_setschedparam (pthread_attr_t* attr, struct sched_param* param); 功能:设置线程优先级。 attr:线程属性。 param:线程优先级。 函数返回值:成功:0,失败:-1
创建优先级为10的线程:
1 2 3 4 5 6 7 8 9 10 pthread_attr_t attr;struct sched_param param ;pthread_attr_init(&attr); pthread_attr_setscope (&attr, PTHREAD_SCOPE_SYSTEM); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setschedpolicy(&attr, SCHED_RR); param.sched_priority = 10 ; pthread_attr_setschedparam(&attr, ¶m); pthread_create(xxx, &attr, xxx, xxx); pthread_attr_destroy(&attr);
设置取消 1 2 3 4 5 pthread_attr_t attr;pthread_attr_init(&attr); pthread_attr_setcancelstate(int new_state, int *old_state); pthread_attr_setcanceltype(int new , int *old);
如果线程的rountine函数执行比较长, 最好在其中设置为不能被主线程取消(此时主线程的取消操作仍旧是return 0, 即成功的), 之后相关操作完成后, 再设置为可以被主线程取消.
(注意, 不管相关线程是否相应取消操作, 取消请求一直存在; 如果开始为不响应状态, 即未决, 那么即使之后设置为可以取消状态, 也不处理相关的取消请求)
设置分离状态 1 2 3 4 5 pthread_attr_t attr;pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACH); pthread_create(&tid, &attr, func, NULL );
用完attr记得销毁; 并且如果要创建的线程在pthread_create返回之前就运行完毕了, 这个时候pthread_create返回错误代码, 此时最好应该让子线程的运行逻辑里加上等待逻辑, 例如:
修改栈大小 查看栈大小(线程均分进程栈空间)
可以从堆中手动申请, 然后设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 pthread_attr_t attr;pthread_attr_init(&attr); pthread_attr_setstacksize(&attr, 1024 );
(堆空间相对大一些, 再把stack size设置小一点, 那么每个进程可以创建的线程数量就会变多)
设置警戒区 每个线程栈的末尾有一段警戒区, 用来防止线程和线程之间干扰或者栈溢出.
用的不是太多, 略.
其他属性 察看当前 pthread 库版本 getconf GNU_LIBPTHREAD_VERSION
.
(NPTL 实现机制(POSIX), Native POSIX Thread Library)
线程控制总结
主线程退出其他线程不退出, 主线程应调用 pthread_exit
避免僵尸线程 pthread_join
, pthread_detach
, pthread_create
指定分离属性
被 join 线程可能在 join 函数返回前就释放完自己的所有内存资源, 所以不应当返回被回收线程栈中的值(即临时变量);
malloc 和 mmap 申请的内存可以被其他线程释放
应避免在多线程模型中调用 fork ; 进程中只有调用 fork 的线程存在(该线程独占进程了), 其他线程在子进程中均 pthread_exit 自动退出.
信号的复杂语义很难和多线程共存,应避免在多线程引入信号机制
线程的同步(重点) (该部分内容非常多) 互斥是同步的工具 线程互斥和线程同步还有一定的距离
同步条件 访问某一个共享资源有先后的次序, 具体说, 某一个线程访问某一个资源, 得到结果之前, 该调用不返回, 并且其他线程不可访问. (多个线程需要访问某一个共享资源时, 需要同步处理)
同步的手段(并发编程): 加锁(多线程); 任务队列(actor)
(锁是建议锁, 不具有强制性; 意思说, 你访问共享资源的时候, 线程不写拿锁逻辑, 直接访问也是可以的, 但这种方式就没有经过同步处理, 是不推荐的)
一把锁, 对应一个需要操作的共享数据.
互斥量 互斥锁, 也是一把建议锁&协同锁, 不具有强制性; 涉及到的相关函数如下: (apue上面称之为 协同锁
)
1 2 3 4 5 6 pthread_mutex_init() pthread_mutex_destroy() pthread_mutex_lock() pthread_trylock() pthread_mutex_unlock()
成功返回0, 失败返回错误码;
pthread_mutex_t
是结构体, 使用时忽略细节(不需要关注该结构体的细节)可以简单认为是整型, 有两种取值0,1; 相当于一个把锁.pthread_mutexattr_t
是指互斥锁的属性.
下面给予一个简单的实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 #include <stdio.h> #include <string.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> pthread_mutex_t mutex;void *func (void *arg) { srand(time(NULL )); while (1 ) { pthread_mutex_lock(&mutex); printf ("small" ); sleep(rand()%2 ); printf ("\n" ); pthread_mutex_unlock(&mutex); sleep(rand()%2 ); } return NULL ; } int main (void ) { pthread_t tid; int ret; srand(time(NULL )); if (pthread_mutex_init(&mutex, NULL )) { fprintf (stderr , "mutex init error" ); exit (-1 ); } ret = pthread_create(&tid, NULL , func, NULL ); if (ret) { fprintf (stderr , "pthread_create error:%s\n" , strerror(ret)); exit (-1 ); } while (1 ) { pthread_mutex_lock(&mutex); printf ("BIG" ); sleep(rand()%2 ); printf ("\n" ); pthread_mutex_unlock(&mutex); sleep(rand()%2 ); } pthread_mutex_destroy(&mutex); return 0 ; }
(注意保证另外一个线程也能抢到cpu, 所以保证锁的粒度越小越好, 共享的标准输出一旦使用完毕, 立即解锁)
读写锁 与互斥量类似, 但是读写锁运行 更高的并发性
; 写独占, 读共享; 共享独占锁, 适合读的情况比写的情况的场景. 拥有写锁的线程, 其他线程全部阻塞等待, 直到完成; 而读锁加锁成功时, 并且同时需要加锁的没有写锁(即之后的全是读锁), 那么其他线程仍可以加读锁, 但是一旦有写锁, 则大家一起等着(即读锁也不能加锁成功); 并且读写锁一起抢的时候, 写锁的优先级高
. 一把rwlock
锁有3个状态:
读模式下加锁(读锁)
写模式下加锁(写锁)
不加锁状态
主要函数总结:
1 2 3 4 5 6 7 8 9 10 pthread_rwlock_init() pthread_rwlock_destroy() pthread_rwlock_rdlock() pthread_rwlock_wrlock() pthread_rwlock_tryrdlock() pthread_rwlock_trywrlock() pthread_rwlock_unlock()
成功返回0, 失败返回错误码; 关于解锁, 特别的情况是, 如果多个读锁锁定某个资源, 那么直到最后一个读锁解锁该资源才算真正解锁了.
pthread_rwlock_t
定义了读写锁的类型; pthread_rwlockattr_t
读写锁的属性.
写一个demo, 演示读写锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 #include <stdio.h> #include <unistd.h> #include <pthread.h> int counter = 0 ; pthread_rwlock_t rwlock;void *thread_write (void *arg) { int t; int i = (int )arg; while (1 ) { t = counter; usleep(1000 ); pthread_rwlock_wrlock(&rwlock); printf ("---write %d: %lu: counter=%d ++counter=%d\n" , i, pthread_self(), t, ++counter); pthread_rwlock_unlock(&rwlock); usleep(2000 ); } return NULL ; } void *thread_read (void *arg) { int i = (int )arg; while (1 ) { pthread_rwlock_rdlock(&rwlock); printf ("--------------------read %d: %lu: %d\n" , i, pthread_self(), counter); pthread_rwlock_unlock(&rwlock); usleep(900 ); } return NULL ; } int main (void ) { int i; pthread_t tid[8 ]; pthread_rwlock_init(&rwlock, NULL ); for (int i = 0 ; i<3 ; i++) { pthread_create(&tid[i], NULL , thread_write, (void *)i); } for (int i =0 ; i<5 ; i++) { pthread_create(&tid[i+3 ], NULL , thread_read, (void *)i); } for (i = 0 ; i<8 ; i++) { pthread_join(tid[i], NULL ); } pthread_rwlock_destroy(&rwlock); return 0 ; }
上面例子只是演示了最基本的用法. 总之对于加锁效率, 如果存在大量的读情况, 读写锁是非常不错的选择.
条件变量 (sudo apt-get install manpages-posix-dev
) 条件变量本身 不是锁
, 一般用于谓词判断(判断一下条件), 可造成线程阻塞.
通常要和互斥锁mutex配合在一起, 完成同步操作;
(满足某个条件才阻塞或者得到cpu)
主要函数有:
1 2 3 4 5 6 7 pthread_cond_init() pthread_cond_destroy() pthread_cond_wait() pthread_cond_timedwait() pthread_cond_signal() pthread_cond_broadcast()
其中 pthread_cond_wait()
:
1 2 int pthread_cond_wait (pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex) ;
该函数调用时, 阻塞等待条件变量, 并解锁互斥量(相当于unlock), 且是原子操作(放开锁让别的线程操作)
阻塞等待一个条件变量(条件变量不满足则一直等待)—或者等其他线程唤醒本线程
该函数返回时(其他线程唤醒该线程时), 重新申请获取互斥锁pthread_mutex_lock(&mutex);//或者trylock; 本次拿不到就等下次, 不一定能拿到, 但是一旦拿到锁则阻塞解除, 可以开始操作数据了
(什么时候条件变量满足? 别的线程调用signal, broadcast的时候自然会传入cond, 此时即表明条件变量已经满足, 有点儿像java的notify和notifyAll)
具体的例子, 可以参考一下下面的 生产者消费者模型
(生产队列中有没有产品, 就依靠条件变量做判断; 加锁依靠mutex) (也就是大家伙都cond_wait着, 就依靠别人signal或者broadcast通知, 即当条件变量满足时通知等待的众人, 然后大家抢锁成功的操作共享资源, 没有成功的继续循环调用wait阻塞等待下一次通知)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;struct msg { int num; struct msg *next ; }; struct msg *head = NULL ;struct msg *tmp = NULL ;void *productor (void *arg) { for (;;) { tmp = (struct msg*) malloc (sizeof (struct msg)); tmp->num = rand() % 100 + 1 ; printf ("---producer has produced %d \n" , tmp->num); pthread_mutex_lock(&mutex); tmp->next = head; head = tmp; pthread_cond_signal(&has_product); pthread_mutex_unlock(&mutex); usleep(rand()%1000 ); } return NULL ; } void *consumer (void *arg) { for (;;) { pthread_mutex_lock(&mutex); while (head==NULL ) { pthread_cond_wait(&has_product, &mutex); } tmp = head; head = head->next; pthread_mutex_unlock(&mutex); printf ("consumer has consumed %d \n" , tmp->num); free (tmp); tmp = NULL ; usleep(rand()%1000 ); } return NULL ; } int main (void ) { pthread_t ptid, ctid; pthread_create(&ptid, NULL , productor, NULL ); pthread_create(&ctid, NULL , consumer, NULL ); pthread_join(ptid, NULL ); pthread_join(ctid, NULL ); return 0 ; }
至于 pthread_cond_timewait
注意一下时间的使用: (指定阻塞时常, struct timespec具体定义可以参考 `man sem_timedwait)
1 2 3 4 5 6 7 8 9 10 time_t cur = time(NULL ); struct timespec t = {0 , 0 }; t.tv_sec = cur + 10 ; pthread_cond_timewait(&cond, &mutex, &t);
该部分可以参考 APUE 11.6 章节.
总结: 条件变量在达到同步阻塞的目的是, 更多的减少了不必要的竞争(例如多个抢锁的进程, 原来是直接抢锁, 现在是等到条件满足的时候才去抢锁).
信号量 信号量和信号完全两码事儿. (semaphore可以用于线程, 进程间同步; mutex的增强版本)
主要函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 #include <semaphore.h> int sem_init (sem_t *sem, int pshared, unsigned int value) ; sem_destroy() sem_wait() sem_trywait() sem_timedwait() sem_post()
具体可以查看man手册 man sem_init
, 上述的加减操作对开发者隐藏; 并且可以看出, 上述初值value决定了同时占用的共享资源的个数.
具体案例, 见下面 消费者生产者模型
(不同于条件变量的实现, 这个思路比较简单, 实现多生产者, 多消费者) (定义两个信号量, 其中一个表示实际生产出来的产品数量, 另外一个信号量表示总共的产品 (线性)队列
空余容量)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define NUM 5 int queue [NUM]={0 }; sem_t blank_num, product_num; void *produce (void *arg) { int i = 0 ; while (1 ) { sem_wait(&blank_num); queue [i] = rand() % 100 + 1 ; printf ("----produced %d\n" , queue [i]); sem_post(&product_num); i = (i+1 ) % NUM; sleep(rand()%1 ); } } void *consume (void *arg) { int i = 0 ; while (1 ) { sem_wait(&product_num); printf ("consume %d\n" , queue [i]); queue [i] = 0 ; sem_post(&blank_num); i = (i+1 ) % NUM; sleep(rand()%3 ); } } int main (void ) { pthread_t pid, cid; sem_init(&blank_num, 0 , NUM); sem_init(&product_num, 0 , 0 ); pthread_create(&pid, NULL , produce, NULL ); pthread_create(&cid, NULL , consume, NULL ); pthread_join(pid, NULL ); pthread_join(cid, NULL ); sem_destroy(&blank_num); sem_destroy(&product_num); return 0 ; }
死锁 使用互斥量或者其他锁机制中出现的一种现象. 一般我遇到的情况:
线程试图对互斥量A重复加锁
线程1拥有A锁, 线程2拥有B锁; 两者互相请求对方的锁.
解决方案, 一般是找到问题, 对症下药.
加锁之前, 一定先检查一下之前是否加过同样的锁(及时解锁)
(得不到资源的时候, 把已经占有的先释放)已经有一把锁, 再加锁另一把的时候可以使用try_lock逻辑(try_lock失败,就主动放弃已有的锁,让对方线程先运行)
一个简单的demo, 看看下面这个死锁现象(注意执行步骤):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <string.h> pthread_mutex_t mutex_a;pthread_mutex_t mutex_b;void *func (void *) { pthread_mutex_lock(&mutex_a); printf ("sub thread, i have a lock, quest for b lock; once i get b lock, i will unlock a\n" ); pthread_mutex_lock(&mutex_b); pthread_mutex_unlock(&mutex_a); pthread_mutex_unlock(&mutex_b); return NULL ; } int main (void ) { pthread_t tid; pthread_mutex_init(&mutex_a, NULL ); pthread_mutex_init(&mutex_b, NULL ); pthread_create(&tid, NULL , func, NULL ); pthread_mutex_lock(&mutex_b); printf ("main thread, i have b lock, quest for a lock; once i get a lock, i will unlock b\n" ); sleep(1 ); pthread_mutex_lock(&mutex_a); pthread_mutex_unlock(&mutex_b); pthread_mutex_unlock(&mutex_a); pthread_mutex_destroy(&mutex_a); pthread_mutex_destroy(&mutex_b); }
这种类型解法上面已经说了, “牺牲自己, 成全别人”: 使用trylock逻辑, 如果我拿不到别人的锁, 我先放弃自己已经占有的锁, 将来再去占锁.
总结起来也就是这样:
1 2 3 4 5 6 7 8 9 10 11 12 pthread_mutex_lock(&mutex_1); while ( pthread_mutex_trylock(&mutex_2) ) { pthread_mutex_unlock(&mutex_1); ... ... pthread_mutex_lock(&mutex_1); } count++; pthread_mutex_unlock(&mutex_2); pthread_mutex_unlock(&mutex_1);
常见模型
这里使用的是, 采用互斥量+条件变量一起使用的案例, 具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 #include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;pthread_cond_t has_product = PTHREAD_COND_INITIALIZER;struct msg { int num; struct msg *next ; }; struct msg *head = NULL ;struct msg *tmp = NULL ;void *productor (void *arg) { for (;;) { tmp = (struct msg*) malloc (sizeof (struct msg)); tmp->num = rand() % 100 + 1 ; printf ("---producer has produced %d \n" , tmp->num); pthread_mutex_lock(&mutex); tmp->next = head; head = tmp; pthread_cond_signal(&has_product); pthread_mutex_unlock(&mutex); usleep(rand()%1000 ); } return NULL ; } void *consumer (void *arg) { for (;;) { pthread_mutex_lock(&mutex); while (head==NULL ) { pthread_cond_wait(&has_product, &mutex); } tmp = head; head = head->next; pthread_mutex_unlock(&mutex); printf ("consumer has consumed %d \n" , tmp->num); free (tmp); tmp = NULL ; usleep(rand()%1000 ); } return NULL ; } int main (void ) { pthread_t ptid, ctid; pthread_create(&ptid, NULL , productor, NULL ); pthread_create(&ctid, NULL , consumer, NULL ); pthread_join(ptid, NULL ); pthread_join(ctid, NULL ); return 0 ; }
上面用一个简单的链表, 条件变量绑定mutex就实现了一个 “产品空” 的 阻塞通知模型
.
上面的代码只是把意思表现出来了, 但是没有涵盖 “产品满” 的情况, 下面给出一个相对规范的代码写法: (用具体的数组模拟环形队列, 从而控制产品数量)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define SIZE 3 typedef struct cond_struct { pthread_cond_t notfull; pthread_cond_t notempty; pthread_mutex_t lock; int buffer[SIZE]; int readpos; int writepos; }cond_t ; void init (cond_t *) ; void put (cond_t *, int ) ; int get (cond_t *) ; void *produce (void *) ;void *consume (void *) ;cond_t queue ; int main (void ) { pthread_t producer, consumer; init(&queue ); pthread_create(&producer, NULL , produce, NULL ); pthread_create(&consumer, NULL , consume, NULL ); pthread_join(producer, NULL ); pthread_join(consumer, NULL ); return 0 ; } void init (cond_t *arg) { pthread_mutex_init(&(arg->lock), NULL ); pthread_cond_init(&(arg->notempty), NULL ); pthread_cond_init(&(arg->notfull), NULL ); arg->readpos = 0 ; arg->writepos = 0 ; } void put (cond_t *queue , int data) { pthread_mutex_lock(&(queue ->lock)); while ( (queue ->writepos+1 )%SIZE == queue ->readpos ) { printf ("producer watting for the queue not full\n" ); pthread_cond_wait(&(queue ->notfull), &(queue ->lock)); } queue ->buffer[queue ->writepos] = data; queue ->writepos = (queue ->writepos + 1 ) % SIZE; printf ("producer produce data = %d\n" , data); pthread_cond_signal(&(queue ->notempty)); pthread_mutex_unlock(&(queue ->lock)); } int get (cond_t *queue ) { int data; pthread_mutex_lock(&(queue ->lock)); while ( queue ->readpos == queue ->writepos ) { printf ("consumer waitting for queue not empty.\n" ); pthread_cond_wait(&(queue ->notempty), &(queue ->lock)); } data = queue ->buffer[queue ->readpos]; queue ->readpos = (queue ->readpos + 1 ) % SIZE; printf ("---consumer consume data = %d\n" , data); pthread_cond_signal(&(queue ->notfull)); pthread_mutex_unlock(&(queue ->lock)); return data; } void *produce (void *arg) { srand(time(NULL )); while (1 ) { put(&queue , rand()%10 ); sleep(1 ); } return NULL ; } void *consume (void *arg) { while (1 ) { get(&queue ); sleep(2 ); } return NULL ; }
条件变量如果想在判断的同时, 控制产品的个数, 则比较困难(当然你多加变量控制链表或者数组当然没有问题), 最多也就是告知是否有产品, 队列是否已满. 同样的逻辑信号量实现起来更加简单, 并且还能控制产品数量.
(定义两个信号量, 其中一个表示实际生产出来的产品数量, 另外一个信号量表示总共的产品 (线性)队列
空余容量)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #define NUM 5 int queue [NUM]={0 }; sem_t blank_num, product_num; void *produce (void *arg) { int i = 0 ; while (1 ) { sem_wait(&blank_num); queue [i] = rand() % 100 + 1 ; printf ("----produced %d\n" , queue [i]); sem_post(&product_num); i = (i+1 ) % NUM; sleep(rand()%1 ); } } void *consume (void *arg) { int i = 0 ; while (1 ) { sem_wait(&product_num); printf ("consume %d\n" , queue [i]); queue [i] = 0 ; sem_post(&blank_num); i = (i+1 ) % NUM; sleep(rand()%3 ); } } int main (void ) { pthread_t pid, cid; sem_init(&blank_num, 0 , NUM); sem_init(&product_num, 0 , 0 ); pthread_create(&pid, NULL , produce, NULL ); pthread_create(&cid, NULL , consume, NULL ); pthread_join(pid, NULL ); pthread_join(cid, NULL ); sem_destroy(&blank_num); sem_destroy(&product_num); return 0 ; }
这里有个很经典的分析过程, 具体略, 主要思想是: 要么能拿到2把锁, 开始就餐; 要么拿不到释放已经占有的锁(或者就餐完毕释放已经占有的两把锁)
下面可以采用 mutex数组
或者 信号量
去实现(思想类似)5个哲学家就餐问题:
1 2 3 4 5 6 7 8 9 10 if (i == 4 ) { left = i; right = 0 ; } else { left = i; right = i+1 ; } left = i; right = (i+1 )%5 ;
之后可以进行加锁操作: 左手加锁成功则尝试加右锁, 如果右锁加锁成功则就餐, 完毕释放资源(先释放右锁); 如果失败, 则释放左锁, 阻塞等待(再从加左锁开始).
代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 pthread_mutex_t m[5 ]; pthread_t t[5 ]; void *run (void *arg) { int pthread_num = (int ) arg; int left; int right; if (pthread_num == 4 ) { left = pthread_num; right = 0 ; } else { left = pthread_num; right = pthread_num+1 ; } while (1 ) { pthread_mutex_lock(&m[left]); if (pthread_mutex_trylock(&m[right])) { printf ("thread[%d] lock right failed, relese left lock %d\n" , pthread_num, left); } else { printf ("thread[%d] start eating\n" , pthread_num); pthread_mutex_unlock(&m[right]); } pthread_mutex_unlock(&m[left]); usleep(500 ); } return NULL ; }
完整的代码可以是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define N 5 struct hands { int left; int right; }; pthread_t t[N]; pthread_mutex_t m[N]; struct hands h [N ];void * func (void *arg) { int pthread_num = (int ) arg; int left = h[pthread_num].left; int right = h[pthread_num].right; while (1 ) { pthread_mutex_lock(&m[left]); if (pthread_mutex_trylock(&m[right])) { printf ("thread[%d] lock right failed, relese left lock %d\n" , pthread_num, left); } else { printf ("thread[%d] start eating\n" , pthread_num); pthread_mutex_unlock(&m[right]); } pthread_mutex_unlock(&m[left]); usleep(20 ); } return NULL ; } int main (void ) { int i; for (i=0 ; i<N; i++) { if (pthread_mutex_init(&m[i], NULL ) ) { fprintf (stderr , "mutex[%d] init error\n" , i); exit (-1 ); } h[i].left = i; h[i].right = (i+1 )%N; if ( pthread_create(&t[i], NULL , func, (void *)i) ) { fprintf (stderr , "thread[%d] init error\n" , i); exit (-1 ); } } for (i=0 ; i<N; i++) { pthread_join(t[i], NULL ); } for (i=0 ; i<N; i++) { pthread_mutex_destroy(&m[i]); } return 0 ; }
关于锁的选择, 其实可以使用信号量semophore.
sem_t s[N]; //value 0或者1 然后使用sem_wait以及sem_trywait, sem_post进行主要逻辑, 其过程类似上面的mutex.
(注意, 此处用的是信号量 semaphore.h
不建议使用信号 sys/sem.h
, sys/shm.h
, 多线程和信号兼容不好, 多进程问题可以使用信号 )
总结: 让哲学家按一定顺序就餐, 得不到时即释放当前获得的锁.
另外一种高效的方式是: 引入一个对各个线程(进程)的调度者, 当哲学家饥饿时向该调度者申请用餐, 而调度者根据哲学家面前是否同时有两只筷子空闲来判断哲学家此时能否就餐, 能则占用这两只筷子并且让哲学家就餐, 不能则让哲学家继续等待. 引入一个调度者能很有效地管理进程资源的分配.
线程私有数据 很多人把它解释为 线程局部存储
(Thread Local Storage), 简称 TSL
, 不过Linux平台最好理解成 TSD
, 即 Thread Specific Data. 即使各个线程都有一样的名字的变量, 但是它们是不同的变量(有不同的存储地点), 各个线程独占; 你可以把它理解成, 线程局部变量
. (就像Java中ThreadLocal)
出现原因自然是, 多线程编程中, 全局变量被一个线程修改, 就会影响另外一个线程的使用; 所以最好有一个线程特有变量的机制, 或称线程局部变量, 或static memory local to a thread (线程局部静态变量).
线程局部存储在不同的平台有不同的实现, 可移植性不太好. 幸好要实现线程局部存储并不难. “最简单的办法就是建立一个全局表”, 通过当前线程ID去查询相应的数据, 因为各个线程的ID不同, 查到的数据自然也不同了. (如果当初由你涉及全局表, 估计今天看到的是另外一种使用方式; 当然你可以完全不必关心相关的实现)
并且Linux平台已经提供了相关的实现:
1 2 3 4 5 6 7 8 int pthread_key_create (pthread_key_t *key, void (*destructor)(void*)); //线程终止时才调用destructor清理存储 int pthread_key_delete (pthread_key_t key) ; void *pthread_getspecific (pthread_key_t key) ; int pthread_setspecific (pthread_key_t key, const void *value) ;
补充说明:
当线程被创建时, 会将所有的线程局部存储变量初始化为NULL
destructor() 被自动调用时机: 线程终止时且key关联的值不为NULL.
pthread_key_delete()并不检查当前是否有线程正在使用该线程局部数据变量,也不会调用清理函数destructor.
pthread_key_t的key数量使有限的, linux中可以通过 PTHREAD_KEY_MAX(定义于limits.h文件中)或者系统调用sysconf(_SC_THREAD_KEYS_MAX)来确定当前系统最多支持多少个键(默认1024), key太多, 请进行封装.
pthread_key_create对应的key的 const void *value
可以是普通类型强制类型转换过去的, 此时destructor可以传入NULL
pthread_setspecific()设置的value取出。在使用取出的值前最好是将void*转换成原始数据类型的指针, 再取值.
(注: linux平台的实现是, 两张全局表, 进程一张, 线程各一张; 全局表的某个key:pthread_keys[index] 对应 线程表tsd[index]; 所以即使大家在 进程领域
都叫一样的名字, 但是 线程表领域
值是不一样的)
我写个使用案例, 你就大概明白它的 二级表
实现了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> pthread_key_t key; void destructor (void *t) { int tmp = (int )t; printf ("destructor executed out of thread %lu, value:%d\n" , pthread_self(), tmp); } void *func1 (void *arg) { int i = 10 ; pthread_setspecific(key, (void *)i); printf ("thread %lu has set key to %d, address are %x\n" , pthread_self(), i, &i); sleep(2 ); return NULL ; } void *func2 (void *arg) { int i = 20 ; sleep(1 ); pthread_setspecific(key, (void *)i); printf ("thread %lu has set key to %d, address are %x\n" , pthread_self(), i, &i); return NULL ; } int main (void ) { pthread_t tid1, tid2; pthread_key_create(&key, destructor); pthread_create(&tid1, NULL , func1, NULL ); pthread_create(&tid2, NULL , func2, NULL ); pthread_join(tid1, NULL ); pthread_join(tid2, NULL ); pthread_key_delete(key); return 0 ; }
当然GCC扩展的TSD实现 __thread
也不错, 只是移植性不好.
在Linux中还有一种更为高效的线程局部存储方法,就是使用关键字__thread来定义变量。__thread是GCC内置的线程局部存储设施(Thread-Local Storage),它的实现非常高效,与pthread_key_t向比较更为快速,其存储性能可以与全局变量相媲美,而且使用方式也更为简单。创建线程局部变量只需简单的在全局或者静态变量的声明中加入__thread说明即可。凡是带有__thread的变量,每个线程都拥有该变量的一份拷贝,且互不干扰。线程局部存储中的变量将一直存在,直至线程终止,当线程终止时会自动释放这一存储。__thread并不是所有数据类型都可以使用的,因为其只支持POD(Plain old data structure) 类型,不支持class类型——其不能自动调用构造函数和析构函数。同时__thread可以用于修饰全局变量、函数内的静态变量,但是不能用于修饰函数的局部变量或者class的普通成员变量。另外,__thread变量的初始化只能用编译期常量.
1 2 3 4 static __thread char t_buf[32 ] = {'\0' };extern __thread int t_val = 0 ;
具体不展开, 有兴趣自己去查一下吧, 我还是建议使用 “不那么高效”的 pthread_key_t
线程的调试 详细信息可以参看 多线程-进程的调试 .
其他注意事项 线程的诸多好处, 貌似总是相对于进程的, 比如通信, 共享等都比进程方便; 但是线程也有不方便的:
gdb调试困难
线程对于 信号
的支持不好(有些函数在不同平台的意义未定义)
保证使用的pthread版本一致, getconf GNU_LIBPTHREAD_VERSION
detach后的进程,不要去join,会得到invalid argument的运行时报错(人家自己已经处理完毕资源了,在Join函数之前);
被cancel的线程, 也需要回收,但是得到的返回值是-1
malloc, mmap的堆空间会被其他线程共享(小心其他线程给你free掉)—-可重入中也说过,如果想要可重入,不要调用malloc和free
多线程模型中最好不要调用fork, 除非立即exec*; 此时创建出来的子进程中只有调用fork调用的这个线程存在,其他线程全部pthread_exit()了—最好不要用fork.
总结 本文几乎通篇都在介绍pthread API及其含义&使用, 从线程的控制, 线程的属性, 以及线程的同步(mutex函数,条件变量函数,信号量,同步屏障函数,读写锁), 线程私有变量, 最后其他的一些uitl函数. 当然也还有没有涉及到了如 pthread_once
.
但是, 已经花费了很长时间, 几乎把pthread全部总结了一下.
可以看到,玩下来,基本上操作&弄懂API没有多大难度,难的是理解这么设计的理由是什么?以及设计并发流程,换句话说,到目前为了所有的练习全部是为了掌握并发编程的工具,真正的核心可能在于并发的模式或者算法上,而相关内容本文并没有深入设计,而这方面你只能参考业界相关领域的大牛,甚至是写相关库的本人的论述。
参考资料
POSIX threads explained –IBM的技术贴, 关键看它开头部分
《Programming With POSIX Theads》 作者: David R.Butenhof (市面上已经绝版)—强烈推荐
《Unix Networking Programming Volume 1》 chapter 26–作者: W.Richard Stevens等
《APUE》
《linux高级程序设计》 作者:杨宗德等—人民邮电出版社
《pthread primer》 这个数绝版了, 结合linux内核讲解线程, 比较不错
[《多线程服务器的常用编程模型》] 陈硕大佬的网文
[《Linux/UNIX系统编程手册(上)》] 线程私有数据