感觉上课视频、PDF里头章节的顺序和Lab展开的顺序有一点点不一致… 为啥接下来先碰到的Lab居然是第7章的,迷…

课程相关的资源、视频和教材见Lab util: Unix utilities开头。

在这次的Lab中,我们将进一步学习多线程相关知识,我们将实现用户界别下的进程间切换,同时使用多线程来加速程序,以及实现一个同步屏障。另外不由地说,xv6里面CPU进程间切换的机制讲的实在是太牛了,Morris大神讲的很好,当然最好还是先看下书再听课,但是不管怎么说,感觉打开了新世界,尤其是看到内核线程和CPU调度线程交叉加锁解锁p->lock的时候,真的很妙…

准备

对应课程

本次的作业是Lab Thread,我们需要分别完成三项作业内容,分别是实现用户级别的线程切换,使用多线程加速程序运行和实现同步屏障。

网课这次就很迷了,按照课程的进度,我们需要看掉Lecture 9, 10, 11, 13… 分别对应B站课程的P8, P9, P10, P12。仔细看了一下,Lecture 9和PDF第五章Interrupts and device drivers关系更大点,主要讲的是来自硬件的中断,以及驱动程序等相关知识;Lecture 10和PDF第六章Locking关系更大些,主要讲的是操作系统下的一些基本的加锁解锁等知识;Lecture 11&13和PDF第七章Scheduling关系更大些,主要介绍了CPU在线程间切换的实现细节,以及针对唤醒丢失问题的解决方案。

P.S. 另外Lecture 12是真的Lab Cow的问答课,感兴趣的可以听听… btw,MIT的同学思维真的好开放,能问出这么好的问题,我是傻子…

一圈看下来,感觉只看第七章Scheduling,只看Lecture 11&13貌似也不合适,还是按照顺序来吧…

系统环境

使用Arch Linux虚拟机作为实验环境…

环境依赖配置,编译使用make qemu等,如遇到问题,见第一次的Lab util: Unix utilities

使用准备

参考本次的实验说明书:Lab thread: Multithreading

从仓库clone下来课程文件:

1
git clone git://g.csail.mit.edu/xv6-labs-2021

切换到本次作业的thread分支即可:

1
2
cd xv6-labs-2021
git checkout thread

在作业完成后可以使用make grade对所有结果进行评分。

题目

用户线程切换

要求和提示

在本部分中我们将设计并实现用户级别下的线程上下文切换。开始前可以阅读user/uthread.cuthread_switch.S下的代码,这两个文件实现了线程切换的大致框架和简单的功能测试代码,我们需要补全剩余部分。

注意到我们需要想办法创造线程并在线程切换的过程中尝试保存/恢复寄存器,事后如果功能实现,可以在xv6下运行uthread,正确运行示例如下(三个线程的顺序可能会有出入):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ make qemu
...
$ uthread
thread_a started
thread_b started
thread_c started
thread_c 0
thread_a 0
thread_b 0
thread_c 1
thread_a 1
thread_b 1
...
thread_c 99
thread_a 99
thread_b 99
thread_c: exit after 100
thread_a: exit after 100
thread_b: exit after 100
thread_schedule: no runnable threads
$

终端上的输出来自三个测试线程,每个线程都会进入一个循环,在循环中打印一行随后让出CPU给其他线程。但是由于现在我们还没有加入实现上下文切换的代码,我们是看不到输出的。

我们需要向user/uthread.c下的thread_create()thread_schedule()中,以及user/uthread_switch.S下的thread_switch加入代码,从名字里看得出来,我们需要实现线程的创建和切换功能…

我们有两个目标需要实现,一个是我们需要确保当thread_schedule()第一次运行一个给定的线程时,该线程在自己的堆栈中执行thread_create()中给定的函数参数;另外一个是,确保thread_switch保存被切换走的线程寄存器,并恢复被切换到的线程寄存器,随后返回到切换过来的线程的指令中最后离开的位置。

我们需要决定在哪里存储/恢复寄存器,尝试修改结构体struct thread会是一个很好的主意。此外,我们需要在thread_switch中加入调用thread_schedule(这玩意儿只有汇编代码),可以在调用时传入我们想要的任何参数,只要能从线程t切换到next_thread

