关于操作系统:xv6-源码窥探5锁

7次阅读

共计 17713 个字符,预计需要花费 45 分钟才能阅读完成。

前言

  • 本篇是对于 MIT 6.S081-2020-Lab8(Lock) 的实现;
  • 我并没有在找到“全局最优解”的状况下通过 Buffer cache 的测试;
  • 因而在 Exercise 2 我只提供了在找到“全局最优解”的前提下,我的各种代码尝试以及思路;
  • 如果内容上发现有什么问题请不要悭吝您的键盘。

筹备工作

要新看的内容不多,只有第八章 (File system) 的前三个大节。

文件系统的层次模型

这张图特地地像计算机网络档次服务模型:以后层调用上层所提供的服务为下层提供更优质的服务。但实际上到文件系统,这里的象征没有那么强,更像是对 Disk 的形象水平,越往下走越凑近外设磁盘,越往上走越凑近用户过程。很多时候文件系统的分层不是很严格,所以这张图的意义在于帮忙你了解文件系统的构造。


磁盘通常是 SSD 或 HDD,SSD 拜访一个 disk block 须要 100us 到 1ms 之间,而 HDD 则须要 10ms 左右。对于磁盘驱动程序而言,磁盘都是 Sector 的汇合,一个 sector 的大小为 512B;对于操作系统内核而言,磁盘则是一系列的 Block 数组,一个 Block 的大小为 1024B。明天咱们不须要去关注 Sector,因为磁盘驱动程序 kernel/virtio_disk.c 为内核做了很好的屏蔽,咱们只需应用它的对外提供的读写磁盘某个 Block 的接口(顶半部:virtio_disk_rw())即可,给定 Block number 返回 Block cache。

/* kernel/buf.h */

struct buf {
  int valid;   // has data been read from disk?
  int disk;    // does disk "own" buf?
  uint dev;
  uint blockno;
  struct sleeplock lock;
  uint refcnt;
  struct buf *prev; // LRU cache list
  struct buf *next;
  uchar data[BSIZE];
};


/* kernel/bio.c */

struct {
  struct spinlock lock;
  struct buf buf[NBUF];

  // Linked list of all buffers, through prev/next.
  // Sorted by how recently the buffer was used.
  // head.next is most recent, head.prev is least.
  struct buf head;
} bcache;

void
binit(void)
{
  struct buf *b;

  initlock(&bcache.lock, "bcache");

  // Create linked list of buffers
  bcache.head.prev = &bcache.head;
  bcache.head.next = &bcache.head;
  for(b = bcache.buf; b < bcache.buf+NBUF; b++){
    b->next = bcache.head.next;
    b->prev = &bcache.head;
    initsleeplock(&b->lock, "buffer");
    bcache.head.next->prev = b;
    bcache.head.next = b;
  }
}

为了放慢读写访问速度,缓存的设计能够说是必须的。整个文件系统就一个 Buffer cache(struct bcache),Buffer cache 存储有多个 Block cache(struct buf),这些 Block cache 和 bcache.head 通过 binit() 一起造成环状双链表。Buffer cache 用到的 Block cache 置换算法是 LRU(Least Recently Used, bget())。bcache.head.prev 一侧将会是最远被应用过的 Block cache,而 bcache.head.next 将会是最近被应用的 Block cache。同时为了保障并发线程环境下 Block 内容的一致性,Buffer cache 配有一个 Spin Lock,所有 Buffer cache 共用一个 sleep-lock。为什么不都是自旋锁,以及 sleep-lock 的作用下一个局部会介绍。


Logging 层这里就不多做介绍了,因为这是下一个 Lab 的内容。


/* kernel/fs.h */

// On-disk inode structure
struct dinode {
  short type;           // File type
  short major;          // Major device number (T_DEVICE only)
  short minor;          // Minor device number (T_DEVICE only)
  short nlink;          // Number of links to inode in file system
  uint size;            // Size of file (bytes)
  uint addrs[NDIRECT+1];   // Data block addresses
};

inode 层次要是为文件提供的一层形象。每个 inode 都有惟一标识的 inode 号,每个 inode 的大小都为 64 bytes。只有给定一个 inode 号,咱们就能够推算出这个 inode 位于磁盘中的哪一个 Block 的哪个字节开始的。依据这个地址拿到这个 inode 后,咱们能够推算出这个 inode 所对应的文件的大小以及该文件被存储在哪几个 Block 中。

