本文是MIT课程6.824分布式系统学习笔记的一部分:

相关的代码都放在了GitHub下了:MIT-6.824-2022

上次的MapReduce感觉虽然一开始很混乱,但是到最后理清了思路确实感觉还好… 只要别自己把自己劝退了就行

之前过了一轮GFS、VM-FT的论文,连带上课视频,感觉好带感的,果然MIT个个都是聪明人,问的问题也很刁钻… 这次的Raft论文内容更多了,强烈建议上课前读完论文材料(不过我寻思不读材料,课应该也听不懂多少),论上课前预习的重要性,不先看论文就等着懵吧…

先贴一份跑完Lab 2全部测试的结果,高达10MB,自证清白:

Test Results - go_test_6_824_raft.html

准备

对应课程

这次的作业是Lab 2: Raft,Lab分为2A, 2B, 2C和2D四个部分。

其中A和B属于Lecture 5也就是Raft(1)的范围,并且你需要阅读Raft论文Raft (extended) (2014)前5章节,B站课程参考P5;C和D属于Lecture 7,也就是Raft(2)的范围,你需要阅读Raft论文从第7章至结束,B站课程参考P7。

另外在Raft的介绍内容和上次的MapReduce之间还夹了GFS和VM-FT的Lecture,也就是Lecture3&4,对应P3和P4,强烈建议学习下。

系统环境

之前开的Arch Linux他满了… 大概也许是6GB的根目录真的很脆吧,加上之前CS:APP和MIT 6.S081的Lab没往里少装东西(其实是清了半天没什么效果)… 所以重装了一个新的Arch Linux虚拟机,这次分大点了,假装无事发生…

Go环境也配置好了,版本为1.19.3

使用准备

从仓库中clone下来我们的Lab即可,还是上次那个,貌似也没有别的branch:

1
2
$ git clone git://g.csail.mit.edu/6.824-golabs-2022 6.824
$ cd 6.824

本次Lab的实验说明书详见:Lab 2: Raft

最终我们在src/raft下完成评分工作,使用如下命令可以完成对不同部分的测试:

1
2
3
4
5
$ go test -race
$ go test -race -run 2A
$ go test -race -run 2B
$ go test -race -run 2C
$ go test -race -run 2D

调试技巧

分布式系统调试起来真的是让人头皮发麻,不同peer节点的行为顺序互相交错,稍稍差一点都可能有很大的变化… 日志即使能打印出来,各个节点之间互相交错,大量日志也迟早会让你感觉生不如死,为此TA直接附上了他的小技巧,详情见Debugging by Pretty Printing

简单来说就是规范化日志输出,然后使用python脚本根据日志的类型和不同的节点编号,进行着色和分割。最终效果那只能说是相当令人心动,堪比艺术品,优雅就完事了。当然如果你想使用这样的调试方法,那得全盘接受TA的日志打印方法,我是个懒狗,我就直接把TA的下面这些代码放在util.go下了,反正原来自带的那个Dprintf也看起来不太聪明的样子,我就直接盖掉了。

在文中TA通过系统环境变量VERBOSE来控制运行过程中是否产生日志,如果需要开启日志只需要将VERBOSE设置为例如1

1
2
3
4
5
6
7
8
9
10
11
12
13
// Retrieve the verbosity level from an environment variable
func getVerbosity() int {
v := os.Getenv("VERBOSE")
level := 0
if v != "" {
var err error
level, err = strconv.Atoi(v)
if err != nil {
log.Fatalf("Invalid verbosity %v", v)
}
}
return level
}

TA在文中为Raft选举列举了如下相关日志事件类别:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type logTopic string
const (
dClient logTopic = "CLNT"
dCommit logTopic = "CMIT"
dDrop logTopic = "DROP"
dError logTopic = "ERRO"
dInfo logTopic = "INFO"
dLeader logTopic = "LEAD"
dLog logTopic = "LOG1"
dLog2 logTopic = "LOG2"
dPersist logTopic = "PERS"
dSnap logTopic = "SNAP"
dTerm logTopic = "TERM"
dTest logTopic = "TEST"
dTimer logTopic = "TIMR"
dTrace logTopic = "TRCE"
dVote logTopic = "VOTE"
dWarn logTopic = "WARN"
)

并借用fmt.Sprintf()实现了日志的格式化打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var debugStart time.Time
var debugVerbosity int

func init() {
debugVerbosity = getVerbosity()
debugStart = time.Now()

log.SetFlags(log.Flags() &^ (log.Ldate | log.Ltime))
}

func Debug(topic logTopic, format string, a ...interface{}) {
if debugVerbosity >= 1 {
time := time.Since(debugStart).Microseconds()
time /= 100
prefix := fmt.Sprintf("%06d %v ", time, string(topic))
format = prefix + format
log.Printf(format, a...)
}
}

并给出了打印示例:

1
Debug(dTimer, "S%d Leader, checking heartbeats", rf.me)

我们通过指定临时环境变量即可达到相应效果,例如运行命令VERBOSE=1 go test -run TestBackup2B,就会产生如下输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
...
008258 LOG2 S2 Saved Log (-1, 0) [{<nil> 0}]
008256 VOTE S3 Granting Vote to S1 at T1
008258 VOTE S1 <- S0 Got vote
008258 VOTE S4 Granting Vote to S1 at T1
008259 PERS S4 Saved State T:1 VF:1
008258 PERS S3 Saved State T:1 VF:1
008259 LOG2 S3 Saved Log (-1, 0) [{<nil> 0}]
008259 VOTE S1 <- S2 Got vote
008260 LEAD S1 Achieved Majority for T1 (3), converting to Leader
008260 TIMR S1 Broadcast, reseting HBT
008260 LOG1 S1 -> S0 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008260 LOG1 S1 -> S2 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008259 LOG2 S4 Saved Log (-1, 0) [{<nil> 0}]
008261 LOG1 S1 -> S3 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008262 LOG1 S1 -> S4 Sending PLI: 0 PLT: 0 N: 0 LC: 0 - []
008262 TIMR S1 Leader, checking heartbeats
008263 TIMR S0 Resetting ELT, received AppEnt T1
008263 TIMR S3 Resetting ELT, received AppEnt T1
008264 TIMR S2 Resetting ELT, received AppEnt T1
008264 LOG2 S2 LOG: (-1, 0) [{<nil> 0}]
...

每行日志分别包含了,事件发生的事件,事件的类别,对应哪个peer节点服务器,剩下的则是自由的格式。

最终TA根据上述设置的不同种类事件,通过python脚本处理,完成了染色和分列处理,脚本的地址在Script for pretty printing logs of MIT 6.824 labs

至于python的相关环境和模块,请额外安装一下,以Arch Linux为例:

1
2
$ sudo pacman -S python python-pip
$ pip install typer rich

相关的处理命令参数示例如下,更多细节可以去参考python脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# You can just pipe the go test output into the script
$ VERBOSE=1 go test -run InitialElection | python dslogs
# ... colored output will be printed

# We can ignore verbose topics like timers or log changes
$ VERBOSE=1 go test -run Backup | python dslogs -c 5 -i TIMR,DROP,LOG2
# ... colored output in 5 columns

# Dumping output to a file can be handy to iteratively
# filter topics and when failures are hard to reproduce
$ VERBOSE=1 go test -run Figure8Unreliable > output.log
# Print from a file, selecting just two topics
$ python dslogs output.log -j CMIT,PERS
# ... colored output

最终效果还是蛮震撼的,如果你看久了对颜色熟悉了,那调试起来还是相当高效的:

未处理原日志

处理后日志

批量测试

分布式系统调试起来,容易出玄学bug,因为每次运行起来程序的时候,各个peer的超时时间是随机设置的,运行时的行为互相交错,投票结果也完全不确定,每重新运行一次打印出来的日志都大不相同,这就很难受,go test跑一遍PASS完成证明不了这代码有没有bug,可能跑上个几千次才能让自己心安一些…

我们还是回到上面TA贴出的那篇论文,他给出了一个python测试脚本,链接在Utility for running MIT 6.824 lab test in parallel and saving failed logs

通过这个脚本,我们可以给指定的测试项目指定测试次数,并将每次偶发的失败日志存储下来方便排查问题。例如我们可以在raft目录下运行命令:

1
$ python dstest.py 2A -n 10 -v 1

这里 2A 可以换成其他的测试用例,-n 10 表示测试多少次,默认是10-v 1 表示设置环境变量 VERBOSE,这样就能告诉 Go 打印日志。

如下是TA给出的示例截图,可以看出对每次发生错误的日志都进行了保存,最终还给出了运行的次数,Fail的次数以及平均运行时间:

题目

简介

概念介绍

我们将在这一系列Lab中开始逐步搭建具有容错能力的key/value键值对存储系统。在这个Lab中,我们主要考虑实现Raft,即一种复制状态机协议。在下一个Lab中,我们将以Raft协议为基础创建key/value键值对服务。然后为了更好的性能,我们将我们的服务分发到多台复制状态机服务器中。

我们的服务通过将自身状态(即数据)的完整拷贝存储在多个服务器上来实现容错。这样的复制允许在一些服务发生故障(崩溃或者网络问题)时依然能提供服务。我们需要处理的问题在于,故障可能会导致副本中包含不同的数据拷贝。

Raft将客户端请求组织称一个序列,我们将其称为log日志,并且确保所有的副本服务器都能看见相同的日志。每个副本服务器都会按照日志的顺序执行客户端的请求,并将他们应用于本地的服务状态拷贝中。由于所有在线的副本服务器看见的是相同的日志内容,他们都会按照相同的顺序执行相同的请求,因此会继续具有相同的状态。如果一个服务器发生了短暂的故障,Raft会负责将他的日志更新到最新状态。只要大多数服务器都处于在线的状态并且可以相互通信,Raft就能继续运行。如果大多数的服务器都无法工作,那么Raft就不会继续运行,但是只要多数服务器能够重新通信,Raft就会从停下的地方继续开始。

在这次的Lab中,我们需要使用Go来实现Raft,使其成为一个对象模块,拥有响应的方法API,并可以作为模块在大型服务中使用。一组Raft节点实例通过RPC互相通信来维护日志。Raft接口需要接受一系列不确定的命令,也称为log entry日志条目。这些日志条目会按照索引号进行编号,并最终会被提交。在提交时,Raft才会将日志条目发送给对应的服务进行执行。

请遵守需要阅读的论文中的相关设计,尤其需要注意图2中的细节。你讲实现论文中的大部分功能,包括保存持久化状态、在节点发生故障后重启并加载数据。我们不需要实现集群成员更变功能(见第6章)。

可以简单的参考一下这份指南guide,以及并发下锁locking和数据结构structure相关的建议。还可以看一下其他实现来拓展一下思路,例如Paxos, Chubby, Paxos Made Live, Spanner, Zookeeper, Harp, Viewstamped Replication, 和Bolosky et al.(注意:学生指南是几年前写的,尤其是2D部分后来有了变化。请确保在盲目遵循某个特定的实施策略之前,先了解它为什么要这样做)

极有可能,Lab中最有挑战的部分可能不是实现解决方案,而是进行调试。为了方便解决,我们需要花时间考虑如何让我们的实现代码更加方便调试。这方面可以参考指南Guidance和这篇博客中关于高效的进行日志打印相关内容Debugging by Pretty Printing

此外这里还提供了Raft的交互逻辑表diagram of Raft interactions来帮助我们确定Raft代码是怎样和顶层的应用交互的。

开始运行

src/raft/raft.go中,Lab提供了一套代码骨架。此外在src/raft/test_test.go中,Lab还提供了一套测试代码,方便我们对自己的代码进行检验和评分。

请确保你的代码不存在并发竞争问题,因为竞争的情况可能会导致相关测试失败。所以强烈建议使用-race标志来进行测试。

如果要启动并运行,请在src/raft下执行命令。如果暂时没有对代码做出任何改动,失败的样例输出如下:

1
2
3
4
5
6
7
8
9
10
$ cd src/raft
$ go test -race
Test (2A): initial election ...
--- FAIL: TestInitialElection2A (5.04s)
config.go:326: expected one leader, got none
Test (2A): election after network failure ...
--- FAIL: TestReElection2A (5.03s)
config.go:326: expected one leader, got none
...
$

代码介绍

我们需要在raft/raft.go中补全相关代码来实现Raft。在文件中,我们会看到一些代码骨架,以及如何发送和接受RPC的示例代码。

我们的实现中必须支持以下接口,测试程序和后续运行的key/value键值对服务器都会使用到这些接口。更多的细节相关问题参考raft.go中的注释部分。

1
2
3
4
5
6
7
8
9
10
11
12
// create a new Raft server instance:
rf := Make(peers, me, persister, applyCh)

// start agreement on a new log entry:
rf.Start(command interface{}) (index, term, isleader)

// ask a Raft for its current term, and whether it thinks it is leader
rf.GetState() (term, isLeader)

// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester).
type ApplyMsg

服务会调用Make(peers, me, ...)来创建一个Raft节点。其中peers参数是所有Raft节点构成的网络标志符数组(包含当前节点),用于RPC通信。me参数是当前节点在网络标志符数组中的下标。Start(command)要求Raft启动处理,将命令加入到日志各副本中。Start()应当立即返回,无需等待日志追加完成。在服务中,希望我们为每一个新提交的日志发送ApplyMsgMake()applyCh这个channel参数中。

raft.go中包含了发送RPC(sendRequestVote())和处理传入RPC(RequestVote())的样例代码。我们的Raft节点需要使用labrpc的Go包(源代码在src/labrpc中)交换RPC。测试时会设置labrpc来模拟延迟RPC请求、重新排列以及丢弃它们的情况,来充分模拟各种网络故障。请务必确保labrpc相关代码在测试时没有被修改,尽管你可以在调试时暂时的修改其代码。我们实现的Raft实例必须仅与RPC进行交互;例如,他们不允许使用共享的Go变量或者文件进行通信。

后续的Lab是基于当前Lab构建的,请务必确保在本次Lab中实现的代码足够可靠。

实现leader选举(2A)

要求和提示

本部分中,我们需要实现Raft的leader选举和心跳检测(通过发送AppendEntriesRPC请求但是不携带日志条目)。在2A部分中,我们的目标是选举一个leader,并且在没有发生故障的情况下使其继续保持leader,如果发生故障或者老leader发送/接受的数据包丢失则让新的leader接替。最终运行go test -run 2A来测试代码。

