课程相关的资源、视频和教材见Lab util: Unix utilities开头。

全乱了全乱了,这Lab顺序… 不管了,先做了再说

准备

对应课程

本次的作业是Lab Lock,我们将重新设计代码,使得程序获得更高的并行性能,这里我们主要将关注内存分配器和块缓存。

网课和Lab顺序相互错开,按照课程进度,我们需要看掉课程的Lecture10,也就是B站课程的P9,同时阅读PDF中的第六章Locking。

另外在本次的Lab的第二部分磁盘缓冲加速中,可能需要我们简单了解部分文件系统的知识,请看Lecture 14,也就是B站课程的P13,并阅读PDF中的第八章File system中的8.1~8.3章节,了解磁盘缓存块的组织形式。

系统环境

使用Arch Linux虚拟机作为实验环境…

环境依赖配置,编译使用make qemu等,如遇到问题,见第一次的Lab util: Unix utilities

使用准备

参考本次的实验说明书:Lab lock: Parallelism/locking

从仓库clone下来课程文件:

1
git clone git://g.csail.mit.edu/xv6-labs-2021

切换到本次作业的lock分支即可:

1
2
cd xv6-labs-2021
git checkout lock

在作业完成后可以使用make grade对所有结果进行评分。

题目

内存分配器加速

要求和提示

user/kalloctest.c下我们实现了一个测试,在这个测试中针对xv6的内存分配器进行了重点测试,使用了三个进程大量的对地址空间进行增加和缩小,导致大量对kallockfree 的调用,其中kallockfree在执行时均需要获取锁kmem.lock。在这里我们使用kalloctest打印了由于另一进程已经持有了锁而在尝试自旋获取的次数,在这里使用#fetch-and-add打印进行表示。需要指出的是,自旋的次数只能作为锁冲突的一种粗略的衡量。如果我们还没有对当前的Lab做出修改,我们执行kalloctest会得到如下类似的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: bcache: #fetch-and-add 0 #acquire() 1260
--- top 5 contended locks:
lock: kmem: #fetch-and-add 83375 #acquire() 433015
lock: proc: #fetch-and-add 23737 #acquire() 130718
lock: virtio_disk: #fetch-and-add 11159 #acquire() 114
lock: proc: #fetch-and-add 5937 #acquire() 130786
lock: proc: #fetch-and-add 4080 #acquire() 130786
tot= 83375
test1 FAIL