dinode.addrs[] 的前 NDIRECT 个元素都是 Direct block number。这 NDIRECT 个 Direct blocks 中存储有文件的最多前 sizeof(buf)*NDIRECT 个字节。如果文件的大小比拟小,可能用不了 NDIRECT 个 Blocks 就能装的下来;如果文件的大小比拟大,NDIRECT 个 Blocks 都装不下的话就须要 dinode.addrs[NDIRECT] 这个 Indirect block number 派上用场了。Indirect Block 充当索引 Block,其内含有一系列的 Block numbers,而这些 Block numbers 所指代的 Blocks 就负责装有那些 Direct Blocks 装不下的残余的文件。所以,一个文件的大小最大为 sizeof(buf)*(NDIRECT+(sizeof(buf)/sizeof(buf.blockno))) 个字节。


/* kernel/fs.h */

struct dirent {
  ushort inum;
  char name[DIRSIZ];
};

如果用户看到的文件都是一些 inode 编号,而后通过这些 inode 编号来操作文件,那未免这个文件系统也太难用了,用户可能都不晓得本人在操作哪个文件。

于是到了目录层,为用户提供了层次化的命名空间 (hierarchical namespace) 的服务,能够给本人的文件取个好记的名字,并且以层级目录的形式将这些文件组织起来。为同时实现这两点,咱们引入了新的数据结构:struct dirent。并且为了反对拜访这些数据结构,咱们扩大了 inode 的类型,所以当初 dinode.type 的取值不仅能够是 T_FILE,还能够是 T_DIR,这些宏都在 kernel/stat.h 中被定义。sizeof(dirent) 为 16 个字节,目录名称为 14 个字节,同时 dinode.link 代表有多少文件名指向这个 inode。


路径名层所提供的性能很简略:给定一个路径名,路径名层可能解析它并查找该门路下的文件。xv6 实现的解析算法是线性扫描。即如果给定路径名 /usr/rtm/xv6/fs.c

  1. 文件系统会首先去找 root inode,root inode 的 inode number 固定为 1;
  2. 发现 root inode 的类型是 T_DIR,于是在 root inode 指向的 Blocks 中遍历搜寻 dirent.name 为 usr 的目录;
  3. 如果没找到就返回谬误,找到了就那到 usr 的 inode 编号,递归执行上述步骤;
  4. 最终拿到 dirent.name 为 fs.c 的目录,拿到 inode 编号后;
  5. 发现 fs.c 的 inode 的类型是 T_FILE,查找胜利。

文件描述符层提供了许多对 Unix 资源的形象,包含文件、设施、管道、Socket 等等,利用这个最上层的文件系统接口,可能大大简化程序员拜访这些资源的操作。起初文件描述符只是给文件用的,不过大家慢慢发现,对于设施、管道这类资源的操作和文件的操作都很像,不外乎就是关上、读写、敞开这些操作,因而就对立应用了文件系统提供的接口。

文件系统在 Disk 中的组织构造

文件系统会把磁盘看作是一整个 Block 数组,每个 Block 大小为 1KB:

  1. disk[0] 要么没用,要么被用所 Boot Sector 来启动操作系统;
  2. disk[1] 被称为 Super Block,外面存储有整个文件系统的 metadata,包含文件系统的大小、Data Blocks 的个数、inodes 的个数、Logging Blocks 的个数等等;
  3. disk[2]~disk[31] 都是 Logging Blocks;
  4. disk[32]~disk[44] 存储有 inode;
  5. disk[45] 是 bitmap,当该 Block 的第 n 个 bit 置位时,代表 Block number 为 n 的 Block 正被占用;
  6. disk[46]~ 都是 Data Blocks,被占用的 Block 都存储有文件的内容或目录的内容。

Buffer cache layer

Buffer cache layer 所提供的服务有以下几点:

  1. 确保 Buffer cache 里同一个 Disk block 的拷贝(Block cache)不超过一个;

    • 否则会呈现数据的不一致性
  2. 确保同时拜访一个 Block cache 的内核线程不超过一个;

    • 通过给每个 Block cache 设立一个 sleep-lock 来实现
  3. 确保那些常常要拜访的 Blocks 要在 Buffer cache 中有一份拷贝(Block cache)。