相关提示:

  • 这次的Lab中并没有简单的方法直接让我们的Raft实现运行起来;我们需要通过测试代码来运行,即go test -run 2A
  • 严格按照论文中的图2完成Lab。在这个部分中我们只需要关心发送和接收Request Vote相关的RPC、与选举相关的服务器规则以及与leader选举相关的状态。
  • 将论文图2中用于leader选举的状态加入到在raft.go中的Raft结构体中。我们可能还定义一个结构体来保存有关每个日志条目的信息。
  • 填充RequestVoteArgsRequestVoteReply结构体,尝试修改Make()来创建一个后台的goroutine,使用这个协程在一段时间没有从另一个节点收到消息时,发送RequestVoteRPC请求来定期启动leader选举。通过这种方式,节点就会了解到如果有leader那么当前的leader是谁,要么就会自己成为leader。之后实现RequestVote()RPC处理函数,以便服务器投票给别人。
  • 为了实现心跳机制,我们需要定义一个名为AppendEntries的RPC结构体(尽管你可能暂时还不需要所有的参数),并让leader定期发送他们。我们需要写一个AppendEntries的RPC处理函数,通过函数来重置选举超时时间,以便其他服务器当选时,其他服务器不会继续竞选leader。
  • 请确保不同的节点不会总在同一时刻发生选举超时,否则有可能所有节点都仅投票给自己,导致没有节点被竞选为leader。
  • 测试要求leader发送心跳RPC的速度不得超过10次每秒。
  • 测试要求我们的Raft能够在旧leader发生故障(前提是大多数节点仍然能相互通信)5秒内选举出一个新leader。但是需要注意的是,如果发生分裂投票split vote(当发送的所有数据包都丢失了或者候选人不巧地选择了相同的随机的回票时间是有可能发生的),leader选举可能需要多轮投票。因此我们必须要选择一个足够短的超时时间(心跳间隔也是如此),确保即使选举需要多轮,也能够在5秒内完成。
  • 在论文5.2节中提到了选举的超时时间应当在150到300毫秒之间,前提是leader发送心跳的频率远高于每150毫秒一次。由于测试程序限制我们一秒最多10次心跳,因此我们需要使用比论文中150到300毫秒更大的选举超时时间,但是也不要设置的太大了,应该很有可能会导致无法在5秒内选举出一个新的leader。
  • 学会使用Go中的rand,会在Lab中很有用。
  • 我们需要完成代码,实现周期性地或者延时执行某些操作。最简单的办法是创建一个goroutine,在协程的循环中调用time.Sleep()(可以参考在Make()中创建的goroutineticker())。不要使用time.Timertime.Ticker,这两个并不好用,容易出错。
  • 指南页面Guidance page中有一些如何开发和调试的小建议。
  • 如果你的代码不能正常通过测试,请再次阅读论文的图2;leader选举的完整逻辑在图中多个部分被提及。
  • 不要忘记实现GetState()
  • 测试程序会在永久关闭一个实例时调用Raft的rf.Kill()。我们可以使用rf.killed()来查看Kill() 是否被调用过了。我们可能需要在所有的循环中都这样做,以避免死亡的Raft实例打印出混乱的信息。
  • Go RPC 仅发送以大写字母为首的结构体字段。子结构体中也必须具有大写字段名称(例如数组中的日志记录字段)。labgob包会警告这一点,不要忽略警告。

成功通过2A测试的输出样例如下:

1
2
3
4
5
6
7
8
9
10
$ go test -run 2A
Test (2A): initial election ...
... Passed -- 3.5 3 58 16840 0
Test (2A): election after network failure ...
... Passed -- 5.4 3 118 25269 0
Test (2A): multiple elections ...
... Passed -- 7.3 7 624 138014 0
PASS
ok 6.824/raft 16.265s
$

每一个“通过”的测试用例会输出五个数字,分别是测试所用的时间、Raft 节点的数量、测试期间发送 RPC 的次数、RPC 消息中的字节总数和Raft 确定并提交的日志条目数。

测试时运行出的数字可能会有所差异,这些数字可以帮助我们检查发送的 RPC 数量。对于实验 2、3 和 4,如果评分脚本(go test)运行总共超过 600 秒,或者任何单个测试超过 120 秒,则会无法通过测试。

可能的坑

很头痛… 虽然说这是2A部分,但是写着写着真的很容易一不小心思路就扩出去了…

这里先贴一张万恶之源,论文图2… 接下来我们大量的实现都要严格遵照这张图:

简单的来说,在这部分中,我们需要实现的有选举和心跳两部分,也就是需要我们实现RequestVoteAppendEntries RPC的主体部分,并且还需要我们设置循环的定时器确定何时触发这些RPC调用。

需要注意到的是心跳Heartbeat我们不应当将其进行特殊处理,认为是特殊的AppendEntris RPC,从而直接返回成功。毕竟AppendEntries RPC的功能有两部分,一方面是重置Follower节点的超时定时器,一方面是检测Follower节点上面的日志条目是否同步了(也就是查看RPC中携带的新条目位置之前的日志是否一致),如果不同步就要返回失败,否则将RPC中携带的日志更新到当前的日志条目中并返回成功。失败情况下,Leader需要回退,对nextIndex进行减 1,然后重试 AppendEntries RPC。因此心跳也承担着检测日志是否同步的功能,如果我们一味的返回成功,会误导Leader节点以为日志是同步成功的,可能错误的进行提交操作。这一点在TA的guide中有所提及:Students’ Guide to Raft

在论文中TA还贴心的指出了另一种错误,当添加日志条目时不要直接截断日志。论文图2中的原话是:

If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it.

只有在现有的日志条目和添加的日志条目有冲突(即相同下标,不相同任期),才将当前日志条目以及之后的都删除。如果传来的是来自Leader的过时的AppendEntries RPC,错误的直接截断可能会导致后续的若干条目被删除,这些条目如果已经告知过Leader节点同步完成了那可就糟糕了。

正如上图所示的,我们假设Leader传来的AppendEntries RPC内容是过时的。根据比对RPC中的PrevLogIndexPrevLogTerm,我们确定Follower在插入位置之前的日志(也就是下标为1, 2和3的日志)都是同步的,至此我们确定从此位置开始插入是合法的,那么就可以准备预计返回成功,否则立即返回失败,要求Leader将nextIndex回退重试。

接下来的操作比较关键,先讲错误示范:直接把从下标4(包含)的位置开始向后的位置全部截断扔掉,然后接头霸王,把RPC中的日志条目直接接上。这样一通操作猛如虎,直接把最后的下标7位置的日志给丢掉了,这个操作就很危险,因为有可能这个日志的应用已经告知了Leader,现在再将其撤回显然是不合常理的。正确的做法是向后依次比对各个日志是否一致,如果遇到不一致,才将之后的部分全部截断,将RPC日志中当前位置之后部分接上。我们要理解的是AppendEntries RPC的目的不是要求Follower的日志内容完全与发送者Leader的日志条目一致(因为Leader发出的RPC可能有所过时),我们只是检查是否发生了可能的冲突并纠正而已。

在Raft的实现过程中TA还指出,我们需要严格逐字阅读图2中的描述并实现,例如关于选举超时定时器重置的相关问题有如下描述:

If election timeout elapses without receiving AppendEntries RPC from current leader or granting vote to candidate: convert to candidate.

并不是说收到任何一个Leader的AppendEntries RPC请求Follower就需要重置超时时间,也并不是说只要收到了请求投票的Candidate发来的RequestVote RPC就要重置时间。很明显,如果请求的Leader的RPC中标记的任期Term明显过时,那就完全没有必要;如果当前Follower节点最终决定不给Candidate投票,那么重置时间也完全不必要。一味的重置定时器可能在功能上看来没有很大的影响,但是确实会严重的影响Raft的性能,超时时间不断重置,下一次触发超时可能需要更久的时间。

另外我们还需要处理收到过时的RPC回复的情况,也就是说如果我们发现我们刚才发出去的RPC请求里面携带的任期信息和我们目前的任期信息不一致(通常是由于RPC返回的太慢导致返回顺序错序):

Term confusion refers to servers getting confused by RPCs that come from old terms. In general, this is not a problem when receiving an RPC, since the rules in Figure 2 say exactly what you should do when you see an old term. However, Figure 2 generally doesn’t discuss what you should do when you get old RPC replies. From experience, we have found that by far the simplest thing to do is to first record the term in the reply (it may be higher than your current term), and then to compare the current term with the term you sent in your original RPC. If the two are different, drop the reply and return. Only if the two terms are the same should you continue processing the reply. There may be further optimizations you can do here with some clever protocol reasoning, but this approach seems to work well. And not doing it leads down a long, winding path of blood, sweat, tears and despair.

TA给出的解决办法是遇到这种情况直接丢弃。

Raft结构体实现

那么我们接下来进入到喜闻乐见的写代码阶段,首先我们根据论文图2给出的提示,我们为表示每个peer节点状态的Raft对象补充相应的属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type RaftState int
type LogEntries []LogEntry

const (
FollowerState RaftState = iota
CandidateState
LeaderState
)

// A Go object implementing raft log entry
type LogEntry struct {
Command interface{} // Command to be excuted
Term int // Term number when created
}

// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

state RaftState
heartbeatTime time.Time
electionTime time.Time

// Persistent state on all servers

currentTerm int // latest term server has seen
votedFor int // candidateId that received vote in current term
log LogEntries // log entries; each entry contains command for state machine, and term when entry was received by leader

// Volatile state on all servers

commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine

// Volatile state on leaders (Reinitialized after election)

nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server
}

注释里面把图2里面的信息都带上了,这里还是简单的翻译一下。

需要在所有服务器上持久化的信息(需要在回复RPC之前更新存储):

  • currentTerm,表示服务器认为的最新任期(初始化为0,单调增加)

  • votedFor,表示在当前任期下该服务器选择投票投给的服务器Candidate的ID(如果没有投,我们记为null,在这里由于是int表示的记为-1

    P.S. 值得一提的是。服务器只有一票的机会,因此服务器必须清楚自己有没有投票,投给了谁。如果突发宕机,服务器恢复时必须清楚的了解自己有没有投过票了,为了避免多次投票产生的错误,我们需要将该变量进行持久化。

  • log[],表示每个节点中存储的日志条目;每个日志条目包含了状态机的一个命令,以及当日志条目被Leader接收时的任期号(我们认定第一个日志条目的下标是1

在所有服务器上保存的易失性状态:

  • commitIndex,表示日志条目中已提交条目的最高下标(初始化为0,单调递增)
  • lastApplied,表示应用于状态机的最高日志条目下标(初始化为0,单调递增)

对于Leader节点需要保存的易失性状态(每次选举后重新初始化):

  • nextIndex[],表示对于每台服务器,将要发送的下一个日志条目的下标(初始化为Leader节点的最后一个日志条目下标 +1)
  • matchIndex[],表示对于每台服务器,已知的在服务器上复制的日志条目的最大下标(初始化为0,单调递增)

其中,我们干脆根据论文中的示例,认为日志条目下标从1开始,任期号也从1开始,0则表示服务器上暂时没有日志条目,这样更加方便我们进行操作,另外越界的下标我们一律将任期表示为-1。为此我们额外写上几个方法,方便后续使用,这几个方法分别实现的功能有根据下标获取某个日志条目,获取最后一个日志条目,根据范围获取日志切片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// index and term start from 1
func (logEntries LogEntries) getEntry(index int) *LogEntry {
if index < 0 {
log.Panic("LogEntries.getEntry: index < 0.\n")
}
if index == 0 {
return &LogEntry{
Command: nil,
Term: 0,
}
}
if index > len(logEntries) {
return &LogEntry{
Command: nil,
Term: -1,
}
}
return &logEntries[index-1]
}

func (logEntries LogEntries) lastLogInfo() (index, term int) {
index = len(logEntries)
logEntry := logEntries.getEntry(index)
return index, logEntry.Term
}

func (logEntries LogEntries) getSlice(startIndex, endIndex int) LogEntries {
if startIndex <= 0 {
Debug(dError, "LogEntries.getSlice: startIndex out of range. startIndex: %d, len: %d.",
startIndex, len(logEntries))
log.Panic("LogEntries.getSlice: startIndex out of range. \n")
}
if endIndex > len(logEntries)+1 {
Debug(dError, "LogEntries.getSlice: endIndex out of range. endIndex: %d, len: %d.",
endIndex, len(logEntries))
log.Panic("LogEntries.getSlice: endIndex out of range.\n")
}
if startIndex > endIndex {
Debug(dError, "LogEntries.getSlice: startIndex > endIndex. (%d > %d)", startIndex, endIndex)
log.Panic("LogEntries.getSlice: startIndex > endIndex.\n")
}
return logEntries[startIndex-1 : endIndex-1]
}

另外状态中还保存了一个mu,用于保护状态,防止可能的多线程读写竞争。state用于表示当前节点状态,例如Follower、Leader、Candidate状态。heartbeatTime和electionTime表示了下次触发心跳广播(仅对Leader节点有效)和下次触发选举(一般是由心跳超时或者选举超时导致的)的时机。

根据如上信息,我们在Make函数中完成peer节点的初始化。至于相关的ticker()方法,我们后面再提:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {

rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).
rf.currentTerm = 0
rf.votedFor = -1
rf.state = FollowerState
rf.setElectionTimeout(randHeartbeatTimeout())
rf.log = make([]LogEntry, 0)
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
Debug(dClient, "S%d Started at T%d. LLI: %d, LLT: %d.", rf.me, rf.currentTerm, lastLogIndex, lastLogTerm)

// start ticker goroutine to start elections
go rf.ticker()

return rf
}

另外我们需要补充GetState()方法来确保测试程序能够获得当前节点的状态信息:

1
2
3
4
5
6
7
8
9
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

// Your code here (2A).
rf.mu.Lock()
defer rf.mu.Unlock()
return rf.currentTerm, rf.state == LeaderState
}

AppendEntries RPC结构体实现

此外,我们还需要填充两个RPC调用中的相关参数,首先看AppendEntries RPC内容。这个RPC主要负责在Leader和Follower之间更新同步日志条目:

1
2
3
4
5
6
7
8
9
10
11
12
13
type AppendEntriesArgs struct {
Term int // leader’s term
LeaderId int // with leaderId follower can redirect clients
PrevLogIndex int // index of log entry immediately preceding new ones
PrevLogTerm int // term of prevLogIndex entry
Entries []LogEntry // log entries to store (empty for heartbeat; may send more than one for efficiency)
LeaderCommit int // leader's commitIndex
}

type AppendEntriesReply struct {
Term int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
}

简单的介绍一下,在Leader发送的RPC请求内容中有以下几个参数:

  • term,表示发送时Leader的任期号
  • leaderId,表示Leader节点自身的节点编号,给出该参数可以后续方便在接受客户端请求时重定向到Leader节点
  • prevLogIndex,预期要插入的日志条目位置紧接着前面的一个日志的下标,插入操作前要校对日志下标和任期来确保之前的日志全部同步
  • prevLogTerm,预期要插入的日志条目位置紧接着前面的一个日志的任期,插入操作前要校对日志下标和任期来确保之前的日志全部同步
  • entries[],发送的更新日志条目(如果为空则表示心跳,一次性可以发送多条日志条目来提高效率)
  • leaderCommit,表示Leader节点当前的commitIndex,相关含义见Raft结构体定义

Follower节点在接收到来自Leader的请求后会返回如下的RPC回复参数:

  • term,表示Follower节点当前的任期编号。如果Leader的任期太老了,Follower有义务提醒Leader节点转变为Follower身份并更新任期
  • success,表示prevLogIndexprevLogTerm的检查是否通过,如果没有则需要Leader节点回退更多的条目重试