可以看到acquire针对每一个锁,打印出了当前锁的调用次数(#acquire()后),以及为了获取锁失败而产生的自旋次数(#fetch-and-add()后)。kalloctest通过调用系统调用,要求内核为kmembcache打印相关的计数信息,以及5个竞争最激烈的锁。如果发生了锁竞争,那么自旋的次数就会很大。由于在这部分实验中,我们比较关心kmembcache两个锁的性能,系统调用最后会返回这两个锁上发生的自旋次数的总和。

需要注意到的是,为了完成当前的Lab,我们需要使用一台多核心独立无负载的机器。如果使用的机器上面有其他的工作负载,kalloctest打印出的计数次数就会失去意义。

现在让我们来关注代码,在kalloctest中产生大量锁冲突的根本原因是kalloc()只有一个空闲列表。为了尽可能的规避锁冲突,我们需要重新设计内存分配器来避免一个唯一的锁和空闲列表。有一个很简单的想法,我们在每个CPU核心中都维护一个空闲列表,每个空闲列表有自己的锁。这样下来,在不同CPU下的内存分配和释放就可以并行运行,因为每个CPU都操作的是不同的空闲列表。这个想法实现下来最大的困难是,如何处理如下情形,一个CPU的空闲列表已经空了,但是另一个CPU仍然有空闲的内存。为了处理这样的情形,我们的CPU需要从别的CPU的空闲列表中“偷出”一部分空闲内存。这种“偷窃内存”也有可能引入锁冲突,但是希望这样会少频繁一些。

我们需要做的事情总结下来就是为每个CPU维护空闲列表,并当空闲列表空时从其他CPU处“偷取”空余内存。也就是说,我们需要为我们的每一个列表的锁都调用initlock,并将名字设置为kmem开头。我们需要运行kalloctest来查看我们的实现是否减少了锁的竞争,此外为了确保实现没有问题我们还能正确分配所有的内存,我们需要运行usertests sbrkmuch测试。我们需要通过test1test2测试,测试的相关输出示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 42843
lock: kmem: #fetch-and-add 0 #acquire() 198674
lock: kmem: #fetch-and-add 0 #acquire() 191534
lock: bcache: #fetch-and-add 0 #acquire() 1242
--- top 5 contended locks:
lock: proc: #fetch-and-add 43861 #acquire() 117281
lock: virtio_disk: #fetch-and-add 5347 #acquire() 114
lock: proc: #fetch-and-add 4856 #acquire() 117312
lock: proc: #fetch-and-add 4168 #acquire() 117316
lock: proc: #fetch-and-add 2797 #acquire() 117266
tot= 0
test1 OK
start test2
total free number of pages: 32499 (out of 32768)
.....
test2 OK
$ usertests sbrkmuch
usertests starting
test sbrkmuch: OK
ALL TESTS PASSED
$ usertests
...
ALL TESTS PASSED
$

相关提示:

  • 我们可以使用kernel/param.h下的NCPU常量表示CPU数量。
  • 使用freerange为当前运行freerange的CPU提供所有空余内存。
  • 函数cpuid可以返回当前的CPU核心编号,但是只有在中断处于关闭状态时调用才是安全的,我们需要使用push_off()pop_off()来控制中断的关和开。
  • 可以看看在kernel/sprintf.c下是怎么实现格式化字符串的。当然,我们将空闲列表锁的名称设置为kmem也是完全可以的。

思路分析

别的问题这里我都不想多提了,上面的提示给的很清楚。

但是在cpuid()这一块,我是这样理解的,我们需要使用push_off()pop_off()来屏蔽中断,防止由于定时器中断当前进程发生CPU切换,不然cpuid()返回的核心编号我们也不知道是何时的,那么我们也不知道当时调用kalloc或者kfree的究竟是哪个CPU核心。

但是一旦我们获取了对应的CPU核心编号,我们就不必将对该CPU所属的空闲列表的操作局限在当前CPU下。由于已经确定了编号和对应空闲列表,如果发生定时器中断交由其他CPU操作空闲列表也是完全可以接受的。因此我们只需要将cpuid()前后使用push_off()pop_off()包裹应该就可,并不需要包裹空闲列表的相关操作。

空闲列表kmem修改

由于现在我们引入了面向NCPU个CPU核心的空闲列表,因此我们有必要将空闲列表有单一的一个升级为数组:

1
2
3
4
5
6
7
8
struct run {
struct run *next;
};

struct {
struct spinlock lock;
struct run *freelist;
} kmem[NCPU];

现在我们就可以通过下标来访问对应CPU的空闲列表了。

内存初始化kinit配置

根据要求,我们需要为每一个CPU设置一个空闲列表,同时根据提示自行将锁的名称设置为kmem开头的名称。那其实我们也只需要稍作微调即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# define LOCK_NAME_LEN 16
char kmem_lock_name[NCPU][LOCK_NAME_LEN];

void
kinit()
{
for (int i = 0; i < NCPU; i++) {
// Call initlock for each of your locks, and pass a name that starts with "kmem".
char *name = kmem_lock_name[i];
snprintf(name, LOCK_NAME_LEN - 1, "kmem_cpu_%d", i);
initlock(&kmem[i].lock, name);
}
// Let freerange give all free memory to the CPU running freerange.
freerange(end, (void *)PHYSTOP);
}

这里使用到了kernel/sprintf.c下的snprintf()函数,其实就是一个字符串格式化函数,可以去了解一下。简单的来说就是为NCPU个CPU核心分别初始化了空闲列表的锁。

最后我们调用freerange,按照提示,我们直接将当前可用的所有物理内存划给调用freerange的CPU,相关代码我们在下部分展示。

内存初始化freerange配置

freerange中,根据提示,我们需要将所有的可用内存分配给当前调用freerange的CPU。

那么我们直接通过cpuid()获得调用的CPU核心编号即可,从pa_startpa_end位置逐页使用kfree函数将每个内存页加入到当前CPU的空闲列表中去:

1
2
3
4
5
6
7
8
9
10
11
void
freerange(void *pa_start, void *pa_end)
{
char *p;
p = (char*)PGROUNDUP((uint64)pa_start);
push_off();
int cpu = cpuid();
pop_off();
for(; p + PGSIZE <= (char*)pa_end; p += PGSIZE)
kfree_cpu(p, cpu);
}

注意到的是,在这里我没有使用kfree而是另外创建了一个名为kfree_cpu的函数,强行指定了CPU编号及需要操作的空闲列表(主要是考虑到在切换到其他CPU核心后仍然可以正确操作对应空闲列表),内部的函数功能则和kfree基本相似:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Free the page of physical memory pointed at by v,
// add the page to CPU's list with given cpuid.
void
kfree_cpu(void *pa, int cpuid)
{
struct run *r;

if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
panic("kfree");

// Fill with junk to catch dangling refs.
memset(pa, 1, PGSIZE);

r = (struct run*)pa;

acquire(&kmem[cpuid].lock);
r->next = kmem[cpuid].freelist;
kmem[cpuid].freelist = r;
release(&kmem[cpuid].lock);
}

由于我们加入了新的kfree_cpu,因此我们也需要在kernel/defs.h中声明一下:

1
2
3
4
5
// kalloc.c
void* kalloc(void);
void kfree(void *);
void kinit(void);
void kfree_cpu(void *, int);

内存释放kfree实现

按照要求,我们的kfree需要实现的就是将内存页加入到当前CPU所属的空闲列表中,考虑到我们刚才“魔改”的kfree_cpu函数,我们实现kfree并不需要太多代码:

1
2
3
4
5
6
7
8
// Free the page of physical memory pointed at by pa,
// add the page to current CPU's free list
void kfree(void *pa) {
push_off();
int cpu = cpuid();
pop_off();
kfree_cpu(pa, cpu);
}

内存分配kalloc实现

按照要求,我们先查看当前CPU所属的空闲列表,看看有没有有效节点可以直接从头上拿出来。如果没有,那就遍历着去看其他CPU所属的空闲列表,如果空闲列表中有节点那就直接“偷”出来。

需要注意的是,在查看各个CPU的空闲列表操作时需要及时上锁以防止可能的竞争问题,但是加锁也要避免交叉上锁不然有可能死锁。因此我们不能在没有释放当前CPU空闲列表锁的情况下就尝试获取其他CPU空闲列表锁,这是一种加锁顺序交错导致的经典死锁情形。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
struct run *r;

push_off();
int cpu = cpuid();
pop_off();
acquire(&kmem[cpu].lock);
r = kmem[cpu].freelist;
if(r) { // if current cpu freelist is not empty
kmem[cpu].freelist = r->next;
release(&kmem[cpu].lock);
} else { //else steal from other cpu's freelist
release(&kmem[cpu].lock);
for (int i = 0; i < NCPU; i++) {
// traverse each cpu's freelist except current one
if (i == cpu) {
continue;
}
acquire(&kmem[i].lock);
r = kmem[i].freelist;
if(r) {
// if node found, no need to search for next cpu's freelist, just break
kmem[i].freelist = r->next;
release(&kmem[i].lock);
break;
}
release(&kmem[i].lock);
}
}

if(r)
memset((char*)r, 5, PGSIZE); // fill with junk
return (void*)r;
}