/* kernel/bio.c */

struct buf*
bread(uint dev, uint blockno)
{
  struct buf *b;

  b = bget(dev, blockno);
  if(!b->valid) {virtio_disk_rw(b, 0);
    b->valid = 1;
  }
  return b;
}

void
bwrite(struct buf *b)
{if(!holdingsleep(&b->lock))
    panic("bwrite");
  virtio_disk_rw(b, 1);
}

void
brelse(struct buf *b)
{if(!holdingsleep(&b->lock))
    panic("brelse");

  releasesleep(&b->lock);

  acquire(&bcache.lock);
  b->refcnt--;
  if (b->refcnt == 0) {
    // no one is waiting for it.
    b->next->prev = b->prev;
    b->prev->next = b->next;
    b->next = bcache.head.next;
    b->prev = &bcache.head;
    bcache.head.next->prev = b;
    bcache.head.next = b;
  }
  
  release(&bcache.lock);
}

Buffer cache layer 对外的接口只有两个,一个是 bread(),另一个是 bwrite(),都在 kernel/bio.c 中有定义。内核线程对某个 Block cache 实现操作后,都必须要开释它的锁 (brelse())。brelse() 不仅会开释 sleep-lock,还会将这个 Block cache 调整到 bcache.head.next 这个地位,代表它是被最近拜访过的 Block cache。

  • bread()

    • 给定一个 Block number,返回对应的上锁的 Block cache;
    • 如果命中了间接返回;
    • 如果没命中就依据 RLU 算法选中一个闲暇(refcnt 字段为 0 代表这个 Block cache 没有被任何一个内核线程正在占用)的 Block cache 后,抉择将其重用并将 valid 字段设为 0 代表这个 Block cache 没有对应 Disk block 的数据,提醒 break() 调用去从磁盘间接获取这个 Block 的数据。
    • 如果没有命中,并且没有一个闲暇的 Block cache,文件系统会间接 panic,是个比拟粗犷的解决。
  • bwrite()

    • 如果某个内核线程批改了某个 Block cache 的内容,它必须在开释这个 Block cache 的 sleep-lock 之前调用 bwrite,来将指定 Block number 的 Block cache 强制写回 Disk 中。

能够看到 bwrite() 的实现要更简略些,所以重点看 bread() 的实现。bread() 调用了一个 bget()

/* kernel/bio.c */

static struct buf*
bget(uint dev, uint blockno)
{
  struct buf *b;

  acquire(&bcache.lock);

  // Is the block already cached?
  for(b = bcache.head.next; b != &bcache.head; b = b->next){if(b->dev == dev && b->blockno == blockno){
      b->refcnt++;
      release(&bcache.lock);
      acquiresleep(&b->lock);
      return b;
    }
  }

  // Not cached.
  // Recycle the least recently used (LRU) unused buffer.
  for(b = bcache.head.prev; b != &bcache.head; b = b->prev){if(b->refcnt == 0) {
      b->dev = dev;
      b->blockno = blockno;
      b->valid = 0;
      b->refcnt = 1;
      release(&bcache.lock);
      acquiresleep(&b->lock);
      return b;
    }
  }
  panic("bget: no buffers");
}

如果两个内核线程正在并发的执行 bget(),那么为了保障只有一个过程在拜访 Buffer cache,进入函数的一开始就获取 Buffer cache 的锁。最初想要的 Block cache 后,须要获取这个 Block cache 的 Sleep lock 再返回。

/* kernel/sleeplock.c */

void
acquiresleep(struct sleeplock *lk)
{acquire(&lk->lk);
  while (lk->locked) {sleep(lk, &lk->lk);
  }
  lk->locked = 1;
  lk->pid = myproc()->pid;
  release(&lk->lk);
}

void
releasesleep(struct sleeplock *lk)
{acquire(&lk->lk);
  lk->locked = 0;
  lk->pid = 0;
  wakeup(lk);
  release(&lk->lk);
}