RequestVote RPC结构体实现

另外我们还需要实现RequestVote RPC的结构,这个RPC主要用于投票阶段,Candidate发用RPC请求,其余节点回复:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// example RequestVote RPC arguments structure.
// field names must start with capital letters!
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // candidate's term
CandidateId int // candidate requesting vote
LastLogIndex int // index of candidate’s last log entry (§5.4)
LastLogTerm int // term of candidate’s last log entry (§5.4)
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
// Your data here (2A).
Term int // currentTerm, for candidate to update itself
VoteGranted bool // true means candidate received vote
}

同样简单的翻译下,Candidate发送的请求中有如下参数:

  • term,表示Candidate节点当前所在的任期号
  • candidateId,表示Candidate节点的peer节点编号
  • lastLogIndex,表示Candidate节点的最后一个日志条目的下标。投票节点需要确定Candidate的日志条目比自己的“更新”才有可能会选择投票,因此需要此参数。
  • lastLogTerm,表示Candidate节点的最后一个日志条目的任期,同上原则

其余节点返回的结果中有如下参数:

  • term,表示节点当前的任期。如果Candidate的任期太老了,节点有义务提醒Candidate节点转变为Follower身份并更新任期
  • voteGranted,表示当前节点是否同意给Candidate投票

ticker超时检查实现

Make方法中,我们调起了一个ticker方法,这个方法主要用于检查相关的超时设置,例如我们在Raft结构体中提到了heartbeatTimeelectionTime,分别表示了Leader下次触发心跳的时机以及普通节点下次发起选举的时机。简单来看,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// These constants are all in milliseconds.
const TickInterval int64 = 30 // Loop interval for ticker and applyLogsLoop. Mostly for checking timeouts.

// The ticker go routine starts a new election if this peer hasn't received
// heartsbeats recently.
// The routine also broadcasts heartbeats and appies logs periodically.
func (rf *Raft) ticker() {
for !rf.killed() {

// Your code here to check if a leader election should
// be started and to randomize sleeping time using
// time.Sleep().
rf.mu.Lock()
// leader repeat heartbeat during idle periods to prevent election timeouts (§5.2)
if rf.state == LeaderState && time.Now().After(rf.heartbeatTime) {
Debug(dTimer, "S%d HBT elapsed. Broadcast heartbeats.", rf.me)
rf.sendEntries(true)
}
// If election timeout elapses: start new election
if time.Now().After(rf.electionTime) {
Debug(dTimer, "S%d ELT elapsed. Converting to Candidate, calling election.", rf.me)
rf.raiseElection()
}
rf.mu.Unlock()
time.Sleep(time.Duration(TickInterval) * time.Millisecond)
}
}

我们以TickInterval为周期不断刷新检测超时时间,如果超时则执行对应的行为。例如如果当前节点是Leader,且时间超过了heartbeatTime,我们就需要让Leader节点进行全体心跳广播。如果时间超过了electionTime,那么我们就要让节点发起一次选举。

超时时间函数实现

让我们重新来梳理下这里面存在的各种超时时间关系,Raft协议中最重要的两个超时时间分别是心跳超时和选举超时,一个是由于节点长时间收不到来自Leader的心跳导致的,一个是在选举过程中迟迟无法选举出Leader等原因导致的,这两个超时都会导致节点发起一轮选举。

在网课中教授提到,为了尽可能的防止split vote分裂投票的发生,我们最好将这些超时时间尽可能的随机化,为此我们产生了以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// These constants are all in milliseconds.
const BaseHeartbeatTimeout int64 = 300 // Lower bound of heartbeat timeout. Election is raised when timeout as a follower.
const BaseElectionTimeout int64 = 1000 // Lower bound of election timeout. Another election is raised when timeout as a candidate.

const RandomFactor float64 = 0.8 // Factor to control upper bound of heartbeat timeouts and election timeouts.

func randElectionTimeout() time.Duration {
extraTime := int64(float64(rand.Int63()%BaseElectionTimeout) * RandomFactor)
return time.Duration(extraTime+BaseElectionTimeout) * time.Millisecond
}

func randHeartbeatTimeout() time.Duration {
extraTime := int64(float64(rand.Int63()%BaseHeartbeatTimeout) * RandomFactor)
return time.Duration(extraTime+BaseHeartbeatTimeout) * time.Millisecond
}

其中BaseHeartbeatTimeoutBaseElectionTimeout是等待心跳超时和选举超时的基准时间,具体的超时时间收到随机因子RandomFactor的影响,对于一个基准超时时间BaseTimeoutBaseTimeout而言,其最终产生的随机时间会落在区间[BaseTimeout,(1+RandomFactor)BaseTimeout)[BaseTimeout, (1+RandomFactor)*BaseTimeout)中。

然后我们需要将这个超时时间应用在选举超时时间electionTime上:

1
2
3
4
5
func (rf *Raft) setElectionTimeout(timeout time.Duration) {
t := time.Now()
t = t.Add(timeout)
rf.electionTime = t
}

相比之下,我们还有一个超时时间,那就是对于Leader而言的心跳间隔HeartbeatInterval,Leader节点每隔一段时间就需要广播一批心跳。和刚才的选举时间相比,我们的心跳间隔设置为固定的即可,因此:

1
2
3
4
5
6
7
8
// These constants are all in milliseconds.
const HeartbeatInterval int64 = 100 // Interval between heartbeats. Broadcast heartbeats when timeout as a leader.

func (rf *Raft) setHeartbeatTimeout(timeout time.Duration) {
t := time.Now()
t = t.Add(timeout)
rf.heartbeatTime = t
}

RequestVote RPC核心函数实现

我们先来看简单点的请求投票里面的RPC实现。

首先在图2中的提示里的Rules for Servers (All Servers) 里面有一段话:

If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)

对于任何的服务器,当其接收到来自其他的服务器的信息,不管是RPC请求方发来的,还是RPC回复方发来的。我们检查当前服务器的任期和另外那台服务器的任期关系,一旦发现当前服务器的任期数字较小,我们就选择转变为Follower状态,并更新任期。在这里,我是用checkTerm函数来实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
// Check if current term is out of date when hearing from other peers,
// update term, revert to follower state and return true if necesarry
func (rf *Raft) checkTerm(term int) bool {
if rf.currentTerm < term {
Debug(dTerm, "S%d Term is higher, updating term to T%d, setting state to follower. (%d > %d)",
rf.me, term, term, rf.currentTerm)
rf.state = FollowerState
rf.currentTerm = term
rf.votedFor = -1
return true
}
return false
}

接下来我们就完全按照图2 RequestVote RPC部分给出的提示来:

  1. Reply false if term < currentTerm (§5.1)
  2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)

如果Candidate的任期还不如我们的当前任期,我们就需要拒绝这次投票。

此外只有在Candidate的日志条目比我们当前节点相比的至少“一样新”时我们才可以选择进行投票。这里的“较新”指的是在任期号较新或者是任期号一致的情况下日志条目的数量较多。原文是这样描述的:

Raft uses the voting process to prevent a candidate from winning an election unless its log contains all committed entries. A candidate must contact a majority of the cluster in order to be elected, which means that every committed entry must be present in at least one of those servers. If the candidate’s log is at least as up-to-date as any other log in that majority (where “up-to-date” is defined precisely below), then it will hold all the committed entries. The RequestVote RPC implements this restriction: the RPC includes information about the candidate’s log, and the voter denies its vote if its own log is more up-to-date than that of the candidate.

Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs. If the logs have last entries with different terms, then the log with the later term is more up-to-date. If the logs end with the same term, then whichever log is longer is more up-to-date.

此外如果当前节点已经选择投票给其他人了那我们也只能拒绝投票。

因此,我们有代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dVote, "S%d <- S%d Received vote request at T%d.", rf.me, args.CandidateId, rf.currentTerm)
reply.VoteGranted = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dVote, "S%d Term is lower, rejecting the vote. (%d < %d)", rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

// If votedFor is null or candidateId,
// and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
if lastLogTerm < args.LastLogTerm ||
(lastLogTerm == args.LastLogTerm && lastLogIndex <= args.LastLogIndex) {
Debug(dVote, "S%d Granting vote to S%d at T%d.", rf.me, args.CandidateId, args.Term)
reply.VoteGranted = true
rf.votedFor = args.CandidateId
Debug(dTimer, "S%d Resetting ELT, wait for next potential election timeout.", rf.me)
rf.setElectionTimeout(randElectionTimeout())
} else {
Debug(dVote, "S%d Candidate's log not up-to-date, rejecting the vote. LLI: %d, %d. LLT: %d, %d.",
rf.me, lastLogIndex, args.LastLogIndex, lastLogTerm, args.LastLogTerm)
}
} else {
Debug(dVote, "S%d Already voted for S%d, rejecting the vote.", rf.me, rf.votedFor)
}
}

发起选举流程实现

实现了基本的RPC之后,我们开始考虑整个发起选举的流程,首先ticker()函数发现发生了超时,会开始启动我们的发起选举raiseElection函数。

图2中在Rules for Servers部分给出了如下提示:

On conversion to candidate, start election:

  • Increment currentTerm
  • Vote for self
  • Reset election timer
  • Send RequestVote RPCs to all other servers

If votes received from majority of servers: become leader

If AppendEntries RPC received from new leader: convert to follower <= 这点暂时无关

If election timeout elapses: start new election

开始选举时,我们首先更新我们的任期编号,并将自己的票投给自己,同时使用随机的选举超时时间重置选举计时器,并向其他所有节点发送RPC:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (rf *Raft) raiseElection() {
rf.state = CandidateState
// Increment currentTerm
rf.currentTerm++
Debug(dTerm, "S%d Starting a new term. Now at T%d.", rf.me, rf.currentTerm)
// Vote for self
rf.votedFor = rf.me
// Reset election timer
rf.setElectionTimeout(randElectionTimeout())
Debug(dTimer, "S%d Resetting ELT because of election, wait for next potential election timeout.", rf.me)
lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
voteCount := 1
var once sync.Once
// Send RequestVote RPCs to all other servers
for peer := range rf.peers {
if peer == rf.me {
continue
}
Debug(dVote, "S%d -> S%d Sending request vote at T%d.", rf.me, peer, rf.currentTerm)
go rf.candidateRequestVote(&voteCount, args, &once, peer)
}
}

具体的发起选举行为,我们使用一个goroutine在candidateRequestVote方法下实现,在这里我们维护了一个voteCount表示接收到的投票数量(其中有一票已经投给了自己),一旦超过半数我们就成功当选Leader(这个方法只执行一次,可以用sync.Once来实现)。一旦当选,我们就会调用sendEntries(true)广播心跳,这个方法我们稍后在AppendEntries部分实现。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (rf *Raft) candidateRequestVote(voteCount *int, args *RequestVoteArgs, once *sync.Once, server int) {
reply := &RequestVoteReply{}
ok := rf.sendRequestVote(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dVote, "S%d <- S%d Received request vote reply at T%d.", rf.me, server, rf.currentTerm)
if reply.Term < rf.currentTerm {
Debug(dVote, "S%d Term is lower, invalid vote reply. (%d < %d)", rf.me, reply.Term, rf.currentTerm)
return
}
if rf.currentTerm != args.Term {
Debug(dWarn, "S%d Term has changed after the vote request, vote reply discarded. "+
"requestTerm: %d, currentTerm: %d.", rf.me, args.Term, rf.currentTerm)
return
}
// If AppendEntries RPC received from new leader: convert to follower
rf.checkTerm(reply.Term)
if reply.VoteGranted {
*voteCount++
Debug(dVote, "S%d <- S%d Get a yes vote at T%d.", rf.me, server, rf.currentTerm)
// If votes received from majority of servers: become leader
if *voteCount > len(rf.peers)/2 {
once.Do(func() {
Debug(dLeader, "S%d Received majority votes at T%d. Become leader.", rf.me, rf.currentTerm)
rf.state = LeaderState
lastLogIndex, _ := rf.log.lastLogInfo()
for peer := range rf.peers {
rf.nextIndex[peer] = lastLogIndex + 1
rf.matchIndex[peer] = 0
}
// Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server
rf.sendEntries(true)
})
}
} else {
Debug(dVote, "S%d <- S%d Get a no vote at T%d.", rf.me, server, rf.currentTerm)
}
}
}

AppendEntries RPC核心函数实现

接下来我们来看AppendEntriesRPC的核心实现部分,图2中给出了提示:

  1. Reply false if term < currentTerm (§5.1)
  2. Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
  3. If an existing entry conflicts with a new one (same index but different terms), delete the existing entry and all that follow it (§5.3)
  4. Append any new entries not already in the log

这里就提取主要的来说,我们需要根据prevLogIndexprevLogTerm来检查插入位置之前的日志是否全部同步,如果不同步我们要通知Leader回退重试。如果已经同步,我们再尝试插入RPC中携带的日志(这一块请参考之前写的可能的坑)。

另外如果当前节点还是Candidate… 那极有可能是在同一个任期下多个节点开始了竞选,如果Candidate收到了当前任期下的Leader的RPC请求,应该自动变为Follower。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
Debug(dLog2, "S%d <- S%d Received heartbeat at T%d.", rf.me, args.LeaderId, rf.currentTerm)
} else {
Debug(dLog2, "S%d <- S%d Received append entries at T%d.", rf.me, args.LeaderId, rf.currentTerm)
}
reply.Success = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dLog2, "S%d Term is lower, rejecting append request. (%d < %d)",
rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

// For Candidates. If AppendEntries RPC received from new leader: convert to follower
if rf.state == CandidateState && rf.currentTerm == args.Term {
rf.state = FollowerState
Debug(dLog2, "S%d Convert from candidate to follower at T%d.", rf.me, rf.currentTerm)
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.log.getEntry(args.PrevLogIndex).Term {
Debug(dLog2, "S%d Prev log entries do not match. Ask leader to retry.", rf.me)
return
}
// If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
for i, entry := range args.Entries {
if rf.log.getEntry(i+1+args.PrevLogIndex).Term != entry.Term {
rf.log = append(rf.log.getSlice(1, i+1+args.PrevLogIndex), args.Entries[i:]...)
break
}
}
Debug(dLog2, "S%d <- S%d Append entries success. Saved logs: %v.", rf.me, args.LeaderId, args.Entries)
reply.Success = true
}

广播心跳流程实现

根据我们在ticker函数中的设置,Leader节点每过一段固定的时间,就会尝试开始发送心跳广播,使用sendEntries(true)来实现。其中true表示是心跳包,是不带有日志条目的AppendEntries RPC,false则表示是带有日志条目的普通AppendEntries RPC。

图2中提示如下:

  • Upon election: send initial empty AppendEntries RPCs (heartbeat) to each server; repeat during idle periods to prevent election timeouts (§5.2)
  • If command received from client: append entry to local log, respond after entry applied to state machine (§5.3)
  • If last log index ≥ nextIndex for a follower: send AppendEntries RPC with log entries starting at nextIndex