相关提示:

  • thread_switch中我们只需要保存和恢复callee-save寄存器。
  • 我们可以参考编译后根据uthread.c生成的user/uthread.asm汇编代码,以方便调试。
  • 为了测试代码,使用riscv64-linux-gnu-gdb单步浏览thread_switch可能会有帮助。相关的GDB调试技巧,已经在Lecture 5介绍过了,也可以再去熟悉下。

思路分析

用户级别线程切换?emmm,大概就是协程类似的东西吧,虽然是在用户空间下实现的,但是目测可以大量参考xv6在内核线程下的实现。这么看下来,切换模型更简单了:

thread_schedule中,我们先找到可以运行的目标用户线程,随后我们尝试将当前线程的相关寄存器信息保存,随后将目标线程的相关信息恢复出来即可。

如果是创建一个新的用户线程,我们只需要将新的线程信息填入all_thread数组中,设置状态,然后将返回地址ra和栈指针sp“伪造”好,这样当调度器切换当这个刚创建好的线程时,程序就能到设置好的对应程序地址从头开始“无缝启动”。

另外在xv6操作系统的设计下,一个进程下不可以有并行的线程,因此不存在竞争等问题。

定义上下文结构体context

首先,我们需要定义一个context结构体保存上下文,这块可以参考一下在kernel/proc.h下的struct cpu结构体。我们在user/uthread.c下创建这样的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
struct thread_context {
uint64 ra;
uint64 sp;

// callee-saved
uint64 s0;
uint64 s1;
uint64 s2;
uint64 s3;
uint64 s4;
uint64 s5;
uint64 s6;
uint64 s7;
uint64 s8;
uint64 s9;
uint64 s10;
uint64 s11;
};
struct thread {
char stack[STACK_SIZE]; /* the thread's stack */
int state; /* FREE, RUNNING, RUNNABLE */
struct thread_context context; /* thread context to save/restore reg */
};

可以看到现在每个用户线程都有自己的栈空间、状态和上下文信息。

实现保存/恢复寄存器thread_switch

这部分的代码是由user/uthread_switch.S来实现的,我们可以参考下kernel/swtch.S内核下的切换代码,其实本质都是相似的。

我们接受传入的两个参数,分别是切出的线程的上下文存储地址和切入线程的上下文存储地址,接下来我们要做的就是读取当前寄存器相关信息,存储到切出线程上下文结构体中,然后从切入线程上下文结构体中读出信息,覆盖在当前寄存器。

至此我们已经获得了可以在新线程中运行的所有条件,我们有正确设置的栈指针sp和返回地址ra,只要我们调用ret,我们就能继续从切入线程上次的停下的位置继续开始(也就是这个线程在某处调用thread_switch曾经切出位置)。

因此,我们在uthread_switch.S下的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
	.text

/*
* save the old thread's registers,
* restore the new thread's registers.
*/

.globl thread_switch
thread_switch:
/* YOUR CODE HERE */
sd ra, 0(a0)
sd sp, 8(a0)
sd s0, 16(a0)
sd s1, 24(a0)
sd s2, 32(a0)
sd s3, 40(a0)
sd s4, 48(a0)
sd s5, 56(a0)
sd s6, 64(a0)
sd s7, 72(a0)
sd s8, 80(a0)
sd s9, 88(a0)
sd s10, 96(a0)
sd s11, 104(a0)

ld ra, 0(a1)
ld sp, 8(a1)
ld s0, 16(a1)
ld s1, 24(a1)
ld s2, 32(a1)
ld s3, 40(a1)
ld s4, 48(a1)
ld s5, 56(a1)
ld s6, 64(a1)
ld s7, 72(a1)
ld s8, 80(a1)
ld s9, 88(a1)
ld s10, 96(a1)
ld s11, 104(a1)
ret /* return to ra */