为什么要是 sleep-lock,跟 Buffercache 一起用个别的自旋锁不行么? 其实这里至多有两种考量:

  1. 拜访一个 Block 是个慢速操作(毫秒级),读时如果没命中须要读磁盘,如果批改了 Block cache 还要写回磁盘,跟 CPU 执行指令的速度(纳秒级)相比太慢了。于其始终自旋占用 CPU 忙等,还不如 sleep 把 CPU 资源让进来。
  2. 别忘了调度器会开释掉过程锁,所以 sleep 时候不会敞开中断,也就能响应磁盘发送的中断,这是个别自旋锁做不到的。sleep-lock 算是设计的十分精妙的一种锁了。

总的来说,bcache.lock 爱护的信息是哪些 Block 被 cached 了,而 sleep-lock 则爱护的是 Block cache 的在读写时的内容。

试验局部

Memory allocator

多个 CPU hart 共用一个在内存中的 freelist 资源,势必会造成十分重大的竞争。

因而在这个 Exercise 中,咱们须要为每一个 CPU 独自调配一个 freelist 和一个配套的 lock。

所以咱们给 kernel/kalloc.c 中的 struct kmem 扩大成一个长度为 NCPUkmem 数组即可。

之后能够通过 cpuid() 函数来获取以后是哪个 CPU hart 在做相干调配和开释操作。

特地留神的是,当某个 CPU hart 下的 freelist 空掉了之后,在 kalloc() 时,它就要去看看其它 CPU hart 下的 freelist 是否无余量,有的话就间接拿过去返回即可。

初始化 kmems 的工作咱们交给 cpuid() == 0 的 CPU hart 去做,因而一开始所有的 free page 都在该 CPU hart 下的 freelist 中。

/* kernel/kalloc.c */

struct kmem {
  struct spinlock lock;
  struct run *freelist;
};

struct kmem kmems[NCPU];

void
kinit()
{for (int ncpu = 0; ncpu < NCPU; ++ncpu)
    initlock(&kmems[ncpu].lock, "kmem");

  freerange(end, (void*)PHYSTOP);
}

void
kfree(void *pa)
{
  struct run *r;

  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");

  // Fill with junk to catch dangling refs.
  memset(pa, 1, PGSIZE);

  r = (struct run*)pa;

  push_off();

  int cid = cpuid();

  acquire(&kmems[cid].lock);
  r->next = kmems[cid].freelist;
  kmems[cid].freelist = r;
  release(&kmems[cid].lock);

  pop_off();}

void *
kalloc(void)
{
  struct run *r;

  push_off();

  int cid = cpuid();

  acquire(&kmems[cid].lock);
  r = kmems[cid].freelist;
  if(r)
    kmems[cid].freelist = r->next;
  else {for (int _cid = 0; _cid < NCPU; ++_cid) {if (_cid == cid)
        continue;

      acquire(&kmems[_cid].lock);
      r = kmems[_cid].freelist;
      if (r) {kmems[_cid].freelist = r->next;
        release(&kmems[_cid].lock);
        break;
      }
      release(&kmems[_cid].lock);
    }
  }

  release(&kmems[cid].lock);

  pop_off();

  if(r)
    memset((char*)r, 5, PGSIZE); // fill with junk
  return (void*)r;
}

Buffer cache

在原先的实现中,咱们是以全局双链表的模式来保护 Block cache 的新旧拜访水平的,即最老的块在 head 的 prev 侧,最新的块在 head 的 next 侧。

在这个 Exercise 中,为了升高竞争水平,咱们须要摒弃这种双链表的设计,转而应用桶哈希表。为什么应用哈希表能升高竞争水平?因为给一个大链表一个锁和给每个小桶一个锁的粒度相比,很显著后者锁的粒度更小,直观地减小了竞争频率。

在只有 Buffer cache 的锁和 hash bucket 的锁的前提下,假如 evict 的计划是遵循原来的,即“从所有的 Block cache 中,找到最老的且援用数为 0 的那一个”,那么这将会与这个 Exercise 的指标产生矛盾。