我们的prevLogIndex表示的是即将要插入位置的前面的下标,可以直接用nextIndex减一获得。如果我们需要发送普通AppendEntries RPC,我们只需要根据下标范围从nextIndex到最后做一个切片即可,这段日志条目我们放入RPC中即可。最后我们开启一个goroutine开始发起RPC请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func (rf *Raft) sendEntries(isHeartbeat bool) {
Debug(dTimer, "S%d Resetting HBT, wait for next heartbeat broadcast.", rf.me)
rf.setHeartbeatTimeout(time.Duration(HeartbeatInterval) * time.Millisecond)
Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())
lastLogIndex, _ := rf.log.lastLogInfo()
for peer := range rf.peers {
if peer == rf.me {
continue
}
nextIndex := rf.nextIndex[peer]
args := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.log.getEntry(nextIndex - 1).Term,
LeaderCommit: rf.commitIndex,
}
if lastLogIndex >= nextIndex {
// If last log index ≥ nextIndex for a follower:
// send AppendEntries RPC with log entries starting at nextIndex
entries := make([]LogEntry, lastLogIndex-nextIndex+1)
copy(entries, rf.log.getSlice(nextIndex, lastLogIndex+1))
args.Entries = entries
Debug(dLog, "S%d -> S%d Sending append entries at T%d. PLI: %d, PLT: %d, LC: %d. Entries: %v.",
rf.me, peer, rf.currentTerm, args.PrevLogIndex,
args.PrevLogTerm, args.LeaderCommit, args.Entries,
)
go rf.leaderSendEntries(args, peer)
} else if isHeartbeat {
args.Entries = make([]LogEntry, 0)
Debug(dLog, "S%d -> S%d Sending heartbeat at T%d. PLI: %d, PLT: %d, LC: %d.",
rf.me, peer, rf.currentTerm, args.PrevLogIndex, args.PrevLogTerm, args.LeaderCommit)
go rf.leaderSendEntries(args, peer)
}
}
}

在请求的过程中,如果返回表示prevLogIndex不一致,那就需要我们进行回退,随后重新发起RPC。否则我们就可以成功更新nextIndexmatchIndex数组信息。图2中提示为:

  • If successful: update nextIndex and matchIndex for follower (§5.3)
  • If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (rf *Raft) leaderSendEntries(args *AppendEntriesArgs, server int) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dLog, "S%d <- S%d Received send entry reply at T%d.", rf.me, server, rf.currentTerm)
if reply.Term < rf.currentTerm {
Debug(dLog, "S%d Term lower, invalid send entry reply. (%d < %d)",
rf.me, reply.Term, rf.currentTerm)
return
}
if rf.currentTerm != args.Term {
Debug(dWarn, "S%d Term has changed after the append request, send entry reply discarded. "+
"requestTerm: %d, currentTerm: %d.", rf.me, args.Term, rf.currentTerm)
return
}
if rf.checkTerm(reply.Term) {
return
}
// If successful: update nextIndex and matchIndex for follower (§5.3)
if reply.Success {
Debug(dLog, "S%d <- S%d Log entries in sync at T%d.", rf.me, server, rf.currentTerm)
newNext := args.PrevLogIndex + 1 + len(args.Entries)
newMatch := args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = max(newNext, rf.nextIndex[server])
rf.matchIndex[server] = max(newMatch, rf.matchIndex[server])
} else {
// If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
if rf.nextIndex[server] > 1 {
rf.nextIndex[server]--
}
lastLogIndex, _ := rf.log.lastLogInfo()
nextIndex := rf.nextIndex[server]
if lastLogIndex >= nextIndex {
Debug(dLog, "S%d <- S%d Inconsistent logs, retrying.", rf.me, server)
entries := make([]LogEntry, lastLogIndex-nextIndex+1)
copy(entries, rf.log.getSlice(nextIndex, lastLogIndex+1))
newArg := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.log.getEntry(nextIndex - 1).Term,
Entries: entries,
}
go rf.leaderSendEntries(newArg, server)
}
}
}
}

测试

至此,我们的2A部分就基本实现了,可能稍稍有点越界… 像是AppendEntries RPC部分感觉可能已经涉及到了部分2B内容,但是不管了…

我们先看看使用TA给的调试工具出来的效果,我们测试样例选择2A中第一个最简单的测试TestInitialElection2A,即测试单次选举,我们在src/raft目录下使用命令VERBOSE=1 go test -run TestInitialElection2A -race | python dslog.py -c 3

看起来就很不错啦,很明显看出来是S2在第一任期当选Leader,然后开始发送心跳,要的就是这效果。

接下来我们运行2A全部测试:

是通过的,如果还是害怕,可以用TA给的脚本测试5000次结果,运行命令python dstest.py -n 5000 -p 10 -v -r 2A

看到是正常通过了~

实现日志功能(2B)

要求和提示

在本部分中,我们需要完善Leader和Follower节点的相关代码,来实现AppendEntries推送日志条目的相关功能,最终我们运行go test -run 2B -race来进行测试。

相关提示:

  • 我们的第一个目标是通过测试程序TestBasicAgree2B()。我们首先需要实现Start(),然后按照论文图2实现AppendEntries RPC函数来收发新的日志条目。将每个新提交的日志条目发送在各个peer节点的applyCh中。
  • 我们需要满足选举限制的相关要求(见论文5.4.1)
  • 在2B的前期测试中可能的无法达成一致的失败情形是,在Leader存活的情况下反复发起选举。请检查在选举计时器中的bug,或者检查下有没有在赢得选举后立即广播心跳。
  • 代码中可能具有重复检查某些事件的循环。不要让这些循环不停顿地连续执行,因为这会使实现变慢,以至于测试失败。使用 Go 的条件变量condition variables,或者使用在每个循环的时候进行10ms睡眠。
  • 为了方便后续的Lab,还是尽量将我们的代码写的干净整洁。我们可以参考Guidance page 查看开发和调试的相关建议。
  • 如果有测试暂未通过,可以研究下config.gotest_test.go中的测试程序,来更好的理解测试程序在测试什么情形。config.go文件还很好的阐述了测试程序是如何使用Raft中的API的。

如果我们的程序运行Lab运行的太慢了也有可能导致测试无法通过。可以使用time指令来检查我们花费了多少真正的时间和CPU时间,如下是典型的输出示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
$ time go test -run 2B
Test (2B): basic agreement ...
... Passed -- 0.9 3 16 4572 3
Test (2B): RPC byte count ...
... Passed -- 1.7 3 48 114536 11
Test (2B): agreement after follower reconnects ...
... Passed -- 3.6 3 78 22131 7
Test (2B): no agreement if too many followers disconnect ...
... Passed -- 3.8 5 172 40935 3
Test (2B): concurrent Start()s ...
... Passed -- 1.1 3 24 7379 6
Test (2B): rejoin of partitioned leader ...
... Passed -- 5.1 3 152 37021 4
Test (2B): leader backs up quickly over incorrect follower logs ...
... Passed -- 17.2 5 2080 1587388 102
Test (2B): RPC counts aren't too high ...
... Passed -- 2.2 3 60 20119 12
PASS
ok 6.824/raft 35.557s

real 0m35.899s
user 0m2.556s
sys 0m1.458s
$

“ok 6.824/raft 35.557s” 表示Go进行2B测试花费的时间是35.557秒的实际时间,"user 0m2.556s"表示代码消耗了2.556秒的CPU时间或实际执行指令的时间(而不是等待或睡眠)。如果我们的代码在2B测试中使用的实际时间远远超过1分钟,或者远远超过5秒的CPU时间,可能会遇到麻烦。为此我们需要查看是否有花费在睡眠或等待RPC超时上的大量时间,在没有睡眠或等待条件或通道消息的情况下运行的循环,或大量的RPC发送。

实现Start()函数

根据提示,在2B中,我们需要实现Start()函数,该函数主要用于作为Leader节点时接收新的请求LogEntry,而在不是Leader节点时,返回相关的信息,例如当前节点认为的Leader节点是谁,使得请求可以重定向到Leader节点。

为此,我们需要在Raft结构体中新增leaderId字段,来表示当前节点认为节点中的Leader是谁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// A Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

leaderId int // the leader server that current server recognizes
state RaftState
heartbeatTime time.Time
electionTime time.Time

// Persistent state on all servers

currentTerm int // latest term server has seen
votedFor int // candidateId that received vote in current term
log LogEntries // log entries; each entry contains command for state machine, and term when entry was received by leader

// Volatile state on all servers

commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine

// Volatile state on leaders (Reinitialized after election)

nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server
}

在新增的Start()方法中,我们加入如下实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true

// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != LeaderState {
return -1, -1, false
}
logEntry := LogEntry{
Command: command,
Term: rf.currentTerm,
}
rf.log = append(rf.log, logEntry)
index = len(rf.log)
term = rf.currentTerm
Debug(dLog, "S%d Add command at T%d. LI: %d, Command: %v\n", rf.me, term, index, command)
rf.sendEntries(false)

return index, term, isLeader
}

可以看到功能较为简单,即当当前节点认为自己不是Leader节点时返回他所认为的Leader节点编号,否则Leader节点会接受请求,并新增相关的日志条目。

Raft结构体新增leaderId字段

对于Raft结构体中新增的leaderId字段,我们需要在相关受影响的代码位置也更新一下,例如:

初始化Raft结构体时,将leaderId初始化为-1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {

rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).
rf.leaderId = -1
rf.currentTerm = 0
rf.votedFor = -1
rf.state = FollowerState
rf.setElectionTimeout(randHeartbeatTimeout())
rf.log = make([]LogEntry, 0)
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
Debug(dClient, "S%d Started at T%d. LLI: %d, LLT: %d.", rf.me, rf.currentTerm, lastLogIndex, lastLogTerm)

// start ticker goroutine to start elections
go rf.ticker()

// Apply logs periodically until the last committed index to make sure state machine is up to date.
go rf.applyLogsLoop(applyCh)

return rf
}

checkTerm()方法中,当发现当前任期小于RPC调用者的任期,我们需要重置自己的leaderId

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
// Check if current term is out of date when hearing from other peers,
// update term, revert to follower state and return true if necesarry
func (rf *Raft) checkTerm(term int) bool {
if rf.currentTerm < term {
Debug(dTerm, "S%d Term is higher, updating term to T%d, setting state to follower. (%d > %d)",
rf.me, term, term, rf.currentTerm)
rf.state = FollowerState
rf.currentTerm = term
rf.votedFor = -1
rf.leaderId = -1
return true
}
return false
}

当节点在接受AppendEntriesRPC调用时,节点需要在确认Term等信息选择接受后更新一次自己的leaderId信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
Debug(dLog2, "S%d <- S%d Received heartbeat at T%d.", rf.me, args.LeaderId, rf.currentTerm)
} else {
Debug(dLog2, "S%d <- S%d Received append entries at T%d.", rf.me, args.LeaderId, rf.currentTerm)
}
reply.Success = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dLog2, "S%d Term is lower, rejecting append request. (%d < %d)",
rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

// For Candidates. If AppendEntries RPC received from new leader: convert to follower
if rf.state == CandidateState && rf.currentTerm == args.Term {
rf.state = FollowerState
Debug(dLog2, "S%d Convert from candidate to follower at T%d.", rf.me, rf.currentTerm)
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

rf.leaderId = args.LeaderId

//.....
//.....
//.....
}

定期将已提交日志同步状态机

对于在日志条目中已经提交的节点,我们需要额外开一个协程,定期将已提交的变更应用到具体的状态机中,这里只需要将对应的日志发送到applyCh就可以了,至于当前节点的已提交日志水位在哪个位置,我们只需要参考Raft结构体中的commitIndex即可,而状态机的应用日志水位见lastApplied,因此我们创建一个如下的协程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rf *Raft) applyLogsLoop(applyCh chan ApplyMsg) {
for !rf.killed() {
// Apply logs periodically until the last committed index.
rf.mu.Lock()
// To avoid the apply operation getting blocked with the lock held,
// use a slice to store all committed msgs to apply, and apply them only after unlocked
appliedMsgs := []ApplyMsg{}
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
appliedMsgs = append(appliedMsgs, ApplyMsg{
CommandValid: true,
Command: rf.log.getEntry(rf.lastApplied).Command,
CommandIndex: rf.lastApplied,
})
Debug(dLog2, "S%d Applying log at T%d. LA: %d, CI: %d.", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex)
}
rf.mu.Unlock()
for _, msg := range appliedMsgs {
applyCh <- msg
}
time.Sleep(time.Duration(TickInterval) * time.Millisecond)
}
}

注意到为了防止在应用水位追赶提交水位的过程中由于发送channel导致阻塞,我们使用一个切片数组,一次性打包要提交的日志,随后解锁,在解锁后再将切片中的日志应用到状态机中。

随后我们在Make()启动函数中加入协程的调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {

rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).
rf.leaderId = -1
rf.currentTerm = 0
rf.votedFor = -1
rf.state = FollowerState
rf.setElectionTimeout(randHeartbeatTimeout())
rf.log = make([]LogEntry, 0)
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
Debug(dClient, "S%d Started at T%d. LLI: %d, LLT: %d.", rf.me, rf.currentTerm, lastLogIndex, lastLogTerm)

// start ticker goroutine to start elections
go rf.ticker()

// Apply logs periodically until the last committed index to make sure state machine is up to date.
go rf.applyLogsLoop(applyCh)

return rf
}

配置AppendEntries RPC

在本部分中,我们只需要在之前2A的基础上加入一小部份代码,我们根据论文图2中给出的提示来,其中有一点说在AppendEntriesRPC被调用端是如何根据Leader的已提交水位来更新自己的已提交水位的:

  1. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)

这里主要需要好好理解什么叫做index of last new entry,意思并不是当前所有日志条目的最后一个日志下标,而是这次AppendEntries提交的日志应用后,在这批日志中对应的最后一个日志下标。

如果Leader节点发送过来的日志发生冲突了我们做了删除替换操作,那取当前所有日志条目的最后一条自然没有问题。但是如果是不产生冲突的过时日志条目,那显然就不对了。这里我们还是拿之前的那张老图当例子说明下,这个得细品一下:

所以在RPC被调用端,我们的AppendEntries方法这样写:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
Debug(dLog2, "S%d <- S%d Received heartbeat at T%d.", rf.me, args.LeaderId, rf.currentTerm)
} else {
Debug(dLog2, "S%d <- S%d Received append entries at T%d.", rf.me, args.LeaderId, rf.currentTerm)
}
reply.Success = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dLog2, "S%d Term is lower, rejecting append request. (%d < %d)",
rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

