2021 年 11 月 ~ 2022 年 1 月 ,PingCAP 举办了第一届 Talent Plan KV 学习营,相关介绍可参考 推送。
在本次比赛中,由于我们小组的两位成员之前都刷过 MIT 6.824,已经对教学级别的 raft 有一定的了解,所以参加此次比赛的目的就是去感受一下生产级别分布式 KV 的代码实现,学习实践一下 lsm, etcd, raftstore 和 percolator 的理论知识和 codebase。
u1s1,刷 lab 的过程十分曲折,我们俩所在的实验室到年底的时候都非常忙,前几周基本每周都只能抽出顶多一两天的时间来写代码,而理解 lab2b/lab3b raftstore 的难度是非常大的,我们用了一周多的时间才勉强看懂 raftstore 的代码。这使得到还剩两周时间的时候,我们才刷到 lab2c。最后两周我们利用中午午休时间和晚上睡觉时间疯狂加班,在 lab 上花了更多的时间,最后才堪堪刷完。
在刷 lab 的过程中,由于时间有限,我们始终秉持着学习优先,成绩第二的原则。即以 了解 codebase,学习知识,做最容易做且最有用的优化 为主,并没有去卷很多功能点。在处理 bug 的态度上,对于 safety 的问题比如错误读写的 bug 等,我们对这类问题进行了重点关注和解决;对于 liveness 的问题比如 request timeout 等,我们则是在有限的时间内尽力做了优化,但并没有投入太多精力,因为这种工作没有上限,tikv 的 raftstore 也一定在持续做这些工作,时间不够的情况下去卷这些就没有太大意义了。

出人意料的是,我们得了第二名的好成绩,具体可参考 官宣。事后反省一下,在 safety 上我们遇到的问题都解决了;在 liveness 上我们没投入太多精力;在文档上,我们简单介绍了代码实现,但将重点放在了我们对相关知识的理解和思考上;在性能上,我们重点做了最容易做的 batching 优化,其本质上是使用 raft 的优化而不是 raft 自身的优化,但对性能的提升却异常关键,比如 tidb 对于一个事务打包的一堆写请求,到 tikv 的 region 之后,这些写请求同步成一条还是多条 raftlog 对于性能的影响是巨大的。
从结果来看,我们的策略是正确的,我们在很有限的时间内拿到了很高的收益。
最后,出于对课程的保护,也出于跟大家分享一些刷 lab 的经验,让大家少踩坑,在此处我仅将文档公开,希望能为大家提供一些思路,欢迎一起交流。
本部分是对底层 badger api 的包装,主要涉及修改的代码文件是 standalone_storage.go, 需要实现 Storage 接口的 Write 和 Reader 方法,来实现对底层 badger 数据库的读写。
Write 部分涉及到 Put 和 Delete 两种操作。
因为 write_batch.go 中已经实现了对 badger 中 entry 的 put 和 delete 操作,我们只需要判断 batch 中的每一个 Modify 的操作类型,然后直接调用 write_batch.go 中相对应的方法即可。
Reader 部分会涉及到 point read 和 scan read 两种不同读方式。
因为提示到应该使用 badger.Txn 来实现 Reader 函数,所以我们声明了一个 badgerReader 结构体来实现 StorageReader 接口,badgerReader 结构体内部包含对 badger.Txn 的引用。
针对 point read,
我们直接调用 util.go 中的 GetCF 等函数,对 cf 中指定 key 进行读取。
针对 scan read,
直接调用 cf_iterator.go 中的 NewCFIterator 函数,返回一个迭代器,供 part2 中调用。
本部分需要实现 RawGet/ RawScan/ RawPut/ RawDelete 四个 handlers,主要涉及修改的代码文件是 raw_api.go
针对 RawGet,
我们调用 storage 的 Reader 函数返回一个 Reader,然后调用其 GetCF 函数进行点读取即可,读取之后需要判断对应 key 是否存在。
针对 RawScan,
同样地调用 storage 的 Reader 函数返回一个 Reader,然后调用其 IterCF 函数返回一个迭代器,然后使用迭代器读取即可。
针对 RawPut 和 RawDelete,
声明对应的 Modify 后,调用 storage.Write 函数即可。
LSM 是一个伴随 NoSQL 运动一起流行的存储引擎,相比 B+ 树以牺牲读性能的代价在写入性能上获得了较大的提升。
近年来,工业界和学术界均对 LSM 树进行了一定的研究,具体可以阅读 VLDB2018 有关 LSM 的综述:LSM-based Storage Techniques: A Survey, 也可直接阅读针对该论文我认为还不错的一篇 中文概要总结。
介绍完了 LSM 综述,可以简单聊聊 badger,这是一个纯 go 实现的 LSM 存储引擎,参照了 FAST2016 有关 KV 分离 LSM 的设计: WiscKey 。有关其项目的动机和一些 benchmark 结果可以参照其创始人的 博客。
对于 Wisckey 这篇论文,除了阅读论文以外,也可以参考此 阅读笔记 和此 总结博客。这两篇资料较为系统地介绍了现在学术界和工业界对于 KV 分离 LSM 的一些设计和实现。
实际上对于目前的 NewSQL 数据库,其底层大多数都是一个分布式 KV 存储系统。对于 OLTP 业务,其往往采用行存的方式,即 key 对应的 value 便是一个 tuple。在这样的架构下,value 往往很大,因而采用 KV 分离的设计往往能够减少大量的写放大,从而提升性能。
之前和腾讯云的一个大佬聊过,他有说 TiKV 的社区版和商业版存储引擎性能差异很大。目前想一下,KV 分离可能便是 RocksDB 和 Titan 的最大区别吧。
本部分是对 raft 模块 leader 选举功能的实现,主要涉及修改的代码文件是 raft.go、log.go
raft 模块 leader 选举流程如下:

第一步,我们首先实现对 raft 的初始化。
实现 log.go 中的 newLog 方法,调用 storage 的 InitialState 等方法对 RaftLog 进行初始化,读取持久化在 storage 中 term、commit、vote 和 entries,为后面的 lab 做准备。完成 RaftLog 的初始化后,再填充 Raft 中的相应字段,即完成 Raft 对象的初始化。
第二步,我们实现 Raft 对象的 tick() 函数
上层应用会调用 tick() 函数,作为逻辑时钟控制 Raft 模块的选举功能和心跳功能。因此我们实现 tick() 函数,当 Raft 状态是 Follower 时,检查自上次接收心跳之后,间隔时间是否超过了 election timeout,如果超过了,将发送 MessageType_MsgHup;当 Raft 状态时 Leader 时,检查自上次发送心跳之后,间隔时间是否超过了 heartbeat timeout,如果超过了,将发送 MessageType_MsgBeat。
第三步,我们实现 raft.Raft.becomeXXX 等基本函数
实现了 becomeFollower(),becomeCandidate(),becomeLeader() 等 stub 函数,对不同状态下的属性进行赋值。
第四步,我们实现 Step() 函数对不同 Message 的处理
主要涉及到的 Message 有
MessageType_MsgHup
MessageType_MsgRequestVote
MessageType_MsgRequestVoteResponse
接下来分情况实现:
(1)MessageType_Msgup
当 Raft 状态为 Follower 和 Candidate 时,会先调用 becomeCandidate() 方法,将自己的状态转变为 Candidate,然后向所有 peer 发送 MessageType_MsgRequestVote 消息,请求他们的投票
(2)MessageType_MsgRequestVote
当 Raft 接收到此消息时,会在以下情况拒绝投票:
当 Candidate 的 term 小于当前 raft 的 term 时拒绝投票
如果当前 raft 的 term 与 candidate 的 term 相等,但是它之前已经投票给其他 Candidate 时,会拒绝投票
如果当前 raft 发现 candidate 的日志不如自己的日志更 up-to-date 时,也会拒绝投票
(3)MessageType_MsgRequestVoteResponse
Candidate 接收到此消息时,就会根据消息的 reject 属性来确定自己的得票,当自己的得票数大于一半以上,就会调用 becomeLeader() 函数,将状态转变为 Leader;当拒绝票数也大于一半以上时,就会转回到 Follower 状态。
本部分是对 raft 模块日志复制功能的实现,主要涉及修改的代码文件是 raft.go、log.go
日志复制的流程如下:

本部分主要实现不同状态的 raft 对以下 Message 的处理:
接下来分情况实现:
(1)MessageType_MsgBeat
当上层应用调用 tick() 函数时,Leader 需要检查是否到了该发送心跳的时候,如果到了,那么就发送 MessageType_MsgHeartbeat。
leader 会将自己的 commit 值赋给在 MsgHeartbeat 消息中响应值,以让 Follower 能够及时 commit 安全的 entries
(2)MessageType_MsgHeartbeat
当 Follower 接收到心跳时,会更新自己的 electionTimeout,并会将自己的 lastIndex 与 leader 的 commit 值比较,让自己能够及时 commit entry。
(3)MessageType_MsgHeartbeatResponse
当 Leader 接收到心跳回复时,会比较对应 Follower 的 Pr.Match, 如果发现 Follower 滞后,就会向其发送缺少的 entries
(4)MessageType_MsgPropose
当 Leader 要添加 data 到自己的 log entries 中时,会发送一个 local message—MsgPropose 来让自己向所有 follower 同步 log entries,发送 MessageType_MsgAppend
(5)MessageType_MsgAppend
当 Follower 接收到此消息时,会在以下情况拒绝 append:
其他情况,Follower 会接收新的 entries,并更新自己的相关属性。
(6)MessageType_MsgAppendResponse
当 Leader 发现 Follower 拒绝 append 后,会更新 raft.Prs 中对应 Follower 的进度信息,并根据新的进度,重新发送 entries。
本部分主要实现 raw node 的接口,涉及修改的代码文件为 rawnode.go
RawNode 对象中的属性除了 Raft 对象,还增加了 prevSoftState 和 preHardState 两个属性,用于在 HasReady() 函数中判断 node 是否 pending
此外还实现了 Advance() 函数,主要是对 Raft 内部属性进行更新。
本部分主要实现 peer_storage.go 中 SaveReadyState() 方法和 Append() 方法,涉及修改的代码文件为 peer_storage.go
peer storage 除了管理持久化 raft log 外,也会管理持久化其他元数据(RaftLocalState、RaftApplyState 和 RegionLocalState),因此我们需要实现 SaveReadyState() 方法,将 raft.Ready 中修改过的状态和数据保存到 badger 中。
首先我们通过实现 Append() 方法,保存需要持久化的 raft log。遍历 Ready 中 Entries,调用 SetMeta() 方法将他们保存到 raftWB,并删除可能未提交的 raft log,最后更新 raftState。
在处理完 raft log 后,我们还需要保存 Ready 中的 hardState,并在最后调用 WriteToDB() 方法保证之前的修改落盘。
本部分主要实现 peer_storage_handler.go 中的 proposeRaftCommand() 和 HandleRaftReady() 方法,涉及修改的代码文件为 peer_storage_handler.go
proposeRaftCommand() 方法使得系统有能力将接收到的 client 请求通过 raft 模块进行同步,以实现分布式环境下的一致性。在本方法中,我们直接调用 raft 模块的 Propose 方法,将 client 请求进行同步,并为该请求初始化对应的 proposal,以便该请求 committed 后将结果返回给 client
当 msg 被 raft 模块处理后,会导致 raft 模块的一些状态变化,这时候需要 HandleRaftReady() 方法进行一些操作来处理这些变化:
因为 raft entries 不可能一直无限增长下去,所以本部分我们需要实现 snapshot 功能,清理之前的 raft entries。
整个 lab2c 的执行流程如下:


当 leader 发现 follower 落后太多时,会主动向 follower 发送 snapshot,对其进行同步。在 Raft 模块内部,需要增加对 MessageType_MsgSnapshot 消息的处理,主要对以下两点进行处理:
在第二步中,follower 需要判断 leader 发送的 snapshot 是否会与自己的 entries 产生冲突,如果发送的 snapshot 是目前现有 entries 的子集,说明 snapshot 是 stale 的,那么要返回目前 follower 的进度,更新 leader 中相应的 Match 和 Next,以便再下一次发送正确的日志;如果没有发生冲突,那么 follower 就根据 snapshot 中的信息进行相应的更新,更新自身的 committed 等 index,如果 confstate 也产生变化,有新的 node 加入或者已有的 node 被移除,需要更新本节点的 confState,为 lab3 做准备。
在本部分中,当日志增长超过 RaftLogGcCountLimit 的限制时,会要求本节点整理和删除已经应用到状态机的旧日志。节点会接收到类似于 Get/Put/Delete/Snap 命令的 CompactLogRequest,因此我们需要在 lab2b 的基础上,当包含 CompactLogRequest 的 entry 提交后,增加 processAdminRequest() 方法来对这类 adminRequest 的处理。
在 processAdminRequest() 方法中,我们需要更新 RaftApplyState 中 RaftTruncatedState 中的相关元数据,记录最新截断的最后一个日志的 index 和 term,然后调用 ScheduleCompactLog() 方法,异步让 RaftLog-gc worker 能够进行旧日志删除的工作。
另外,因为 raft 模块在处理 snapshot 相关的 msg 时,也会对一些状态进行修改,所以在 peer_storage.go 方法中,我们需要在 SaveReadyState() 方法中,调用 ApplySnapshot() 方法中,对相应的元数据进行保存。
在 ApplySnapshot() 方法中,如果当前节点已经处理过的 entries 只是 snapshot 的一个子集,那么需要对 raftLocalState 中的 commit、lastIndex 以及 raftApplyState 中的 appliedIndex 等元数据进行更新,并调用 ClearData() 和 ClearMetaData() 方法,对现有的 stale 元数据以及日志进行清空整理。同时,也对 regionLocalState 进行相应更新。最后,我们需要通过 regionSched 这个 channel,将 snapshot 应用于对应的状态机
Raft 是 2015 年以来最受人瞩目的共识算法,有关其前世今生可以参考我们总结的 博客,此处不再赘述。
etcd 是一个生产级别的 Raft 实现,我们在实现 lab2a 的时候大量参考了 etcd 的代码。这个过程不仅帮助我们进一步了解了 etcd 的 codebase,也让我们进一步意识到一个工程级别的 raft 实现需要考虑多少 corner case。整个学习过程收获还是很大的,这里贴一些 etcd 的优质博客以供学习。
在 Raft 层完成后,下一步需要做的便是基于 Raft 层搭建一个高可用的 KV 层。这里依然参考了 etcd KV 层驱动 Raft 层的方式。
即总体的思路如下所示:
1 | |
做过 tinykv 的同学应该都能够感觉到 lab2b 的难度与之前有一个大 gap,我认为主要原因是需要看的代码实现是太多了。
如今回首,建议分三个步骤来做,这样效率可能会高一些:
在实现 lab2b 中,由于时间有限,我们重点关注了 batching 的优化和 apply 时的 safety,以下进行简单的介绍:
batching 优化:客户端发来的一条 command 可能包含多个读写请求,服务端可以将其打包成一条或多条 raft 日志。显然,打包成一条 Raft 日志的性能会更高,因为这样能够节省大量 IO 资源的消耗。当然这也需要在 apply 时对所有的 request 均做相应的业务和容错处理。
apply 时的 safety:要想实现基于 Raft 的 KV 服务,一大难点便是如何保证 applyIndex 和状态机数据的原子性。比如在 6.824 的框架中,Raft 层对于上层状态机的假设是易失的,即重启后状态机为空,那么 applyIndex 便可以不被持久化记录,因为一旦发生重启 Raft 实例可以从 0 开始重新 apply 日志,对于状态机来说这个过程保证不会重复。然而这样的实现虽然保证了 safety,但却不是一个生产可用的实现。对于 tinykv,其状态机为非易失的 LSM 引擎,一旦要记录 applyIndex 就可能出现与状态机数据不一致的原子性问题,即重启后可能会存在日志被重复 apply 到状态机的现象。为了解决这一问题,我们将每个 Index 下 entry 的应用和对应 applyIndex 的更新放到了一个事务中来保证他们之间的原子性,巧妙地解决了该过程的 safety 问题。
tinykv 的 Snapshot 几乎是一个纯异步的方案,在架构上有很多讲究,这里可以仔细阅读文档和一位社区同学分享的 Snapshot 流程 后再开始编码。
一旦了解了以下两个流程,代码便可以自然而然地写出来了。
本部分主要涉及 Raft 算法 leader transfer 和 conf change 功能的两个工作,主要涉及修改的代码文件是 raft.go
对于 leader transfer,注意以下几点即可:
对于 conf change,注意以下几点即可:
本部分主要是在 3a 的基础上,在 raft store 层面实现对 TransferLeader、ChangePeer 和 Split 三种 AdminRequest 的处理,涉及修改的文件主要是 peer_msg_handler.go 和 peer.go
对于 TransferLeader,比较简单:
TransferLeader request 因为不需要复制到 follower 节点,所以在 peer_msg_handler.go 的 pproposeRaftCommand() 方法中直接调用 raw_node.go 中的 TransferLeader() 方法即可
对于 ConfChange,分 addNode 和 removeNode 两种行为处理。
当 addNode 的命令 commit 之后,不需要我们手动调用 createPeer() 或者 maybeCreatePeer() 来显式创建 peer。我们只需要对 d.ctx 中的 storeMeta 进行修改即可,新 peer 会通过心跳机制进行创建。
当 removeNode 的命令 commit 之后,与 addNode 命令不同的是,我们需要显式调用 destroyPeer() 函数来停止相应的 raft 模块。这时需要注意的一个点时,当 Region 中只剩下两个节点,要从这两个节点中移除一个时,如果有一个节点挂了,会使整个集群不可用,特别是要移除的节点是 leader 本身。
在测试中会遇到这样的问题:当 Region 中只剩下节点 A(leader)和 节点 B(follower),当 removeNode A 的命令被 commit 之后,leader 就进行自我销毁,如果这个时候进入了 unreliable 的状态,那么 leader 就有可能无法在 destory 之前通过 heartbeat 去更新 follower 的 commitIndex。这样使得 follower B 不知道 leader A 已经被移除,就算发起选举也无法收到节点 A 的 vote,最终无法成功,导致 request timeout。
对于 split, 需要注意:
本部分主要涉及对收集到的心跳信息进行选择性维护和对 balance-region 策略的具体实现两个工作,主要涉及修改的代码文件是 cluster.go 和 balance_region.go
对于维护心跳信息,按照以下流程执行即可:
对于 balance-region 策略的实现,按照以下步骤执行即可:
Multi-Raft 是分布式 KV 可以 scale out 的基石。TiKV 对每个 region 的 conf change 和 transfer leader 功能能够将 region 动态的在所有 store 上进行负载均衡,对 region 的 split 和 merge 则是能够解决单 region 热点并无用工作损耗资源的问题。不得不说,后两者尽管道理上理解起来很简单,但工程实现上有太多细节要考虑了(据说贵司写了好几年才稳定),分析可能的异常情况实在是太痛苦了,为贵司能够啃下这块硬骨头点赞。
最近看到有一个基于 TiKV 的 hackathon 议题,其本质是想通过更改线程模型来优化 TiKV 的写入性能、性能稳定性和自适应能力。这里可以简单提提一些想法,其实就我们在时序数据库方向的一些经验来说,每个 TSM(TimeSeries Merge Tree)大概能够用满一个核的 CPU 资源。只要我们将 TSM 引擎额个数与 CPU 核数绑定,写入性能基本是能够随着核数增加而线性提升的。那么对于 KV 场景,是否开启 CPU 个数的 LSM 引擎能够更好的利用 CPU 资源呢?即对于 raftstore,是否启动 CPU 个数的 Rocksdb 实例能够更好的利用资源呢?感觉这里也可以做做测试尝试一下。
负载均衡是分布式系统中的一大难题,不同系统均有不同的策略实现,不同的策略可能在不同的 workload 中更有效。
相比 pd 的实现,我们在 lab3c 实现的策略实际上很 trivial,因此我们简单学习了 pd 调度 region 的 策略。尽管这些策略道理上理解起来都比较简单,但如何将所有统计信息准确的量化成一个动态模型却是一件很难尽善尽美的事,这中间的很多指标也只能是经验值,没有严谨的依据。
有关负载均衡我们对学术界的相关工作还不够了解,之后有时间会进行一些关注。
本 Lab 整体相对简单,在基本了解 MVCC, 2PC 和 Percolator 后便可动手了,面向测试用例编程即可。
本部分是对 mvcc 模块的实现,主要涉及修改的代码文件是 transaction.go。需要利用对 CFLock, CFDefault 和 CFWrite 三个 CF 的一些操作来实现 mvcc。
针对 Lock 相关的函数:
针对 Value 相关的函数:
针对 Write 相关的函数:
本部分是对 Percolator 算法 KVPreWrite, KVCommit 和 KVGet 三个方法的实现,主要涉及修改的代码文件是 server.go, query.go 和 nonquery.go。
本部分是对 Percolator 算法 KvCheckTxnStatus, KvBatchRollback, KvResolveLock 和 KvScan 四个方法的实现,主要涉及修改的代码文件是 server.go, query.go 和 nonquery.go。
为了使得 server.go 逻辑代码清晰,在分别完成三个 lab 后对代码进行了进一步整理,针对读写请求分别抽象出来了接口,这样可以使得逻辑更为清晰。
1 | |
有关分布式事务,我们之前有过简单的 学习,对 2PL, 2PC 均有简单的了解,因此此次在实现 Percolator 时只需要关注 2PC 与 MVCC 的结合即可,这里重点参考了以下博客:
实现完后,我们进一步被 Google 的聪明所折服,Percolator 基于单行事务实现了多行事务,基于 MVCC 实现了 SI 隔离级别。尽管其事务恢复流程相对复杂,但其本质上是在 CAP 定理中通过牺牲恢复时的 A 来优化了协调者正常写入时的 A,即协调者单点在 SQL 层不用高可用来保证最终执行 commit 或者 abort。因为一旦协调者节点挂掉,该事务在超过 TTL (TTL 的超时也是由 TSO 的时间戳来判断,对于各个 TiKV 节点来说均为逻辑时钟,这样的设计也避免了 Wall Clock 的同步难题)后会被其他事务 rollback,总体上来看 Percolator 比较优雅的解决了 2PC 的 safety 问题。
当然,分布式事务可以深究的地方还很多,并且很多思想都与 Lamport 那篇最著名的论文 Time, Clocks, and the Ordering of Events in a Distributed System 有关。除了 TiDB 外,Spanner,YugaByte,CockroachDB 等 NewSQL 数据库均有自己的大杀器,比如 TrueTime,HLC 等等。总之这块儿挺有意思的,虽然在这儿告一段落,但希望以后有机会能深入做一些相关工作。
实现一个稳定的分布式系统实在是太有挑战太有意思啦。
感谢 PingCAP 社区提供如此优秀的课程!
2021 年 11 月 ~ 2022 年 1 月 ,PingCAP 举办了第一届 Talent Plan KV 学习营,相关介绍可参考 推送。
在本次比赛中,由于我们小组的两位成员之前都刷过 MIT 6.824,已经对教学级别的 raft 有一定的了解,所以参加此次比赛的目的就是去感受一下生产级别分布式 KV 的代码实现,学习实践一下 lsm, etcd, raftstore 和 percolator 的理论知识和 codebase。
u1s1,刷 lab 的过程十分曲折,我们俩所在的实验室到年底的时候都非常忙,前几周基本每周都只能抽出顶多一两天的时间来写代码,而理解 lab2b/lab3b raftstore 的难度是非常大的,我们用了一周多的时间才勉强看懂 raftstore 的代码。这使得到还剩两周时间的时候,我们才刷到 lab2c。最后两周我们利用中午午休时间和晚上睡觉时间疯狂加班,在 lab 上花了更多的时间,最后才堪堪刷完。
在刷 lab 的过程中,由于时间有限,我们始终秉持着学习优先,成绩第二的原则。即以 了解 codebase,学习知识,做最容易做且最有用的优化 为主,并没有去卷很多功能点。在处理 bug 的态度上,对于 safety 的问题比如错误读写的 bug 等,我们对这类问题进行了重点关注和解决;对于 liveness 的问题比如 request timeout 等,我们则是在有限的时间内尽力做了优化,但并没有投入太多精力,因为这种工作没有上限,tikv 的 raftstore 也一定在持续做这些工作,时间不够的情况下去卷这些就没有太大意义了。

