前言

  • 本篇是对于 MIT 6.S081-2020-Lab8(Lock) 的实现;
  • 我并没有在找到“全局最优解”的状况下通过 Buffer cache 的测试;
  • 因而在 Exercise 2 我只提供了在找到“全局最优解”的前提下,我的各种代码尝试以及思路;
  • 如果内容上发现有什么问题请不要悭吝您的键盘。

筹备工作

要新看的内容不多,只有第八章(File system) 的前三个大节。

文件系统的层次模型

这张图特地地像计算机网络档次服务模型:以后层调用上层所提供的服务为下层提供更优质的服务。但实际上到文件系统,这里的象征没有那么强,更像是对 Disk 的形象水平,越往下走越凑近外设磁盘,越往上走越凑近用户过程。很多时候文件系统的分层不是很严格,所以这张图的意义在于帮忙你了解文件系统的构造。


磁盘通常是 SSD 或 HDD,SSD 拜访一个 disk block 须要 100us 到 1ms 之间,而 HDD 则须要 10ms 左右。对于磁盘驱动程序而言,磁盘都是 Sector 的汇合,一个 sector 的大小为 512B;对于操作系统内核而言,磁盘则是一系列的 Block 数组,一个 Block 的大小为 1024B。明天咱们不须要去关注 Sector,因为磁盘驱动程序 kernel/virtio_disk.c 为内核做了很好的屏蔽,咱们只需应用它的对外提供的读写磁盘某个 Block 的接口(顶半部:virtio_disk_rw())即可,给定 Block number 返回 Block cache。