// For Candidates. If AppendEntries RPC received from new leader: convert to follower
if rf.state == CandidateState && rf.currentTerm == args.Term {
rf.state = FollowerState
Debug(dLog2, "S%d Convert from candidate to follower at T%d.", rf.me, rf.currentTerm)
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

rf.leaderId = args.LeaderId

// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.log.getEntry(args.PrevLogIndex).Term {
Debug(dLog2, "S%d Prev log entries do not match. Ask leader to retry.", rf.me)
return
}
// If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
for i, entry := range args.Entries {
if rf.log.getEntry(i+1+args.PrevLogIndex).Term != entry.Term {
rf.log = append(rf.log.getSlice(1, i+1+args.PrevLogIndex), args.Entries[i:]...)
break
}
}
Debug(dLog2, "S%d <- S%d Append entries success. Saved logs: %v.", rf.me, args.LeaderId, args.Entries)
if args.LeaderCommit > rf.commitIndex {
Debug(dCommit, "S%d Get higher LC at T%d, updating commitIndex. (%d < %d)",
rf.me, rf.currentTerm, rf.commitIndex, args.LeaderCommit)
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
Debug(dCommit, "S%d Updated commitIndex at T%d. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
}
reply.Success = true
}

而在RPC调用端,也就是Leader节点发送的这一端,在论文图2中这样提醒我们处理结果来决定Leader如何更新自己的已提交水位:

If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N, and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).

只有当大多数节点都已经接受并将该日志加入到自己的日志条目中,Leader才能认为该条目已提交了,这点符合我们对Raft的预期。在论文中不过是用一种更加具体的方式描述了一下这个意思,即存在日志下标N,使得大多数节点的matchIndex都大于等于N,且该日志属于Leader当前任期日志,我们就将Leader的已提交水位设置为N。值得玩味的是最后一句“该日志属于Leader当前任期日志”很好的解决了论文图8中的问题,只有Leader节点将当前任期中的日志同步到了大多数节点中,我们才能够开始提交之前任期中没有提交的日志。因此我们加一段代码,严格按照上述逻辑执行即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
func (rf *Raft) leaderSendEntries(args *AppendEntriesArgs, server int) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dLog, "S%d <- S%d Received send entry reply at T%d.", rf.me, server, rf.currentTerm)
if reply.Term < rf.currentTerm {
Debug(dLog, "S%d Term lower, invalid send entry reply. (%d < %d)",
rf.me, reply.Term, rf.currentTerm)
return
}
if rf.currentTerm != args.Term {
Debug(dWarn, "S%d Term has changed after the append request, send entry reply discarded. "+
"requestTerm: %d, currentTerm: %d.", rf.me, args.Term, rf.currentTerm)
return
}
if rf.checkTerm(reply.Term) {
return
}
// If successful: update nextIndex and matchIndex for follower (§5.3)
if reply.Success {
Debug(dLog, "S%d <- S%d Log entries in sync at T%d.", rf.me, server, rf.currentTerm)
newNext := args.PrevLogIndex + 1 + len(args.Entries)
newMatch := args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = max(newNext, rf.nextIndex[server])
rf.matchIndex[server] = max(newMatch, rf.matchIndex[server])
// If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
// and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
for N := len(rf.log); N > rf.commitIndex && rf.log.getEntry(N).Term == rf.currentTerm; N-- {
count := 1
for peer, matchIndex := range rf.matchIndex {
if peer == rf.me {
continue
}
if matchIndex >= N {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = N
Debug(dCommit, "S%d Updated commitIndex at T%d for majority consensus. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
break
}
}
} else {
// If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
if rf.nextIndex[server] > 1 {
rf.nextIndex[server]--
}
lastLogIndex, _ := rf.log.lastLogInfo()
nextIndex := rf.nextIndex[server]
if lastLogIndex >= nextIndex {
Debug(dLog, "S%d <- S%d Inconsistent logs, retrying.", rf.me, server)
entries := make([]LogEntry, lastLogIndex-nextIndex+1)
copy(entries, rf.log.getSlice(nextIndex, lastLogIndex+1))
newArg := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.log.getEntry(nextIndex - 1).Term,
Entries: entries,
}
go rf.leaderSendEntries(newArg, server)
}
}
}
}

测试

2B中这次代码更变不多,简单测试一下2B相关的情况,使用命令go test -run 2B -race

然后顺带连带2A和2B跑了5000次测试,看起来也没啥问题:

实现持久化(2C)

要求和提示

如果一台Raft服务器重启了,那么他就应该在刚才停下的地方继续运行服务。这就要求Raft协议要将一部份状态进行持久化,使得在重启后相关状态可以保留。在论文的图2种描述了哪些状态需要进行持久化保存。

一种正确的实现方式是,每当Raft的需持久化状态发生改变时,将其写入硬盘,并在重启之后从硬盘中读取出相关的状态信息。在本次Lab的实现中,我们不会使用到硬盘,而是从一个叫做Persister的对象(详见persistor.go)中保存和恢复需持久化的状态。Raft.Make()方法的调用者会提供一个Persister对象,如果有的话该对象中会存储Raft最近持久化的相关状态信息。Raft需要从这个Persister对象中初始化自己的状态,并且应当在每次相关持久化状态改变时使用该对象进行保存,我们使用Persister对象中的ReadRaftState()Save()方法进行持久化状态的恢复和保存。

在本部分中,我们需要完成raft.go中的persist()readPersist()方法来实现持久化状态的保存和恢复。我们会需要序列化编码相关状态,生成byte类型的数组来将其传递给Persister。使用labgob编码器即可,相关内容详见persist()readPersist()中的注释。labgob编码器类似于Go中提供的gob编码器,但是当你尝试序列化的数据结构中有小写名称的字段时就会打印错误信息。在本部分中,我们暂时将persister.Save()的第二个参数传为nil,并在我们尝试修改持久化状态时触发对persist()的调用。如果能够正确的完成该部分,2C的相关测试就可以通过。

我们可能需要使用到的优化点有一处,当我们的日志条目需要后退超过一个日志的时候可以适度优化。这里可以阅读Raft论文中第7页最后,第8页顶上被灰线标记的部份。

If desired, the protocol can be optimized to reduce the number of rejected AppendEntries RPCs. For example, when rejecting an AppendEntries request, the follower can include the term of the conflicting entry and the first index it stores for that term. With this information, the leader can decrement nextIndex to bypass all of the con-flicting entries in that term; one AppendEntries RPC will be required for each term with conflicting entries, rather than one RPC per entry. In practice, we doubt this opti- mization is necessary, since failures happen infrequently and it is unlikely that there will be many inconsistent en- tries.

简单来说,就是我们可以一次性回退到位,在AppendEntries的时候完整的带过去全量的更新信息,这样当有大量拒绝的时候就可以避免一次一条目的慢慢回退重新发送,好处是大量拒绝发生时可以有效减少回退成本。 考虑到发生拒绝的场景并不是很常见,论文的作者认为并没有必要对此作出优化。论文中没有提供相关的细节,需要我们自己处理逻辑。一种可能的解法是在拒绝的信息中包含以下字段:

1
2
3
XTerm:  term in the conflicting entry (if any)
XIndex: index of first entry with that term (if any)
XLen: log length

Leader节点的逻辑可以处理成:

1
2
3
4
5
6
Case 1: leader doesn't have XTerm:
nextIndex = XIndex
Case 2: leader has XTerm:
nextIndex = leader's last entry for XTerm
Case 3: follower's log is too short:
nextIndex = XLen

相关提示:

  • 及时使用git pull更新Lab中的代码
  • 2C测试相比2A和2B会更加苛刻,相关的错误问题可能是由2A和2B中的错误导致的

最终我们应当通过所有测试,相关的测试结果样例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ go test -run 2C
Test (2C): basic persistence ...
... Passed -- 5.0 3 86 22849 6
Test (2C): more persistence ...
... Passed -- 17.6 5 952 218854 16
Test (2C): partitioned leader and one follower crash, leader restarts ...
... Passed -- 2.0 3 34 8937 4
Test (2C): Figure 8 ...
... Passed -- 31.2 5 580 130675 32
Test (2C): unreliable agreement ...
... Passed -- 1.7 5 1044 366392 246
Test (2C): Figure 8 (unreliable) ...
... Passed -- 33.6 5 10700 33695245 308
Test (2C): churn ...
... Passed -- 16.1 5 8864 44771259 1544
Test (2C): unreliable churn ...
... Passed -- 16.5 5 4220 6414632 906
PASS
ok 6.5840/raft 123.564s
$

最好还是多次运行测试确保没有例外情况:

1
$ for i in {0..10}; do go test; done

实现日志后退优化

正如在提示中所说到的方法,我们可以适当对Raft在处理回退的时候的逻辑。简单来说,如果发生冲突,Leader节点可以选择回退到一个合适的位置重新开始同步。

如果被同步节点冲突位置的Term任期Leader节点日志中恰好有,那Leader就回退到该任期中的最后一个日志开始同步;反之如果没有那就可以考虑从被同步节点在该任期的第一个位置开始同步;而如果是因为同步Index位置在被同步节点上还不存在,也就是太短了导致的,那就回退到被同步节点尾部开始同步。

为了处理这种奇奇怪怪的给定某个Term任期,获取任期中日志条目下标最大最小这种问题,我们贴心的写一个方法用来解决这个问题:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// Get the index of first entry and last entry with the given term.
// Return (-1,-1) if no such term is found
func (logEntries LogEntries) getBoundsWithTerm(term int) (minIndex int, maxIndex int) {
if term == 0 {
return 0, 0
}
minIndex, maxIndex = math.MaxInt, -1
for i := 1; i <= len(logEntries); i++ {
if logEntries.getEntry(i).Term == term {
minIndex = min(minIndex, i)
maxIndex = max(maxIndex, i)
}
}
if maxIndex == -1 {
return -1, -1
}
return
}

为此,我们在AppendEntries RPC请求回复中加入三个字段,如提示中所示,其中XTerm为被同步节点发生冲突位置日志的Term任期,XIndex为该任期下被同步节点的第一个日志的下标,XLen则表示当前被同步节点日志的总长度:

1
2
3
4
5
6
7
type AppendEntriesReply struct {
Term int // currentTerm, for leader to update itself
XTerm int // term in the conflicting entry (if any)
XIndex int // index of first entry with that term (if any)
XLen int // log length
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
}

在被同步节点尝试对比Leader节点发来的PrevLogTermPrevLogIndex时,如果发生不一致,我们顺带把冲突信息带上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
Debug(dLog2, "S%d <- S%d Received heartbeat at T%d.", rf.me, args.LeaderId, rf.currentTerm)
} else {
Debug(dLog2, "S%d <- S%d Received append entries at T%d.", rf.me, args.LeaderId, rf.currentTerm)
}
reply.Success = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dLog2, "S%d Term is lower, rejecting append request. (%d < %d)",
rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

// For Candidates. If AppendEntries RPC received from new leader: convert to follower
if rf.state == CandidateState && rf.currentTerm == args.Term {
rf.state = FollowerState
Debug(dLog2, "S%d Convert from candidate to follower at T%d.", rf.me, rf.currentTerm)
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

rf.leaderId = args.LeaderId

// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.log.getEntry(args.PrevLogIndex).Term {
Debug(dLog2, "S%d Prev log entries do not match. Ask leader to retry.", rf.me)
reply.XLen = len(rf.log)
reply.XTerm = rf.log.getEntry(args.PrevLogIndex).Term
reply.XIndex, _ = rf.log.getBoundsWithTerm(reply.XTerm)
return
}
// If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
for i, entry := range args.Entries {
if rf.log.getEntry(i+1+args.PrevLogIndex).Term != entry.Term {
Debug(dLog2, "S%d Running into conflict with existing entries at T%d. conflictLogs: %v, startIndex: %d.",
rf.me, rf.currentTerm, args.Entries[i:], i+1+args.PrevLogIndex)
rf.log = append(rf.log.getSlice(1, i+1+args.PrevLogIndex), args.Entries[i:]...)
break
}
}
Debug(dLog2, "S%d <- S%d Append entries success. Saved logs: %v.", rf.me, args.LeaderId, args.Entries)
if len(args.Entries) > 0 {
rf.persist()
}
if args.LeaderCommit > rf.commitIndex {
Debug(dCommit, "S%d Get higher LC at T%d, updating commitIndex. (%d < %d)",
rf.me, rf.currentTerm, rf.commitIndex, args.LeaderCommit)
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
Debug(dCommit, "S%d Updated commitIndex at T%d. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
}
reply.Success = true
}

自然的,当Leader节点接收到对应的回复时,处理逻辑我们同样遵守上面的描述:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
func (rf *Raft) leaderSendEntries(args *AppendEntriesArgs, server int) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dLog, "S%d <- S%d Received send entry reply at T%d.", rf.me, server, rf.currentTerm)
if reply.Term < rf.currentTerm {
Debug(dLog, "S%d Term lower, invalid send entry reply. (%d < %d)",
rf.me, reply.Term, rf.currentTerm)
return
}
if rf.currentTerm != args.Term {
Debug(dWarn, "S%d Term has changed after the append request, send entry reply discarded."+
"requestTerm: %d, currentTerm: %d.", rf.me, args.Term, rf.currentTerm)
return
}
if rf.checkTerm(reply.Term) {
return
}
// If successful: update nextIndex and matchIndex for follower (§5.3)
if reply.Success {
Debug(dLog, "S%d <- S%d Log entries in sync at T%d.", rf.me, server, rf.currentTerm)
newNext := args.PrevLogIndex + 1 + len(args.Entries)
newMatch := args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = max(newNext, rf.nextIndex[server])
rf.matchIndex[server] = max(newMatch, rf.matchIndex[server])
// If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
// and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
for N := len(rf.log); N > rf.commitIndex && rf.log.getEntry(N).Term == rf.currentTerm; N-- {
count := 1
for peer, matchIndex := range rf.matchIndex {
if peer == rf.me {
continue
}
if matchIndex >= N {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = N
Debug(dCommit, "S%d Updated commitIndex at T%d for majority consensus. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
break
}
}
} else {
// If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
// the optimization that backs up nextIndex by more than one entry at a time
if reply.XTerm == -1 {
// follower's log is too short
rf.nextIndex[server] = reply.XLen + 1
} else {
_, maxIndex := rf.log.getBoundsWithTerm(reply.XTerm)
if maxIndex != -1 {
// leader has XTerm
rf.nextIndex[server] = maxIndex
} else {
// leader doesn't have XTerm
rf.nextIndex[server] = reply.XIndex
}
}
lastLogIndex, _ := rf.log.lastLogInfo()
nextIndex := rf.nextIndex[server]
if lastLogIndex >= nextIndex {
Debug(dLog, "S%d <- S%d Inconsistent logs, retrying.", rf.me, server)
entries := make([]LogEntry, lastLogIndex-nextIndex+1)
copy(entries, rf.log.getSlice(nextIndex, lastLogIndex+1))
newArg := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.log.getEntry(nextIndex - 1).Term,
Entries: entries,
}
go rf.leaderSendEntries(newArg, server)
}
}
}
}

至此我们就简单处理好回退的优化了。

持久化序列化和反序列化实现