矛盾的起因如下:咱们称原来的 evict 计划是寻找“全局最优解”。那么当一个过程 miss 时,它就要避免任何过程尝试去拜访更新 Buffer cache 里的任一一个 Block cache,否则就会出错。例如,当过程 A 找到这个“全局最优解时”,过程 B 命中了这个 Block cache 并批改了它的 ticks 和援用数。之后两个过程将会共用一个 Block cache,这几乎是个灾难性的谬误。

在设计过程间的同步时,把需要列出来会更加清晰:

  • 当不同过程在尝试 hit 时,只有不是同一个 bucket 就能够并行执行,否则串行;
  • 当有过程执行 evict 操作时,其它所有过程都要期待。

在去理解了锁的类型后,我发现读写锁最能满足这里的需要。所以打算来本人封装一个相似读写锁的锁,于是以下是这个 Exercise 的实现:

// Buffer cache.
//
// The buffer cache is a linked list of buf structures holding
// cached copies of disk block contents.  Caching disk blocks
// in memory reduces the number of disk reads and also provides
// a synchronization point for disk blocks used by multiple processes.
//
// Interface:
// * To get a buffer for a particular disk block, call bread.
// * After changing buffer data, call bwrite to write it to disk.
// * When done with the buffer, call brelse.
// * Do not use the buffer after calling brelse.
// * Only one process at a time can use a buffer,
//     so do not keep them longer than necessary.


#include "types.h"
#include "param.h"
#include "spinlock.h"
#include "sleeplock.h"
#include "riscv.h"
#include "defs.h"
#include "fs.h"
#include "buf.h"

#define NBUCKET 13

struct entry {
  struct buf *value;
  struct entry *next;
  uint ticks;
};

struct entry entries[NBUF];

struct bucket {
  struct entry *e;
  struct spinlock lock;
};

struct bucket table[NBUCKET];

int valid_cnt = 0;

struct entry *
remove(uint blockno)
{
  int i = blockno % NBUCKET;

  struct entry **p = &table[i].e;
  struct entry *prev = 0;
  struct entry *e = table[i].e;

  for (; e; e = e->next) {if (e->value->blockno == blockno) {if (!prev)
        *p = e->next;
      else
        prev->next = e->next;
      
      return e;
    }

    prev = e;
  }

  return 0;
}


#define MAX_QUEUE_SIZE NBUF

struct entry *priority_queue[MAX_QUEUE_SIZE];

int
parent_index(int i)
{return (i-1)/2;
}

int
left_child_index(int i)
{return (2*i)+1;
}

int
right_child_index(int i)
{return (2*i)+2;
}

void
swap_element(int i1, int i2)
{struct entry *temp = priority_queue[i1];
  priority_queue[i1] = priority_queue[i2];
  priority_queue[i2] = temp;
}

uint current_queue_size = 0;

int
queue_push(struct entry *e)
{if (current_queue_size == MAX_QUEUE_SIZE) return 1;

  int current_idx = current_queue_size;
  int parent_idx = parent_index(current_idx);
  priority_queue[current_idx] = e;

  while (current_idx != 0 && priority_queue[current_idx]->ticks < priority_queue[parent_idx]->ticks) {swap_element(current_idx, parent_idx);
    current_idx = parent_idx;
    parent_idx = parent_index(current_idx);
  }

  ++current_queue_size;

  return 0;
}

struct entry *
queue_pop()
{if (current_queue_size == 0) return 0;

  struct entry *top_element = priority_queue[0];
  int current_idx = 0;
  swap_element(--current_queue_size, 0);

  int left_child_idx;
  int right_child_idx;
  int min_child_idx;
  uint left_child_ticks;
  uint right_child_ticks;
  uint current_ticks;

  while (1) {left_child_idx = left_child_index(current_idx);
    right_child_idx = right_child_index(current_idx);
    
    if (left_child_idx >= current_queue_size) 
      break;

    left_child_ticks = priority_queue[left_child_idx]->ticks;
    current_ticks = priority_queue[current_idx]->ticks;

    if (right_child_idx >= current_queue_size) {if (left_child_ticks < current_ticks)
        swap_element(left_child_idx, current_idx);
      break;
    }

    right_child_ticks = priority_queue[right_child_idx]->ticks;

    min_child_idx = left_child_ticks < right_child_ticks ? left_child_idx : right_child_idx;
    min_child_idx = priority_queue[min_child_idx]->ticks < current_ticks ? min_child_idx : current_idx;

    if (min_child_idx == current_idx)
      break;
    else {swap_element(min_child_idx, current_idx);
      current_idx = min_child_idx;
    }
  }

  return top_element;
}