测试

现在我们在主目录下运行make clean; make qemu,然后在xv6下尝试运行kalloctestusertests sbrkmuch,看一下结果:

各部分测试都成功通过了,看起来没啥问题

磁盘缓存加速

要求和提示

在本部分中实验与上半部分互相独立,如果愿意也可以先做这部分,前部分结果并不会影响当前部分的完成。

当多个进程尝试大量的使用文件系统时,锁bcache.lock就会产生大量的冲突,这个锁的功能就是在kernel/bio.c中保护磁盘块缓存。我们的测试程序bcachetest会创建多个进程,并不断的读取不同的文件来触发锁bcache.lock上的冲突,在我们还没有完成Lab前,bcachetest输出类似如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 33035
lock: bcache: #fetch-and-add 16142 #acquire() 65978
--- top 5 contended locks:
lock: virtio_disk: #fetch-and-add 162870 #acquire() 1188
lock: proc: #fetch-and-add 51936 #acquire() 73732
lock: bcache: #fetch-and-add 16142 #acquire() 65978
lock: uart: #fetch-and-add 7505 #acquire() 117
lock: proc: #fetch-and-add 6937 #acquire() 73420
tot= 16142
test0: FAIL
start test1
test1 OK

具体输出的数字可能有所差异,但是肯定可以看到bcache锁由于不断acquire产生的自旋尝试次数会变得很高。在kernel/bio.c中,我们可以看到bcache.lock保护了缓存块列表,包括每个块中的引用数b->refcnt以及缓存块的信息(b->devb->blockno):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct {
struct spinlock lock;
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
} bcache;

struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
};

我们的目标是修改缓存块使得在bcachetest测试中,由于对尝试锁acquire产生的自旋次数尽可能的少且接近零。理想状态下,块缓存中涉及所有锁的尝试自旋次数之和应该为0,但是总和小于500也是可以接受的。我们需要修改bgetbrelse,使得bcache中的涉及不同缓存块的查找和释放时更不容易在锁上发生冲突(例如,不需要所有操作都等待bcache.lock)。同时,我们需要确保每个磁盘块只会有一份缓存。

当完成时,我们的输出示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
$ bcachetest
start test0
test0 results:
--- lock kmem/bcache stats
lock: kmem: #fetch-and-add 0 #acquire() 32954
lock: kmem: #fetch-and-add 0 #acquire() 75
lock: kmem: #fetch-and-add 0 #acquire() 73
lock: bcache: #fetch-and-add 0 #acquire() 85
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4159
lock: bcache.bucket: #fetch-and-add 0 #acquire() 2118
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4274
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4326
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6334
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6321
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6704
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6696
lock: bcache.bucket: #fetch-and-add 0 #acquire() 7757
lock: bcache.bucket: #fetch-and-add 0 #acquire() 6199
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4136
lock: bcache.bucket: #fetch-and-add 0 #acquire() 4136
lock: bcache.bucket: #fetch-and-add 0 #acquire() 2123
--- top 5 contended locks:
lock: virtio_disk: #fetch-and-add 158235 #acquire() 1193
lock: proc: #fetch-and-add 117563 #acquire() 3708493
lock: proc: #fetch-and-add 65921 #acquire() 3710254
lock: proc: #fetch-and-add 44090 #acquire() 3708607
lock: proc: #fetch-and-add 43252 #acquire() 3708521
tot= 128
test0: OK
start test1
test1 OK
$ usertests
...
ALL TESTS PASSED
$

initlock中,我们需要给我们创建的每个锁一个名字,以bcache开头,。

相比刚才的kalloc内存分配,这里处理减少对于缓存块的竞争问题更加棘手,因为bcache缓冲区是在进程、CPU之间共享的。对于kalloc来说,我们可以为每个CPU安排一个空闲列表来解决,在这里却是行不通的。我们需要使用一个哈希表来查询块编号,并给每一个bucket桶设置一把锁。

这些情况下,如果发生锁冲突我们认定是可以接受的:

  • 当两个进程同时操作相同的块编号时,在bcachetest中的test0我们不会这样测试。
  • 当两个进程同时没有命中缓存,并需要寻找一块没有被使用的缓存块进行替换时。我们在bcachetest中的test0下也不会这样测试。
  • 当两个进程操作的块编号在你用来划分区块和锁的方案中发生冲突时;例如,两个进程给定的块编号映射到了同一个哈希表的bucket桶中。这是完全有可能的,bcachetest中的test0有可能会这样做,这完全取决于我们的设计,但我们应该尝试调整方案的细节以避免冲突(例如,改变哈希表的大小)。

test0相比之下,test1中将测试操作更多不同的块编号,并大量使用文件系统的代码路径。