出人意料的是,我们得了第二名的好成绩,具体可参考 官宣。事后反省一下,在 safety 上我们遇到的问题都解决了;在 liveness 上我们没投入太多精力;在文档上,我们简单介绍了代码实现,但将重点放在了我们对相关知识的理解和思考上;在性能上,我们重点做了最容易做的 batching 优化,其本质上是使用 raft 的优化而不是 raft 自身的优化,但对性能的提升却异常关键,比如 tidb 对于一个事务打包的一堆写请求,到 tikv 的 region 之后,这些写请求同步成一条还是多条 raftlog 对于性能的影响是巨大的。
从结果来看,我们的策略是正确的,我们在很有限的时间内拿到了很高的收益。
最后,出于对课程的保护,也出于跟大家分享一些刷 lab 的经验,让大家少踩坑,在此处我仅将文档公开,希望能为大家提供一些思路,欢迎一起交流。
本部分是对底层 badger api 的包装,主要涉及修改的代码文件是 standalone_storage.go, 需要实现 Storage 接口的 Write 和 Reader 方法,来实现对底层 badger 数据库的读写。
Write 部分涉及到 Put 和 Delete 两种操作。
因为 write_batch.go 中已经实现了对 badger 中 entry 的 put 和 delete 操作,我们只需要判断 batch 中的每一个 Modify 的操作类型,然后直接调用 write_batch.go 中相对应的方法即可。
Reader 部分会涉及到 point read 和 scan read 两种不同读方式。
因为提示到应该使用 badger.Txn 来实现 Reader 函数,所以我们声明了一个 badgerReader 结构体来实现 StorageReader 接口,badgerReader 结构体内部包含对 badger.Txn 的引用。
针对 point read,
我们直接调用 util.go 中的 GetCF 等函数,对 cf 中指定 key 进行读取。
针对 scan read,
直接调用 cf_iterator.go 中的 NewCFIterator 函数,返回一个迭代器,供 part2 中调用。
本部分需要实现 RawGet/ RawScan/ RawPut/ RawDelete 四个 handlers,主要涉及修改的代码文件是 raw_api.go
针对 RawGet,
我们调用 storage 的 Reader 函数返回一个 Reader,然后调用其 GetCF 函数进行点读取即可,读取之后需要判断对应 key 是否存在。
针对 RawScan,
同样地调用 storage 的 Reader 函数返回一个 Reader,然后调用其 IterCF 函数返回一个迭代器,然后使用迭代器读取即可。
针对 RawPut 和 RawDelete,
声明对应的 Modify 后,调用 storage.Write 函数即可。
LSM 是一个伴随 NoSQL 运动一起流行的存储引擎,相比 B+ 树以牺牲读性能的代价在写入性能上获得了较大的提升。
近年来,工业界和学术界均对 LSM 树进行了一定的研究,具体可以阅读 VLDB2018 有关 LSM 的综述:LSM-based Storage Techniques: A Survey, 也可直接阅读针对该论文我认为还不错的一篇 中文概要总结。
介绍完了 LSM 综述,可以简单聊聊 badger,这是一个纯 go 实现的 LSM 存储引擎,参照了 FAST2016 有关 KV 分离 LSM 的设计: WiscKey 。有关其项目的动机和一些 benchmark 结果可以参照其创始人的 博客。
对于 Wisckey 这篇论文,除了阅读论文以外,也可以参考此 阅读笔记 和此 总结博客。这两篇资料较为系统地介绍了现在学术界和工业界对于 KV 分离 LSM 的一些设计和实现。
实际上对于目前的 NewSQL 数据库,其底层大多数都是一个分布式 KV 存储系统。对于 OLTP 业务,其往往采用行存的方式,即 key 对应的 value 便是一个 tuple。在这样的架构下,value 往往很大,因而采用 KV 分离的设计往往能够减少大量的写放大,从而提升性能。
之前和腾讯云的一个大佬聊过,他有说 TiKV 的社区版和商业版存储引擎性能差异很大。目前想一下,KV 分离可能便是 RocksDB 和 Titan 的最大区别吧。
本部分是对 raft 模块 leader 选举功能的实现,主要涉及修改的代码文件是 raft.go、log.go
raft 模块 leader 选举流程如下:

第一步,我们首先实现对 raft 的初始化。
实现 log.go 中的 newLog 方法,调用 storage 的 InitialState 等方法对 RaftLog 进行初始化,读取持久化在 storage 中 term、commit、vote 和 entries,为后面的 lab 做准备。完成 RaftLog 的初始化后,再填充 Raft 中的相应字段,即完成 Raft 对象的初始化。
第二步,我们实现 Raft 对象的 tick() 函数
上层应用会调用 tick() 函数,作为逻辑时钟控制 Raft 模块的选举功能和心跳功能。因此我们实现 tick() 函数,当 Raft 状态是 Follower 时,检查自上次接收心跳之后,间隔时间是否超过了 election timeout,如果超过了,将发送 MessageType_MsgHup;当 Raft 状态时 Leader 时,检查自上次发送心跳之后,间隔时间是否超过了 heartbeat timeout,如果超过了,将发送 MessageType_MsgBeat。
第三步,我们实现 raft.Raft.becomeXXX 等基本函数
实现了 becomeFollower(),becomeCandidate(),becomeLeader() 等 stub 函数,对不同状态下的属性进行赋值。
第四步,我们实现 Step() 函数对不同 Message 的处理
主要涉及到的 Message 有
MessageType_MsgHup
MessageType_MsgRequestVote
MessageType_MsgRequestVoteResponse
接下来分情况实现:
(1)MessageType_Msgup
当 Raft 状态为 Follower 和 Candidate 时,会先调用 becomeCandidate() 方法,将自己的状态转变为 Candidate,然后向所有 peer 发送 MessageType_MsgRequestVote 消息,请求他们的投票
(2)MessageType_MsgRequestVote
当 Raft 接收到此消息时,会在以下情况拒绝投票:
当 Candidate 的 term 小于当前 raft 的 term 时拒绝投票
如果当前 raft 的 term 与 candidate 的 term 相等,但是它之前已经投票给其他 Candidate 时,会拒绝投票
如果当前 raft 发现 candidate 的日志不如自己的日志更 up-to-date 时,也会拒绝投票
(3)MessageType_MsgRequestVoteResponse
Candidate 接收到此消息时,就会根据消息的 reject 属性来确定自己的得票,当自己的得票数大于一半以上,就会调用 becomeLeader() 函数,将状态转变为 Leader;当拒绝票数也大于一半以上时,就会转回到 Follower 状态。
本部分是对 raft 模块日志复制功能的实现,主要涉及修改的代码文件是 raft.go、log.go
日志复制的流程如下:

本部分主要实现不同状态的 raft 对以下 Message 的处理:
接下来分情况实现:
(1)MessageType_MsgBeat
当上层应用调用 tick() 函数时,Leader 需要检查是否到了该发送心跳的时候,如果到了,那么就发送 MessageType_MsgHeartbeat。
leader 会将自己的 commit 值赋给在 MsgHeartbeat 消息中响应值,以让 Follower 能够及时 commit 安全的 entries
(2)MessageType_MsgHeartbeat
当 Follower 接收到心跳时,会更新自己的 electionTimeout,并会将自己的 lastIndex 与 leader 的 commit 值比较,让自己能够及时 commit entry。
(3)MessageType_MsgHeartbeatResponse
当 Leader 接收到心跳回复时,会比较对应 Follower 的 Pr.Match, 如果发现 Follower 滞后,就会向其发送缺少的 entries
(4)MessageType_MsgPropose
当 Leader 要添加 data 到自己的 log entries 中时,会发送一个 local message—MsgPropose 来让自己向所有 follower 同步 log entries,发送 MessageType_MsgAppend
(5)MessageType_MsgAppend
当 Follower 接收到此消息时,会在以下情况拒绝 append:
其他情况,Follower 会接收新的 entries,并更新自己的相关属性。
(6)MessageType_MsgAppendResponse
当 Leader 发现 Follower 拒绝 append 后,会更新 raft.Prs 中对应 Follower 的进度信息,并根据新的进度,重新发送 entries。
本部分主要实现 raw node 的接口,涉及修改的代码文件为 rawnode.go
RawNode 对象中的属性除了 Raft 对象,还增加了 prevSoftState 和 preHardState 两个属性,用于在 HasReady() 函数中判断 node 是否 pending
此外还实现了 Advance() 函数,主要是对 Raft 内部属性进行更新。
本部分主要实现 peer_storage.go 中 SaveReadyState() 方法和 Append() 方法,涉及修改的代码文件为 peer_storage.go
peer storage 除了管理持久化 raft log 外,也会管理持久化其他元数据(RaftLocalState、RaftApplyState 和 RegionLocalState),因此我们需要实现 SaveReadyState() 方法,将 raft.Ready 中修改过的状态和数据保存到 badger 中。
首先我们通过实现 Append() 方法,保存需要持久化的 raft log。遍历 Ready 中 Entries,调用 SetMeta() 方法将他们保存到 raftWB,并删除可能未提交的 raft log,最后更新 raftState。
在处理完 raft log 后,我们还需要保存 Ready 中的 hardState,并在最后调用 WriteToDB() 方法保证之前的修改落盘。
本部分主要实现 peer_storage_handler.go 中的 proposeRaftCommand() 和 HandleRaftReady() 方法,涉及修改的代码文件为 peer_storage_handler.go
proposeRaftCommand() 方法使得系统有能力将接收到的 client 请求通过 raft 模块进行同步,以实现分布式环境下的一致性。在本方法中,我们直接调用 raft 模块的 Propose 方法,将 client 请求进行同步,并为该请求初始化对应的 proposal,以便该请求 committed 后将结果返回给 client
当 msg 被 raft 模块处理后,会导致 raft 模块的一些状态变化,这时候需要 HandleRaftReady() 方法进行一些操作来处理这些变化:
因为 raft entries 不可能一直无限增长下去,所以本部分我们需要实现 snapshot 功能,清理之前的 raft entries。
整个 lab2c 的执行流程如下:


当 leader 发现 follower 落后太多时,会主动向 follower 发送 snapshot,对其进行同步。在 Raft 模块内部,需要增加对 MessageType_MsgSnapshot 消息的处理,主要对以下两点进行处理:
在第二步中,follower 需要判断 leader 发送的 snapshot 是否会与自己的 entries 产生冲突,如果发送的 snapshot 是目前现有 entries 的子集,说明 snapshot 是 stale 的,那么要返回目前 follower 的进度,更新 leader 中相应的 Match 和 Next,以便再下一次发送正确的日志;如果没有发生冲突,那么 follower 就根据 snapshot 中的信息进行相应的更新,更新自身的 committed 等 index,如果 confstate 也产生变化,有新的 node 加入或者已有的 node 被移除,需要更新本节点的 confState,为 lab3 做准备。
在本部分中,当日志增长超过 RaftLogGcCountLimit 的限制时,会要求本节点整理和删除已经应用到状态机的旧日志。节点会接收到类似于 Get/Put/Delete/Snap 命令的 CompactLogRequest,因此我们需要在 lab2b 的基础上,当包含 CompactLogRequest 的 entry 提交后,增加 processAdminRequest() 方法来对这类 adminRequest 的处理。
在 processAdminRequest() 方法中,我们需要更新 RaftApplyState 中 RaftTruncatedState 中的相关元数据,记录最新截断的最后一个日志的 index 和 term,然后调用 ScheduleCompactLog() 方法,异步让 RaftLog-gc worker 能够进行旧日志删除的工作。
另外,因为 raft 模块在处理 snapshot 相关的 msg 时,也会对一些状态进行修改,所以在 peer_storage.go 方法中,我们需要在 SaveReadyState() 方法中,调用 ApplySnapshot() 方法中,对相应的元数据进行保存。
在 ApplySnapshot() 方法中,如果当前节点已经处理过的 entries 只是 snapshot 的一个子集,那么需要对 raftLocalState 中的 commit、lastIndex 以及 raftApplyState 中的 appliedIndex 等元数据进行更新,并调用 ClearData() 和 ClearMetaData() 方法,对现有的 stale 元数据以及日志进行清空整理。同时,也对 regionLocalState 进行相应更新。最后,我们需要通过 regionSched 这个 channel,将 snapshot 应用于对应的状态机
Raft 是 2015 年以来最受人瞩目的共识算法,有关其前世今生可以参考我们总结的 博客,此处不再赘述。
etcd 是一个生产级别的 Raft 实现,我们在实现 lab2a 的时候大量参考了 etcd 的代码。这个过程不仅帮助我们进一步了解了 etcd 的 codebase,也让我们进一步意识到一个工程级别的 raft 实现需要考虑多少 corner case。整个学习过程收获还是很大的,这里贴一些 etcd 的优质博客以供学习。
在 Raft 层完成后,下一步需要做的便是基于 Raft 层搭建一个高可用的 KV 层。这里依然参考了 etcd KV 层驱动 Raft 层的方式。
即总体的思路如下所示:
1 | |
做过 tinykv 的同学应该都能够感觉到 lab2b 的难度与之前有一个大 gap,我认为主要原因是需要看的代码实现是太多了。
如今回首,建议分三个步骤来做,这样效率可能会高一些:
在实现 lab2b 中,由于时间有限,我们重点关注了 batching 的优化和 apply 时的 safety,以下进行简单的介绍:
batching 优化:客户端发来的一条 command 可能包含多个读写请求,服务端可以将其打包成一条或多条 raft 日志。显然,打包成一条 Raft 日志的性能会更高,因为这样能够节省大量 IO 资源的消耗。当然这也需要在 apply 时对所有的 request 均做相应的业务和容错处理。
apply 时的 safety:要想实现基于 Raft 的 KV 服务,一大难点便是如何保证 applyIndex 和状态机数据的原子性。比如在 6.824 的框架中,Raft 层对于上层状态机的假设是易失的,即重启后状态机为空,那么 applyIndex 便可以不被持久化记录,因为一旦发生重启 Raft 实例可以从 0 开始重新 apply 日志,对于状态机来说这个过程保证不会重复。然而这样的实现虽然保证了 safety,但却不是一个生产可用的实现。对于 tinykv,其状态机为非易失的 LSM 引擎,一旦要记录 applyIndex 就可能出现与状态机数据不一致的原子性问题,即重启后可能会存在日志被重复 apply 到状态机的现象。为了解决这一问题,我们将每个 Index 下 entry 的应用和对应 applyIndex 的更新放到了一个事务中来保证他们之间的原子性,巧妙地解决了该过程的 safety 问题。
tinykv 的 Snapshot 几乎是一个纯异步的方案,在架构上有很多讲究,这里可以仔细阅读文档和一位社区同学分享的 Snapshot 流程 后再开始编码。
一旦了解了以下两个流程,代码便可以自然而然地写出来了。
本部分主要涉及 Raft 算法 leader transfer 和 conf change 功能的两个工作,主要涉及修改的代码文件是 raft.go
对于 leader transfer,注意以下几点即可:
对于 conf change,注意以下几点即可:
本部分主要是在 3a 的基础上,在 raft store 层面实现对 TransferLeader、ChangePeer 和 Split 三种 AdminRequest 的处理,涉及修改的文件主要是 peer_msg_handler.go 和 peer.go
对于 TransferLeader,比较简单:
TransferLeader request 因为不需要复制到 follower 节点,所以在 peer_msg_handler.go 的 pproposeRaftCommand() 方法中直接调用 raw_node.go 中的 TransferLeader() 方法即可
对于 ConfChange,分 addNode 和 removeNode 两种行为处理。
当 addNode 的命令 commit 之后,不需要我们手动调用 createPeer() 或者 maybeCreatePeer() 来显式创建 peer。我们只需要对 d.ctx 中的 storeMeta 进行修改即可,新 peer 会通过心跳机制进行创建。
当 removeNode 的命令 commit 之后,与 addNode 命令不同的是,我们需要显式调用 destroyPeer() 函数来停止相应的 raft 模块。这时需要注意的一个点时,当 Region 中只剩下两个节点,要从这两个节点中移除一个时,如果有一个节点挂了,会使整个集群不可用,特别是要移除的节点是 leader 本身。
在测试中会遇到这样的问题:当 Region 中只剩下节点 A(leader)和 节点 B(follower),当 removeNode A 的命令被 commit 之后,leader 就进行自我销毁,如果这个时候进入了 unreliable 的状态,那么 leader 就有可能无法在 destory 之前通过 heartbeat 去更新 follower 的 commitIndex。这样使得 follower B 不知道 leader A 已经被移除,就算发起选举也无法收到节点 A 的 vote,最终无法成功,导致 request timeout。
对于 split, 需要注意:
本部分主要涉及对收集到的心跳信息进行选择性维护和对 balance-region 策略的具体实现两个工作,主要涉及修改的代码文件是 cluster.go 和 balance_region.go
对于维护心跳信息,按照以下流程执行即可:
对于 balance-region 策略的实现,按照以下步骤执行即可:
Multi-Raft 是分布式 KV 可以 scale out 的基石。TiKV 对每个 region 的 conf change 和 transfer leader 功能能够将 region 动态的在所有 store 上进行负载均衡,对 region 的 split 和 merge 则是能够解决单 region 热点并无用工作损耗资源的问题。不得不说,后两者尽管道理上理解起来很简单,但工程实现上有太多细节要考虑了(据说贵司写了好几年才稳定),分析可能的异常情况实在是太痛苦了,为贵司能够啃下这块硬骨头点赞。
最近看到有一个基于 TiKV 的 hackathon 议题,其本质是想通过更改线程模型来优化 TiKV 的写入性能、性能稳定性和自适应能力。这里可以简单提提一些想法,其实就我们在时序数据库方向的一些经验来说,每个 TSM(TimeSeries Merge Tree)大概能够用满一个核的 CPU 资源。只要我们将 TSM 引擎额个数与 CPU 核数绑定,写入性能基本是能够随着核数增加而线性提升的。那么对于 KV 场景,是否开启 CPU 个数的 LSM 引擎能够更好的利用 CPU 资源呢?即对于 raftstore,是否启动 CPU 个数的 Rocksdb 实例能够更好的利用资源呢?感觉这里也可以做做测试尝试一下。
负载均衡是分布式系统中的一大难题,不同系统均有不同的策略实现,不同的策略可能在不同的 workload 中更有效。
相比 pd 的实现,我们在 lab3c 实现的策略实际上很 trivial,因此我们简单学习了 pd 调度 region 的 策略。尽管这些策略道理上理解起来都比较简单,但如何将所有统计信息准确的量化成一个动态模型却是一件很难尽善尽美的事,这中间的很多指标也只能是经验值,没有严谨的依据。
有关负载均衡我们对学术界的相关工作还不够了解,之后有时间会进行一些关注。
本 Lab 整体相对简单,在基本了解 MVCC, 2PC 和 Percolator 后便可动手了,面向测试用例编程即可。
本部分是对 mvcc 模块的实现,主要涉及修改的代码文件是 transaction.go。需要利用对 CFLock, CFDefault 和 CFWrite 三个 CF 的一些操作来实现 mvcc。
针对 Lock 相关的函数:
针对 Value 相关的函数:
针对 Write 相关的函数:
本部分是对 Percolator 算法 KVPreWrite, KVCommit 和 KVGet 三个方法的实现,主要涉及修改的代码文件是 server.go, query.go 和 nonquery.go。
本部分是对 Percolator 算法 KvCheckTxnStatus, KvBatchRollback, KvResolveLock 和 KvScan 四个方法的实现,主要涉及修改的代码文件是 server.go, query.go 和 nonquery.go。
为了使得 server.go 逻辑代码清晰,在分别完成三个 lab 后对代码进行了进一步整理,针对读写请求分别抽象出来了接口,这样可以使得逻辑更为清晰。
1 | |
有关分布式事务,我们之前有过简单的 学习,对 2PL, 2PC 均有简单的了解,因此此次在实现 Percolator 时只需要关注 2PC 与 MVCC 的结合即可,这里重点参考了以下博客:
实现完后,我们进一步被 Google 的聪明所折服,Percolator 基于单行事务实现了多行事务,基于 MVCC 实现了 SI 隔离级别。尽管其事务恢复流程相对复杂,但其本质上是在 CAP 定理中通过牺牲恢复时的 A 来优化了协调者正常写入时的 A,即协调者单点在 SQL 层不用高可用来保证最终执行 commit 或者 abort。因为一旦协调者节点挂掉,该事务在超过 TTL (TTL 的超时也是由 TSO 的时间戳来判断,对于各个 TiKV 节点来说均为逻辑时钟,这样的设计也避免了 Wall Clock 的同步难题)后会被其他事务 rollback,总体上来看 Percolator 比较优雅的解决了 2PC 的 safety 问题。
当然,分布式事务可以深究的地方还很多,并且很多思想都与 Lamport 那篇最著名的论文 Time, Clocks, and the Ordering of Events in a Distributed System 有关。除了 TiDB 外,Spanner,YugaByte,CockroachDB 等 NewSQL 数据库均有自己的大杀器,比如 TrueTime,HLC 等等。总之这块儿挺有意思的,虽然在这儿告一段落,但希望以后有机会能深入做一些相关工作。
实现一个稳定的分布式系统实在是太有挑战太有意思啦。
感谢 PingCAP 社区提供如此优秀的课程!