共计 16913 个字符,预计需要花费 43 分钟才能阅读完成。
前言
- 本篇是对于 MIT 6.S081-2020-Lab7(Multithreading) 的实现;
- 如果内容上发现有什么问题请不要悭吝您的键盘。
筹备工作
这次试验不能像做前三次试验一样吃老本莽过来了,须要提前浏览 xv6 book 的 Scheduling 一章。但还没看过 Interrupts and device drivers 和 Locking 这两章,所以就三章一起看了。筹备工作这里就摘抄了一些概念,次要是给本人了解所记录的,不便日后重温。“设施驱动”、“线程调度”、“Lost Wakeup”这几个局部的内容比拟重要。
Interrupts and device drivers
I/O 接口和中断控制器
因为 CPU 与像 console, keyboard 这样的外设(I/O 设施)的连贯不能像 memory 一样间接挂载到总线(DB, AB, CB)上,须要通过各自的专用的接口电路或者接口芯片来辅助实现,这种接口被称为 I/O 接口。
不像 memory,因为外设品种的繁多,I/O 接口也会比较复杂。但个别地,I/O 接口都具备以下构造:
-
数据缓冲区:
-
用来存放数据信息,数据信息也分三种类型:
- 数字量(如 ASCII 码)
- 模拟量(如温度、压力等间断变动的物理量,须要通过数模模数转换)
- 开关量(用一位二进制数示意)
-
-
状态寄存器:
-
反映 I/O 接口所连贯的 I/O 设施工作状态信息:
- 输出设施的数据是否筹备好,不便读?
- 输出设备是否闲暇,不便写?
-
-
管制寄存器:
-
使编程设施成为可能(设施驱动程序)
- 简略的比方管制 I/O 设施的启动或进行
-
-
中断管制逻辑:
- 向 CPU 发送中断信号
在 xv6 中,内部中断源次要有 UART 和 virtio_disk。产生中断时,零碎会通过返回的中断号来判断哪一个 I/O 接口产生了中断。特地地,在 x86 架构下,零碎会将中断号作为索引,在中断向量表中寻找相应的中断处理程序并执行。但在 xv6 下的中断设施很少,所以一个分支就能搞定(具体见 kernel/trap.c
文件中的 int devintr()
函数)。
当这些设施连贯到主板后,会通过 memory-mapped I/O 机制坐落在内存物理地址的不同的地位上,像是 UART 就在 0x10000000L
。因为 QEMU 只是模仿的 SiFive 主板,设施理论座落的地位会很 SiFive 说明书上的有些出入。另外,QEMU 所模仿的 UART 设施芯片的型号是 16550,能够间接查到相应的编程手册。明天的配角 UART 是负责与 console 和 keyboard 交互的设施,中文叫“异步收发传输器”。
中断控制器是专门用来解决中断的管制芯片。它用于在多个中断源的零碎中,帮助 CPU 实现对外部中断请求的治理。在 xv6 中,中断控制器是 PLIC,它能够最多接管 53 个不同设施的中断。这些中断达到 PLIC 后,PLIC 会将中断路由到一个没有在解决中断的 CPU hart 去解决它。如果所有的 CPU hart 都在解决中断,PLIC 会保留中断直到有一个 CPU hart 能够用来解决中断。具体的流程如下:
- PLIC 告诉所有的 CPU hart 以后有一个待处理的中断;
- 有且仅有一个 CPU hart 会 Claim 接管中断,之后 PLIC 不会把中断发给其它的 CPU hart 去解决;
- CPU hart 解决完中断后告诉 PLIC;
- PLIC 不在保留中断信息。
中断嵌套是指终端零碎正在执行一个中断服务时,有另一个优先级更高的中断提出中断请求,这时会临时进行以后正在执行的级别较低的中断源的服务程序,去解决级别更高的中断源,待处理完毕,再返回到被中断了的中断服务程序继续执行的过程。目前我还没有发现 xv6 中有不同优先级的中断,因而应该是没有中断嵌套的状况呈现的。
设施驱动
个别对 I/O 设施的操作都是初始化后再进行读写,这和创立文件、读写文件的操作非常相似,因而 xv6 就将这些 I/O 设施形象成文件,个别都对外提供对这个设施的 init
, read
和 write
操作。进一步地,对 I/O 接口的操作也是相似,不过多了一个中断解决的操作,因而不能用文件来形象了。而这些操作的具体的实现就被称为是设施驱动了。
以 UART Driver 举例,我找到了对于它的形容,文章比拟短,倡议间接看完。简略来说,为了传输字符,UART 外部有两个 FIFO 队列,别离是 receiver FIFO(简称 RFIFO)和 transmitter FIFO(简称 TFIFO)。UART 会触发一个中断,当且仅当 RFIFO 为满或 TFIFO 为空。
UART Driver 的充当底半部的 Interrupt Handler 会将 RFIFO 中的字符读到 Input Buffer 中去(通过读 RHR,先查看 LSR 第 0 位看看数据是否 ready 好),并且将 Output Buffer 中的字符写到 TFIFO 中去(通过写 THR,先查看 LSR 第 5 位看看是否闲暇),以上是底半部的工作。
顶半部则是用户程序去往 Input Buffer(在 kernel/console.c
中的 cons.buf
和 int consoleread(int, uint64, int)
)或 Output Buffer(在 kernel/uart.c
中的 uart_tx_buf
和 void uartputc(int)
)进行读或写。就这样,通过两组 Buffer-FIFO 对就实现了顶半部和底半部的解耦。
来看一下,shell 在调用 getcmd()
后的调用链为 fprint()
->vfprint()
->putc()
->write()
->sys_write()
->filewrite()
->consolewrite()
->uartputc()
。
/* kernel/console.c */
int
consolewrite(int user_src, uint64 src, int n)
{
int i;
acquire(&cons.lock);
for(i = 0; i < n; i++){
char c;
if(either_copyin(&c, user_src, src+i, 1) == -1)
break;
uartputc(c);
}
release(&cons.lock);
return i;
}
留神在执行 consolewrite()
时会先获取 cons.lock
,直到把一行字串全副输入完,免得有在其它 hart 上并行运行的过程去往 console 上交错输入。
/* kernel/uart.c */
void
uartputc(int c)
{acquire(&uart_tx_lock);
if(panicked){for(;;)
;
}
while(1){if(((uart_tx_w + 1) % UART_TX_BUF_SIZE) == uart_tx_r){
// buffer is full.
// wait for uartstart() to open up space in the buffer.
sleep(&uart_tx_r, &uart_tx_lock);
} else {uart_tx_buf[uart_tx_w] = c;
uart_tx_w = (uart_tx_w + 1) % UART_TX_BUF_SIZE;
uartstart();
release(&uart_tx_lock);
return;
}
}
}
void
uartstart()
{while(1){if(uart_tx_w == uart_tx_r){
// transmit buffer is empty.
return;
}
if((ReadReg(LSR) & LSR_TX_IDLE) == 0){
// the UART transmit holding register is full,
// so we cannot give it another byte.
// it will interrupt when it's ready for a new byte.
return;
}
int c = uart_tx_buf[uart_tx_r];
uart_tx_r = (uart_tx_r + 1) % UART_TX_BUF_SIZE;
// maybe uartputc() is waiting for space in the buffer.
wakeup(&uart_tx_r);
WriteReg(THR, c);
}
}
在 uartputc()
中会向 Output Buffer 写入一个字节,属于是 UART 顶半部的操作;之后在 uartstart()
中,将 Output Buffer 中的一个字节写到 UART 的 THR 寄存器中后间接返回,之后把该字节传给 Console 的工作就交给 UART 了。等到 $
和 \space
这两个字符都被 UART 发送到 Console 后,UART 发现自己的 TFIFO 空掉了,于是就触发一个中断。实际上,当某个 CPU hart 响应中断后,最终会调用 uartintr()
这个 Interrupt Handler:
/* kernel/uart.c */
void
uartintr(void)
{
// read and process incoming characters.
while(1){int c = uartgetc();
if(c == -1)
break;
consoleintr(c);
}
// send buffered characters.
acquire(&uart_tx_lock);
uartstart();
release(&uart_tx_lock);
}
因为没有任何 I/O 设施(键盘)给 UART 输出数据,上边的 while 循环会间接 break 掉,之后再一次地执行 uartstart()
。但这一次在 uartstart()
中,因为读写指针雷同,函数会间接返回,等于说这边中断解决什么工作也没有做,这个中断相当于只是在通知 CPU:“你交给我的那些字节我都曾经解决完了”。这边就是 UART 驱动的底半部所要做的工作。
以上是 shell 想要在 Console 画点什么货色的状况。当然,shell 的 getcmd()
前面还会通过 gets()
来获取用户从键盘上的输出,同样也要通过 UART 的中断解决,大体过程和下面也是相似的,不过 Console 还心愿把用户输出的键对应的字符再在 Console 上画进去,状况会比下面要多一点货色(具体地,uartintr()
会调用 consoleintr()
,这次底半部终于做了点实质性的工作)。
Locking
锁的应用
21 世纪以来,单核 CPU 的时钟频率晋升不显著,使得单线程效率达到一个瓶颈,于是呈现了多核零碎来躲避这种性能上的限度。但一个不言而喻的问题是,多个线程对共享数据进行读写而不加以限度时,通常会呈现数据不统一景象,特地地,这被称作“竞争条件(Race Condtion)”。xv6 引入了锁机制,使得这些线程在访问共享数据时串行地执行,从而防止 Race Condition。
一个无效的办法是,通过在“临界区域 (Critical Section)”高低 aquire(lock)
和 release(lock)
,来让临界区域内的代码原子性地执行,即 要么不做,要么做完 。但在这么做的同时,也要思考 锁的粒度 ,如果粒度大到整个内核只有一个锁(Big Kernel Lock),根本每次零碎调用都要 acquire 这把锁,执行完返回用户空间之前再开释掉,使得整个零碎毫无并行性可言,锁的毛病(就义性能)被放大到了最大。最初为了防止出现死锁景象,获取和开释锁的 机会 也很重要。所以具体怎么做齐全取决于程序员本人,即这个模块的某段指令需不需要原子操作,粒度要多大能力进行最大水平的并行(通常由粗到细一个个试)等等。
自旋锁的实现
/* kernel/spinlock.c */
void
acquire(struct spinlock *lk)
{push_off(); // disable interrupts to avoid deadlock.
if(holding(lk))
panic("acquire");
// On RISC-V, sync_lock_test_and_set turns into an atomic swap:
// a5 = 1
// s1 = &lk->locked
// amoswap.w.aq a5, a5, (s1)
while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
;
// Tell the C compiler and the processor to not move loads or stores
// past this point, to ensure that the critical section's memory
// references happen strictly after the lock is acquired.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Record info about lock acquisition for holding() and debugging.
lk->cpu = mycpu();}
void
release(struct spinlock *lk)
{if(!holding(lk))
panic("release");
lk->cpu = 0;
// Tell the C compiler and the CPU to not move loads or stores
// past this point, to ensure that all the stores in the critical
// section are visible to other CPUs before the lock is released,
// and that loads in the critical section occur strictly before
// the lock is released.
// On RISC-V, this emits a fence instruction.
__sync_synchronize();
// Release the lock, equivalent to lk->locked = 0.
// This code doesn't use a C assignment, since the C standard
// implies that an assignment might be implemented with
// multiple store instructions.
// On RISC-V, sync_lock_release turns into an atomic swap:
// s1 = &lk->locked
// amoswap.w zero, zero, (s1)
__sync_lock_release(&lk->locked);
pop_off();}
如果只是像 while(lk->locked == 1)
这样循环判断,锁仍不具备原子操作,因为如果同时有两个线程并行地执行到了这一句,而且发现 lk->locked == 0
,那么它们都会拿到这个锁,这显然是谬误的。
个别地,大部分软件锁的实现都须要依附硬件提供的原子性操作来实现(我不晓得是不是全副)。RISC-V 中的 amoswap
(atomic memory swap) 指令就是个原子性操作:amoswap
接管三个参数,别离是 address
, r1
, r2
。它会先把 address
锁定住,把 *address
保留再一个长期变量 tmp
中,之后 *address = r1
,而后 r2 = temp
,最初再对地址解锁。整个过程 r1
的值没有变,只是把 *address
和 r2
的值替换了。
相似的硬件原子性操作有很多,但根本与 amoswap
的过程别无二致。但不同处理器在这种原子操作的实现可能会很不一样,取决于其依赖的内存零碎是怎么工作的:
- 内存控制器能够给指定的某个地址加锁和解锁。并且因为所有的 hart 都须要通过这里的内存控制器实现读写,所以内存控制器能够对操作进行排序和加锁。
- 当内存位于一个共享的 Bus 时,则须要“总线控制器”(bus arbiter) 来反对以原子性的形式执行多个内存操作。
- 当处理器内有 Cache 时,处理器能够对某个 Cache Line 加锁,以确保只有一个写入者。
对于锁的最重要的局部介绍完了,但还有两个细节须要解决,否则谬误将是 fatal 的。首先要在 acquire()
的一开始调用 push_off()
关中断,再在 release()
的最初调用 pop_off()
开中断。这样做的起因是预防死锁。
锁与中断
锁要防止两种并发所带来的谬误,一种不同 CPU hart 之间线程的并发,还有一种是设施中断和失常在 CPU 上执行程序之间的并发。
回到 kernel/uart.c
中的 uartputc()
函数。shell 调用 consolewrite()
时,须要调用这个函数。执行 uartputc()
的第一件事就是 acquire(&uart_tx_lock)
,而如果这时 UART 发送一个中断给 CPU,最终会调用 uartintr()
这个 Interrupt Handler,而在这个 Interrupt Handler 里又会 acquire(&uart_tx_lock)
。试想如果 acquire()
时没无关中断,Interrupt Handler 会始终期待 uartputc()
去开释锁,但除非 Interrupt Handler 执行完,uartputc()
是不会开释锁的,因而这就产生了死锁。所以对于 acquire()
,关中断是个必须的动作。
锁与编译器优化
编译器有时为了程序执行效率的进步,会去做各种优化,包含但不限于调整执行执行的程序,这被称为是 memory ordering。因为锁与 critical section 之间齐全互相独立的,编译器可能会把在 critical section 中对某些共享数据批改的指令挪到开释锁之后,然而这对并发执行而言是个劫难。所以为了通知编译器不要做这样子的优化,咱们须要应用 memory fence 或者叫做 synchronize 指令来限度这样的优化。对于 synchronize 指令,任何在它之前的 load/store 指令都不能挪动到他之后。
Scheduling
线程调度
线程和过程的概念其实分地不是太开。据 Rober 传授集体的了解:每个过程有两个线程,一个用户空间线程,一个内核空间线程,并且存在限度使得一个过程要么运行在用户空间线程,要么为了执行零碎调用或者响应中断而运行在内核空间线程,然而永远也不会两者同时运行。
如果要用一个坐标来惟一形容一个运行着的线程的话,这个坐标至多要包含这些:地址空间(text, data), stack, pc, GPR。
不同线程可能存在地址空间共享的状况,比方不同用户线程内的代码和全局变量都是不共享的,但在内核线程大家都是共享同一个地址空间上的内核代码和全局数据结构。但能够确定的是,能够通过判断两个 stack 是否雷同来辨别两个不同的线程。即使在 lab3-pagetable 使所有用户过程都共用一个 kstack,但在内核线程切换到用户线程是 kstack 永远都是空的,所以能够看作所有内核线程都有本人独立的一份 kstack。特地地,咱们能够随时保留线程的状态并暂停线程的运行,并在之后通过复原状态来复原线程的运行。这个状态包含:
- PC:示意以后线程执行到了哪条指令;
- GPR(通用寄存器):保留有执行指令所需的长期变量的值;
- stack:保留有函数历史调用记录。
之所以没有 text 和 data 是因为这些数据始终都会在内存,不怕找不到。这三个状态将会被保留在 Trapframe 或 context 中。从一个用户线程调度到另一个用户线程的状态机图如下(没找什么适宜的工具画,尽管比拟潦草,但内容应该还算清晰,十分重要),这边成心疏忽了 yield()
和 sched()
,假设 yield()
就是 swtch()
:
须要明确一下 CPU hart 和线程的对应关系:每个 CPU hart 在任意时刻最多只会运行一个线程,它要么是运行用户过程的线程,要么是运行内核线程,要么是运行这个 CPU hart 对应的调度器线程。在任意时刻同一个线程永远不会运行在多个 CPU hart 上,要么线程运行在一个 CPU hart 上,要么就没有运行。
再补充些传授下面的话,一个用户过程在用户态运行时在 CPU 上运行的是用户过程的用户线程,用的是用户栈(ustack);这个用户过程在调用了一个零碎调用,陷入了内核态,此时在 CPU 上运行的就是用户过程的内核线程,用的是内核栈(kstack);当在进行过程调度时,在以后 CPU hart 上运行的就是 scheduler 线程,用的是最开始在 kernel/entry.S
时所调配的栈(sp = stack0 + (hartid * 4096)
)。每个 CPU hart 都有本人的 scheduler,scheduler 线程也是一种内核线程。
/* kernel/proc.c */
void
scheduler(void)
{
struct proc *p;
struct cpu *c = mycpu();
c->proc = 0;
for(;;){
// Avoid deadlock by ensuring that devices can interrupt.
intr_on();
int nproc = 0;
for(p = proc; p < &proc[NPROC]; p++) {acquire(&p->lock);
if(p->state != UNUSED) {nproc++;}
if(p->state == RUNNABLE) {
// Switch to chosen process. It is the process's job
// to release its lock and then reacquire it
// before jumping back to us.
p->state = RUNNING;
c->proc = p;
swtch(&c->context, &p->context);
// Process is done running for now.
// It should have changed its p->state before coming back.
c->proc = 0;
}
release(&p->lock);
}
if(nproc <= 2) { // only init and sh exist
intr_on();
asm volatile("wfi");
}
}
}
在遍历过程列表的每一次的迭代的最开始要首先获取这个过程的锁。否则,因为 if (p->state == RUNNABLE)
的那个分支下的操作不是个原子操作,别的 CPU hart 上的调度器线程不小心看到这个线程 p->state == RUNNABLE
后也想运行它。另一方面是的顾虑是因为中断,如果是其它设施的中断还好,如果在 swtch()
的中途产生了一个 timer interrupt,那将会再次进行调度,用这个不残缺 context 去参加调度可不是什么好事件。
值得一提的是,xv6 不容许内核线程在执行 swtch()
切换到调度器线程时持有其它资源的锁。因为如果 $P_i$ 持有了资源 $S$ 的锁后,调度器切换到了 $P_j$ 后,如果 $P_j$ 也正好想要应用 $S$,就会产生死锁,这时也不会有定时器中断来援救 $P_j$,因为 $P_j$ acquire(&S->lock)
会关中断。
/* kernel/swtch.S */
.globl swtch
swtch:
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)
ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)
ret
swtch()
保留的这些寄存器都是 callee-saved 寄存器,而 swtch()
是从 C 代码调用的,所以 caller-saved 寄存器会被 C 编译器保留在以后的栈上。
当一个刚刚调配的过程是如何参加调度器的调度的?allocproc()
里会执行 p->context.ra = (uint64)forkret
。意味着调度器线程在执行 swtch(&c->context, &p->context)
时,函数会跳转到 forkret()
的开始处。而 forkret()
还会调用 usertrapret()
,使用户过程从内核线程执行变为用户线程执行。从代码中看,它的工作其实就是开释调度器之前获取的锁(在 forkret()
中会 release(&myproc()->lock)
)。usertrapret()
其实是一个假的函数,它会使得程序体现的看起来像是从 trap 中返回,然而对应的 trapframe 其实也是假的,这样能力跳到用户的第一个指令中。
Sleep&Wakeup
当线程须要做一些 I/O 事件时,通常须要将这个线程的状态设置为 SLEEPING,因为 I/O 事件的处理速度绝对于 CPU 的执行速度来说切实太慢太慢了(大略是等 3 天和等 1s 的感觉),而线程不从这个 I/O 事件获取所需或发送数据也就无奈实现上面的工作,因而为了充分利用这段 CPU 闲暇期,须要进行线程调度让其它线程在这个 hart 上跑从而进步 CPU 利用率。
当调用 wakeup()
时,咱们只想唤醒正待期待刚刚产生的特定 I/O 事件的线程,因而 sleep()
和 wakeup()
须要通过某种通信形式链接在一起,所以 sleep()
和 wakeup()
函数都带有一个叫 sleep channel 的参数,来表明咱们所期待的特定 I/O 事件。调用 wakeup()
时,须要传入与当初调用 sleep()
时雷同的 64bit 的 sleep channel。
Lost Wakeup
/* kernel/uart.c */
void
uartputc(int c)
{acquire(&uart_tx_lock);
if(panicked){for(;;)
;
}
while(1){if(((uart_tx_w + 1) % UART_TX_BUF_SIZE) == uart_tx_r){
// buffer is full.
// wait for uartstart() to open up space in the buffer.
sleep(&uart_tx_r, &uart_tx_lock);
} else {uart_tx_buf[uart_tx_w] = c;
uart_tx_w = (uart_tx_w + 1) % UART_TX_BUF_SIZE;
uartstart();
release(&uart_tx_lock);
return;
}
}
}
察看 uartputc()
的逻辑,首先须要 acquire(&uart_tx_lock)
,这是因为在多线程并发状况下,uartputc()
要对 Output Buffer 的读写指针这两个共享变量做爱护。咱们须要在线程在 sleep()
时调度走之前把 uart_tx_lock 开释掉,因为中断产生时 uartintr()
再次获取这个锁。这边假如 sleep()
只是简略的 release(&uart_tx_lock)
,并没有做其它额定操作,是 xv6 源码中的实在 sleep()
实现的进化版实现。
留神到,release(&uart_tx_lock)
之后,sched()
之前,有一小段的空窗期,并且在这段空窗期中,过程的状态可能是 SLEEPING,可能是 RUNNING,取决于有没有执行到 p->state = SLEEPING
这条语句。
如果在这段空窗期中,该 sleep channel 对应的 I/O 事件完结了,这将会触发一个中断,控制权将交给 uartintr()
去 wakeup()
对应的过程。但很显著,如果过程的状态为 RUNNING,它 wakeup()
就不会去唤醒这个过程。在之后如果没有雷同 sleep channel 的 I/O 事件中断去唤醒这个 sleep()
,这个过程会始终 SLEEPING 上来。这个景象被称作 Lost Wakeup。
想要打消这段空窗期,须要将 release(&uart_tx_lock)
与 p->state = SLEEPING
合并为一个原子操作。所以咱们须要再来一个锁,来保障操作执行的原子性。
/* kernel/proc.c */
void
wakeup(void *chan)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++) {acquire(&p->lock);
if(p->state == SLEEPING && p->chan == chan) {p->state = RUNNABLE;}
release(&p->lock);
}
}
察看到 wakeup()
在遍历过程列表时,每次迭代都要 acquire(&p->lock)
,因为它要拜访过程的共享变量 p->state
。于是,咱们能够在 sleep()
也获取以后过程的锁,来保障操作的原子性:
/* kernel/proc.c */
void
sleep(void *chan, struct spinlock *lk)
{struct proc *p = myproc();
// Must acquire p->lock in order to
// change p->state and then call sched.
// Once we hold p->lock, we can be
// guaranteed that we won't miss any wakeup
// (wakeup locks p->lock),
// so it's okay to release lk.
if(lk != &p->lock){ //DOC: sleeplock0
acquire(&p->lock); //DOC: sleeplock1
release(lk);
}
// Go to sleep.
p->chan = chan;
p->state = SLEEPING;
sched();
// Tidy up.
p->chan = 0;
// Reacquire original lock.
if(lk != &p->lock){release(&p->lock);
acquire(lk);
}
}
可能会有疑难,如果我在 sleep()
开释了锁(release(lk)
),它不会开中断么?实际上这里是个锁的嵌套,push_off()
和 pop_off()
是为嵌套应用锁的状况下,断定并开关中断的接口。因而即便 release(lk)
了也不会开中断。之后调度器线程会将这个过程锁开释掉。
因为两个过程在调用 sleep()
时,可能都传入同一个 sleep channel,比方有两个过程都想调用 uartputc()
。留神到 wakeup()
的实现,它会遍历过程列表找到所有过程的 sleep channel 与其相匹配的过程并将其唤醒。所以会有意外唤醒的状况呈现,即可能是别的过程所期待的 I/O 事件实现了,但却把本人也给叫起来了,所以须要持续 sleep()
上来。这种景象貌似还挺常见,因而简直所有调用 sleep()
的中央须要套上一个循环。
事实上,还有一些更高级的 Coordination 实现形式,例如“信号量”(Semaphore)。调用者不须要向 Semaphore 传入 condition lock,它本身外部有一个 up-down 计数器来解决 Lost Wakeup 的问题的。但据说 Semaphore 没有 sleep()
和 wakeup()
通用,我就不去考据为什么了,因为这跟咱们的试验无关。
exit&wait
打个比拟粗鄙的比喻,
exit()
像是自尽,wait()
就是父过程给子过程收尸。为什么要这么设计,因为没有人自我了结后还能顺带给本人收尸的,都是他人替他收尸。
xv6 的 exit()
只做了以下几件事:
- 敞开该过程所有文件;
- 将该过程的所有子过程的父过程设置为 initproc;
- 唤醒父过程和 initproc;
- 将该过程的状态设置为 ZOMBIE。
能够看到 exit()
并没有开释过程资源,并且一旦过程状态设置为 ZOMBIE 后,它就再也不会被调度器选中运行,一个再也不会运行的过程和 UNUSED 没什么区别。
xv6 的 wait()
做了以下几件事:
sleep()
直到子过程将本人唤醒;- 从过程列表中找到子过程是 ZOMIE 的,对它调用
freeproc()
从而开释过程资源。
kill
要想 kill()
一个在执行用户线程的过程还好,kill()
一个执行内核线程的过程就会有问题了,因为它可能在更新一些重要的内核数据,比方正在更新文件系统,创立一个文件,因而咱们心愿它把这个操作实现后再 kill()
它。
/* kernel/proc.c */
int
kill(int pid)
{
struct proc *p;
for(p = proc; p < &proc[NPROC]; p++){acquire(&p->lock);
if(p->pid == pid){
p->killed = 1;
if(p->state == SLEEPING){// Wake process from sleep().
p->state = RUNNABLE;
}
release(&p->lock);
return 0;
}
release(&p->lock);
}
return -1;
}
如果是在用户线程被 kill()
了,那么在它下一次的零碎调用或是设施中断时,usertrap()
会执行 exit(-1)
将其退出。
如果是在内核线程被 kill()
了,不巧的是正在过程正在 SLEEPING,比方调用 piperead()
从 pipe 中获取一串字符,但另一端迟迟不写数据到 pipe 中去,就这样一个被 killed 的过程始终没有开释资源,显然是不合理的。
因而你能够看到 xv6 的 kill()
的实现中,如果过程的状态是 SLEEPING 会被动将其设置为 RUNNING,使该过程被调度器从新运行。之后 piperead()
会在合适的时候判断该过程是否被 killed 了,平安地返回退出。这是所有 I/O 类零碎调用须要做的一件事。
试验局部
Uthread: switching between threads
第一个试验实现模仿的是一个单核多用户线程并发,它无奈做到并行,相当于是个小玩具程序了。这和 xv6 的线程调度过程很不一样,不同用户线程之间是能够间接切换上下文的,甚至连调度器线程都不须要,线程调度函数都是用户线程被动去调用的:
/* user/uthread_switch.S */
.text
/*
* save the old thread's registers,
* restore the new thread's registers.
*/
.globl thread_switch
thread_switch:
/* YOUR CODE HERE */
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)
ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)
ret /* return to ra */
user/uthread_switch.S
的内容与 kernel/swtch.S
的内容别无二致,照抄即可。
/* user/uthread.c */
struct context {
uint64 ra;
uint64 sp;
// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};
struct thread {char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct context context;
};
void
thread_schedule(void)
{
struct thread *t, *next_thread;
/* Find another runnable thread. */
...
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)&t->context, (uint64)¤t_thread->context);
} else
next_thread = 0;
}
void
thread_create(void (*func)())
{
struct thread *t;
for (t = all_thread; t < all_thread + MAX_THREAD; t++) {if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->context.ra = (uint64)func;
t->context.sp = (uint64)&t->stack[STACK_SIZE];
}
中等难度的个别没什么坑点,留神栈是向低地址增长的就好。
Using threads
有意思的是当我把锁的粒度弄得特地大,使得所有线程在 put()
时都要串行执行,尽管没有 put 操作谬误,但耗时反而要比单线程更多了,阐明加锁解锁是个比拟耗时的操作,也就解释了为什么要有 Lock-free Program 存在的必要了。
所以咱们必定要放大锁的粒度,missing key 的问题必定是进去批改桶内键值对单链表的头节点那块不是一个原子性的操作,咱们能够在其高低加锁和开释锁,甚至还能够调整一下 insert()
内执行指令的程序:
/* notxv6/ph.c */
pthread_mutex_t mutex;
static void
insert(int key, int value, struct entry **p, struct entry *n)
{struct entry *e = malloc(sizeof(struct entry));
*p = e;
pthread_mutex_unlock(&mutex);
e->key = key;
e->value = value;
e->next = n;
}
static
void put(int key, int value)
{
...
if(e){
// update the existing key.
e->value = value;
} else {
// the new is new.
pthread_mutex_lock(&mutex);
insert(key, value, &table[i], table[i]);
}
}
int
main(int argc, char *argv[])
{
...
pthread_mutex_init(&mutex, NULL);
//
// first the puts
//
...
//
// now the gets
//
...
printf("%d gets, %.3f seconds, %.0f gets/second\n",
NKEYS*nthread, t1 - t0, (NKEYS*nthread) / (t1 - t0));
pthread_mutex_destroy(&mutex);
}
只管这能通过 ph_safe 和 ph_fast,但这样子的代码仍是线程不平安的。具体来说,put()
进去的 key 是有可能反复的,因为可能 random()
出两个雷同的随机数。假如线程 0 调用 put(42, 0)
,线程 1 调用 put(42, 1)
,并且在这之前的 i = 42 % NBUCKET
这个桶中没有 $key=42$ 的键值对。在当 put(42, 0)
中,线程 1 在刚遍历某一个桶中的所有键值对时,线程 2 就实现了插入操作,那么最终这个桶中就会有两个 $key=42$ 的键值对,对于 ph.c
程序来说这是一种谬误。
Barrier
![make grade](make grade.png)/* notxv6/barrier.c */
static void
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//
pthread_mutex_lock(&bstate.barrier_mutex);
if (++bstate.nthread == nthread) {
++bstate.round;
bstate.nthread = 0;
pthread_cond_broadcast(&bstate.barrier_cond);
} else {pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
}
pthread_mutex_unlock(&bstate.barrier_mutex);
}
尽管提醒也说了,然而还是要强调的一点是 pthread_cond_wait()
的实现和 xv6 sleep()
的实现很像,尤其是针对解决 Lost Wakeup 问题从而要传入一个 mutex 的这一点。
最初再贴一个 make grade 的截图:
后记(废话)
学校的安排课业常常做一点就开始无聊犯困,当脑子开始思考做这些到底有什么意义的时候,察觉电脑曾经不盲目地把虚拟机关上来了,于是乎算是在周末完结之前把多线程的试验做进去了吧,期间工夫大多都是花在概念的了解和整顿上。
下一个锁的试验尽管不晓得能不能在放假回家前做掉,但必定至多要在除夕之后了,因为学校这边再摸怕是要出事件,考试周温习感觉切实没意思可能会摸出工夫再做掉。
参考链接
- pthread_cond_wait
- MIT 6.S081 Book