相关提示:

  • 请阅读xv6教材PDF中关于块缓存的描述(见8.1~8.3节)
  • 为哈希表设置固定数量的桶是可以的,可以不去动态调整哈希表的大小。另外可以使用质数数量的桶(例如13)来减少散列冲突。
  • 在哈希表中搜索块编号,如果没有命中我们再重新分配条目,这两个操作需要是原子的。
  • 我们需要删除所有缓存块列表(bcache.head等),改为使用缓冲区的最后时间(即使用kernel/trap.c下的ticks来记录时间戳)。这样修改后,brelse就不需要获取bcache锁,bget也可以根据时间戳选择最久没有使用的块。
  • bget中序列化地进行驱逐是可行的(即bget中选择缓存块的部分,当查找缓存未命中时可以选择一个缓存块重新使用)。至于为什么要将驱逐操作序列化,可以看我在bget中的分析。
  • 我们的答案在一些情形下需要使用两把锁;例如,在驱逐缓存块期间,我们可能需要持有bcache和哈希表bucket下的锁。请确保不会发生死锁。
  • 当我们尝试替换一个缓存块,我们可能需要将一个缓冲块从一个桶bucket转移到另一个桶bucket。我们可能会遇到棘手的问题,也就是新的块可能哈希到与旧的块相同的桶,务必注意不要发生死锁。
  • 一些调试小技巧:实现bucket锁的时候,可以先在bget的开始和结束位置留下全局的bcache.lock的加锁解锁。当我们尝试解决了竞争问题,尝试运行测试的时候再去掉全局锁,处理并发问题。我们也可以使用make CPUS=1 qemu来使用单独一个核心来进行测试。

思路分析

我们简单的浏览kernel/bio.c相关代码,不难看出bcache的组织形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct {
struct spinlock lock;
struct buf buf[NBUF];

// Linked list of all buffers, through prev/next.
// Sorted by how recently the buffer was used.
// head.next is most recent, head.prev is least.
struct buf head;
} bcache;

struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf *prev; // LRU cache list
struct buf *next;
uchar data[BSIZE];
};

其实就是一个在空间中划出了一片缓存块列表,但是我们平时操作时不是通过buf数组,而是通过LRU链表来完成的。在binit中,我们可以看到链表被初始化为了一个双向的循环链表,链表的尾部接在了其头部,同时其头部只是一个哨兵节点用于定位用的,遍历的时候不会去访问head

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void
binit(void)
{
struct buf *b;

initlock(&bcache.lock, "bcache");

// Create linked list of buffers
bcache.head.prev = &bcache.head;
bcache.head.next = &bcache.head;
for(b = bcache.buf; b < bcache.buf+NBUF; b++){
b->next = bcache.head.next;
b->prev = &bcache.head;
initsleeplock(&b->lock, "buffer");
bcache.head.next->prev = b;
bcache.head.next = b;
}
}

brelse中,如果当前缓存块引用数归零,我们就将其回收,将节点重新放置到head的后面:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Release a locked buffer.
// Move to the head of the most-recently-used list.
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

acquire(&bcache.lock);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
b->next->prev = b->prev;
b->prev->next = b->next;
b->next = bcache.head.next;
b->prev = &bcache.head;
bcache.head.next->prev = b;
bcache.head.next = b;
}

release(&bcache.lock);
}

而在bget中,我们首先遍历链表,查找有没有信息一致直接命中的缓存块。如果没有,再从尾部向头部遍历,这样也符合LRU的原则,越在后部的未使用缓存块越久没被使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b;

acquire(&bcache.lock);

// Is the block already cached?
for(b = bcache.head.next; b != &bcache.head; b = b->next){
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}

// Not cached.
// Recycle the least recently used (LRU) unused buffer.
for(b = bcache.head.prev; b != &bcache.head; b = b->prev){
if(b->refcnt == 0) {
b->dev = dev;
b->blockno = blockno;
b->valid = 0;
b->refcnt = 1;
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}
panic("bget: no buffers");
}

根据上面的要求和提示信息,遍历链表花费的时间太长,而且每次扫描整个的链表以及修改的过程都会大量涉及加锁,性能开销很大。因此我们引入哈希表,通过数组链表的方式来减缓冲突,通过这样的办法我们就可以把加锁的粒度缩小到单个bucket桶,从而提升性能。

由于弃用了LRU双向链表,根据提示建议,在哈希表中我们需要为bucket下单向链表中的每一个节点设置时间戳信息,以方便在LRU过程中根据时间戳判断最久未使用的节点。

在哈希算法上直接使用取模函数即可,bucket数量我们也使用质数,例如13个。

bcache缓存结构修改

我们现在要放弃bcache中原先的双向链表结构,转而使用哈希表。因此,我们需要对bcache以及buf等数据结构进行微调。