void
queue_clear()
{current_queue_size = 0;}


struct {
  struct spinlock lock;
  struct buf buf[NBUF];
} bcache;


struct rwlock {
  uint evicting;
  uint hitting;
  struct spinlock *lock;
};

struct rwlock evict_lock;
struct spinlock _evict_lock;

void
initrwlock(struct rwlock *rwlk, struct spinlock *splk)
{
  rwlk->lock = splk;
  rwlk->hitting = 0;
  rwlk->evicting = 0;
}

void
acquirerwlock(struct rwlock *rwlk, int hit)
{acquire(rwlk->lock);
  if (hit) {while (rwlk->evicting == 1) {release(rwlk->lock);
      acquire(rwlk->lock);
    }
    ++rwlk->hitting;
  } else {while (rwlk->evicting == 1 || rwlk->hitting != 0) {release(rwlk->lock);
      acquire(rwlk->lock);
    }
    rwlk->evicting = 1;
  }
  release(rwlk->lock);
}

void
releaserwlock(struct rwlock *rwlk, int hit)
{acquire(rwlk->lock);
  
  if (hit) {--rwlk->hitting;} else {rwlk->evicting = 0;}

  release(rwlk->lock);
}

void
binit(void)
{memset(entries, 0, sizeof(struct entry)*NBUF);
  memset(table, 0, sizeof(struct bucket)*NBUCKET);
  memset(bcache.buf, 0, sizeof(struct buf)*NBUF);

  initlock(&bcache.lock, "bcache");
  initlock(&_evict_lock, "bcache-evict");
  initrwlock(&evict_lock, &_evict_lock);

  struct buf *b;
  for (b = bcache.buf; b < bcache.buf+NBUF; ++b)
    initsleeplock(&b->lock, "buffer");

  struct bucket *bkt;
  for (bkt = table; bkt < table+NBUCKET; ++bkt)
    initlock(&bkt->lock, "bcache-bucket");
}

// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{

  // Is the block already cached?
  struct buf *b;
  int nbucket = blockno%NBUCKET;
  struct entry *e;

  acquirerwlock(&evict_lock, 1);
  acquire(&table[nbucket].lock);
    
  for (e = table[nbucket].e; e; e = e->next) {
    b = e->value;
    if (b->dev == dev && b->blockno == blockno) {
      ++b->refcnt;

      release(&table[nbucket].lock);
      releaserwlock(&evict_lock, 1);
      acquiresleep(&b->lock);
      return b;
    }
  }


  release(&table[nbucket].lock);
  releaserwlock(&evict_lock, 1);


  // ------------------------------------------------------------

  // Not cached.
  // Recycle the least recently used (LRU) unused buffer.
  acquirerwlock(&evict_lock, 0);

  if (valid_cnt < NBUF) {b = &bcache.buf[valid_cnt];
    b->dev = dev;
    b->blockno = blockno;
    b->valid = 0;
    b->refcnt = 1;

    e = entries + valid_cnt;
    e->value = b;
    e->next = table[nbucket].e;
    table[nbucket].e = e;
    
    ++valid_cnt;

    releaserwlock(&evict_lock, 0);
    acquiresleep(&b->lock);
    return b;
  }


  // -------------------------------------------------------

  for (e = entries; e < &entries[NBUF]; ++e)
    queue_push(e);

  while ((e = queue_pop()) != 0) {
    b = e->value;
    int _nbucket = b->blockno % NBUCKET;      

    if (b->refcnt == 0) {queue_clear();

      if (_nbucket != nbucket) {remove(b->blockno);
        e->next = table[nbucket].e;
        table[nbucket].e = e;
      }

      b->dev = dev;
      b->blockno = blockno;
      b->valid = 0;
      b->refcnt = 1;

      releaserwlock(&evict_lock, 0);
      acquiresleep(&b->lock);

      return b;
    }
  }

  panic("bget: no buffers");
}