于是我们就可以在thread_schedule()中加入这一行调用uthread_switch,来实现tnext_thread的上下文切换:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
void 
thread_schedule(void)
{
struct thread *t, *next_thread;

/* Find another runnable thread. */
next_thread = 0;
t = current_thread + 1;
for(int i = 0; i < MAX_THREAD; i++){
if(t >= all_thread + MAX_THREAD)
t = all_thread;
if(t->state == RUNNABLE) {
next_thread = t;
break;
}
t = t + 1;
}

if (next_thread == 0) {
printf("thread_schedule: no runnable threads\n");
exit(-1);
}

if (current_thread != next_thread) { /* switch threads? */
next_thread->state = RUNNING;
t = current_thread;
current_thread = next_thread;
/* YOUR CODE HERE
* Invoke thread_switch to switch from t to next_thread:
* thread_switch(??, ??);
*/
thread_switch((uint64)&t->context, (uint64)&next_thread->context);
} else
next_thread = 0;
}

实现创建线程thread_create

就像刚才想的一样,我们需要“伪造”一些数据,使得当切换到这个刚被创建的线程时,程序可以从我们想要的程序开始位置运行,栈空间也要从底部开始增长。

我们主要需要“伪造”的上下文信息有,返回地址ra,栈指针sp。这部分代码可以参考kernel/proc.c下的allocproc()最后,看看他们是怎么让程序跳转到forkret的。

所以,我们要做的就是将ra设置为func函数的地址,将sp设置为当前栈的底部,也就是地址最大处,即(uint64)&t->stack + STACK_SIZE。因此,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void 
thread_create(void (*func)())
{
struct thread *t;

for (t = all_thread; t < all_thread + MAX_THREAD; t++) {
if (t->state == FREE) break;
}
t->state = RUNNABLE;
// YOUR CODE HERE
t->context.ra = (uint64)func;
t->context.sp = (uint64)&t->stack + STACK_SIZE;
}

测试

在主目录下使用make clean; make qemu测试,然后运行uthread

目测运行良好,没啥问题

多线程编程

要求和提示

注意:本部分实验需要在有多核心处理器的Linux/macOS电脑中完成,而不是在xv6/QEMU下完成。

在本部分实验中,我们将使用多线程编程和锁来实现一个HashTable哈希表。在本次作业中我们将使用UNIX下的pthread库函数,关于pthreads的更多用法,可以查询man手册,终端上输入man pthreads即可,也可查询相关网页。

文件notxv6/ph.c中实现了一个仅在单线程下可行的哈希表,但是在多线程下运行就会产生错误。我们在主目录下运行make ph,随后向可执行文件中传入线程数量作为参数,例如单线程执行./ph 1即可。示例如下:

ph程序中主要尝试运行两项测试,首先它通过put()尝试向哈希表中加入大量的key,并打印出平均每秒钟执行put的数量,随后它尝试使用get()从哈希表中将key取出,并打印出哈希表中的key的数量。但是很显然目前由于没有对多线程去做额外的实现,部分的put操作丢失了,如下是程序未修改的情况下两个线程执行ph的结果,我们看到线程put()操作丢失了一些key:

虽然使用两个线程使得哈希表插入的平均速度翻倍,但是带来了严重的数据丢失问题,这就是个很严重的问题需要我们解决。请仔细阅读notxv6/ph.c,尤其是文件下的put()insert()函数。

两个线程同时执行put()导致key丢失的原因是什么?请将作业答案保存在主目录下answers-thread.txt下。

为了确保相关时间的有序执行,我们需要在put()get()中引入加锁和解锁机制。相关的pthread函数调用有:

1
2
3
4
pthread_mutex_t lock;            // declare a lock
pthread_mutex_init(&lock, NULL); // initialize the lock
pthread_mutex_lock(&lock); // acquire lock
pthread_mutex_unlock(&lock); // release lock

记得不要忘记调用pthread_mutex_init()。在这部分操作结束后,你应该可以顺利通过ph_safe测试,即不会有key丢失的状况存在。

接下来,我们尝试对哈希表的速度进行优化,在操作中有部分put()对内存的操作并无重叠部分,也许不需要一把很大的锁来保护哈希表(疯狂暗示减小锁的粒度),我们需要对ph.c做出额外的变动进而加速put()操作(提示:可以对每个哈希桶bucket各设置一把锁)。在这部分操作结束后,我们将通过ph_fast测试,该测试要求两个线程产生的put()速率是一个线程的1.25倍。

代码分析