哈希表中我们使用的是数组单向链表,所以buf中删去prev前节点,并添加新的时间戳标志timestamp

1
2
3
4
5
6
7
8
9
10
11
struct buf {
int valid; // has data been read from disk?
int disk; // does disk "own" buf?
uint dev;
uint blockno;
struct sleeplock lock;
uint refcnt;
struct buf *next;
uchar data[BSIZE];
uint timestamp;
};

bcache中,我们需要重新构建哈希表,因此我们定义bucket对象为:

1
2
3
4
struct bucket {
struct spinlock lock;
struct buf head;
};

其中包含bucket对应的锁,以及链表的头节点。

接下来我们尝试修改bcache,将建立的哈希表放进去:

1
2
3
4
5
6
#define NBUCKET 13
struct {
struct spinlock lock;
struct buf buf[NBUF];
struct bucket hashtable[NBUCKET];
} bcache;

binit初始化实现

首先我们要设置哈希的函数,将每一个块编号映射到对应的桶里面:

1
2
3
int hash(uint dev, uint blockno) {
return (dev + blockno) % NBUCKET;
}

接下来我们开始实现初始化,我们可以考虑根据在bcachebuf[]数组的下标将空闲块均匀的打乱在各个bucket中,每次使用头插法将节点插在头部:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#define NBUCKET 13
#define LOCK_NAME_LEN 20
char bucket_lock_name[NBUCKET][LOCK_NAME_LEN];

void
binit(void)
{
struct buf *b;
struct bucket *bucket;

initlock(&bcache.lock, "bcache_evict");
for (int i = 0; i < NBUCKET; i++) {
char *name = bucket_lock_name[i];
snprintf(name, LOCK_NAME_LEN - 1, "bcache_bucket_%d", i);
initlock(&bcache.hashtable[i].lock, name);
}
b = bcache.buf;
for (int i = 0; i < NBUF; i++) {
initsleeplock(&bcache.buf[i].lock, "buffer");
bucket = bcache.hashtable + (i % NBUCKET);
b->next = bucket->head.next;
bucket->head.next = b;
b++;
}
}

brelse释放缓存块实现

我们先处理简单的释放缓存块逻辑,我们只需要在刚才的基础上稍加修改即可,每次不必去锁bcache的大锁,而是看bucket上的小锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Release a locked buffer and update the timestamp
void
brelse(struct buf *b)
{
if(!holdingsleep(&b->lock))
panic("brelse");

releasesleep(&b->lock);

int index = hash(b->dev, b->blockno);
acquire(&bcache.hashtable[index].lock);
b->refcnt--;
if (b->refcnt == 0) {
// no one is waiting for it.
b->timestamp = ticks;
}
release(&bcache.hashtable[index].lock);
}

bpin/bunpin计数加减实现

这块原理同上brelse,我们每次直接锁对应bucket即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void
bpin(struct buf *b) {
int index = hash(b->dev, b->blockno);
acquire(&bcache.hashtable[index].lock);
b->refcnt++;
release(&bcache.hashtable[index].lock);
}

void
bunpin(struct buf *b) {
int index = hash(b->dev, b->blockno);
acquire(&bcache.hashtable[index].lock);
b->refcnt--;
release(&bcache.hashtable[index].lock);
}

bget获取缓存块实现

接下来就是重点实现bget函数,首先我们查看在对应的bucket中当前的块是否被缓存,如果被缓存则直接返回;否则我们尝试查找一个块并将其驱逐,查找可用的块我们需要遍历各个bucket根据时间戳找到最久未使用也就是时间戳最小的空闲块。

这就给我一个年纪轻轻的强迫症带来了莫大的痛苦,脑子里疯狂脑补并发不安全和死锁情况,主要有一对矛盾很难解除。假设我们目前正在操作第i号bucket,由于没有命中缓存,我们需要根据LRU原则扫描第j号bucket:

  • 我们应当在给j号bucket加锁前,给i号bucket解锁,不然套娃上锁情况下,不正确的加锁顺序可能会导致死锁。例如另外一个进程目前持有j的锁,正在申请i的锁;当前进程持有i的锁,正在申请j的锁。
  • 另一方面,给i解锁又是万万不得的,因为当我们之后解锁出去在为i号bucket寻找空闲块时,如果i号bucket上恰好由于空闲块驱逐被相同blockno块编号的填入,我们找到了空闲块再填到链表头部,那就是两个相同的blockno了。

