共计 11733 个字符,预计需要花费 30 分钟才能阅读完成。
Memory allocator (moderate)
要求
程序 user/kalloctest
强调 xv6 的内存分配器:三个过程增长和放大它们的地址空间,导致对 kalloc
和 kfree
的许多调用。kalloc
和 kfree
获取 kmem.lock
。Kalloctest 打印(作为“#test-and-set”)在acquire
中因为尝试获取另一个内核曾经持有的锁(用于 kmem
锁和其余一些锁)而循环迭代的次数。获取中的循环迭代次数是锁争用的粗略度量。kalloctest
的输入如下所示:
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #test-and-set 83375 #acquire() 433015
lock: bcache: #test-and-set 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #test-and-set 83375 #acquire() 433015
lock: proc: #test-and-set 23737 #acquire() 130718
lock: virtio_disk: #test-and-set 11159 #acquire() 114
lock: proc: #test-and-set 5937 #acquire() 130786
lock: proc: #test-and-set 4080 #acquire() 130786
tot= 83375
test1 FAIL
start test2
total free number of pages: 32497 (out of 32768)
.....
test2 OK
start test3
child done 1
child done 100000
test3 OK
start test2
total free number of pages: 32497 (out of 32768)
.....
test2 OK
start test3
child done 1
child done 100000
test3 OK
您可能会看到与此处显示的计数不同,并且前 5 个争用锁的程序也不同。
acquire
为每个锁保护要获取该锁的调用计数,以及获取中的循环尝试但未能设置锁的次数。Kalloctest 调用一个零碎调用,使内核打印 KMEM 和 bcache 锁(这是本练习的重点)以及 5 个最有争议的锁的计数。如果存在锁争用,则 acquire
循环迭代的次数将很大。零碎调用返回 kmem 和 bcache 锁的循环迭代次数之和。
对于此试验,必须应用具备多核计算机。如果您应用正在执行其余操作的机器,则 kalloctest 打印的计数将是无稽之谈。您能够应用专用的 Athena 工作站或您本人的笔记本电脑,但不要应用拨号机。
kalloctest 中锁争用的根本原因是 kalloc()
有一个受单个锁爱护的闲暇列表。要打消锁争用,您必须从新设计内存分配器以防止单个锁和列表。
- 根本思维是为每个 CPU 保护一个闲暇列表,每个列表都有本人的锁。不同 CPU 上的调配和开释能够并行运行,因为每个 CPU 将在不同的列表上运行。
- 次要挑战是解决一个 CPU 的可用列表为空,但另一个 CPU 的列表具备可用内存的状况; 在这种状况下,一个 CPU 必须“窃取”另一个 CPU 的可用列表的一部分。
-
窃取可能会引入锁争用,但心愿这种状况并不常见。
您的工作是实现每个 CPU 的闲暇列表,并在 CPU 的闲暇列表为空时窃取。您必须为所有以“kmem”结尾的锁命名。也就是说,您应该为每个锁调用
initlock
,并传递一个以“kmem”结尾的名称。运行 kalloctest 以查看您的实现是否缩小了锁争用。要查看它是否依然能够调配所有内存,请运行usertests sbrkmuch
。您的输入将相似于上面显示的输入,只管具体数字会有所不同,但 kmem 锁上的争用总数大大减少。确保usertests -q
中的所有测试都通过。$ kalloctest start test1 test1 results: --- lock kmem/bcache stats lock: kmem: #test-and-set 0 #acquire() 42843 lock: kmem: #test-and-set 0 #acquire() 198674 lock: kmem: #test-and-set 0 #acquire() 191534 lock: bcache: #test-and-set 0 #acquire() 1242 --- top 5 contended locks: lock: proc: #test-and-set 43861 #acquire() 117281 lock: virtio_disk: #test-and-set 5347 #acquire() 114 lock: proc: #test-and-set 4856 #acquire() 117312 lock: proc: #test-and-set 4168 #acquire() 117316 lock: proc: #test-and-set 2797 #acquire() 117266 tot= 0 test1 OK start test2 total free number of pages: 32499 (out of 32768) ..... test2 OK start test3 child done 1 child done 100000 test3 OK $ usertests sbrkmuch usertests starting test sbrkmuch: OK ALL TESTS PASSED $ usertests -q ... ALL TESTS PASSED $
提醒
- 您能够应用来自
kernel/param.h
的常量NCPU
- 让
freerange
将所有可用内存提供给运行freerange
的 CPU。 - 函数
cpuid
返回以后内核编号,但只有在中断敞开时调用它并应用其后果才是平安的。您应该应用push_off()
和pop_off()
来敞开和关上中断。 - 看看
kernel/sprintf.c
中的snprintf
函数,理解字符串格局的想法。不过,将所有锁命名为“kmem”是能够的。 -
(可选)应用 xv6 的竞争检测器运行您的解决方案
$ make clean $ make KCSAN=1 qemu $ kalloctest ..
kalloctest 可能会失败,但你不应该看到任何竞争。如果 xv6 的竞争检测器察看到竞争,它将打印两条堆栈轨迹,形容以下路线的竞争:
== race detected == backtrace for racing load 0x000000008000ab8a 0x000000008000ac8a 0x000000008000ae7e 0x0000000080000216 0x00000000800002e0 0x0000000080000f54 0x0000000080001d56 0x0000000080003704 0x0000000080003522 0x0000000080002fdc backtrace for watchpoint: 0x000000008000ad28 0x000000008000af22 0x000000008000023c 0x0000000080000292 0x0000000080000316 0x000000008000098c 0x0000000080000ad2 0x000000008000113a 0x0000000080001df2 0x000000008000364c 0x0000000080003522 0x0000000080002fdc ==========
在您的操作系统上,您能够通过将回溯跟踪剪切并粘贴到
addr2line
中,将其转换为带有行号的函数名称:$ riscv64-linux-gnu-addr2line -e kernel/kernel 0x000000008000ab8a 0x000000008000ac8a 0x000000008000ae7e 0x0000000080000216 0x00000000800002e0 0x0000000080000f54 0x0000000080001d56 0x0000000080003704 0x0000000080003522 0x0000000080002fdc ctrl-d kernel/kcsan.c:157 kernel/kcsan.c:241 kernel/kalloc.c:174 kernel/kalloc.c:211 kernel/vm.c:255 kernel/proc.c:295 kernel/sysproc.c:54 kernel/syscall.c:251
您不须要运行竞争检测器,但您可能会发现它很有帮忙。请留神,竞争检测器会显著减慢 xv6 的速度,因而您可能不想在运行用户测试时应用它。
实现
-
将治理闲暇 list 与锁的构造体按 CPU 离开 :将本来示意闲暇内存和其对应的锁的构造体变量
kmem
从一个变成多个,让每个 CPU 都领有一个该构造体。因而将其改为数组,大小为NCPU
个:struct { struct spinlock lock; struct run* freelist; } kmem[NCPU];
-
更新内存初始化
kinit()
:在内存初始化kinit()
中将本来对一个kmem
变量的初始化改为对kmem
数组的初始化:void kinit() {// initlock(&kmem.lock, "kmem"); int i; char* name = "kmem0"; // 内存锁名字 for (i = 0; i < NCPU; ++i) {initlock(&kmem[i].lock, name); ++name[4]; // 通过递增序号位扭转序号 } freerange(end, (void*)PHYSTOP); }
-
更新内存开释
kfree()
:依据提醒能够将开释的内存要退出到以后运行的 cpu 对应的闲暇 list 上。因而在原函数根底上,先获取 cpuid,在通过 cpuid 索引失去须要对应治理闲暇 list 和锁的构造体void kfree(void* pa) { ... // 在将要开释的内存填充完后 push_off(); // 获取 cpuid 要敞开中断 cid = cpuid(); // 获取 cpuid acquire(&kmem[cid].lock); r->next = kmem[cid].freelist; kmem[cid].freelist = r; release(&kmem[cid].lock); pop_off(); // 复原中断}
-
更新内存调配
kalloc()
:内存调配须要思考:如果以后 cpu 对应的内存治理构造体无闲暇 list,则须要应用其余 cpu 的闲暇内存。-
首先不思考借用其余 cpu 的状况,将
kmem
单变量更新为数组(与kfree()
相似)void* kalloc(void) { struct run* r; int cid, i; push_off(); cid = cpuid(); acquire(&kmem[cid].lock); r = kmem[cid].freelist; if(r) {kmem[cid].freelist = r->next; } release(&kmem[cid].lock); ... // TODO:须要增加借用其余 cpu 内存的逻辑代码 pop_off(); if (r) memset((char*)r, 5, PGSIZE); // fill with junk return (void*)r; }
-
在实现对本 cpu 闲暇 list 的读写操作后,无论是否有闲暇内存都不须要再读写这个 list 了因而,在开释了本 cpu 对应的锁之后,加上是否胜利获取闲暇内存的判断:若有闲暇内存则无需借用其余 cpu,否则须要借用其余 cpu 的内存:
void* kalloc(void) { ... release(&kmem[cid].lock); // 在开释本 cpu 锁后、复原中断前退出借用其余 cpu 的逻辑代码 if (!r) { // 若本 cpu 无闲暇内存 for (i = 0; i < NCPU - 1; ++i) {if (++cid == NCPU) cid = 0; // 下一个 cpu 的 id acquire(&kmem[cid].lock); // 锁上要借的 cpu 的内存锁 if (kmem[cid].freelist) { // 若该 cpu 有闲暇内存,则进行操作,而后跳出循环 r = kmem[cid].freelist; kmem[cid].freelist = r->next; release(&kmem[cid].lock); break; } else { // 否则就间接跳过,开释锁后换下一个 cpu release(&kmem[cid].lock); } } } pop_off(); ... }
-
后果
- 运行后果
- 测试后果
Buffer cache (hard)
要求
作业的这一半独立于前半部分; 无论您是否实现了前半部分,您都能够解决这一半(并通过测试)。
如果多个过程集中应用文件系统,它们可能会争用 bcache.lock
,它爱护 kernel/bio.c
中的磁盘块缓存。bcachetest
创立多个过程,这些过程反复读取不同的文件,以便在 bcache.lock
上产生争用; 其输入如下所示(在实现本试验之前):
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #test-and-set 0 #acquire() 33035
lock: bcache: #test-and-set 16142 #acquire() 65978
--- top 5 contended locks:
lock: virtio_disk: #test-and-set 162870 #acquire() 1188
lock: proc: #test-and-set 51936 #acquire() 73732
lock: bcache: #test-and-set 16142 #acquire() 65978
lock: uart: #test-and-set 7505 #acquire() 117
lock: proc: #test-and-set 6937 #acquire() 73420
tot= 16142
test0: FAIL
start test1
test1 OK
您可能会看到不同的输入,但 bcache 锁的 test-and-sets 数量会很高。如果你查看 kernel/bio.c
中的代码,你会发现 bcache.lock
爱护缓存块缓冲区的列表,每个块缓冲区中的援用计数(b->refcnt
)以及缓存块的身份(b->dev
和b->blockno
)。
批改块缓存,以便在运行
bcachetest
时,bcache 中所有锁的acquire
循环迭代次数接近于零。现实状况下,块缓存中波及的所有锁的计数总和应为零,但如果总和小于 500 也能够。批改bget
和brelse
,以便 bcache 中不同块的并发查找和开释不太可能在锁上发生冲突(例如,不用都期待bcache.lock
)。您必须放弃每个缓存块最多一个正本的不变性。实现后,输入应相似于上面显示的输入(只管不雷同)。确保usertests -q
依然通过。实现后,make grade
应通过所有测试。
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #test-and-set 0 #acquire() 32954
lock: kmem: #test-and-set 0 #acquire() 75
lock: kmem: #test-and-set 0 #acquire() 73
lock: bcache: #test-and-set 0 #acquire() 85
lock: bcache.bucket: #test-and-set 0 #acquire() 4159
lock: bcache.bucket: #test-and-set 0 #acquire() 2118
lock: bcache.bucket: #test-and-set 0 #acquire() 4274
lock: bcache.bucket: #test-and-set 0 #acquire() 4326
lock: bcache.bucket: #test-and-set 0 #acquire() 6334
lock: bcache.bucket: #test-and-set 0 #acquire() 6321
lock: bcache.bucket: #test-and-set 0 #acquire() 6704
lock: bcache.bucket: #test-and-set 0 #acquire() 6696
lock: bcache.bucket: #test-and-set 0 #acquire() 7757
lock: bcache.bucket: #test-and-set 0 #acquire() 6199
lock: bcache.bucket: #test-and-set 0 #acquire() 4136
lock: bcache.bucket: #test-and-set 0 #acquire() 4136
lock: bcache.bucket: #test-and-set 0 #acquire() 2123
--- top 5 contended locks:
lock: virtio_disk: #test-and-set 158235 #acquire() 1193
lock: proc: #test-and-set 117563 #acquire() 3708493
lock: proc: #test-and-set 65921 #acquire() 3710254
lock: proc: #test-and-set 44090 #acquire() 3708607
lock: proc: #test-and-set 43252 #acquire() 3708521
tot= 128
test0: OK
start test1
test1 OK
$ usertests -q
...
ALL TESTS PASSED
$
请给出所有以“bcache”结尾的锁名称。也就是说,您应该为每个锁调用 initlock,并传递一个以“bcache”结尾的名称。
缩小块缓存中的争用比 kalloc 更辣手,因为 bcache 缓冲区实际上在过程(以及 CPU)之间是共享的。对于 kalloc,能够通过为每个 CPU 提供本人的分配器来打消大多数争用; 这不适用于块缓存。咱们建议您应用每个哈希桶具备锁的哈希表在缓存中查找块号。
在某些状况下,如果解决方案存在锁定抵触,则能够:
- 当两个过程同时应用雷同的块号时。
bcachetest test0
从不这样做。 - 当两个过程同时未命中缓存,并且须要查找要替换的未应用块时。
bcachetest test0
从不这样做。 - 当两个过程同时应用在用于分区块和锁的任何计划中抵触的块时; 例如,如果两个过程应用的块号散列到哈希表中的同一槽。
bcachetest test0
可能会这样做,具体取决于您的设计,但您应该尝试调整计划的细节以防止抵触(例如,更改哈希表的大小)。
bcachetest
的 test1
应用比缓冲区更多的不同块,并执行大量文件系统代码门路。
提醒
- 浏览 xv6 书中块缓存的阐明(第 8.1-8.3 节)
- 能够应用固定数量的存储桶,而不是动静调整哈希表的大小。应用质数存储桶(例如 13)来升高哈希抵触的可能性。
- 在哈希表中搜寻缓冲区并在找不到缓冲区时为该缓冲区调配条目必须是原子的。
- 删除所有缓冲区的列表(
bcache.head
等),不要实现 LRU。通过此更改,brelse
不须要获取 bcache 锁。在bget
中,您能够抉择任何具备refcnt == 0
的块,而不是最近起码应用的块。 - 您可能无奈以原子形式查看已缓存的 buf 和(如果未缓存)找到未应用的 buf; 如果缓冲区不在缓存中,则可能必须删除所有锁并从头开始。能够在
bget
中序列化查找未应用的 buf(即,当查找未命中缓存时抉择要重用的缓冲区的bget
局部)。 - 在某些状况下,解决方案可能须要保留两个锁; 例如,在逐出期间,您可能须要持有 bcache 锁和每个存储桶的锁。确保防止死锁。
- 替换块时,您能够将
struct buf
从一个存储桶挪动到另一个存储桶,因为新块哈希到另一个存储桶。您可能遇到一个辣手的状况:新块可能会散列到与旧块雷同的存储桶。在这种状况下,请确保防止死锁。 - 一些调试技巧:实现存储桶锁,但将全局
bcache.lock
的获取 / 开释保留在bget
的结尾 / 结尾以序列化代码。确定它正确无误且没有争用条件后,请删除全局锁并解决并发问题。您还能够运行make CPUS=1 qemu
以应用一个内核进行测试。 - 应用 xv6 的竞争检测器来查找潜在的竞争(请参阅上文如何应用竞争检测器)。
实现
-
哈希表桶数与索引形式:依据提醒,须要把本来双向 list 的 bcache 改为哈希表。实现哈希表,首先须要决定桶数量和哈希索引形式,依据提醒可将桶数量设为 13,哈希索引个别采取取余的形式。
#define NUM_BUCKET 13 #define HASH(x) ((x) % NUM_BUCKET)
-
更改
bcache
构造体:更改管制 bcache 的构造体,将本来以双向 list 保护缓存的模式分桶,每个桶仍然以双向 list 保护,同时将本来一个锁分为每个桶一个锁struct {struct spinlock locks[NUM_BUCKET]; struct buf buf[NBUF]; // Linked list of all buffers, through prev/next. // Sorted by how recently the buffer was used. // head.next is most recent, head.prev is least. struct buf heads[NUM_BUCKET]; } bcache;
-
bcache
初始化binit()
:依据源码,初始化次要分为三步:- 初始化锁:从初始化一个锁到初始化每个桶的锁
- 初始化双向 list 头节点:初始化一个头节点到初始化每个桶的头节点
-
将所有 buf 放入 list(hash table):本来将 buf 放入 list,当初须要把 buf 放入哈希表中的 list
void binit(void) { struct buf* b; char* lockname = "bcache0"; uint i; // 初始化锁和头节点,放一块了 for (i = 0; i < NUM_BUCKET; ++i) {initlock(bcache.locks + i, lockname); ++lockname[6]; bcache.heads[i].prev = bcache.heads + i; bcache.heads[i].next = bcache.heads + i; } // 将 buf 放入哈希表中的 list for (b = bcache.buf; b < bcache.buf + NBUF; ++b) {i = HASH(b->blockno); // 能够不必 hash,最开始都是 0 b->next = bcache.heads[i].next; b->prev = bcache.heads + i; initsleeplock(&b->lock, "buffer"); bcache.heads[i].next->prev = b; bcache.heads[i].next = b; } }
-
bcache
开释brelse()
及其他:因为从本来一把🔒变为分桶🔒,因而再去拜访 or 读写 bcache 其中一个桶的时候就能够只获取该桶对应的锁了。受此影响须要扭转的三个函数:void brelse(struct buf* b) {uint idx = HASH(b->blockno); if (!holdingsleep(&b->lock)) panic("brelse"); releasesleep(&b->lock); acquire(bcache.locks + idx); b->refcnt--; if (b->refcnt == 0) { // 将 b 从列表中删去 b->next->prev = b->prev; b->prev->next = b->next; // 将 b 退出到本桶 list head 的右边 b->prev = bcache.heads[idx].prev; b->next = bcache.heads + idx; bcache.heads[idx].prev->next = b; bcache.heads[idx].prev = b; } release(bcache.locks + idx); } void bpin(struct buf* b) {uint idx = HASH(b->blockno); acquire(bcache.locks + idx); b->refcnt++; release(bcache.locks + idx); } void bunpin(struct buf* b) {uint idx = HASH(b->blockno); acquire(bcache.locks + idx); b->refcnt--; release(bcache.locks + idx); }
-
bcache
获取bget()
:bget()
函数不仅🔒受到影响,因为分桶,在本桶没有refcnt == 0
的缓存时还须要去其余桶中寻找。基本思路(不实现 LRU):- 先寻找是否曾经有缓存,若有间接返回
- 若没有缓存则须要替换
refcnt == 0
的缓存。能够先在本桶寻找是否有refcnt == 0
的缓存,若有则替换后返回。 -
若本桶没有
refcnt == 0
的缓存,就须要去其余桶中寻找。若找到,则须要将其从本来桶的 list 中删去,而后退出本桶的 list 中。static struct buf* bget(uint dev, uint blockno) { struct buf* b; uint jdx, idx = HASH(blockno); acquire(bcache.locks + idx); // 先获取本桶🔒 // Is the block already cached? for (b = bcache.heads[idx].next; b != bcache.heads + idx; b = b->next) {if (b->dev == dev && b->blockno == blockno) { b->refcnt++; release(bcache.locks + idx); acquiresleep(&b->lock); return b; // 若已缓存间接返回 } } // 若没有缓存先在本桶找 refcnt== 0 的缓存 for (b = bcache.heads[idx].prev; b != bcache.heads + idx; b = b->prev) {if (b->refcnt == 0) { b->dev = dev; b->blockno = blockno; b->valid = 0; b->refcnt = 1; release(bcache.locks + idx); acquiresleep(&b->lock); return b; // 找到、替换后间接返回 } } release(bcache.locks + idx); // 本桶没有 refcnt == 0 的桶,就要找别的桶,然而须要先开释本桶的锁 for (jdx = HASH(idx + 1); jdx != idx; jdx = (jdx + 1) % NUM_BUCKET) {acquire(bcache.locks + jdx); for (b = bcache.heads[jdx].prev; b != bcache.heads + jdx; b = b->prev) {if (b->refcnt == 0) { // 找到后替换内容 b->dev = dev; b->blockno = blockno; b->valid = 0; b->refcnt = 1; // 而后在 jdx 桶 list 中删去 b b->next->prev = b->prev; b->prev->next = b->next; release(bcache.locks + jdx); // 删去后就能够开释 jdx 桶的锁了 // 最初把 b 放入本桶 list acquire(bcache.locks + idx); // 之前开释了本桶的锁,当初须要从新获取 b->next = bcache.heads[idx].next; b->prev = bcache.heads + idx; bcache.heads[idx].next->prev = b; bcache.heads[idx].next = b; release(bcache.locks + idx); // 操作后开释本桶锁 acquiresleep(&b->lock); return b; } } release(bcache.locks + jdx); } panic("bget: no buffers"); }
后果
- 运行后果
- 测试后果
总结
make grade
集体播种
- 内存和缓存是内核中重要的临界区,存在许多并发拜访 or 读写。一方面能够通过装备一把锁对立治理的形式,这样编程时不易呈现死锁但效率较低;
- 另一方面能够离开治理,例如内存通过 CPU 离开、bcache 缓存通过哈希桶离开,这时候就要对每个分区装备独立的锁。独立的锁保障了不同分区之间能够并发 or 并行,从而进步了效率。
- 离开治理还须要思考分区之间的交互问题。例如某个分区的资源不够时须要去拜访其余分区。这时候就须要思考在分区交互期间如何放弃互斥和防止死锁的问题。