接下来,我们来到2C的重头戏内容,也就是将信息进行持久化的操作。我们还是先处理一下涉及到持久化的一些序列化和反序列化的方法,不是很难,按照给出的注释自己实现一下即可。根据论文图2中的描述,需要被持久化的变量主要有日志条目log,节点当前任期currentTerm,以及节点目前投给了谁votedFor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
Debug(dPersist, "S%d Saving persistent state to stable storage at T%d.", rf.me, rf.currentTerm)
// Your code here (2C).
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if err := e.Encode(rf.currentTerm); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.currentTerm\". err: %v, data: %v", err, rf.currentTerm)
}
if err := e.Encode(rf.votedFor); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.votedFor\". err: %v, data: %v", err, rf.votedFor)
}
if err := e.Encode(rf.log); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.log\". err: %v, data: %v", err, rf.log)
}
data := w.Bytes()
rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
Debug(dPersist, "S%d Restoring previously persisted state at T%d.", rf.me, rf.currentTerm)
// Your code here (2C).
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
rf.mu.Lock()
defer rf.mu.Unlock()
if err := d.Decode(&rf.currentTerm); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.currentTerm\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.votedFor); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.votedFor\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.log); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.log\". err: %v, data: %s", err, data)
}
}

持久化时机选择

根据刚才的信息,不难推断出,只要涉及到log, currentTermvotedFor中任意一个变量内容发生变化,我们就需要执行持久化。根据我们之前实现的内容,有多处需要实现rf.persist(),在此我们简单罗列一下。

在检查RPC的请求/回复中的任期信息时,如果当前节点需要更变任期信息currentTerm并重置votedFor,则需要持久化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
// Check if current term is out of date when hearing from other peers,
// update term, revert to follower state and return true if necesarry
func (rf *Raft) checkTerm(term int) bool {
if rf.currentTerm < term {
Debug(dTerm, "S%d Term is higher, updating term to T%d, setting state to follower. (%d > %d)",
rf.me, term, term, rf.currentTerm)
rf.state = FollowerState
rf.currentTerm = term
rf.votedFor = -1
rf.leaderId = -1
rf.persist()
return true
}
return false
}

Leader节点在接收到来自客户端新增的Command时,会需要将新日志条目增加到自己的log中,因此需要立即持久化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
isLeader := true

// Your code here (2B).
rf.mu.Lock()
defer rf.mu.Unlock()
if rf.state != LeaderState {
return -1, -1, false
}
logEntry := LogEntry{
Command: command,
Term: rf.currentTerm,
}
rf.log = append(rf.log, logEntry)
index = len(rf.log)
term = rf.currentTerm
Debug(dLog, "S%d Add command at T%d. LI: %d, Command: %v\n", rf.me, term, index, command)
rf.persist()
rf.sendEntries(false)

return index, term, isLeader
}

在AppendEntries的被同步端,日志同步完成一般都伴随着log信息的修改,需要进行持久化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
Debug(dLog2, "S%d <- S%d Received heartbeat at T%d.", rf.me, args.LeaderId, rf.currentTerm)
} else {
Debug(dLog2, "S%d <- S%d Received append entries at T%d.", rf.me, args.LeaderId, rf.currentTerm)
}
reply.Success = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dLog2, "S%d Term is lower, rejecting append request. (%d < %d)",
rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

// For Candidates. If AppendEntries RPC received from new leader: convert to follower
if rf.state == CandidateState && rf.currentTerm == args.Term {
rf.state = FollowerState
Debug(dLog2, "S%d Convert from candidate to follower at T%d.", rf.me, rf.currentTerm)
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

rf.leaderId = args.LeaderId

// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.log.getEntry(args.PrevLogIndex).Term {
Debug(dLog2, "S%d Prev log entries do not match. Ask leader to retry.", rf.me)
reply.XLen = len(rf.log)
reply.XTerm = rf.log.getEntry(args.PrevLogIndex).Term
reply.XIndex, _ = rf.log.getBoundsWithTerm(reply.XTerm)
return
}
// If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
for i, entry := range args.Entries {
if rf.log.getEntry(i+1+args.PrevLogIndex).Term != entry.Term {
Debug(dLog2, "S%d Running into conflict with existing entries at T%d. conflictLogs: %v, startIndex: %d.",
rf.me, rf.currentTerm, args.Entries[i:], i+1+args.PrevLogIndex)
rf.log = append(rf.log.getSlice(1, i+1+args.PrevLogIndex), args.Entries[i:]...)
break
}
}
Debug(dLog2, "S%d <- S%d Append entries success. Saved logs: %v.", rf.me, args.LeaderId, args.Entries)
if len(args.Entries) > 0 {
rf.persist()
}
if args.LeaderCommit > rf.commitIndex {
Debug(dCommit, "S%d Get higher LC at T%d, updating commitIndex. (%d < %d)",
rf.me, rf.currentTerm, rf.commitIndex, args.LeaderCommit)
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
Debug(dCommit, "S%d Updated commitIndex at T%d. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
}
reply.Success = true
}

节点发起选举时,需要将票投给自己,这时会修改votedFor,因此需要持久化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
func (rf *Raft) raiseElection() {
rf.state = CandidateState
// Increment currentTerm
rf.currentTerm++
Debug(dTerm, "S%d Starting a new term. Now at T%d.", rf.me, rf.currentTerm)
// Vote for self
rf.votedFor = rf.me
rf.persist()
// Reset election timer
rf.setElectionTimeout(randElectionTimeout())
Debug(dTimer, "S%d Resetting ELT because of election, wait for next potential election timeout.", rf.me)
lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
args := &RequestVoteArgs{
Term: rf.currentTerm,
CandidateId: rf.me,
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
voteCount := 1
var once sync.Once
// Send RequestVote RPCs to all other servers
for peer := range rf.peers {
if peer == rf.me {
continue
}
Debug(dVote, "S%d -> S%d Sending request vote at T%d.", rf.me, peer, rf.currentTerm)
go rf.candidateRequestVote(&voteCount, args, &once, peer)
}
}

投票时,节点如果将选票投给某个节点,需要涉及到修改votedFor,则需要持久化来记忆他已经投给过谁了,防止多次投票:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// example RequestVote RPC handler.
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dVote, "S%d <- S%d Received vote request at T%d.", rf.me, args.CandidateId, rf.currentTerm)
reply.VoteGranted = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dVote, "S%d Term is lower, rejecting the vote. (%d < %d)", rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

// If votedFor is null or candidateId,
// and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
if rf.votedFor == -1 || rf.votedFor == args.CandidateId {
if lastLogTerm < args.LastLogTerm ||
(lastLogTerm == args.LastLogTerm && lastLogIndex <= args.LastLogIndex) {
Debug(dVote, "S%d Granting vote to S%d at T%d.", rf.me, args.CandidateId, args.Term)
reply.VoteGranted = true
rf.votedFor = args.CandidateId
rf.persist()
Debug(dTimer, "S%d Resetting ELT, wait for next potential election timeout.", rf.me)
rf.setElectionTimeout(randElectionTimeout())
} else {
Debug(dVote, "S%d Candidate's log not up-to-date, rejecting the vote. LLI: %d, %d. LLT: %d, %d.",
rf.me, lastLogIndex, args.LastLogIndex, lastLogTerm, args.LastLogTerm)
}
} else {
Debug(dVote, "S%d Already voted for S%d, rejecting the vote.", rf.me, rf.votedFor)
}
}

持久化恢复

在节点重启时,由于持久化功能的加入,我们需要读取出相应的内容,应用到我们的Raft结构体中,因此我们需要重新修改我们的Make函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {

rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me

// Your initialization code here (2A, 2B, 2C).
rf.leaderId = -1
rf.currentTerm = 0
rf.votedFor = -1
rf.state = FollowerState
rf.setElectionTimeout(randHeartbeatTimeout())
rf.log = make([]LogEntry, 0)
rf.commitIndex = 0
rf.lastApplied = 0
rf.nextIndex = make([]int, len(rf.peers))
rf.matchIndex = make([]int, len(rf.peers))

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

for peer := range rf.peers {
rf.nextIndex[peer] = len(rf.log) + 1
}

lastLogIndex, lastLogTerm := rf.log.lastLogInfo()
Debug(dClient, "S%d Started at T%d. LLI: %d, LLT: %d.", rf.me, rf.currentTerm, lastLogIndex, lastLogTerm)

// start ticker goroutine to start elections
go rf.ticker()

// Apply logs periodically until the last committed index to make sure state machine is up to date.
go rf.applyLogsLoop(applyCh)

return rf
}

测试

简单测试一下我们的代码,使用命令go test -run 2C -race

鉴于我的MacBook Pro平时要拿去上班,这种大批量的测试又没有个七八个小时干不完,目前暂时没有很好的办法测一下这个批量测试,不过两千次还是没有问题的🥲

实现日志压缩(2D)

要求和提示

至此为止,一台服务器节点在重启时,会重放完整的Raft日志来恢复自身点状态。然而,让一个长时间运行的服务去永久记忆完整的Raft日志时不现实的。相反的,我们需要修改Raft相关代码,使其可以不时地与状态机沟通,来持久化存储状态机的状态快照,此时Raft会丢弃快照之前的日志条目。这就带来了更小的持久化数据量和更快的重启速度。但是,一个Follower节点落后过多,以至于Leader节点已经丢弃了他所需要跟上的日志条目信息,这样的情况是有可能发生的;因此Leader节点必须发送快照以及快照开始后的全部日志。在论文的第7节中概述了相关方案,我们需要实现其中的相关细节。

关于状态机和Raft之间是如何沟通的可以详见这张图:diagram of Raft interactions

我们的Raft代码中需要包含如下方法,来确保我们可以使用序列化后的快照调用服务:

1
Snapshot(index int, snapshot []byte)

在Lab 2D中,测试程序会周期性的调用Snapshot()方法。在Lab 3中,我们还会实现一个可以调用Snapshot()的KV服务器;其中入参snapshot包含了完成的键值对信息。服务层会在每个节点上面都调用Snapshot()(不仅仅在Leader节点上面)。

index入参表示了在该快照中保存的最高的日志条目。Raft代码实现的时候需要将在该点之前保存的日志条目全部丢弃。我们需要重新检查我们的Raft代码确保我们只保存日志条目尾部的日志。

我们还需要实现InstallSnapshot这个RPC方法,该方法已经在论文中讨论过了,其功能是让Leader节点告知一个落后的Raft节点去用快照信息替换其自身的状态。我们需要思考一下,该RPC方法应该如何与论文图2中的状态和规则进行交互。

当Follower的节点Raft接收到一个InstallSnapshot的RPC请求时,应当使用applyCh将快照信息通过ApplyMsg发送给状态机,在ApplyMsg结构体的定义中已经包含了我们需要的字段。另外需要小心的是,这些快照只应当推进状态机的状态,而不能导致回滚。

一旦服务发生崩溃,我们需要从持久化的数据中重启。我们的Raft代码需要同时持久化Raft的状态和对应的快照数据。我们可以使用persister.Save()中的第二个参数来保存快照数据,如果没有快照数据则传nil即可。

当服务重启时,应用层会读取持久化的快照数据并恢复之前保存过的状态。

在2D部分中,我们需要实现Snapshot()方法和InstallSnapshot的RPC调用,以及需要对Raft代码做一些修改。至此结束后我们应该可以通过Lab 2中的全部测试。

相关提示:

  • 一开始我们可以考虑修改Raft代码,使其能够只存储从某个索引X开始的日志部分,一开始我们将索引位置X设置为0,并尝试运行2B/2C测试。如果没有问题,再尝试让Snapshot(index)丢弃索引index之前的日志,并将X设置为等于索引index。如果一切顺利,我们现在应该可以通过第一个2D测试。
  • 我们不能将日志存储在Go的切片中,并将Go切片索引与Raft日志索引互换使用;我们在索引切片的时候还需要考虑到已经被丢弃的部份日志。
  • 接下来:如果Leader节点没有Follower节点更新所需的日志条目,则让Leader节点发送InstallSnapshot RPC请求。
  • 请在单独一次InstallSnapshot RPC中发送整个快照数据。不要去实现在论文图13中的偏移机制来分割快照。
  • Raft代码中需要丢弃旧的日志条目,使得Go的垃圾收集器可以释放并重新使用内存空间;这就要求内存可达性检测的时候不能有相关的指针指向被丢弃的日志条目。
  • 即使我们的日志被裁剪,我们的实现中仍然需要正确地发送日志条目对应的任期和索引;这可能需要我们保存和引用最新快照的lastIncludedTermlastIncludedIndex(考虑一下这些信息是否需要被持久化)。
  • 在不带-race参数的情况下,Lab 2的全套测试(2A+2B+2C+2D)的合理耗时是6分钟的真实时间和1分钟的CPU时间。当用-race运行时,大约是10分钟的真实时间和2分钟的CPU时间。

如下为2D通过测试之后的典型输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
$ go test -run 2D
Test (2D): snapshots basic ...
... Passed -- 11.6 3 176 61716 192
Test (2D): install snapshots (disconnect) ...
... Passed -- 64.2 3 878 320610 336
Test (2D): install snapshots (disconnect+unreliable) ...
... Passed -- 81.1 3 1059 375850 341
Test (2D): install snapshots (crash) ...
... Passed -- 53.5 3 601 256638 339
Test (2D): install snapshots (unreliable+crash) ...
... Passed -- 63.5 3 687 288294 336
Test (2D): crash and restart all servers ...
... Passed -- 19.5 3 268 81352 58
PASS
ok 6.5840/raft 293.456s

实现新的日志索引

由于我们现在涉及到快照的存储,在快照之后我们会将日志条目中已经被快照的部份丢弃,那么原来在rf.log中索引是0的位置现在他的日志索引就不是1了,我们现在计算所有东西都得带上一个偏移量,这个偏移量就是我们最后被快照的日志的索引。

论文图13中有简单描述快照实现的相关细节,但是在这里我们不打算完全照搬实现:

那首先,我们还是解决一些比较基础的问题,由于这次引入了日志的偏移量,导致我们的代码可能有被整体“翻新”的风险… 只能怪我之前设计的不好,没有考虑到这一天

首先如上图,我们本次在Raft对象的基本属性中先加入三个字段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// Raft is a Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

leaderId int // the leader server that current server recognizes
state State
heartbeatTime time.Time
electionTime time.Time

// Persistent state on all servers

currentTerm int // latest term server has seen
votedFor int // candidateId that received vote in current term
log []LogEntry // log entries; each entry contains command for state machine, and term when entry was received by leader

// Volatile state on all servers

commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine
applyCh chan ApplyMsg // the channel on which the tester or service expects Raft to send ApplyMsg messages

// Volatile state on leaders (Reinitialized after election)

nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server

// Snapshot state on all servers
lastIncludedIndex int // the snapshot replaces all entries up through and including this index, entire log up to the index discarded
lastIncludedTerm int // term of lastIncludedIndex

}

其中lastIncludedIndexlastIncludedTerm表示的是最后被快照的日志条目的下标索引和任期,lastIncludedIndex也可以作为我们rf.log的截断位置的参照。

为了适应新的日志偏移截断的特性,我们不得不重构了日志的索引和查询方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// LogEntry is a Go object implementing raft log entry
type LogEntry struct {
Command interface{} // Command to be executed
Term int // Term number when created
}