这就是一种矛盾… 困扰的人想自杀… 我们干脆直接将驱逐操作序列化… 刚好我们使用bcache.lock作为我们的驱逐锁,只有当前持有驱逐锁的进程才有权力对bucket进行驱逐插入操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
struct buf *b, *prev_replace_buf;
struct bucket *bucket, *prev_bucket, *cur_bucket;

int index = hash(dev, blockno);
bucket = bcache.hashtable + index;

// Is the block already cached?
acquire(&bucket->lock);
for (b = bucket->head.next; b; b = b->next) {
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bucket->lock);
acquiresleep(&b->lock);
return b;
}
}
release(&bucket->lock);

// Acquire evict-lock, serialize the evict action
acquire(&bcache.lock);

// Check on current bucket again.
// If another eviction happened between release of bucket-lock and acquire of evict-lock
// There is chance that block has cached in current bucket but the process is unaware.
// Without the check, blocks with same blockno could exist in the bucket.
acquire(&bucket->lock);
for (b = bucket->head.next; b; b = b->next) {
if(b->dev == dev && b->blockno == blockno){
b->refcnt++;
release(&bucket->lock);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}
}
release(&bucket->lock);

// Not cached.
// Recycle the least recently used (LRU) unused buffer.
// Traverse each bucket and each buffer, pick the least recently used buffer
prev_replace_buf = (void*)0; // the prev node of the buf to evict
prev_bucket = (void*)0; // prev locked bucket, null initially
for (cur_bucket = bcache.hashtable; cur_bucket < bcache.hashtable + NBUCKET; cur_bucket++) {
acquire(&cur_bucket->lock);
int found = 0;
for (b = &cur_bucket->head; b->next; b = b->next) {
if (b->next->refcnt == 0 &&
(!prev_replace_buf || b->next->timestamp < prev_replace_buf->next->timestamp)) {
found = 1;
prev_replace_buf = b;
}
}
if (!found) {
release(&cur_bucket->lock);
} else {
// if smaller time stamp found in the bucket, consider releasing prev holding bucket,
// which is no longer needed
if (prev_bucket) {
release(&prev_bucket->lock);
}
prev_bucket = cur_bucket;
}
}

if (!prev_replace_buf) {
panic("bget: no buffers");
}
b = prev_replace_buf->next;

if (prev_bucket != bucket) {
// Delete the buf to evict from its bucket,
prev_replace_buf->next = b->next;
release(&prev_bucket->lock);
// Add the buf to bucket
acquire(&bucket->lock);
b->next = bucket->head.next;
bucket->head.next = b;
}

b->dev = dev;
b->blockno = blockno;
b->refcnt = 1;
b->valid = 0;
release(&bucket->lock);
release(&bcache.lock);
acquiresleep(&b->lock);
return b;
}

这里看到我们在获得了bcache.lock驱逐锁后重新对当前bucket做了一遍扫描,其目的是为了防止在bucket锁释放和驱逐锁获取中间的窗口期发生了一些不好的事情。例如另外一个进程操作了这个bucket,不管他怎么做到的,他有可能成功在列表中添加了blockno编号的缓存块,如果我们不重新检查,可能会发生重复插入,这是不能接受的。(至于为啥不把bucket锁释放和驱逐锁获取顺序颠倒一下,看了下会有死锁…)

之后大致思路简单来说就是,遍历各个桶链表中的各个节点,对比时间戳,找到最久未使用的需要驱逐的缓存块,然后我们把他“偷到”我们当前的bucket里来就好了。

测试

主目录下运行make clean; make qemu,我们在xv6中运行bcachetest

看起来问题不大

总结

接下来我们来到主目录上使用make grade进行评分,有几点注意一下:

  • 你需要在主目录下创建time.txt表示完成任务的时间。

  • 测试过程中usertests耗时略长,电脑机能不行的小伙伴可以适当将测试的超时延长一点点,请把主目录下 grade-lab-lock 文件里面的 timeout 参数改大点,不然可能你的测试项目还没跑完他就跳出给你标个大大的 FAIL…

接下来,我们主目录下make grade一下,看下结果:

看着不错,完事~