// Return a locked buf with the contents of the indicated block.
struct buf*
bread(uint dev, uint blockno)
{
  struct buf *b;

  b = bget(dev, blockno);
  if(!b->valid) {virtio_disk_rw(b, 0);
    b->valid = 1;
  }
  return b;
}

// Write b's contents to disk.  Must be locked.
void
bwrite(struct buf *b)
{if(!holdingsleep(&b->lock))
    panic("bwrite");
  virtio_disk_rw(b, 1);
}

// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{if(!holdingsleep(&b->lock))
    panic("brelse");

  acquirerwlock(&evict_lock, 0);
  int nbucket = b->blockno % NBUCKET;
  
  for (struct entry *e = table[nbucket].e; e; e = e->next)
    if (e->value->blockno == b->blockno) {
      e->ticks = ticks;
      break;
    }

  --b->refcnt;
  releaserwlock(&evict_lock, 0);

  releasesleep(&b->lock);
}

void
bpin(struct buf *b) {
  int nbucket = b->blockno % NBUCKET;
  acquire(&table[nbucket].lock);
  ++b->refcnt;
  release(&table[nbucket].lock);
}

void
bunpin(struct buf *b) {
  int nbucket = b->blockno % NBUCKET;
  acquire(&table[nbucket].lock);
  --b->refcnt;
  release(&table[nbucket].lock);
}


一开始天真地用 malloc 调配 entry 内存,起初忽然意识到内核内存布局里如同没有像用户过程一样的堆,内核只能一次性调配整整一页大小的内存块。但幸好 entry 数量和 Buffer cache 的数量是一一对应的,因而能够间接动态调配,于是又改了原来的实现,不然我还临时真想不到该怎么动态分配 entry。

对于找到最老的且援用数为 0 的 Block cache 的办法,无论是用哪种排序算法(冒泡、归并、快排)都能够。我这里用优先队列是因为实现起来比较简单,而且开销不算太大(但举荐最好是开一个全局的排序数组,把 entries 快排更简洁一些,不同写入队和出队的操作)。在其它高级语言中都有内置的库去提供优先队列的实现,但遗憾的是在 C 只能反复造轮子了,须要留神的是这个优先队列是须要小顶堆实现的,也算是顺带复习数据结构的常识了。

我选了 13 作为哈希桶的数量,因为提醒说桶的数量尽量是个质数。为什么最好是质数,我在参考链接附上了这个问题的答案。

初始化完结后,咱们的哈希表中没有任何一个 entry,所以一个 bget() 一个都命中不了。这时依照原来的逻辑,程序将会去剔除掉一个援用数为 0 的 Block cache,但因为哈希表是空的,程序会认为剔除失败,间接 panic。这显然不合理,因为咱们连一个 Block cache 都没有用,所以在剔除之前咱们须要查看是否所有的 Block cache 都是 valid 的。随着零碎运行,Buffer cache 很快就会被占满,所以这一步查看执行不了多少次。

重点来了! 在运行 bcachetest 后,test0 和 test1 都能顺利执行,没有呈现任何 panic 和死锁,但 test0 不意外地挂掉了。test0 给出咱们的读写锁一共被竞争了 80 多万次,这里 test0 的指标(小于 500 次)可是差了将近 1000 倍。

咱们无妨计算一下从上电到 test0 完结的这段时间内的 Buffer cache 的命中率。我用了两个计数器变量去记录这里的后果,结果显示 bget() 一共被申请了将近 32000 次,而其中 evict 操作则进行了不到 90 次,命中率高达 99.72%。均匀 357 次申请中才有一次 evict 申请。很显著,一次 evict 就会自旋很久,大部分的竞争次数都是从这儿来的。那好办,咱们就无妨让申请 evict 的过程 sleep 过来,等到没有其它 bget() 申请时再 wakeup 它不就行了么。所以上面这个第二版是改良后的读写锁的获取和开释操作:

void
acquirerwlock(struct rwlock *rwlk, int hit)
{acquire(rwlk->lock);
  if (hit) {while (rwlk->evicting == 1)
      sleep(rwlk, rwlk->lock);
    
    ++rwlk->hitting;
  } else {while (rwlk->evicting == 1 || rwlk->hitting != 0)
      sleep(rwlk, rwlk->lock);

    rwlk->evicting = 1;
  }

  release(rwlk->lock);
}