// Get the entry with the given index.
// Index and term of *valid* entries start from 1.
// If no *valid* entry is found, return an empty entry with term equal to -1.
// If log is too short, also return an empty entry with term equal to -1.
// Panic if the index of the log entry is already in the snapshot and unable to get from memory.
func (rf *Raft) getEntry(index int) *LogEntry {
logEntries := rf.log
logIndex := index - rf.lastIncludedIndex
if logIndex < 0 {
log.Panicf("LogEntries.getEntry: index too small. (%d < %d)", index, rf.lastIncludedIndex)
}
if logIndex == 0 {
return &LogEntry{
Command: nil,
Term: rf.lastIncludedTerm,
}
}
if logIndex > len(logEntries) {
return &LogEntry{
Command: nil,
Term: -1,
}
}
return &logEntries[logIndex-1]
}

// Get the index and term of the last entry.
// Return (0, 0) if the log is empty.
func (rf *Raft) lastLogInfo() (index, term int) {
logEntries := rf.log
index = len(logEntries) + rf.lastIncludedIndex
logEntry := rf.getEntry(index)
return index, logEntry.Term
}

// Get the slice of the log with index from startIndex to endIndex.
// startIndex included and endIndex excluded, therefore startIndex should be no greater than endIndex.
func (rf *Raft) getSlice(startIndex, endIndex int) []LogEntry {
logEntries := rf.log
logStartIndex := startIndex - rf.lastIncludedIndex
logEndIndex := endIndex - rf.lastIncludedIndex
if logStartIndex <= 0 {
Debug(dError, "LogEntries.getSlice: startIndex out of range. startIndex: %d, len: %d.",
startIndex, len(logEntries))
log.Panicf("LogEntries.getSlice: startIndex out of range. (%d < %d)", startIndex, rf.lastIncludedIndex)
}
if logEndIndex > len(logEntries)+1 {
Debug(dError, "LogEntries.getSlice: endIndex out of range. endIndex: %d, len: %d.",
endIndex, len(logEntries))
log.Panicf("LogEntries.getSlice: endIndex out of range. (%d > %d)", endIndex, len(logEntries)+1+rf.lastIncludedIndex)
}
if logStartIndex > logEndIndex {
Debug(dError, "LogEntries.getSlice: startIndex > endIndex. (%d > %d)", startIndex, endIndex)
log.Panicf("LogEntries.getSlice: startIndex > endIndex. (%d > %d)", startIndex, endIndex)
}
return append([]LogEntry(nil), logEntries[logStartIndex-1:logEndIndex-1]...)
}

// Get the index of first entry and last entry with the given term.
// Return (-1,-1) if no such term is found
func (rf *Raft) getBoundsWithTerm(term int) (minIndex int, maxIndex int) {
logEntries := rf.log
if term == 0 {
return 0, 0
}
minIndex, maxIndex = math.MaxInt, -1
for i := rf.lastIncludedIndex + 1; i <= rf.lastIncludedIndex+len(logEntries); i++ {
if rf.getEntry(i).Term == term {
minIndex = min(minIndex, i)
maxIndex = max(maxIndex, i)
}
}
if maxIndex == -1 {
return -1, -1
}
return
}

值得注意的是,如果我们要丢弃旧的日志条目,使得Go的垃圾收集器可以释放并重新使用内存空间,那就需要重新将指定范围的日志从原来的rf.log中拷贝出来,随后消除对原切片的引用。这里需要注意,对切片不正确处理是非常有可能导致内存泄漏的。

接下来就是痛苦的修改下标的过程了,由于地方实在太多了,我这里就不展开描述了,可以看看我提交的git记录里面,记得不要漏,不然应该会挂2D的第一个测试。

另外新增的两个字段属于需要被持久化的状态信息,当Raft代码崩溃重启,我们需要日志的截断位置,配合状态机中恢复的快照来进行一个快速的恢复,因此需要修改一下持久化的相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
Debug(dPersist, "S%d Saving persistent state to stable storage at T%d.", rf.me, rf.currentTerm)
// Your code here (2C).
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if err := e.Encode(rf.currentTerm); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.currentTerm\". err: %v, data: %v", err, rf.currentTerm)
}
if err := e.Encode(rf.votedFor); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.votedFor\". err: %v, data: %v", err, rf.votedFor)
}
if err := e.Encode(rf.log); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.log\". err: %v, data: %v", err, rf.log)
}
if err := e.Encode(rf.lastIncludedIndex); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.lastIncludedIndex\". err: %v, data: %v", err, rf.lastIncludedIndex)
}
if err := e.Encode(rf.lastIncludedTerm); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.lastIncludedTerm\". err: %v, data: %v", err, rf.lastIncludedTerm)
}
data := w.Bytes()
rf.persister.SaveRaftState(data)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
Debug(dPersist, "S%d Restoring previously persisted state at T%d.", rf.me, rf.currentTerm)
// Your code here (2C).
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
rf.mu.Lock()
defer rf.mu.Unlock()
if err := d.Decode(&rf.currentTerm); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.currentTerm\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.votedFor); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.votedFor\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.log); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.log\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.lastIncludedIndex); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.lastIncludedIndex\". err: %v, data: %s", err, data)
}
if err := d.Decode(&rf.lastIncludedTerm); err != nil {
Debug(dError, "Raft.readPersist: failed to decode \"rf.lastIncludedTerm\". err: %v, data: %s", err, data)
}
}

实现Snapshot快照函数

Snapshot是一个方法,一般是由状态机周期性的调用Raft代码,告诉我们状态机目前已经应用收集了多少日志了打算压缩一下做一轮快照了。因此在方法的入参里面带上了预期截断的下标,以及被压缩后的快照信息。

这个被压缩的快照信息是有必要存储在Raft代码的内存中的,后续Leader发送InstallSnapshot等场景会使用到,这里我们先单独开个字段snapshot用来存储状态机传来的快照数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// Raft is a Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

leaderId int // the leader server that current server recognizes
state State
heartbeatTime time.Time
electionTime time.Time

// Persistent state on all servers

currentTerm int // latest term server has seen
votedFor int // candidateId that received vote in current term
log []LogEntry // log entries; each entry contains command for state machine, and term when entry was received by leader

// Volatile state on all servers

commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine
applyCh chan ApplyMsg // the channel on which the tester or service expects Raft to send ApplyMsg messages

// Volatile state on leaders (Reinitialized after election)

nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server

// Snapshot state on all servers
lastIncludedIndex int // the snapshot replaces all entries up through and including this index, entire log up to the index discarded
lastIncludedTerm int // term of lastIncludedIndex
snapshot []byte // snapshot stored in memory

}

接下来我们实现一下Snapshot这个方法,由于担心调用可能存在时延等情况,保险起见,我们还是需要先对入参先做一定的校验。

例如如果我们当前的截断下标都已经比请求中给出的高了,那说明相关快照信息已经存储过了;而如果请求中给出的下标都比我们提交的水位还高(不应该发生),那就拒绝这次请求。

相关代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Snapshot is the service says it has created a snapshot that has
// all info up to and including index. this means the
// service no longer needs the log through (and including)
// that index. Raft should now trim its log as much as possible.
func (rf *Raft) Snapshot(index int, snapshot []byte) {
// Your code here (2D).
Debug(dSnap, "S%d Snapshotting through index %d.", rf.me, index)
rf.mu.Lock()
defer rf.mu.Unlock()
lastLogIndex, _ := rf.lastLogInfo()
if rf.lastIncludedIndex >= index {
Debug(dSnap, "S%d Snapshot already applied to persistent storage. (%d >= %d)", rf.me, rf.lastIncludedIndex, index)
return
}
if rf.commitIndex < index {
Debug(dWarn, "S%d Cannot snapshot uncommitted log entries, discard the call. (%d < %d)", rf.me, rf.commitIndex, index)
return
}
newLog := rf.getSlice(index+1, lastLogIndex+1)
newLastIncludeTerm := rf.getEntry(index).Term

rf.lastIncludedTerm = newLastIncludeTerm
rf.log = newLog
rf.lastIncludedIndex = index
rf.snapshot = snapshot
rf.persistAndSnapshot(snapshot)
}

可以看到在最后我们的Raft代码还负责了快照信息的持久化,这份被持久化的快照信息在状态机重启时会用于恢复状态机的快照数据。持久化代码类似如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// save Raft's persistent state and service snapshot to stable storage,
// where they can later be retrieved after a crash and restart.
func (rf *Raft) persistAndSnapshot(snapshot []byte) {
Debug(dSnap, "S%d Saving persistent state and service snapshot to stable storage at T%d.", rf.me, rf.currentTerm)
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
if err := e.Encode(rf.currentTerm); err != nil {
Debug(dError, "Raft.persistAndSnapshot: failed to encode \"rf.currentTerm\". err: %v, data: %v", err, rf.currentTerm)
}
if err := e.Encode(rf.votedFor); err != nil {
Debug(dError, "Raft.persistAndSnapshot: failed to encode \"rf.votedFor\". err: %v, data: %v", err, rf.votedFor)
}
if err := e.Encode(rf.log); err != nil {
Debug(dError, "Raft.persistAndSnapshot: failed to encode \"rf.log\". err: %v, data: %v", err, rf.log)
}
if err := e.Encode(rf.lastIncludedIndex); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.lastIncludedIndex\". err: %v, data: %v", err, rf.lastIncludedIndex)
}
if err := e.Encode(rf.lastIncludedTerm); err != nil {
Debug(dError, "Raft.persist: failed to encode \"rf.lastIncludedTerm\". err: %v, data: %v", err, rf.lastIncludedTerm)
}
data := w.Bytes()
rf.persister.SaveStateAndSnapshot(data, snapshot)
}

InstallSnapshot RPC结构体实现

InstallSnapshot这个RPC调用主要是当Leader尝试给一个非常落后的Follower节点发送AppendEntries日志的时候,可能会发现自己已经捞不出那块日志了(因为已经被截断拉去做快照了,现在的截断位置已经比这个落后节点的matchIndex来的高了),这个时候Leader节点就转为向这个节点发送一个InstallSnapshot RPC,要求这个落后的Follower节点通过直接安装快照的方式来更新自己的状态。

根据刚才论文图13中给出的图片,我们不难得出以下的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// InstallSnapshotArgs is the InstallSnapshot RPC arguments structure.
// field names must start with capital letters!
type InstallSnapshotArgs struct {
Term int // leader's term
LeaderId int // so follower can redirect clients
LastIncludedIndex int // the snapshot replaces all entries up through and including this index
LastIncludedTerm int // term of lastIncludedIndex
Data []byte // raw bytes of the snapshot chunk, starting at offset
}

// InstallSnapshotReply is the InstallSnapshot RPC reply structure.
// field names must start with capital letters!
type InstallSnapshotReply struct {
Term int // currentTerm, for leader to update itself
}

其中请求信息中,Term为当前Leader节点的任期,LeaderId为当前Leader节点的编号,LastIncludedIndexLastIncludedTerm则表示了Leader当前存储的快照的截断位置和任期信息,最后Data中存储了重要的快照信息。在返回信息中,Term表示了Follower节点的任期信息。

某种程度上来讲InstallSnashot RPC和AppendEntries RPC在某种程度上是共通的,他们都是由Leader节点发起用于更新Follower节点信息的。

InstallSnapshot RPC核心函数实现

当Follower节点接收到InstallSnapshot RPC请求时,正如提示中所说的,我们不能直接将其应用在我们内存中snapshot字段上。我们不能立刻应用快照,需要保证Raft和状态机都应用快照成功,不然可能会导致状态机和Raft之间存在不一致。

在这里,根据给出的图片提示,我们将快照信息先通过applyCh传给状态机,状态机接受到了快照之后,再通过调用Raft代码上的CondInstallSnapshot来协商,Raft代码在返回结果中会告知状态机是否需要安装快照,同时视情况更新自己的状态信息。

由于applyCh之前已经用于日志的提交,这段代码我们已经实现在applyLogsLoop中了,先前的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (rf *Raft) applyLogsLoop() {
for !rf.killed() {
// Apply logs periodically until the last committed index.
rf.mu.Lock()
// To avoid the apply operation getting blocked with the lock held,
// use a slice to store all committed messages to apply, and apply them only after unlocked
var appliedMsgs []ApplyMsg
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
appliedMsgs = append(appliedMsgs, ApplyMsg{
CommandValid: true,
Command: rf.getEntry(rf.lastApplied).Command,
CommandIndex: rf.lastApplied,
})
Debug(dLog2, "S%d Applying log at T%d. LA: %d, CI: %d.", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex)
}
rf.mu.Unlock()
for _, msg := range appliedMsgs {
rf.applyCh <- msg
}
time.Sleep(time.Duration(TickInterval) * time.Millisecond)
}
}

可以看到出于性能的考虑,我们在应用日志的时候采用的是批量更新的方式,一次性将lastApplied更新到

commitIndex的水位,并且在锁外对applyCh进行写入。这里就要看到危险的地方,如果我们在别处(例如在InstallSnapshot方法中)将一个新的快照传进来,极有可能会插入当前的应用循环中,而状态机在安装完快照之后,面对接下来还没有应用结束的日志就会产生应用错序。

P.S. 例如循环中正在执行Index 1, 2, 3, 4, 5的应用,但是在2执行之后突然插入了一个快照。安装快照之后,假设状态机目前已经拥有了Index 1, 2, 3, 4, 5, 6, 7, 8压缩之后的快照,理论上接下来状态机应该收到的是Index 9的日志,此时却收到了循环还没执行结束的3,这就导致了应用错序。

解决这个问题的方案有多种,我们可以将应用的循环纳入到锁中,然后在InstallSnapshot方法持有锁的时候发送快照,但是这样实在是太低效了。

我最终还是将全部的应用操作交给applyLogsLoop,让他负责日志以及快照的提交,这样就不会产生乱序的困扰。为此我们需要额外在Raft中开辟了相关的字段,用于暂存要带给applyCh的相关快照数据,见下方代码中的waitingIndex, waitingTerm, waitingSnapshot。每次applyLogsLoop循环都会检查我们的暂存区有没有待提交的快照信息,如果有就进行单独的快照提交操作,并且清空暂存区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Raft is a Go object implementing a single Raft peer.
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()

// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.

leaderId int // the leader server that current server recognizes
state State
heartbeatTime time.Time
electionTime time.Time

// Persistent state on all servers

currentTerm int // latest term server has seen
votedFor int // candidateId that received vote in current term
log []LogEntry // log entries; each entry contains command for state machine, and term when entry was received by leader

// Volatile state on all servers

commitIndex int // index of highest log entry known to be committed
lastApplied int // index of highest log entry applied to state machine
applyCh chan ApplyMsg // the channel on which the tester or service expects Raft to send ApplyMsg messages

// Volatile state on leaders (Reinitialized after election)

nextIndex []int // for each server, index of the next log entry to send to that server
matchIndex []int // for each server, index of highest log entry known to be replicated on server

// Snapshot state on all servers
lastIncludedIndex int // the snapshot replaces all entries up through and including this index, entire log up to the index discarded
lastIncludedTerm int // term of lastIncludedIndex
snapshot []byte // snapshot stored in memory