/* kernel/buf.h */struct buf {  int valid;   // has data been read from disk?  int disk;    // does disk "own" buf?  uint dev;  uint blockno;  struct sleeplock lock;  uint refcnt;  struct buf *prev; // LRU cache list  struct buf *next;  uchar data[BSIZE];};/* kernel/bio.c */struct {  struct spinlock lock;  struct buf buf[NBUF];  // Linked list of all buffers, through prev/next.  // Sorted by how recently the buffer was used.  // head.next is most recent, head.prev is least.  struct buf head;} bcache;voidbinit(void){  struct buf *b;  initlock(&bcache.lock, "bcache");  // Create linked list of buffers  bcache.head.prev = &bcache.head;  bcache.head.next = &bcache.head;  for(b = bcache.buf; b < bcache.buf+NBUF; b++){    b->next = bcache.head.next;    b->prev = &bcache.head;    initsleeplock(&b->lock, "buffer");    bcache.head.next->prev = b;    bcache.head.next = b;  }}

为了放慢读写访问速度,缓存的设计能够说是必须的。整个文件系统就一个 Buffer cache(struct bcache),Buffer cache 存储有多个 Block cache(struct buf),这些 Block cache 和 bcache.head 通过 binit() 一起造成环状双链表。Buffer cache 用到的 Block cache 置换算法是 LRU(Least Recently Used, bget())。bcache.head.prev 一侧将会是最远被应用过的 Block cache,而 bcache.head.next 将会是最近被应用的 Block cache。同时为了保障并发线程环境下 Block 内容的一致性,Buffer cache 配有一个 Spin Lock,所有 Buffer cache 共用一个 sleep-lock。为什么不都是自旋锁,以及 sleep-lock 的作用下一个局部会介绍。


Logging 层这里就不多做介绍了,因为这是下一个 Lab 的内容。


/* kernel/fs.h */// On-disk inode structurestruct dinode {  short type;           // File type  short major;          // Major device number (T_DEVICE only)  short minor;          // Minor device number (T_DEVICE only)  short nlink;          // Number of links to inode in file system  uint size;            // Size of file (bytes)  uint addrs[NDIRECT+1];   // Data block addresses};

inode 层次要是为文件提供的一层形象。每个 inode 都有惟一标识的 inode 号,每个 inode 的大小都为 64 bytes。只有给定一个 inode 号,咱们就能够推算出这个 inode 位于磁盘中的哪一个 Block 的哪个字节开始的。依据这个地址拿到这个 inode 后,咱们能够推算出这个 inode 所对应的文件的大小以及该文件被存储在哪几个 Block 中。

dinode.addrs[] 的前 NDIRECT 个元素都是 Direct block number。这 NDIRECT 个 Direct blocks 中存储有文件的最多前 sizeof(buf)*NDIRECT 个字节。如果文件的大小比拟小,可能用不了 NDIRECT 个 Blocks 就能装的下来;如果文件的大小比拟大,NDIRECT 个 Blocks 都装不下的话就须要 dinode.addrs[NDIRECT] 这个 Indirect block number 派上用场了。Indirect Block 充当索引 Block,其内含有一系列的 Block numbers,而这些 Block numbers 所指代的 Blocks 就负责装有那些 Direct Blocks 装不下的残余的文件。所以,一个文件的大小最大为 sizeof(buf)*(NDIRECT+(sizeof(buf)/sizeof(buf.blockno))) 个字节。


/* kernel/fs.h */struct dirent {  ushort inum;  char name[DIRSIZ];};

如果用户看到的文件都是一些 inode 编号,而后通过这些 inode 编号来操作文件,那未免这个文件系统也太难用了,用户可能都不晓得本人在操作哪个文件。

于是到了目录层,为用户提供了层次化的命名空间(hierarchical namespace) 的服务,能够给本人的文件取个好记的名字,并且以层级目录的形式将这些文件组织起来。为同时实现这两点,咱们引入了新的数据结构:struct dirent。并且为了反对拜访这些数据结构,咱们扩大了 inode 的类型,所以当初 dinode.type 的取值不仅能够是 T_FILE,还能够是 T_DIR,这些宏都在 kernel/stat.h 中被定义。sizeof(dirent) 为 16 个字节,目录名称为 14 个字节,同时 dinode.link 代表有多少文件名指向这个 inode。


路径名层所提供的性能很简略:给定一个路径名,路径名层可能解析它并查找该门路下的文件。xv6 实现的解析算法是线性扫描。即如果给定路径名 /usr/rtm/xv6/fs.c

  1. 文件系统会首先去找 root inode,root inode 的 inode number 固定为 1;
  2. 发现 root inode 的类型是 T_DIR,于是在 root inode 指向的 Blocks 中遍历搜寻 dirent.name 为 usr 的目录;
  3. 如果没找到就返回谬误,找到了就那到 usr 的 inode 编号,递归执行上述步骤;
  4. 最终拿到 dirent.name 为 fs.c 的目录,拿到 inode 编号后;
  5. 发现 fs.c 的 inode 的类型是 T_FILE,查找胜利。

文件描述符层提供了许多对 Unix 资源的形象,包含文件、设施、管道、Socket 等等,利用这个最上层的文件系统接口,可能大大简化程序员拜访这些资源的操作。起初文件描述符只是给文件用的,不过大家慢慢发现,对于设施、管道这类资源的操作和文件的操作都很像,不外乎就是关上、读写、敞开这些操作,因而就对立应用了文件系统提供的接口。

文件系统在 Disk 中的组织构造

文件系统会把磁盘看作是一整个 Block 数组,每个 Block 大小为 1KB:

  1. disk[0] 要么没用,要么被用所 Boot Sector 来启动操作系统;
  2. disk[1] 被称为 Super Block,外面存储有整个文件系统的 metadata,包含文件系统的大小、Data Blocks 的个数、inodes 的个数、Logging Blocks 的个数等等;
  3. disk[2]~disk[31] 都是 Logging Blocks;
  4. disk[32]~disk[44] 存储有 inode;
  5. disk[45] 是 bitmap,当该 Block 的第 n 个 bit 置位时,代表 Block number 为 n 的 Block 正被占用;
  6. disk[46]~ 都是 Data Blocks,被占用的 Block 都存储有文件的内容或目录的内容。

Buffer cache layer

Buffer cache layer 所提供的服务有以下几点:

  1. 确保 Buffer cache 里同一个 Disk block 的拷贝(Block cache)不超过一个;

    • 否则会呈现数据的不一致性
  2. 确保同时拜访一个 Block cache 的内核线程不超过一个;

    • 通过给每个 Block cache 设立一个 sleep-lock 来实现
  3. 确保那些常常要拜访的 Blocks 要在 Buffer cache 中有一份拷贝(Block cache)。
/* kernel/bio.c */struct buf*bread(uint dev, uint blockno){  struct buf *b;  b = bget(dev, blockno);  if(!b->valid) {    virtio_disk_rw(b, 0);    b->valid = 1;  }  return b;}voidbwrite(struct buf *b){  if(!holdingsleep(&b->lock))    panic("bwrite");  virtio_disk_rw(b, 1);}voidbrelse(struct buf *b){  if(!holdingsleep(&b->lock))    panic("brelse");  releasesleep(&b->lock);  acquire(&bcache.lock);  b->refcnt--;  if (b->refcnt == 0) {    // no one is waiting for it.    b->next->prev = b->prev;    b->prev->next = b->next;    b->next = bcache.head.next;    b->prev = &bcache.head;    bcache.head.next->prev = b;    bcache.head.next = b;  }    release(&bcache.lock);}

Buffer cache layer 对外的接口只有两个,一个是 bread(),另一个是 bwrite(),都在 kernel/bio.c 中有定义。内核线程对某个 Block cache 实现操作后,都必须要开释它的锁(brelse())。brelse() 不仅会开释 sleep-lock,还会将这个 Block cache 调整到 bcache.head.next 这个地位,代表它是被最近拜访过的 Block cache。

  • bread()

    • 给定一个 Block number,返回对应的上锁的 Block cache;
    • 如果命中了间接返回;
    • 如果没命中就依据 RLU 算法选中一个闲暇(refcnt 字段为 0 代表这个 Block cache 没有被任何一个内核线程正在占用)的 Block cache 后,抉择将其重用并将 valid 字段设为 0 代表这个 Block cache 没有对应 Disk block 的数据,提醒 break() 调用去从磁盘间接获取这个 Block 的数据。
    • 如果没有命中,并且没有一个闲暇的 Block cache,文件系统会间接 panic,是个比拟粗犷的解决。
  • bwrite()

    • 如果某个内核线程批改了某个 Block cache 的内容,它必须在开释这个 Block cache 的 sleep-lock 之前调用 bwrite,来将指定 Block number 的 Block cache 强制写回 Disk 中。

能够看到 bwrite() 的实现要更简略些,所以重点看 bread() 的实现。bread() 调用了一个 bget()

/* kernel/bio.c */static struct buf*bget(uint dev, uint blockno){  struct buf *b;  acquire(&bcache.lock);  // Is the block already cached?  for(b = bcache.head.next; b != &bcache.head; b = b->next){    if(b->dev == dev && b->blockno == blockno){      b->refcnt++;      release(&bcache.lock);      acquiresleep(&b->lock);      return b;    }  }  // Not cached.  // Recycle the least recently used (LRU) unused buffer.  for(b = bcache.head.prev; b != &bcache.head; b = b->prev){    if(b->refcnt == 0) {      b->dev = dev;      b->blockno = blockno;      b->valid = 0;      b->refcnt = 1;      release(&bcache.lock);      acquiresleep(&b->lock);      return b;    }  }  panic("bget: no buffers");}

如果两个内核线程正在并发的执行 bget(),那么为了保障只有一个过程在拜访 Buffer cache,进入函数的一开始就获取 Buffer cache 的锁。最初想要的 Block cache 后,须要获取这个 Block cache 的 Sleep lock 再返回。

/* kernel/sleeplock.c */voidacquiresleep(struct sleeplock *lk){  acquire(&lk->lk);  while (lk->locked) {    sleep(lk, &lk->lk);  }  lk->locked = 1;  lk->pid = myproc()->pid;  release(&lk->lk);}voidreleasesleep(struct sleeplock *lk){  acquire(&lk->lk);  lk->locked = 0;  lk->pid = 0;  wakeup(lk);  release(&lk->lk);}

为什么要是 sleep-lock,跟 Buffercache 一起用个别的自旋锁不行么?其实这里至多有两种考量:

  1. 拜访一个 Block 是个慢速操作(毫秒级),读时如果没命中须要读磁盘,如果批改了 Block cache 还要写回磁盘,跟 CPU 执行指令的速度(纳秒级)相比太慢了。于其始终自旋占用 CPU 忙等,还不如 sleep 把 CPU 资源让进来。
  2. 别忘了调度器会开释掉过程锁,所以 sleep 时候不会敞开中断,也就能响应磁盘发送的中断,这是个别自旋锁做不到的。sleep-lock 算是设计的十分精妙的一种锁了。

总的来说,bcache.lock 爱护的信息是哪些 Block 被 cached 了,而 sleep-lock 则爱护的是 Block cache 的在读写时的内容。

试验局部

Memory allocator

多个 CPU hart 共用一个在内存中的 freelist 资源,势必会造成十分重大的竞争。

因而在这个 Exercise 中,咱们须要为每一个 CPU 独自调配一个 freelist 和一个配套的 lock。

所以咱们给 kernel/kalloc.c 中的 struct kmem 扩大成一个长度为 NCPUkmem 数组即可。

之后能够通过 cpuid() 函数来获取以后是哪个 CPU hart 在做相干调配和开释操作。

特地留神的是,当某个 CPU hart 下的 freelist 空掉了之后,在 kalloc() 时,它就要去看看其它 CPU hart 下的 freelist 是否无余量,有的话就间接拿过去返回即可。

初始化 kmems 的工作咱们交给 cpuid() == 0 的 CPU hart 去做,因而一开始所有的 free page 都在该 CPU hart 下的 freelist 中。

/* kernel/kalloc.c */struct kmem {  struct spinlock lock;  struct run *freelist;};struct kmem kmems[NCPU];voidkinit(){  for (int ncpu = 0; ncpu < NCPU; ++ncpu)    initlock(&kmems[ncpu].lock, "kmem");  freerange(end, (void*)PHYSTOP);}voidkfree(void *pa){  struct run *r;  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)    panic("kfree");  // Fill with junk to catch dangling refs.  memset(pa, 1, PGSIZE);  r = (struct run*)pa;  push_off();  int cid = cpuid();  acquire(&kmems[cid].lock);  r->next = kmems[cid].freelist;  kmems[cid].freelist = r;  release(&kmems[cid].lock);  pop_off();}void *kalloc(void){  struct run *r;  push_off();  int cid = cpuid();  acquire(&kmems[cid].lock);  r = kmems[cid].freelist;  if(r)    kmems[cid].freelist = r->next;  else {    for (int _cid = 0; _cid < NCPU; ++_cid) {      if (_cid == cid)        continue;      acquire(&kmems[_cid].lock);      r = kmems[_cid].freelist;      if (r) {        kmems[_cid].freelist = r->next;        release(&kmems[_cid].lock);        break;      }      release(&kmems[_cid].lock);    }  }  release(&kmems[cid].lock);  pop_off();  if(r)    memset((char*)r, 5, PGSIZE); // fill with junk  return (void*)r;}

Buffer cache

在原先的实现中,咱们是以全局双链表的模式来保护 Block cache 的新旧拜访水平的,即最老的块在 head 的 prev 侧,最新的块在 head 的 next 侧。

在这个 Exercise 中,为了升高竞争水平,咱们须要摒弃这种双链表的设计,转而应用桶哈希表。为什么应用哈希表能升高竞争水平?因为给一个大链表一个锁和给每个小桶一个锁的粒度相比,很显著后者锁的粒度更小,直观地减小了竞争频率。

在只有 Buffer cache 的锁和 hash bucket 的锁的前提下,假如 evict 的计划是遵循原来的,即“从所有的 Block cache 中,找到最老的且援用数为 0 的那一个”,那么这将会与这个 Exercise 的指标产生矛盾。

矛盾的起因如下:咱们称原来的 evict 计划是寻找“全局最优解”。那么当一个过程 miss 时,它就要避免任何过程尝试去拜访更新 Buffer cache 里的任一一个 Block cache,否则就会出错。例如,当过程 A 找到这个“全局最优解时”,过程 B 命中了这个 Block cache 并批改了它的 ticks 和援用数。之后两个过程将会共用一个 Block cache,这几乎是个灾难性的谬误。

在设计过程间的同步时,把需要列出来会更加清晰:

  • 当不同过程在尝试 hit 时,只有不是同一个 bucket 就能够并行执行,否则串行;
  • 当有过程执行 evict 操作时,其它所有过程都要期待。

在去理解了锁的类型后,我发现读写锁最能满足这里的需要。所以打算来本人封装一个相似读写锁的锁,于是以下是这个 Exercise 的实现:

// Buffer cache.//// The buffer cache is a linked list of buf structures holding// cached copies of disk block contents.  Caching disk blocks// in memory reduces the number of disk reads and also provides// a synchronization point for disk blocks used by multiple processes.//// Interface:// * To get a buffer for a particular disk block, call bread.// * After changing buffer data, call bwrite to write it to disk.// * When done with the buffer, call brelse.// * Do not use the buffer after calling brelse.// * Only one process at a time can use a buffer,//     so do not keep them longer than necessary.#include "types.h"#include "param.h"#include "spinlock.h"#include "sleeplock.h"#include "riscv.h"#include "defs.h"#include "fs.h"#include "buf.h"#define NBUCKET 13struct entry {  struct buf *value;  struct entry *next;  uint ticks;};struct entry entries[NBUF];struct bucket {  struct entry *e;  struct spinlock lock;};struct bucket table[NBUCKET];int valid_cnt = 0;struct entry *remove(uint blockno){  int i = blockno % NBUCKET;  struct entry **p = &table[i].e;  struct entry *prev = 0;  struct entry *e = table[i].e;  for (; e; e = e->next) {    if (e->value->blockno == blockno) {      if (!prev)        *p = e->next;      else        prev->next = e->next;            return e;    }    prev = e;  }  return 0;}#define MAX_QUEUE_SIZE NBUFstruct entry *priority_queue[MAX_QUEUE_SIZE];intparent_index(int i){  return (i-1)/2;}intleft_child_index(int i){  return (2*i)+1;}intright_child_index(int i){  return (2*i)+2;}voidswap_element(int i1, int i2){  struct entry *temp = priority_queue[i1];  priority_queue[i1] = priority_queue[i2];  priority_queue[i2] = temp;}uint current_queue_size = 0;intqueue_push(struct entry *e){  if (current_queue_size == MAX_QUEUE_SIZE) return 1;  int current_idx = current_queue_size;  int parent_idx = parent_index(current_idx);  priority_queue[current_idx] = e;  while (current_idx != 0 && priority_queue[current_idx]->ticks < priority_queue[parent_idx]->ticks) {    swap_element(current_idx, parent_idx);    current_idx = parent_idx;    parent_idx = parent_index(current_idx);  }  ++current_queue_size;  return 0;}struct entry *queue_pop(){  if (current_queue_size == 0) return 0;  struct entry *top_element = priority_queue[0];  int current_idx = 0;  swap_element(--current_queue_size, 0);  int left_child_idx;  int right_child_idx;  int min_child_idx;  uint left_child_ticks;  uint right_child_ticks;  uint current_ticks;  while (1) {    left_child_idx = left_child_index(current_idx);    right_child_idx = right_child_index(current_idx);        if (left_child_idx >= current_queue_size)       break;    left_child_ticks = priority_queue[left_child_idx]->ticks;    current_ticks = priority_queue[current_idx]->ticks;    if (right_child_idx >= current_queue_size) {      if (left_child_ticks < current_ticks)        swap_element(left_child_idx, current_idx);      break;    }    right_child_ticks = priority_queue[right_child_idx]->ticks;    min_child_idx = left_child_ticks < right_child_ticks ? left_child_idx : right_child_idx;    min_child_idx = priority_queue[min_child_idx]->ticks < current_ticks ? min_child_idx : current_idx;    if (min_child_idx == current_idx)      break;    else {      swap_element(min_child_idx, current_idx);      current_idx = min_child_idx;    }  }  return top_element;}voidqueue_clear(){  current_queue_size = 0;}struct {  struct spinlock lock;  struct buf buf[NBUF];} bcache;struct rwlock {  uint evicting;  uint hitting;  struct spinlock *lock;};struct rwlock evict_lock;struct spinlock _evict_lock;voidinitrwlock(struct rwlock *rwlk, struct spinlock *splk){  rwlk->lock = splk;  rwlk->hitting = 0;  rwlk->evicting = 0;}voidacquirerwlock(struct rwlock *rwlk, int hit){  acquire(rwlk->lock);  if (hit) {    while (rwlk->evicting == 1) {      release(rwlk->lock);      acquire(rwlk->lock);    }    ++rwlk->hitting;  } else {    while (rwlk->evicting == 1 || rwlk->hitting != 0) {      release(rwlk->lock);      acquire(rwlk->lock);    }    rwlk->evicting = 1;  }  release(rwlk->lock);}voidreleaserwlock(struct rwlock *rwlk, int hit){  acquire(rwlk->lock);    if (hit) {    --rwlk->hitting;  } else {    rwlk->evicting = 0;  }  release(rwlk->lock);}voidbinit(void){  memset(entries, 0, sizeof(struct entry)*NBUF);  memset(table, 0, sizeof(struct bucket)*NBUCKET);  memset(bcache.buf, 0, sizeof(struct buf)*NBUF);  initlock(&bcache.lock, "bcache");  initlock(&_evict_lock, "bcache-evict");  initrwlock(&evict_lock, &_evict_lock);  struct buf *b;  for (b = bcache.buf; b < bcache.buf+NBUF; ++b)    initsleeplock(&b->lock, "buffer");  struct bucket *bkt;  for (bkt = table; bkt < table+NBUCKET; ++bkt)    initlock(&bkt->lock, "bcache-bucket");}// Look through buffer cache for block on device dev.// If not found, allocate a buffer.// In either case, return locked buffer.static struct buf*bget(uint dev, uint blockno){  // Is the block already cached?  struct buf *b;  int nbucket = blockno%NBUCKET;  struct entry *e;  acquirerwlock(&evict_lock, 1);  acquire(&table[nbucket].lock);      for (e = table[nbucket].e; e; e = e->next) {    b = e->value;    if (b->dev == dev && b->blockno == blockno) {      ++b->refcnt;      release(&table[nbucket].lock);      releaserwlock(&evict_lock, 1);      acquiresleep(&b->lock);      return b;    }  }  release(&table[nbucket].lock);  releaserwlock(&evict_lock, 1);  // ------------------------------------------------------------  // Not cached.  // Recycle the least recently used (LRU) unused buffer.  acquirerwlock(&evict_lock, 0);  if (valid_cnt < NBUF) {    b = &bcache.buf[valid_cnt];    b->dev = dev;    b->blockno = blockno;    b->valid = 0;    b->refcnt = 1;    e = entries + valid_cnt;    e->value = b;    e->next = table[nbucket].e;    table[nbucket].e = e;        ++valid_cnt;    releaserwlock(&evict_lock, 0);    acquiresleep(&b->lock);    return b;  }  // -------------------------------------------------------  for (e = entries; e < &entries[NBUF]; ++e)    queue_push(e);  while ((e = queue_pop()) != 0) {    b = e->value;    int _nbucket = b->blockno % NBUCKET;          if (b->refcnt == 0) {      queue_clear();      if (_nbucket != nbucket) {        remove(b->blockno);        e->next = table[nbucket].e;        table[nbucket].e = e;      }      b->dev = dev;      b->blockno = blockno;      b->valid = 0;      b->refcnt = 1;      releaserwlock(&evict_lock, 0);      acquiresleep(&b->lock);      return b;    }  }  panic("bget: no buffers");}// Return a locked buf with the contents of the indicated block.struct buf*bread(uint dev, uint blockno){  struct buf *b;  b = bget(dev, blockno);  if(!b->valid) {    virtio_disk_rw(b, 0);    b->valid = 1;  }  return b;}// Write b's contents to disk.  Must be locked.voidbwrite(struct buf *b){  if(!holdingsleep(&b->lock))    panic("bwrite");  virtio_disk_rw(b, 1);}// Release a locked buffer.// Move to the head of the most-recently-used list.voidbrelse(struct buf *b){  if(!holdingsleep(&b->lock))    panic("brelse");  acquirerwlock(&evict_lock, 0);  int nbucket = b->blockno % NBUCKET;    for (struct entry *e = table[nbucket].e; e; e = e->next)    if (e->value->blockno == b->blockno) {      e->ticks = ticks;      break;    }  --b->refcnt;  releaserwlock(&evict_lock, 0);  releasesleep(&b->lock);}voidbpin(struct buf *b) {  int nbucket = b->blockno % NBUCKET;  acquire(&table[nbucket].lock);  ++b->refcnt;  release(&table[nbucket].lock);}voidbunpin(struct buf *b) {  int nbucket = b->blockno % NBUCKET;  acquire(&table[nbucket].lock);  --b->refcnt;  release(&table[nbucket].lock);}

一开始天真地用 malloc 调配 entry 内存,起初忽然意识到内核内存布局里如同没有像用户过程一样的堆,内核只能一次性调配整整一页大小的内存块。但幸好 entry 数量和 Buffer cache 的数量是一一对应的,因而能够间接动态调配,于是又改了原来的实现,不然我还临时真想不到该怎么动态分配 entry。

对于找到最老的且援用数为 0 的 Block cache 的办法,无论是用哪种排序算法(冒泡、归并、快排)都能够。我这里用优先队列是因为实现起来比较简单,而且开销不算太大(但举荐最好是开一个全局的排序数组,把 entries 快排更简洁一些,不同写入队和出队的操作)。在其它高级语言中都有内置的库去提供优先队列的实现,但遗憾的是在 C 只能反复造轮子了,须要留神的是这个优先队列是须要小顶堆实现的,也算是顺带复习数据结构的常识了。

我选了 13 作为哈希桶的数量,因为提醒说桶的数量尽量是个质数。为什么最好是质数,我在参考链接附上了这个问题的答案。

初始化完结后,咱们的哈希表中没有任何一个 entry,所以一个 bget() 一个都命中不了。这时依照原来的逻辑,程序将会去剔除掉一个援用数为 0 的 Block cache,但因为哈希表是空的,程序会认为剔除失败,间接 panic。这显然不合理,因为咱们连一个 Block cache 都没有用,所以在剔除之前咱们须要查看是否所有的 Block cache 都是 valid 的。随着零碎运行,Buffer cache 很快就会被占满,所以这一步查看执行不了多少次。

重点来了!在运行 bcachetest 后,test0 和 test1 都能顺利执行,没有呈现任何 panic 和死锁,但 test0 不意外地挂掉了。test0 给出咱们的读写锁一共被竞争了 80多万次,这里 test0 的指标(小于 500次)可是差了将近 1000 倍。

咱们无妨计算一下从上电到 test0 完结的这段时间内的 Buffer cache 的命中率。我用了两个计数器变量去记录这里的后果,结果显示 bget() 一共被申请了将近 32000 次,而其中 evict 操作则进行了不到 90次,命中率高达 99.72%。均匀 357 次申请中才有一次 evict 申请。很显著,一次 evict 就会自旋很久,大部分的竞争次数都是从这儿来的。那好办,咱们就无妨让申请 evict 的过程 sleep 过来,等到没有其它 bget() 申请时再 wakeup 它不就行了么。所以上面这个第二版是改良后的读写锁的获取和开释操作:

voidacquirerwlock(struct rwlock *rwlk, int hit){  acquire(rwlk->lock);  if (hit) {    while (rwlk->evicting == 1)      sleep(rwlk, rwlk->lock);        ++rwlk->hitting;  } else {    while (rwlk->evicting == 1 || rwlk->hitting != 0)      sleep(rwlk, rwlk->lock);    rwlk->evicting = 1;  }  release(rwlk->lock);}voidreleaserwlock(struct rwlock *rwlk, int hit){  acquire(rwlk->lock);  if (hit) {    if (--rwlk->hitting == 0)      wakeup(rwlk->lock);  } else {    rwlk->evicting = 0;    wakeup(rwlk->lock);  }  release(rwlk->lock);}

当然,如果你感觉 hit 操作没必要睡你也能够让它自旋。但无论哪个版本,在运行 bcachetest 后,程序间接卡住不动了。hit 操作是很频繁的,这使得没有过程有机会去进行 evict 操作。就算以后没有进行 hit 操作的过程,唤醒而后调度执行的开销是很大的,这期间很有可能又有新的 hit 过程进来,等到 evict 过程醒过来就发现 rwlk->hittng != 0 就又睡了。这里我查看了很多遍没有发现死锁的可能性,很可能是最初大部分过程都没有命中本人想要 Block cache 就都睡掉了,之后也没有过程去唤醒它们。

咱们须要去更新策略,要让 evcit 操作的优先级比 hit 操作的优先级更高,即来了一个 evict 操作,如果以后有正在 hit 操作,等到 hit 操作执行结束后立刻响应这个 evict 操作,上面这是第三版改良后的读写锁的获取和开释操作,咱们为读写锁减少了一个 evicted 字段:

struct rwlock {  uint evicting;  uint hitting;  uint evicted;  struct spinlock *lock;};struct rwlock evict_lock;struct spinlock _evict_lock;voidinitrwlock(struct rwlock *rwlk, struct spinlock *splk){  rwlk->lock = splk;  rwlk->hitting = 0;  rwlk->evicting = 0;  rwlk->evicted = 1;}voidacquirerwlock(struct rwlock *rwlk, int hit){  acquire(rwlk->lock);  if (hit) {    while (rwlk->evicting != 0)      sleep(rwlk, rwlk->lock);    ++rwlk->hitting;  } else {    ++rwlk->evicting;    while (rwlk->evicted == 0 || rwlk->hitting != 0)       sleep(rwlk, rwlk->lock);    rwlk->evicted = 0;  }  release(rwlk->lock);}voidreleaserwlock(struct rwlock *rwlk, int hit){  acquire(rwlk->lock);  if (hit) {    if (--rwlk->hitting == 0 && rwlk->evicting == 0)      wakeup(rwlk);  } else {    --rwlk->evicting;    rwlk->evicted = 1;    wakeup(rwlk);  }  release(rwlk->lock);}

bcachetest 运行顺利,而咱们的竞争次数也翻新高,达到了 tot= 50236847 这个前所未有的数字。我人都傻了,因为后面剖析的和理论不符。把第三版改成全副自旋,竞争次数还有 tot= 487775 这个多;改成 hit 自旋、evict 互斥时,竞争次数为 tot= 49768743;改成 evict 自旋,hit 互斥时,竞争次数为 tot= 22981816

到这里我真的是万策尽了,终于切实忍不住看了看其他人写的对于这个试验的文章,反正至多我目前看到的当中,都是采纳了“部分最优解”的办法通过测试的,尽管我没有找到在找到“全局最优解”的状况下通过测试,但我很难认同这么做就是正确的。因而到这里我只能提供一个思路,本人一个人做这种试验做到这里曾经是极限了。

后记

这个锁试验是真的难,不晓得是不是我做难了还是有更简略的办法。之前的 Hard 的 Exercise 都是理论编码量不多,但比拟难想进去。但这次试验的 Exercise 2 写了三百行左右,还特地难调试,想了好久好久(断断续续大略有三天工夫)。期间我两头尝试过用各种计划,先是自旋锁、再是互斥锁、而后读写锁(自旋锁实现),最初在是读写锁(互斥锁实现),还有其它的一些 Bug。这期间竞争频数有过 80多万次、2000多万次、1万屡次、5000多万次等,切实是考验人的急躁。我没把这些所有都记录下来是因为太多了……

参考链接

  • MIT 6.S081 Book
  • 算法剖析:哈希表的大小为何是素数