目测这个哈希表就是很常见的数组链表结构,数组长度在程序中暂定为5

1
2
3
4
5
6
7
8
#define NBUCKET 5

struct entry {
int key;
int value;
struct entry *next;
};
struct entry *table[NBUCKET];

图大概就是这样:

我们看到get()的实现,就是先通过取模hash得到对应的哈希桶bucket是数组的哪个位置,然后顺着链表一路搜下去:

1
2
3
4
5
6
7
8
9
10
11
12
13
static struct entry*
get(int key)
{
int i = key % NBUCKET;


struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key) break;
}

return e;
}

相比之下put()的操作稍稍复杂一点,给定了keyvalue,我们要首先查看当前的哈希表中key存不存在,因此我们需要走一个和刚才get()类似的操作。之后,如果找到了直接修改节点中的value信息,如果没找到再考虑插入新节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static 
void put(int key, int value)
{
int i = key % NBUCKET;

// is the key already present?
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key)
break;
}
if(e){
// update the existing key.
e->value = value;
} else {
// the new is new.
insert(key, value, &table[i], table[i]);
}

}

那么insert()是怎么实现的呢?哦,原来是头插法,将当前节点的尾巴指向bucket链表的头部,然后把数组元素修改成现在的entry

1
2
3
4
5
6
7
8
9
static void 
insert(int key, int value, struct entry **p, struct entry *n)
{
struct entry *e = malloc(sizeof(struct entry));
e->key = key;
e->value = value;
e->next = n;
*p = e;
}

那很显然的想到,头插法的时候如果不加锁可能会丢失节点,例如如下情况就是一种竞争情形,这种情形下CPU2设置的entry就永远无法被访问到:

这只是一种情形,假定两个线程put()操作key相同,如果一个线程刚将新建的entrynext指向当前bucket,还没有更新数组,另一个put()线程就开始扫描链表,那就有可能漏掉这个还没插进来的节点,那多插一个重复的也不是没可能… 所以即使我们在insert上加锁也是不够的… 对链表进行修改操作的时候就要锁上,读相关操作也不能进行。

另外多提一嘴,面向这次作业,存在的竞争只有put()操作之间的,并没有put()get()之间的,两批操作是分开执行的。所以也并没有必要为put()get()设置读写锁,当然啦你开心就好,想做下应该也没啥事。

为了性能考虑尽可能的细化粒度,设计锁的时候每个bucket安排一把,所以就是总共5把锁…

设置加锁/解锁

代码到此为止就很清楚了,我们为每个bucket设计一把锁:

1
pthread_mutex_t locks[NBUCKET];

然后在main()中加入初始化各个bucket锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
int
main(int argc, char *argv[])
{
pthread_t *tha;
void *value;
double t1, t0;


if (argc < 2) {
fprintf(stderr, "Usage: %s nthreads\n", argv[0]);
exit(-1);
}
nthread = atoi(argv[1]);
tha = malloc(sizeof(pthread_t) * nthread);
srandom(0);
assert(NKEYS % nthread == 0);
// init lock for each bucket
for (int i = 0; i < NBUCKET; i++) {
pthread_mutex_init(&locks[i], NULL);
}

for (int i = 0; i < NKEYS; i++) {
keys[i] = random();
}

//
// first the puts
//
t0 = now();
for(int i = 0; i < nthread; i++) {
assert(pthread_create(&tha[i], NULL, put_thread, (void *) (long) i) == 0);
}
for(int i = 0; i < nthread; i++) {
assert(pthread_join(tha[i], &value) == 0);
}
t1 = now();

printf("%d puts, %.3f seconds, %.0f puts/second\n",
NKEYS, t1 - t0, NKEYS / (t1 - t0));

//
// now the gets
//
t0 = now();
for(int i = 0; i < nthread; i++) {
assert(pthread_create(&tha[i], NULL, get_thread, (void *) (long) i) == 0);
}
for(int i = 0; i < nthread; i++) {
assert(pthread_join(tha[i], &value) == 0);
}
t1 = now();

printf("%d gets, %.3f seconds, %.0f gets/second\n",
NKEYS*nthread, t1 - t0, (NKEYS*nthread) / (t1 - t0));
}