// Temporary location to give the service snapshot to the apply thread
// All apply messages should be sent in one go routine, we need the temporary space for applyLogsLoop to handle the snapshot apply
waitingIndex int // lastIncludedIndex to be sent to applyCh
waitingTerm int // lastIncludedTerm to be sent to applyCh
waitingSnapshot []byte // snapshot to be sent to applyCh
}

func (rf *Raft) applyLogsLoop() {
for !rf.killed() {
// Apply logs periodically until the last committed index.
rf.mu.Lock()
// To avoid the apply operation getting blocked with the lock held,
// use a slice to store all committed messages to apply, and apply them only after unlocked
var appliedMsgs []ApplyMsg

rf.lastApplied = max(rf.lastApplied, rf.lastIncludedIndex)

if rf.waitingSnapshot != nil {
appliedMsgs = append(appliedMsgs, ApplyMsg{
SnapshotValid: true,
Snapshot: rf.waitingSnapshot,
SnapshotTerm: rf.waitingTerm,
SnapshotIndex: rf.waitingIndex,
})
rf.waitingSnapshot = nil
} else {
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
appliedMsgs = append(appliedMsgs, ApplyMsg{
CommandValid: true,
Command: rf.getEntry(rf.lastApplied).Command,
CommandIndex: rf.lastApplied,
})
Debug(dLog2, "S%d Applying log at T%d. LA: %d, CI: %d.", rf.me, rf.currentTerm, rf.lastApplied, rf.commitIndex)
}
}
rf.mu.Unlock()
for _, msg := range appliedMsgs {
rf.applyCh <- msg
}
time.Sleep(time.Duration(TickInterval) * time.Millisecond)
}
}

这样当Leader的InstallSnapshot RPC进来时,除去我们正常的校验逻辑,我们只需要将快照信息写入暂存区即可。当然对于快照请求接收的时候错序我们也要处理一下,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// InstallSnapshot sends an apply message with the snapshot to applyCh.
// The state machine should cooperate with raft code later to decide whether to install the snapshot using CondInstallSnapshot function.
// No snapshot related status in raft code should be changed right now,
// which could result in inconsistency between the status machine and raft code, as the snapshot is not applied immediately.
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dSnap, "S%d <- S%d Received install snapshot request at T%d.", rf.me, args.LeaderId, rf.currentTerm)

if args.Term < rf.currentTerm {
Debug(dSnap, "S%d Term is lower, rejecting install snapshot request. (%d < %d)", rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}
rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

// All apply messages should be sent in one go routine (applyLogsLoop),
// otherwise the apply action could be out of order, where the snapshot apply could cut in line when the command apply is running.
if rf.waitingIndex >= args.LastIncludedIndex {
Debug(dSnap, "S%d A newer snapshot already exists, rejecting install snapshot request. (%d <= %d)",
rf.me, args.LastIncludedIndex, rf.waitingIndex)
return
}
rf.leaderId = args.LeaderId
rf.waitingSnapshot = args.Data
rf.waitingIndex = args.LastIncludedIndex
rf.waitingTerm = args.LastIncludedTerm
}

发送快照流程实现

和之前的两个RPC类似,我这里就描述的简单一点。Leader节点在发送请求之后,接收到请求,自动默认快照被应用了,原因比较简单。如果Follower节点拒绝快照则证明其状态比传入的快照更加新,接收则说明其已经更新到最新了,所以我们直接更新matchIndexnextIndex即可。

值得说明的是,快照只有可能将Follower节点的值向上更新到与Leader的截断位置同水位,该位置是不大于Leader节点的commitIndex的,所以并没有必要去更新Leader的commitIndex

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (rf *Raft) sendSnapshot(server int) {
Debug(dSnap, "S%d -> S%d Sending installing snapshot request at T%d.", rf.me, server, rf.currentTerm)
args := &InstallSnapshotArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
LastIncludedIndex: rf.lastIncludedIndex,
LastIncludedTerm: rf.lastIncludedTerm,
Data: rf.snapshot,
}
go rf.leaderSendSnapshot(args, server)
}

func (rf *Raft) leaderSendSnapshot(args *InstallSnapshotArgs, server int) {
reply := &InstallSnapshotReply{}
ok := rf.sendInstallSnapshot(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dSnap, "S%d <- S%d Received install snapshot reply at T%d.", rf.me, server, rf.currentTerm)
if reply.Term < rf.currentTerm {
Debug(dLog, "S%d Term lower, invalid install snapshot reply. (%d < %d)",
rf.me, reply.Term, rf.currentTerm)
return
}
if rf.currentTerm != args.Term {
Debug(dWarn, "S%d Term has changed after the install snapshot request, install snapshot reply discarded. "+
"requestTerm: %d, currentTerm: %d.", rf.me, args.Term, rf.currentTerm)
return
}
rf.checkTerm(reply.Term)
newNext := args.LastIncludedIndex + 1
newMatch := args.LastIncludedIndex
rf.matchIndex[server] = max(newMatch, rf.matchIndex[server])
rf.nextIndex[server] = max(newNext, rf.nextIndex[server])
}
}

CondInstallSnapshot协商是否需要安装

正如我们刚才提到的,这个CondInstallSnapshot方法是状态机一侧调用的,用于和Raft协商确定快照是否需要进行安装的。

如果我们确定当前节点的commitIndex还小于快照中给出的截断位置,那我们就可以开始安装了,否则就返回false告诉状态机不要应用这个快照,因为他还不够新。

安装的时候我们也采取类似的策略,日志在快照给出的位置进行截断即可,如果我们的日志非常短还没有到截断位置,那就清空日志即可。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// CondInstallSnapshot returns if the service wants to switch to snapshot.
// Only do so if Raft hasn't had more recent info since it communicate the snapshot on applyCh.
func (rf *Raft) CondInstallSnapshot(lastIncludedTerm int, lastIncludedIndex int, snapshot []byte) bool {
// Your code here (2D).
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dSnap, "S%d Installing the snapshot. LLI: %d, LLT: %d", rf.me, lastIncludedIndex, lastIncludedTerm)
lastLogIndex, _ := rf.lastLogInfo()
if rf.commitIndex >= lastIncludedIndex {
Debug(dSnap, "S%d Log entries is already up-to-date with the snapshot. (%d >= %d)", rf.me, rf.commitIndex, lastIncludedIndex)
return false
}
if lastLogIndex >= lastIncludedIndex {
rf.log = rf.getSlice(lastIncludedIndex+1, lastLogIndex+1)
} else {
rf.log = []LogEntry{}
}
rf.lastIncludedIndex = lastIncludedIndex
rf.lastIncludedTerm = lastIncludedTerm
rf.lastApplied = lastIncludedIndex
rf.commitIndex = lastIncludedIndex
rf.snapshot = snapshot
rf.persistAndSnapshot(snapshot)
return true
}

AppendEntries 日志发送流程微调

回到正题,我们之前描述了Leader在发送日志的过程中,由于Follower节点太落后,可能捞不出对应的日志,也就是Leader节点发现自己的lastIncludedIndex 不高于Follower的 nextIndex的时候,这种时候我们需要发送InstallSnapshot RPC。同理当AppendEntries请求发生回退的时候,极有可能我们会回退到某个Leader捞不到的古早日志位置,这个时候我们也需要发送快照。

发送的时候我们对代码做简单微调,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (rf *Raft) leaderSendEntries(args *AppendEntriesArgs, server int) {
reply := &AppendEntriesReply{}
ok := rf.sendAppendEntries(server, args, reply)
if ok {
rf.mu.Lock()
defer rf.mu.Unlock()
Debug(dLog, "S%d <- S%d Received send entry reply at T%d.", rf.me, server, rf.currentTerm)
if reply.Term < rf.currentTerm {
Debug(dLog, "S%d Term lower, invalid send entry reply. (%d < %d)",
rf.me, reply.Term, rf.currentTerm)
return
}
if rf.currentTerm != args.Term {
Debug(dWarn, "S%d Term has changed after the append request, send entry reply discarded. "+
"requestTerm: %d, currentTerm: %d.", rf.me, args.Term, rf.currentTerm)
return
}
if rf.checkTerm(reply.Term) {
return
}
// If successful: update nextIndex and matchIndex for follower (§5.3)
if reply.Success {
Debug(dLog, "S%d <- S%d Log entries in sync at T%d.", rf.me, server, rf.currentTerm)
newNext := args.PrevLogIndex + 1 + len(args.Entries)
newMatch := args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = max(newNext, rf.nextIndex[server])
rf.matchIndex[server] = max(newMatch, rf.matchIndex[server])
// If there exists an N such that N > commitIndex, a majority of matchIndex[i] ≥ N,
// and log[N].term == currentTerm: set commitIndex = N (§5.3, §5.4).
for N := rf.lastIncludedIndex + len(rf.log); N > rf.commitIndex && rf.getEntry(N).Term == rf.currentTerm; N-- {
count := 1
for peer, matchIndex := range rf.matchIndex {
if peer == rf.me {
continue
}
if matchIndex >= N {
count++
}
}
if count > len(rf.peers)/2 {
rf.commitIndex = N
Debug(dCommit, "S%d Updated commitIndex at T%d for majority consensus. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
break
}
}
} else {
// If AppendEntries fails because of log inconsistency: decrement nextIndex and retry (§5.3)
// the optimization that backs up nextIndex by more than one entry at a time
if reply.XTerm == -1 {
// follower's log is too short
rf.nextIndex[server] = reply.XLen + 1
} else {
_, maxIndex := rf.getBoundsWithTerm(reply.XTerm)
if maxIndex != -1 {
// leader has XTerm
rf.nextIndex[server] = maxIndex
} else {
// leader doesn't have XTerm
rf.nextIndex[server] = reply.XIndex
}
}
lastLogIndex, _ := rf.lastLogInfo()
nextIndex := rf.nextIndex[server]
if nextIndex <= rf.lastIncludedIndex {
// current leader does not have enough log to sync the outdated peer,
// because logs were cleared after the snapshot, then send an InstallSnapshot RPC instead
rf.sendSnapshot(server)
} else if lastLogIndex >= nextIndex {
Debug(dLog, "S%d <- S%d Inconsistent logs, retrying.", rf.me, server)
newArg := &AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: nextIndex - 1,
PrevLogTerm: rf.getEntry(nextIndex - 1).Term,
Entries: rf.getSlice(nextIndex, lastLogIndex+1),
}
go rf.leaderSendEntries(newArg, server)
}
}
}
}

AppendEntries RPC核心函数微调

当然了,Leader有可能捞不到古早日志,那么Follower也会不会有这样的情况呢?

显然也是有的,例如一个Follower刚进行了一轮快照,截断了若干日志。接着接收到了Leader发来的日志,在AppendEntries中我们需要做的第一步就是比对PrevLogIndex位置上的日志是否对应,如果不对应则回退,但是有可能这个位置已经被截断丢弃了。

由于日志没法从快照中重新恢复出来,那么我们只能默认被快照的这些日志,既然已经被提交,加上被快照,那么自然就是一致的。我们只能将Leader发来的日志条目,重新截断,在Follower能接受的位置重新开始比对。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
if len(args.Entries) == 0 {
Debug(dLog2, "S%d <- S%d Received heartbeat at T%d.", rf.me, args.LeaderId, rf.currentTerm)
} else {
Debug(dLog2, "S%d <- S%d Received append entries at T%d.", rf.me, args.LeaderId, rf.currentTerm)
}
reply.Success = false

// Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
Debug(dLog2, "S%d Term is lower, rejecting append request. (%d < %d)",
rf.me, args.Term, rf.currentTerm)
reply.Term = rf.currentTerm
return
}

// For Candidates. If AppendEntries RPC received from new leader: convert to follower
if rf.state == CandidateState && rf.currentTerm == args.Term {
rf.state = FollowerState
Debug(dLog2, "S%d Convert from candidate to follower at T%d.", rf.me, rf.currentTerm)
}

rf.checkTerm(args.Term)
reply.Term = rf.currentTerm

Debug(dTimer, "S%d Resetting ELT, wait for next potential heartbeat timeout.", rf.me)
rf.setElectionTimeout(randHeartbeatTimeout())

rf.leaderId = args.LeaderId

if args.PrevLogIndex < rf.lastIncludedIndex {
alreadySnapshotLogLen := rf.lastIncludedIndex - args.PrevLogIndex
if alreadySnapshotLogLen <= len(args.Entries) {
newArgs := &AppendEntriesArgs{
Term: args.Term,
LeaderId: args.LeaderId,
PrevLogTerm: rf.lastIncludedTerm,
PrevLogIndex: rf.lastIncludedIndex,
Entries: args.Entries[alreadySnapshotLogLen:],
LeaderCommit: args.LeaderCommit,
}
args = newArgs
Debug(dWarn, "S%d Log entry at PLI already discarded by snapshot, readjusting. PLI: %d, PLT:%d, Entries: %v.",
rf.me, args.PrevLogIndex, args.Entries)
} else {
Debug(dWarn, "S%d Log entry at PLI already discarded by snapshot, assume as a match. PLI: %d.", rf.me, args.PrevLogIndex)
reply.Success = true
return
}
}
// Reply false if log doesn’t contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if args.PrevLogTerm == -1 || args.PrevLogTerm != rf.getEntry(args.PrevLogIndex).Term {
Debug(dDrop, "S%d Prev log entries do not match. Ask leader to retry.", rf.me)
reply.XLen = len(rf.log) + rf.lastIncludedIndex
reply.XTerm = rf.getEntry(args.PrevLogIndex).Term
reply.XIndex, _ = rf.getBoundsWithTerm(reply.XTerm)
return
}
// If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
for i, entry := range args.Entries {
if rf.getEntry(i+1+args.PrevLogIndex).Term != entry.Term {
rf.log = append(rf.getSlice(1+rf.lastIncludedIndex, i+1+args.PrevLogIndex), args.Entries[i:]...)
break
}
}
Debug(dLog2, "S%d <- S%d Append entries success. Saved logs: %v.", rf.me, args.LeaderId, args.Entries)
if len(args.Entries) > 0 {
rf.persist()
}
if args.LeaderCommit > rf.commitIndex {
Debug(dCommit, "S%d Get higher LC at T%d, updating commitIndex. (%d < %d)",
rf.me, rf.currentTerm, rf.commitIndex, args.LeaderCommit)
rf.commitIndex = min(args.LeaderCommit, args.PrevLogIndex+len(args.Entries))
Debug(dCommit, "S%d Updated commitIndex at T%d. CI: %d.", rf.me, rf.currentTerm, rf.commitIndex)
}
reply.Success = true
}

测试

简单测试一下我们的代码,使用命令go test -run 2D -race

最近还是要上班,批量测试那真的只能随缘跑了… 未来有时间会跑完的

总结

这一坨代码也不知道拖了多久才写完,反正我开始写的时候我还刚被虾皮裁,我写完的时候已经在PDD干了小半年了…

日常猝死做自我提升,现在是2023.04.22 02:37AM,睡了睡了,狗命要紧…