void
releaserwlock(struct rwlock *rwlk, int hit)
{acquire(rwlk->lock);
  if (hit) {if (--rwlk->hitting == 0)
      wakeup(rwlk->lock);
  } else {
    rwlk->evicting = 0;
    wakeup(rwlk->lock);
  }

  release(rwlk->lock);
}

当然,如果你感觉 hit 操作没必要睡你也能够让它自旋。但无论哪个版本,在运行 bcachetest 后,程序间接卡住不动了。hit 操作是很频繁的,这使得没有过程有机会去进行 evict 操作。就算以后没有进行 hit 操作的过程,唤醒而后调度执行的开销是很大的,这期间很有可能又有新的 hit 过程进来,等到 evict 过程醒过来就发现 rwlk->hittng != 0 就又睡了。这里我查看了很多遍没有发现死锁的可能性,很可能是最初大部分过程都没有命中本人想要 Block cache 就都睡掉了,之后也没有过程去唤醒它们。

咱们须要去更新策略,要让 evcit 操作的优先级比 hit 操作的优先级更高,即来了一个 evict 操作,如果以后有正在 hit 操作,等到 hit 操作执行结束后立刻响应这个 evict 操作,上面这是第三版改良后的读写锁的获取和开释操作,咱们为读写锁减少了一个 evicted 字段:

struct rwlock {
  uint evicting;
  uint hitting;
  uint evicted;
  struct spinlock *lock;
};

struct rwlock evict_lock;
struct spinlock _evict_lock;

void
initrwlock(struct rwlock *rwlk, struct spinlock *splk)
{
  rwlk->lock = splk;
  rwlk->hitting = 0;
  rwlk->evicting = 0;
  rwlk->evicted = 1;
}

void
acquirerwlock(struct rwlock *rwlk, int hit)
{acquire(rwlk->lock);
  if (hit) {while (rwlk->evicting != 0)
      sleep(rwlk, rwlk->lock);
    ++rwlk->hitting;

  } else {
    ++rwlk->evicting;
    while (rwlk->evicted == 0 || rwlk->hitting != 0) 
      sleep(rwlk, rwlk->lock);
    rwlk->evicted = 0;
  }
  release(rwlk->lock);
}

void
releaserwlock(struct rwlock *rwlk, int hit)
{acquire(rwlk->lock);

  if (hit) {if (--rwlk->hitting == 0 && rwlk->evicting == 0)
      wakeup(rwlk);
  } else {
    --rwlk->evicting;
    rwlk->evicted = 1;
    wakeup(rwlk);
  }

  release(rwlk->lock);
}

bcachetest 运行顺利,而咱们的竞争次数也翻新高,达到了 tot= 50236847 这个前所未有的数字。我人都傻了,因为后面剖析的和理论不符。把第三版改成全副自旋,竞争次数还有 tot= 487775 这个多;改成 hit 自旋、evict 互斥时,竞争次数为 tot= 49768743;改成 evict 自旋,hit 互斥时,竞争次数为 tot= 22981816

到这里我真的是万策尽了,终于切实忍不住看了看其他人写的对于这个试验的文章,反正至多我目前看到的当中,都是采纳了“部分最优解”的办法通过测试的,尽管我没有找到在找到“全局最优解”的状况下通过测试,但我很难认同这么做就是正确的。因而到这里我只能提供一个思路,本人一个人做这种试验做到这里曾经是极限了。

后记

这个锁试验是真的难,不晓得是不是我做难了还是有更简略的办法。之前的 Hard 的 Exercise 都是理论编码量不多,但比拟难想进去。但这次试验的 Exercise 2 写了三百行左右,还特地难调试,想了好久好久(断断续续大略有三天工夫)。期间我两头尝试过用各种计划,先是自旋锁、再是互斥锁、而后读写锁(自旋锁实现),最初在是读写锁(互斥锁实现),还有其它的一些 Bug。这期间竞争频数有过 80 多万次、2000 多万次、1 万屡次、5000 多万次等,切实是考验人的急躁。我没把这些所有都记录下来是因为太多了……

参考链接

  • MIT 6.S081 Book
  • 算法剖析:哈希表的大小为何是素数
正文完
 0