接下来我们直接在put()中,对要操作的bucket直接加上一把大锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static 
void put(int key, int value)
{
int i = key % NBUCKET;

// is the key already present?
pthread_mutex_lock(&locks[i]);
struct entry *e = 0;
for (e = table[i]; e != 0; e = e->next) {
if (e->key == key)
break;
}
if(e){
// update the existing key.
e->value = value;
} else {
// the new is new.
insert(key, value, &table[i], table[i]);
}
pthread_mutex_unlock(&locks[i]);

}

对的,这样应该就完事了…

测试

在主目录下执行make clean; make ph,接下来分别测试了1, 2, 4个线程下的put()表现,符合预期:

实现同步屏障

要求和提示

注意:本部分实验需要在有多核心处理器的Linux/macOS电脑中完成,而不是在xv6/QEMU下完成。

在这个部分中,我们需要实现一个同步屏障。也就是程序中的一个位置,所有参与的线程都需要等待直到所有其他线程都到达这个位置。我们将使用pthread提供的条件变量,他的机制类似于xv6中的sleepwakeup,我们将使用到下面这两个调用:

1
2
pthread_cond_wait(&cond, &mutex);  // go to sleep on cond, releasing lock mutex, acquiring upon wake up
pthread_cond_broadcast(&cond); // wake up every thread sleeping on cond

其中,pthread_cond_wait在被调用时释放了mutex,并在返回前重新获取了mutex

notxv6/barrier.c下存在一个没有实现好的同步屏障,如果在没有修改的情况下运行,运行就会报错:

其中我们给barrier传入的第二个参数表示的是需要在同步屏障上进行同步的线程数量。每个线程都在执行一个循环,在每个循环中线程会调用barrier()然后随机睡眠若干微秒。刚才未修改的代码导致断言被触发,是因为一个线程在另一个线程到达同步屏障前就离开了。我们希望每个线程都在barrier()位置阻塞,直到nthreads个线程都调用过了barrier()

notxv6/barrier.c中,barrier_init()已经被给出,结构体struct barrier已经定义,仅供参考。我们只需要实现barrier()相关功能,以此来确保不会报错。

注意两个较难实现的点:

  • 我们需要处理一连串的barrier()调用,我们把这个叫做一轮。bstate.round中记录了轮数,因此每当所有线程到达内存屏障的时候我们就需要将计数加一。
  • 小心处理bstate.nthread,尤其这种情形:一个线程离开了同步屏障,其他线程还没反应过来,就进入了下一轮循环,然后又遇到了同步屏障,直接对计数加一。上一轮和下一轮的计数要分清楚。

实现屏障

仔细一想倒也没啥… 维持一个计数,每次线程遇到屏障就加一,如果bstate.nthread计数还不够,说明线程数量还没凑齐,就睡过去,如果这次加一之后数量够了,那就直接把之前睡下去的线程全部叫起来(叫起来之前记得把计数清零,bstate.round轮数加一)

然后每次线程调用,对bstate.nthreads加一和判断的时候,由于操作不是原子的,需要加锁保证计数的正确。

所以程序可以写成这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static void 
barrier()
{
// YOUR CODE HERE
//
// Block until all threads have called barrier() and
// then increment bstate.round.
//

// protect nthread counter
pthread_mutex_lock(&bstate.barrier_mutex);
if (++bstate.nthread < nthread) {
// sleep if not all threads have reached the barrier
pthread_cond_wait(&bstate.barrier_cond, &bstate.barrier_mutex);
} else {
// clear bstate.nthread
bstate.nthread = 0;
// imcrement on round count
bstate.round++;
// wake up all other threads
pthread_cond_broadcast(&bstate.barrier_cond);
}
pthread_mutex_unlock(&bstate.barrier_mutex);
}

测试

我们看看现在运行结果是否正确,主目录下执行make clean; make barrier

分别测试1, 2, 4个线程的情形,看起来和预期相符

总结

接下来我们来到主目录上使用make grade进行评分,有几点注意一下:

  • 你需要在主目录下创建time.txt表示完成任务的时间。
  • 你需要把刚才问题答案放在主目录下的answers-thread.txt下。

最后我们看一下测试的结果:

这次的Lab难度不是很大,还是很轻松的度过了…