共计 15126 个字符,预计需要花费 38 分钟才能阅读完成。
条件变量
之前咱们介绍了锁,然而锁并不是并发程序设计中所需的惟一原语。在很多状况下,线程须要查看某一条件(condition)满足之后,才会持续运行。例如,父线程须要查看子线程是否执行结束。这种期待如何实现呢?
注:并发程序有两大需要,一是互斥,二是期待。互斥是因为线程间存在共享数据,期待则是因为线程间存在依赖。
咱们能够尝试用一个共享变量,如图所示。这种解决方案个别能工作,然而效率低下,因为主线程会自旋查看,节约 CPU 工夫。咱们心愿有某种形式让父线程休眠,直到期待的条件满足(即子线程实现执行)。
1 volatile int done = 0; | |
2 | |
3 void *child(void *arg) {4 printf("child\n"); | |
5 done = 1; | |
6 return NULL; | |
7 } | |
8 | |
9 int main(int argc, char *argv[]) {10 printf("parent: begin\n"); | |
11 pthread_t c; | |
12 Pthread_create(&c, NULL, child, NULL); // create child | |
13 while (done == 0) | |
14 ; // spin | |
15 printf("parent: end\n"); | |
16 return 0; | |
17 } |
定义和应用
线程能够应用条件变量(condition variable),来期待一个条件变成真。条件变量是一个显式队列,当某些执行状态(即条件,condition)不满足时,线程能够把本人退出队列,期待该条件。当其余线程扭转了上述状态时,就能够通过在该条件上发送信号唤醒队列中的期待线程,让它们继续执行。
在 POSIX 库中,要申明一个条件变量,只有像这样写:pthread_cond_t c(留神:还须要适当的初始化)。条件变量有两种相干操作:wait()和 signal()。线程要睡眠的时候,调用 wait();当线程想唤醒期待在某个条件变量上的睡眠线程时,调用 signal()。上面是一个典型示例:
1 int done = 0; | |
2 pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER; | |
3 pthread_cond_t c = PTHREAD_COND_INITIALIZER; | |
4 | |
5 void thr_exit() {6 Pthread_mutex_lock(&m); | |
7 done = 1; | |
8 Pthread_cond_signal(&c); | |
9 Pthread_mutex_unlock(&m); | |
10 } | |
11 | |
12 void *child(void *arg) {13 printf("child\n"); | |
14 thr_exit(); | |
15 return NULL; | |
16 } | |
17 | |
18 void thr_join() {19 Pthread_mutex_lock(&m); | |
20 while (done == 0) | |
21 Pthread_cond_wait(&c, &m); | |
22 Pthread_mutex_unlock(&m); | |
23 } | |
24 | |
25 int main(int argc, char *argv[]) {26 printf("parent: begin\n"); | |
27 pthread_t p; | |
28 Pthread_create(&p, NULL, child, NULL); | |
29 thr_join(); | |
30 printf("parent: end\n"); | |
31 return 0; | |
32 } |
wait()调用除了条件变量外还有一个参数,它是一个互斥锁。它假设在 wait()调用时,这个互斥锁是已上锁状态。wait()的职责是原子地开释锁,并让调用线程休眠。当线程被唤醒时,它必须从新获取锁,再返回调用者。这样简单的步骤也是为了防止在线程陷入休眠时,产生一些竞态条件。
有两种状况须要思考。第一种状况是父线程创立出子线程,但本人持续运行,而后马上调用 thr_join()期待子线程。在这种状况下,它会先获取锁,查看子线程是否实现,而后调用 wait(),让本人休眠。子线程最终得以运行,打印出“child”,并调用 thr_exit()函数唤醒父线程,这段代码会在取得锁后设置状态变量 done,而后向父线程发信号唤醒它。最初,父线程会运行(从 wait()调用返回并持有锁),开释锁,打印出“parent:end”。
第二种状况是,子线程在创立后,立即运行,设置变量 done 为 1,调用 signal 函数唤醒其余线程(这里没有其余线程),而后完结。父线程运行后,调用 thr_join()时,发现 done 曾经是 1 了,就间接返回。
须要留神的是,在下面的代码中,状态变量 done 和互斥锁 c 都是必须的。如果咱们不应用状态变量,代码像上面这样,会呈现什么问题?
1 void thr_exit() {2 Pthread_mutex_lock(&m); | |
3 Pthread_cond_signal(&c); | |
4 Pthread_mutex_unlock(&m); | |
5 } | |
6 | |
7 void thr_join() {8 Pthread_mutex_lock(&m); | |
9 Pthread_cond_wait(&c, &m); | |
10 Pthread_mutex_unlock(&m); | |
11 } |
假如子线程立即运行,并且调用 thr_exit()。在这种状况下,子线程发送信号,但此时却没有在条件变量上睡眠期待的线程。父线程运行时,就会调用 wait 并卡在那里,没有其余线程会唤醒它。通过这个例子,你应该意识到变量 done 的重要性,它记录了线程感兴趣的值。睡眠、唤醒和锁都离不开它。
在上面的例子中,咱们假如线程在发信号和期待时都不加锁。又会产生什么问题?
1 void thr_exit() { | |
2 done = 1; | |
3 Pthread_cond_signal(&c); | |
4 } | |
5 | |
6 void thr_join() {7 if (done == 0) | |
8 Pthread_cond_wait(&c); | |
9 } |
这里的问题是一个奥妙的竞态条件。具体来说,如果父过程调用 thr_join(),查看完 done 的值为 0,而后试图睡眠。但在调用 wait 进入睡眠之前,父过程被中断。随后子线程批改变量 done 为 1,发出信号,此时同样没有期待线程。当父线程再次运行时,就会长眠不醒。
所以,咱们能够保持这样一条准则:在应用条件变量时,调用 signal 和 wait 时要持有锁。
生产者 / 消费者问题
假如有一个或多个生产者线程和一个或多个消费者线程。生产者把生成的数据项放入缓冲区,消费者从缓冲区取走数据项,以某种形式生产。很多理论的零碎中都会有这种场景。例如,在多线程的网络服务器中,一个生产者将 HTTP 申请放入工作队列,生产线程从队列中取走申请并解决。
因为有界缓冲区是共享资源,所以咱们必须通过同步机制来拜访它,免得产生竞态条件。为了更好地了解这个问题,咱们来尝试一些理论的代码。
首先须要一个共享缓冲区,让生产者放入数据,消费者取出数据。简略起见,咱们就拿一个整数来做缓冲区,两个外部函数将值放入缓冲区,从缓冲区取值。
1 int buffer; | |
2 int count = 0; // initially, empty | |
3 | |
4 void put(int value) {5 assert(count == 0); | |
6 count = 1; | |
7 buffer = value; | |
8 } | |
9 | |
10 int get() {11 assert(count == 1); | |
12 count = 0; | |
13 return buffer; | |
14 } |
put()函数会假如缓冲区是空的,把一个值存在缓冲区,而后把 count 设置为 1 示意缓冲区满了。get()函数刚好相同,把缓冲区清空后,并返回该值。
当初咱们须要编写一些函数,用于生产和生产数据。调用生产函数的咱们称之为生产者(producer)线程,调用生产函数的咱们称之为消费者(consumer)线程。上面展现了一对非线程平安的生产者和消费者的代码,生产者将一个整数放入共享缓冲区 loops 次,消费者继续从该共享缓冲区中获取数据,并打印出数据项。咱们的指标就是应用条件变量将其革新成线程平安的版本。
1 void *producer(void *arg) { | |
2 int i; | |
3 int loops = (int) arg; | |
4 for (i = 0; i < loops; i++) {5 put(i); | |
6 } | |
7 } | |
8 | |
9 void *consumer(void *arg) { | |
10 int i; | |
11 while (1) {12 int tmp = get(); | |
13 printf("%d\n", tmp); | |
14 } | |
15 } |
有问题的计划
显然,put()和 get()函数之中会有临界区,因为 put()更新缓冲区,get()读取缓冲区。咱们的首次尝试如下:
1 cond_t cond; | |
2 mutex_t mutex; | |
3 | |
4 void *producer(void *arg) { | |
5 int i; | |
6 for (i = 0; i < loops; i++) {7 Pthread_mutex_lock(&mutex); // p1 | |
8 if (count == 1) // p2 | |
9 Pthread_cond_wait(&cond, &mutex); // p3 | |
10 put(i); // p4 | |
11 Pthread_cond_signal(&cond); // p5 | |
12 Pthread_mutex_unlock(&mutex); // p6 | |
13 } | |
14 } | |
15 | |
16 void *consumer(void *arg) { | |
17 int i; | |
18 for (i = 0; i < loops; i++) {19 Pthread_mutex_lock(&mutex); // c1 | |
20 if (count == 0) // c2 | |
21 Pthread_cond_wait(&cond, &mutex); // c3 | |
22 int tmp = get(); // c4 | |
23 Pthread_cond_signal(&cond); // c5 | |
24 Pthread_mutex_unlock(&mutex); // c6 | |
25 printf("%d\n", tmp); | |
26 } | |
27 } |
当生产者想要填充缓冲区时,它期待缓冲区变空(p1~p3)。消费者具备完全相同的逻辑,但期待不同的条件——变满(c1~c3)。
当只有一个生产者和一个消费者时,上图的代码可能失常运行。但如果有超过一个线程,这个计划会有两个重大的问题。
先来看第一个问题,它与期待之前的 if 语句无关。假如有两个消费者(Tc1 和 Tc2),一个生产者(Tp)。首先,一个消费者(Tc1)先开始执行,它取得锁(c1),查看缓冲区是否能够生产(c2),而后期待(c3)。
接着生产者(Tp)运行。它获取锁(p1),查看缓冲区是否满(p2),发现没满就给缓冲区退出一个数字(p4)。而后生产者发出信号,说缓冲区已满(p5)。要害的是,这让第一个消费者(Tc1)不再睡在条件变量上,进入就绪队列。生产者继续执行,直到发现缓冲区满后睡眠(p6,p1-p3)。
这时问题产生了:另一个消费者(Tc2)领先执行,生产了缓冲区中的值。当初假如 Tc1 运行,在从 wait 返回之前,它获取了锁,而后返回。而后它调用了 get() (p4),但缓冲区已无奈生产。断言触发,代码不能像预期那样工作。
问题产生的起因很简略:在 Tc1 被生产者唤醒后,但在它运行之前,因为 Tc2 领先运行,缓冲区的状态扭转了。发信号给线程只是唤醒它们,暗示状态产生了变动,但并不会保障在它运行之前状态始终是冀望的状况。
仍有缺点的计划:应用 While 代替 If
修复这个问题很简略:把 if 语句改为 while。当消费者 Tc1 被唤醒后,立即再次查看共享变量(c2)。如果缓冲区此时为空,消费者就会回去持续睡眠(c3)。生产者中相应的 if 也改为 while(p2)。
1 cond_t cond; | |
2 mutex_t mutex; | |
3 | |
4 void *producer(void *arg) { | |
5 int i; | |
6 for (i = 0; i < loops; i++) {7 Pthread_mutex_lock(&mutex); // p1 | |
8 while (count == 1) // p2 | |
9 Pthread_cond_wait(&cond, &mutex); // p3 | |
10 put(i); // p4 | |
11 Pthread_cond_signal(&cond); // p5 | |
12 Pthread_mutex_unlock(&mutex); // p6 | |
13 } | |
14 } | |
15 | |
16 void *consumer(void *arg) { | |
17 int i; | |
18 for (i = 0; i < loops; i++) {19 Pthread_mutex_lock(&mutex); // c1 | |
20 while (count == 0) // c2 | |
21 Pthread_cond_wait(&cond, &mutex); // c3 | |
22 int tmp = get(); // c4 | |
23 Pthread_cond_signal(&cond); // c5 | |
24 Pthread_mutex_unlock(&mutex); // c6 | |
25 printf("%d\n", tmp); | |
26 } | |
27 } |
咱们要记住一条对于条件变量的简略规定:总是应用 while 循环。
然而,这段代码依然有一个问题,也是上文提到的两个问题之一,它和咱们只用了一个条件变量无关。
假如两个消费者(Tc1 和 Tc2)先运行,都睡眠了(c3)。生产者开始运行,在缓冲区放入一个值,唤醒了一个消费者(假设是 Tc1),并开始睡眠。当初是一个消费者马上要运行(Tc1),两个线程(Tc2 和 Tp)都期待在同一个条件变量上。
消费者 Tc1 醒过来并从 wait()调用返回(c3),从新查看条件(c2),发现缓冲区是满的,生产了这个值(c4)。这个消费者而后在该条件上发信号(c5),唤醒一个在睡眠的线程。然而,应该唤醒哪个线程呢?
因为消费者曾经清空了缓冲区,很显然,应该唤醒生产者。然而,如果它唤醒了 Tc2,问题就呈现了。消费者 Tc2 会醒过来,发现队列为空(c2),又持续回去睡眠(c3)。生产者 Tp 方才在缓冲区中放了一个值,当初在睡眠。消费者 Tc1 继续执行后也回去睡眠了。3 个线程都在睡眠,显然是一个大问题。
咱们能够看出:信号显然须要,但必须更有指向性。消费者不应该唤醒消费者,而应该只唤醒生产者,反之亦然。
单值缓冲区的正确计划
这个问题的解决方案也很简略:应用两个而不是一个条件变量,以便在零碎状态扭转时,能正确地发出信号唤醒哪类线程。上面展现了最终的代码。
1 cond_t empty, fill; | |
2 mutex_t mutex; | |
3 | |
4 void *producer(void *arg) { | |
5 int i; | |
6 for (i = 0; i < loops; i++) {7 Pthread_mutex_lock(&mutex); | |
8 while (count == 1) | |
9 Pthread_cond_wait(&empty, &mutex); | |
10 put(i); | |
11 Pthread_cond_signal(&fill); | |
12 Pthread_mutex_unlock(&mutex); | |
13 } | |
14 } | |
15 | |
16 void *consumer(void *arg) { | |
17 int i; | |
18 for (i = 0; i < loops; i++) {19 Pthread_mutex_lock(&mutex); | |
20 while (count == 0) | |
21 Pthread_cond_wait(&fill, &mutex); | |
22 int tmp = get(); | |
23 Pthread_cond_signal(&empty); | |
24 Pthread_mutex_unlock(&mutex); | |
25 printf("%d\n", tmp); | |
26 } | |
27 } |
最终计划
咱们当初有了可用的生产者 / 消费者计划,但不太通用,咱们最初所做的批改是为了进步并发和效率。具体来说就是 减少更多缓冲区槽位,这样在睡眠之前,生产者能够生产多个值;同样,消费者在睡眠之前能够生产多个值。
单个生产者和消费者时,这种计划因为上下文切换少,进步了效率。多个生产者和消费者时,它能够反对并发生产和生产。和现有计划相比,改变也很小。
第一处批改是缓冲区构造自身,以及对应的 put()和 get()办法:
1 int buffer[MAX]; | |
2 int fill = 0; | |
3 int use = 0; | |
4 int count = 0; | |
5 | |
6 void put(int value) {7 buffer[fill] = value; | |
8 fill = (fill + 1) % MAX; | |
9 count++; | |
10 } | |
11 | |
12 int get() {13 int tmp = buffer[use]; | |
14 use = (use + 1) % MAX; | |
15 count--; | |
16 return tmp; | |
17 } |
上面展现了最终的代码逻辑。至此,咱们解决了生产者 / 消费者问题。
1 cond_t empty, fill; | |
2 mutex_t mutex; | |
3 | |
4 void *producer(void *arg) { | |
5 int i; | |
6 for (i = 0; i < loops; i++) {7 Pthread_mutex_lock(&mutex); // p1 | |
8 while (count == MAX) // p2 | |
9 Pthread_cond_wait(&empty, &mutex); // p3 | |
10 put(i); // p4 | |
11 Pthread_cond_signal(&fill); // p5 | |
12 Pthread_mutex_unlock(&mutex); // p6 | |
13 } | |
14 } | |
15 | |
16 void *consumer(void *arg) { | |
17 int i; | |
18 for (i = 0; i < loops; i++) {19 Pthread_mutex_lock(&mutex); // c1 | |
20 while (count == 0) // c2 | |
21 Pthread_cond_wait(&fill, &mutex); // c3 | |
22 int tmp = get(); // c4 | |
23 Pthread_cond_signal(&empty); // c5 | |
24 Pthread_mutex_unlock(&mutex); // c6 | |
25 printf("%d\n", tmp); | |
26 } | |
27 } |
笼罩条件
当初再来看条件变量的一个例子。这段代码是一个简略的多线程内存调配库中的问题片段:
1 // how many bytes of the heap are free? | |
2 int bytesLeft = MAX_HEAP_SIZE; | |
3 | |
4 // need lock and condition too | |
5 cond_t c; | |
6 mutex_t m; | |
7 | |
8 void *allocate(int size) {9 Pthread_mutex_lock(&m); | |
10 while (bytesLeft < size) | |
11 Pthread_cond_wait(&c, &m); | |
12 void *ptr = ...; // get mem from heap | |
13 bytesLeft -= size; | |
14 Pthread_mutex_unlock(&m); | |
15 return ptr; | |
16 } | |
17 | |
18 void free(void *ptr, int size) {19 Pthread_mutex_lock(&m); | |
20 bytesLeft += size; | |
21 Pthread_cond_signal(&c); // whom to signal?? | |
22 Pthread_mutex_unlock(&m); | |
23 } |
从代码中能够看出,当线程调用进入内存调配代码时,它可能会因为内存不足而期待。相应的,线程开释内存时,会发信号说有更多内存闲暇。然而,代码中有一个问题:应该唤醒哪个期待线程(可能有多个线程)?
解决方案也很间接:用 pthread_cond_broadcast()代替上述代码中的 pthread_cond_signal(),唤醒所有的期待线程。这样做,确保了所有应该唤醒的线程都被唤醒。当然,不利的一面是可能会影响性能,因为不必要地唤醒了其余许多不该被唤醒的线程。这些线程被唤醒后,从新查看条件,马上再次睡眠。
这种条件变量叫作笼罩条件(covering condition),因为它能笼罩所有须要唤醒线程的场景(激进策略)。一般来说,如果你发现程序只有改成播送信号时能力工作,可能是程序有缺点。但在某些情景下,就像上述内存调配的例子中,播送可能是最间接无效的计划。
信号量
信号量是 Dijkstra 及其共事创造的,作为与同步无关的所有工作的惟一原语,能够应用信号量作为锁和条件变量。
定义
信号量是有一个整数值的对象,能够用两个函数来操作它。在 POSIX 规范中,是 sem_wait()和 sem_post()。因为信号量的初始值可能决定其行为,所以首先要初始化信号量,能力调用其余函数与之交互。
#include <semaphore.h> | |
sem_t s; | |
sem_init(&s, 0, 1); |
其中申明了一个信号量 s,通过第三个参数,将它的值初始化为 1。sem_init()的第二个参数,在咱们的所有例子中都被设置为 0,示意信号量是在同一过程的多个线程共享的。信号量初始化之后,咱们能够调用 sem_wait()或 sem_post()与之交互。
sem_wait()对信号量的值进行原子减一操作,当信号量的值大于等于 1 时立即返回,否则会将调用线程放入信号量关联的队列中期待被唤醒。sem_post()对信号量的值进行原子加一操作,它不必期待某些条件满足,间接减少信号量的值,如果有期待线程,就唤醒其中一个。当信号量的值为正数时,这个值就是期待线程的个数。
二值信号量(锁)
信号量的第一种用法是咱们曾经相熟的:用信号量作为锁。在上面的代码片段里,咱们间接把临界区用一对 sem_wait()/sem_post()盘绕。为了使这段代码失常工作,信号量 m 的初始值 X 是至关重要的。X 应该是多少呢?
sem_t m; | |
sem_init(&m, 0, X); // initialize semaphore to X; what should X be? | |
sem_wait(&m); | |
// critical section here | |
sem_post(&m); |
回顾 sem_wait()和 sem_post()函数的定义,咱们发现初值应该是 1。
咱们假如有两个线程的场景。第一个线程(线程 1)调用了 sem_wait(),它把信号量的值减为 0。因为值是 0,线程 1 从函数返回并进入临界区。如果没有其余线程尝试获取锁,当它调用 sem_post()时,会将信号量重置为 1(因为没有期待线程,不会唤醒其余线程)。
如果线程 1 持有锁,另一个线程(线程 2)调用 sem_wait()尝试进入临界区。这种状况下,线程 2 把信号量减为−1,而后期待。线程 1 再次运行,它最终调用 sem_post(),将信号量的值减少到 0,唤醒期待的线程,而后线程 2 就能够获取锁。线程 2 执行完结时,再次减少信号量的值,将它复原为 1。
因为锁只有两个状态(持有和没持有),所以这种用法有时也叫作二值信号量(binary semaphore)。
信号量用作条件变量
上面是一个简略的例子。假如一个线程创立另一个线程,并且期待它完结,那么信号量的初始值 X 应该是多少?
1 sem_t s; | |
2 | |
3 void * | |
4 child(void *arg) {5 printf("child\n"); | |
6 sem_post(&s); // signal here: child is done | |
7 return NULL; | |
8 } | |
9 | |
10 int | |
11 main(int argc, char *argv[]) {12 sem_init(&s, 0, X); // what should X be? | |
13 printf("parent: begin\n"); | |
14 pthread_t c; | |
15 Pthread_create(c, NULL, child, NULL); | |
16 sem_wait(&s); // wait here for child | |
17 printf("parent: end\n"); | |
18 return 0; | |
19 } |
有两种状况须要思考。第一种,父线程创立了子线程,然而子线程并没有运行。这种状况下,父线程调用 sem_wait()会先于子线程调用 sem_post()。咱们心愿父线程期待子线程运行,惟一的方法是让信号量的值不大于 0。因而,初值值为 0。父线程运行,将信号量减为−1,而后睡眠期待;子线程运行的时候,调用 sem_post(),信号量减少为 0,唤醒父线程,父线程而后从 sem_wait()返回,实现该程序。
第二种状况是子线程在父线程调用 sem_wait()之前就运行完结。在这种状况下,子线程会先调用 sem_post(),将信号量从 0 减少到 1。而后当父线程有机会运行时,会调用 sem_wait(),发现信号量的值为 1。于是父线程将信号量从 1 减为 0,没有期待,间接从 sem_wait()返回,也达到了预期成果。
生产者 / 消费者问题
在这里,咱们探讨如何应用信号量来解决下面提到的生产者 / 消费者,也即有界缓冲区问题。封装的 put()和 get()函数如下:
1 int buffer[MAX]; | |
2 int fill = 0; | |
3 int use = 0; | |
4 | |
5 void put(int value) {6 buffer[fill] = value; // line f1 | |
7 fill = (fill + 1) % MAX; // line f2 | |
8 } | |
9 | |
10 int get() {11 int tmp = buffer[use]; // line g1 | |
12 use = (use + 1) % MAX; // line g2 | |
13 return tmp; | |
14 } |
第一次尝试
咱们用两个信号量 empty 和 full 别离示意缓冲区空或者满,上面是咱们尝试解决生产者 / 消费者问题的代码。
1 sem_t empty; | |
2 sem_t full; | |
3 | |
4 void *producer(void *arg) { | |
5 int i; | |
6 for (i = 0; i < loops; i++) {7 sem_wait(&empty); // line P1 | |
8 put(i); // line P2 | |
9 sem_post(&full); // line P3 | |
10 } | |
11 } | |
12 | |
13 void *consumer(void *arg) { | |
14 int i, tmp = 0; | |
15 while (tmp != -1) {16 sem_wait(&full); // line C1 | |
17 tmp = get(); // line C2 | |
18 sem_post(&empty); // line C3 | |
19 printf("%d\n", tmp); | |
20 } | |
21 } | |
22 | |
23 int main(int argc, char *argv[]) { | |
24 // ... | |
25 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... | |
26 sem_init(&full, 0, 0); // ... and 0 are full | |
27 // ... | |
28 } |
咱们先假如 MAX=1,验证程序是否无效。假如有两个线程,一个生产者和一个消费者。咱们来看在一个 CPU 上的具体场景。消费者先运行,执行到 C1 行,调用 sem_wait(&full)。因为 full 初始值为 0,wait 调用会将 full 减为−1,导致消费者睡眠,期待另一个线程调用 sem_post(&full),合乎预期。
假如生产者而后运行。执行到 P1 行,调用 sem_wait(&empty)。生产者将继续执行,因为 empty 被初始化为 MAX(在这里是 1)。因而,empty 被减为 0,生产者向缓冲区中退出数据,而后执行 P3 行,调用 sem_post(&full),把 full 从−1 变成 0,唤醒消费者。
在这种状况下,可能会有两种状况。如果生产者继续执行,再次循环到 P1 行,因为 empty 值为 0,它会阻塞。如果生产者被中断,而消费者开始执行,调用 sem_wait(&full),发现缓冲区的确满了,生产它。这两种状况都是合乎预期的。
能够持续推导,在 MAX= 1 时,即使有多个生产者和消费者的状况下,本示例代码依然失常运行。
咱们当初假如 MAX 大于 1,同时假设有多个生产者,多个消费者。那么就有问题了:竞态条件。假如两个生产者(Pa 和 Pb)简直同时调用 put()。当 Pa 先运行,在 f1 行先退出第一条数据(fill=0),假如 Pa 在将 fill 计数器更新为 1 之前被中断,Pb 开始运行,也在 f1 行给缓冲区的 0 地位退出一条数据,这意味着那里的数据被笼罩,这也就意味着生产者的数据失落。
减少互斥
能够看到,向缓冲区退出元素和减少缓冲区的索引是临界区,须要小心爱护起来。所以,咱们应用二值信号量作为锁来进行互斥。上面是对应的代码。
1 sem_t empty; | |
2 sem_t full; | |
3 sem_t mutex; | |
4 | |
5 void *producer(void *arg) { | |
6 int i; | |
7 for (i = 0; i < loops; i++) {8 sem_wait(&mutex); // line p0 (NEW LINE) | |
9 sem_wait(&empty); // line p1 | |
10 put(i); // line p2 | |
11 sem_post(&full); // line p3 | |
12 sem_post(&mutex); // line p4 (NEW LINE) | |
13 } | |
14 } | |
15 | |
16 void *consumer(void *arg) { | |
17 int i; | |
18 for (i = 0; i < loops; i++) {19 sem_wait(&mutex); // line c0 (NEW LINE) | |
20 sem_wait(&full); // line c1 | |
21 int tmp = get(); // line c2 | |
22 sem_post(&empty); // line c3 | |
23 sem_post(&mutex); // line c4 (NEW LINE) | |
24 printf("%d\n", tmp); | |
25 } | |
26 } | |
27 | |
28 int main(int argc, char *argv[]) { | |
29 // ... | |
30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... | |
31 sem_init(&full, 0, 0); // ... and 0 are full | |
32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock (NEW LINE) | |
33 // ... | |
34 } |
当初咱们给整个 put()/get()局部都减少了锁,就是正文中有 NEW LINE 的几行。这仿佛是正确的思路,但依然有问题——死锁。
假如有两个线程,一个生产者和一个消费者。消费者首先运行,取得锁,而后对 full 信号量执行 sem_wait()。因为还没有数据,所以消费者阻塞,让出 CPU。然而,问题来了,此时消费者依然持有锁。而后生产者运行,它首先对二值互斥信号量调用 sem_wait()。锁曾经被消费者持有,因而生产者也被卡住。
这里呈现了一个循环期待。消费者持有互斥量,期待在 full 信号量上。生产者能够发送 full 信号,却在期待互斥量。因而,生产者和消费者相互期待对方——典型的死锁。
最终计划
要解决这个问题,只需缩小锁的作用域,上面是最终的可行计划。能够看到,咱们把获取和开释互斥量的操作调整为紧挨着临界区,把 full、empty 的唤醒和期待操作调整到锁里面。就失去了简略而无效的有界缓冲区,多线程程序的罕用模式。
1 sem_t empty; | |
2 sem_t full; | |
3 sem_t mutex; | |
4 | |
5 void *producer(void *arg) { | |
6 int i; | |
7 for (i = 0; i < loops; i++) {8 sem_wait(&empty); // line p1 | |
9 sem_wait(&mutex); // line p1.5 (MOVED MUTEX HERE...) | |
10 put(i); // line p2 | |
11 sem_post(&mutex); // line p2.5 (... AND HERE) | |
12 sem_post(&full); // line p3 | |
13 } | |
14 } | |
15 | |
16 void *consumer(void *arg) { | |
17 int i; | |
18 for (i = 0; i < loops; i++) {19 sem_wait(&full); // line c1 | |
20 sem_wait(&mutex); // line c1.5 (MOVED MUTEX HERE...) | |
21 int tmp = get(); // line c2 | |
22 sem_post(&mutex); // line c2.5 (... AND HERE) | |
23 sem_post(&empty); // line c3 | |
24 printf("%d\n", tmp); | |
25 } | |
26 } | |
27 | |
28 int main(int argc, char *argv[]) { | |
29 // ... | |
30 sem_init(&empty, 0, MAX); // MAX buffers are empty to begin with... | |
31 sem_init(&full, 0, 0); // ... and 0 are full | |
32 sem_init(&mutex, 0, 1); // mutex=1 because it is a lock | |
33 // ... | |
34 } |
读者—写者锁
另一个经典问题源于对更加灵便的锁定原语的渴望,它抵赖不同的数据结构拜访可能须要不同类型的锁。例如,一个并发链表有很多插入和查找操作。插入操作会批改链表的状态,而查找操作只是读取该构造,只有没有进行插入操作,咱们能够并发的执行多个查找操作。读者—写者锁(reader-writer lock)就是用来实现这种操作的。上面是这种锁的代码。
1 typedef struct _rwlock_t {2 sem_t lock; // binary semaphore (basic lock) | |
3 sem_t writelock; // used to allow ONE writer or MANY readers | |
4 int readers; // count of readers reading in critical section | |
5 } rwlock_t; | |
6 | |
7 void rwlock_init(rwlock_t *rw) { | |
8 rw->readers = 0; | |
9 sem_init(&rw->lock, 0, 1); | |
10 sem_init(&rw->writelock, 0, 1); | |
11 } | |
12 | |
13 void rwlock_acquire_readlock(rwlock_t *rw) {14 sem_wait(&rw->lock); | |
15 rw->readers++; | |
16 if (rw->readers == 1) | |
17 sem_wait(&rw->writelock); // first reader acquires writelock | |
18 sem_post(&rw->lock); | |
19 } | |
20 | |
21 void rwlock_release_readlock(rwlock_t *rw) {22 sem_wait(&rw->lock); | |
23 rw->readers--; | |
24 if (rw->readers == 0) | |
25 sem_post(&rw->writelock); // last reader releases writelock | |
26 sem_post(&rw->lock); | |
27 } | |
28 | |
29 void rwlock_acquire_writelock(rwlock_t *rw) {30 sem_wait(&rw->writelock); | |
31 } | |
32 | |
33 void rwlock_release_writelock(rwlock_t *rw) {34 sem_post(&rw->writelock); | |
35 } |
如果某个线程要更新数据结构,须要调用 rwlock_acquire_writelock()取得写锁,调用 rwlock_release_writelock()开释写锁。外部通过一个 writelock 的信号量保障只有一个写者能取得锁进入临界区,从而更新数据结构。
获取读锁时,读者首先要获取 lock,而后减少 reader 变量,追踪目前有多少个读者在拜访该数据结构。当第一个读者获取读锁时,同时也会获取写锁,即在 writelock 信号量上调用 sem_wait(),最初调用 sem_post()开释 lock。
一旦一个读者取得了读锁,其余的读者也能够获取这个读锁。然而,想要获取写锁的线程,就必须等到所有的读者都完结。最初一个退出的读者在 writelock 信号量上调用 sem_post(),从而让期待的写者可能获取该锁。
这一计划可行,但有一些缺点,尤其是公平性,读者很容易饿死写者。存在简单一些的解决方案,比方有写者期待时,防止更多的读者进入并持有锁。最初,读者 - 写者锁通常退出了更多锁操作,因而和其余一些简略疾速的锁相比,读者—写者锁在性能方面没有劣势。
如何实现信号量
最初,咱们用底层的同步原语锁和条件变量,来实现本人的信号量,名字叫作 Zemaphore。
1 typedef struct _Zem_t { | |
2 int value; | |
3 pthread_cond_t cond; | |
4 pthread_mutex_t lock; | |
5 } Zem_t; | |
6 | |
7 // only one thread can call this | |
8 void Zem_init(Zem_t *s, int value) { | |
9 s->value = value; | |
10 Cond_init(&s->cond); | |
11 Mutex_init(&s->lock); | |
12 } | |
13 | |
14 void Zem_wait(Zem_t *s) {15 Mutex_lock(&s->lock); | |
16 while (s->value <= 0) | |
17 Cond_wait(&s->cond, &s->lock); | |
18 s->value--; | |
19 Mutex_unlock(&s->lock); | |
20 } | |
21 | |
22 void Zem_post(Zem_t *s) {23 Mutex_lock(&s->lock); | |
24 s->value++; | |
25 Cond_signal(&s->cond); | |
26 Mutex_unlock(&s->lock); | |
27 } |
咱们实现的信号量和 Dijkstra 定义的信号量有一点轻微区别,就是咱们没有放弃当信号量的值为正数时,让它反映出期待的线程数。事实上,该值永远不会小于 0。这一行为更容易实现,并合乎现有的 Linux 实现。