TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。
作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。
TiKV 官方源码解析文档 详细地介绍了 TiKV 3.x 版本重要模块的设计要点,主要流程和相应代码片段,是学习 TiKV 源码必读的学习资料。当前 TiKV 已经迭代到了 6.x 版本,不仅引入了很多新的功能和优化,而且对源码也进行了多次重构,因而一些官方源码解析文档中的代码片段已经不复存在,这使得读者在阅读源码解析文档时无法对照最新源码加深理解;此外尽管 TiKV 官方源码解析文档系统地介绍了若干重要模块的工作,但并没有将读写流程全链路串起来去介绍经过的模块和对应的代码片段,实际上尽快地熟悉读写流程全链路会更利于新同学从全局角度理解代码。
基于以上存在的问题,笔者将基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:
希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。
本文为第三篇博客,将主要介绍 TiKV 中一条写请求的全链路流程。
以下四篇博客由上到下分别介绍了 TiKV 3.x 版本 KVService,Storage 和 RaftStore 模块对于分布式事务请求的执行流程。
本小节将在 TiKV 6.1 版本的基础上,以一条 PreWrite 请求为例,介绍当前版本的写请求全链路执行流程。
在 KVService 层,通过 handle_request 和 txn_command_future 宏,PreWrite 接口的请求会直接被路由到 Storage::sched_txn_command 函数中。
1 | |
在 Storage 模块,其会将请求路由到 Scheduler::run_cmd 函数中,并进一步路由到 Scheduler::schedule_command 函数中。在 schedule_command 函数中,当前 command 连同 callback 等上下文会被保存到 task_slots 中,如果当前线程申请到了所有 latch 则会调用 execute 函数继续执行该 task,否则如前文所述,当前任务便会被阻塞在某些 latch 上等待其他线程去唤醒进而执行,当前线程会直接返回并执行其他的工作。
1 | |
在 execute 函数中,当前线程会生成一个异步任务 spawn 到另一个 worker 线程池中去,该任务主要包含以下两个步骤:
Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await 获取 snapshot。此步骤与上文读流程中获取 snapshot 的步骤相同,可能通过 ReadLocal 也可能通过 ReadIndex 来获取引擎的 snapshot,此小节不在赘述sched.process(snapshot, task).await 基于获取到的 snapshot 和对应 task 去调用 scheduler::process 函数,进而被路由到 scheduler::process_write 函数中1 | |
scheduler::process_write 函数是事务处理的关键函数,目前已经有近四百行,里面夹杂了很多新特性和新优化的复杂逻辑,其中最重要的逻辑有两个:
task.cmd.process_write(snapshot, context).map_err(StorageError::from) 根据 snapshot 和 task 执行事务对应的语义:可以从 Command::process_write 函数看到不同的请求都有不同的实现,每种请求都可能根据 snapshot 去底层获取一些数据并尝试写入一些数据。有关 PreWrite 和其他请求的具体操作可以参照 TiKV 源码解析系列文章(十二)分布式事务,此处不再赘述。需要注意的是,此时的写入仅仅缓存在了 WriteData 中,并没有对底层引擎进行实际修改。engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb) 将缓存的 WriteData 实际写入到 engine 层,对于 RaftKV 来说则是表示一次 propose,想要对这一批 WriteData commit 且 apply1 | |
进入 raftkv::async_write_ext 函数后,其进而通过 raftkv::exec_write_requests -> RaftStoreRouter::send_command 的调用栈将 task 连带 callback 发送给 RaftBatchSystem 交由 RaftStore 模块处理。
1 | |
直接定位到 RaftPoller 的 handle_normal 函数。
与处理 ReadIndex 请求相似, RaftPoller 会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 PeerFsmDelegate::handle_msgs 函数进行处理,
这里依然只列出了我们需要关注的几种消息类型:
对于 PreWrite 请求,其会进入 PeerMsg::RaftCommand(cmd) 分支,进而以 PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal -> Peer::propose -> Peer::propose_normal 的调用链最终被 propose 到 raft-rs 的 RawNode 接口中,同时其 callback 会连带该请求的 logIndex 被 push 到该 Peer 的 proposals 中去。
1 | |
在调用完 PeerFsmDelegate::handle_msgs 处理完消息后,会再调用 PeerFsmDelegate::collect_ready() 函数,进而进入 Peer::handle_raft_ready_append 函数。在该函数中会收集 normal 状态机的一次 ready,接着对需要持久化的未提交日志进行持久化(延后攒批),需要发送的消息进行异步发送,需要应用的已提交日志发送给 ApplyBatchSystem。
在三副本情况下,该 PreWrite 请求会存在于本次 ready 需要持久化的日志和需要发往其他两个 peer 的 message 中,对于 message,一旦收到就会 spawn 给 Transport 让其异步发送,对于持久化,在不开启 async-io 的情况下,数据会被暂存到内存中在当前 loop 结尾的 end 函数中实际写入到底层引擎中去。
1 | |
等到任何一个 follower 返回确认后,该 response 会被路由到 RaftBatchSystem,PollHandler 在接下来的一次 loop 中对其进行处理,该请求会被路由到 PeerFsmDelegate::handle_msgs 函数的 PeerMsg::RaftMessage(msg) 分支中,进而调用 step 函数交给 raft-rs 状态机进行处理。
由于此时已经满足了 quorum 的写入,raft-rs 会将该 PreWrite 请求对应的 raftlog 进行提交并在下一次被获取 ready 时返回,在本轮 loop 的 PeerFsmDelegate::collect_ready() 函数及 Peer::handle_raft_ready_append 函数中,会调用 self.handle_raft_committed_entries(ctx, ready.take_committed_entries()) 函数。在该函数中,其会根据已提交日志从 Peer 的 proposals 中获取到对应的 callback,连带这一批所有的已提交日志构建一个 Apply Task 通过 apply_router 发送给 ApplyBatchSystem。
1 | |
此时直接定位到 ApplyPoller 的 handle_normal 函数,可以看到,ApplyPoller 也会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 ApplyFSM::handle_tasks 函数进行处理。然后其会经历 ApplyFSM::handle_apply -> ApplyDelegate::handle_raft_committed_entries 的调用链来到 ApplyDelegate::handle_raft_entry_normal 函数中,在该函数中,会尝试将调用 ApplyDelegate::process_raft_cmd 函数来将本次写入缓存到 kv_write_batch 中,值得一提的是,在写入缓存之前会首先判断是否能够进行一次提交,如果可以则需要在写入缓存之前将这一批日志提交到底层引擎。
1 | |
那么为什么不像 RaftBatchSystem 一样在 end 函数中统一进行攒批提交呢?原因是此时只要攒够一定的大小不对底层引擎造成过大的负载就可以快速提交并返回客户端了,等到最后再去处理只会增加写入延时而没有太大的收益。
让我们阅读一下提交 batch 的逻辑,其会经由 ApplyContext::commit -> ApplyContext::commit_opt 的调用链来到 ApplyContext::write_to_db 函数,在该函数中,会调用 self.kv_wb_mut().write_opt(&write_opts) 函数将该 WriteBatch 提交到底层引擎,接着在最后调用 cb.invoke_with_response(resp) 来执行 callback 尽快返回客户端。
1 | |
在 ApplyPoller 一轮 loop 结尾的 end 函数中,其会调用 ApplyContext::flush 函数,进而通过 self.notifier.notify(apply_res) 将 ApplyRes 重新发送到 RaftBatchSystem 中去,进而更新某些内存结构,此处不再赘述。
1 | |
通过本小节,希望您能够了解 PreWrite 请求的完整流程,并进而具备分析其他写请求全链路的能力。
本篇博客介绍了 TiKV 中一条写请求的全链路流程。
希望本博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。
感谢您的阅读~
TiKV 是一个支持事务的分布式 Key-Value 数据库,目前已经是 CNCF 基金会 的顶级项目。
作为一个新同学,需要一定的前期准备才能够有能力参与 TiKV 社区的代码开发,包括但不限于学习 Rust 语言,理解 TiKV 的原理和在前两者的基础上了解熟悉 TiKV 的源码。
TiKV 官方源码解析文档 详细地介绍了 TiKV 3.x 版本重要模块的设计要点,主要流程和相应代码片段,是学习 TiKV 源码必读的学习资料。当前 TiKV 已经迭代到了 6.x 版本,不仅引入了很多新的功能和优化,而且对源码也进行了多次重构,因而一些官方源码解析文档中的代码片段已经不复存在,这使得读者在阅读源码解析文档时无法对照最新源码加深理解;此外尽管 TiKV 官方源码解析文档系统地介绍了若干重要模块的工作,但并没有将读写流程全链路串起来去介绍经过的模块和对应的代码片段,实际上尽快地熟悉读写流程全链路会更利于新同学从全局角度理解代码。
基于以上存在的问题,笔者将基于 6.1 版本的源码撰写三篇博客,分别介绍以下三个方面:
希望此三篇博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。
本文为第三篇博客,将主要介绍 TiKV 中一条写请求的全链路流程。
以下四篇博客由上到下分别介绍了 TiKV 3.x 版本 KVService,Storage 和 RaftStore 模块对于分布式事务请求的执行流程。
本小节将在 TiKV 6.1 版本的基础上,以一条 PreWrite 请求为例,介绍当前版本的写请求全链路执行流程。
在 KVService 层,通过 handle_request 和 txn_command_future 宏,PreWrite 接口的请求会直接被路由到 Storage::sched_txn_command 函数中。
1 | |
在 Storage 模块,其会将请求路由到 Scheduler::run_cmd 函数中,并进一步路由到 Scheduler::schedule_command 函数中。在 schedule_command 函数中,当前 command 连同 callback 等上下文会被保存到 task_slots 中,如果当前线程申请到了所有 latch 则会调用 execute 函数继续执行该 task,否则如前文所述,当前任务便会被阻塞在某些 latch 上等待其他线程去唤醒进而执行,当前线程会直接返回并执行其他的工作。
1 | |
在 execute 函数中,当前线程会生成一个异步任务 spawn 到另一个 worker 线程池中去,该任务主要包含以下两个步骤:
Self::with_tls_engine(|engine| Self::snapshot(engine, snap_ctx)).await 获取 snapshot。此步骤与上文读流程中获取 snapshot 的步骤相同,可能通过 ReadLocal 也可能通过 ReadIndex 来获取引擎的 snapshot,此小节不在赘述sched.process(snapshot, task).await 基于获取到的 snapshot 和对应 task 去调用 scheduler::process 函数,进而被路由到 scheduler::process_write 函数中1 | |
scheduler::process_write 函数是事务处理的关键函数,目前已经有近四百行,里面夹杂了很多新特性和新优化的复杂逻辑,其中最重要的逻辑有两个:
task.cmd.process_write(snapshot, context).map_err(StorageError::from) 根据 snapshot 和 task 执行事务对应的语义:可以从 Command::process_write 函数看到不同的请求都有不同的实现,每种请求都可能根据 snapshot 去底层获取一些数据并尝试写入一些数据。有关 PreWrite 和其他请求的具体操作可以参照 TiKV 源码解析系列文章(十二)分布式事务,此处不再赘述。需要注意的是,此时的写入仅仅缓存在了 WriteData 中,并没有对底层引擎进行实际修改。engine.async_write_ext(&ctx, to_be_write, engine_cb, proposed_cb, committed_cb) 将缓存的 WriteData 实际写入到 engine 层,对于 RaftKV 来说则是表示一次 propose,想要对这一批 WriteData commit 且 apply1 | |
进入 raftkv::async_write_ext 函数后,其进而通过 raftkv::exec_write_requests -> RaftStoreRouter::send_command 的调用栈将 task 连带 callback 发送给 RaftBatchSystem 交由 RaftStore 模块处理。
1 | |
直接定位到 RaftPoller 的 handle_normal 函数。
与处理 ReadIndex 请求相似, RaftPoller 会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 PeerFsmDelegate::handle_msgs 函数进行处理,
这里依然只列出了我们需要关注的几种消息类型:
对于 PreWrite 请求,其会进入 PeerMsg::RaftCommand(cmd) 分支,进而以 PeerFsmDelegate::propose_raft_command -> PeerFsmDelegate::propose_raft_command_internal -> Peer::propose -> Peer::propose_normal 的调用链最终被 propose 到 raft-rs 的 RawNode 接口中,同时其 callback 会连带该请求的 logIndex 被 push 到该 Peer 的 proposals 中去。
1 | |
在调用完 PeerFsmDelegate::handle_msgs 处理完消息后,会再调用 PeerFsmDelegate::collect_ready() 函数,进而进入 Peer::handle_raft_ready_append 函数。在该函数中会收集 normal 状态机的一次 ready,接着对需要持久化的未提交日志进行持久化(延后攒批),需要发送的消息进行异步发送,需要应用的已提交日志发送给 ApplyBatchSystem。
在三副本情况下,该 PreWrite 请求会存在于本次 ready 需要持久化的日志和需要发往其他两个 peer 的 message 中,对于 message,一旦收到就会 spawn 给 Transport 让其异步发送,对于持久化,在不开启 async-io 的情况下,数据会被暂存到内存中在当前 loop 结尾的 end 函数中实际写入到底层引擎中去。
1 | |
等到任何一个 follower 返回确认后,该 response 会被路由到 RaftBatchSystem,PollHandler 在接下来的一次 loop 中对其进行处理,该请求会被路由到 PeerFsmDelegate::handle_msgs 函数的 PeerMsg::RaftMessage(msg) 分支中,进而调用 step 函数交给 raft-rs 状态机进行处理。
由于此时已经满足了 quorum 的写入,raft-rs 会将该 PreWrite 请求对应的 raftlog 进行提交并在下一次被获取 ready 时返回,在本轮 loop 的 PeerFsmDelegate::collect_ready() 函数及 Peer::handle_raft_ready_append 函数中,会调用 self.handle_raft_committed_entries(ctx, ready.take_committed_entries()) 函数。在该函数中,其会根据已提交日志从 Peer 的 proposals 中获取到对应的 callback,连带这一批所有的已提交日志构建一个 Apply Task 通过 apply_router 发送给 ApplyBatchSystem。
1 | |
此时直接定位到 ApplyPoller 的 handle_normal 函数,可以看到,ApplyPoller 也会首先尝试获取 messages_per_tick 次路由到该状态机的消息,接着调用 ApplyFSM::handle_tasks 函数进行处理。然后其会经历 ApplyFSM::handle_apply -> ApplyDelegate::handle_raft_committed_entries 的调用链来到 ApplyDelegate::handle_raft_entry_normal 函数中,在该函数中,会尝试将调用 ApplyDelegate::process_raft_cmd 函数来将本次写入缓存到 kv_write_batch 中,值得一提的是,在写入缓存之前会首先判断是否能够进行一次提交,如果可以则需要在写入缓存之前将这一批日志提交到底层引擎。
1 | |
那么为什么不像 RaftBatchSystem 一样在 end 函数中统一进行攒批提交呢?原因是此时只要攒够一定的大小不对底层引擎造成过大的负载就可以快速提交并返回客户端了,等到最后再去处理只会增加写入延时而没有太大的收益。
让我们阅读一下提交 batch 的逻辑,其会经由 ApplyContext::commit -> ApplyContext::commit_opt 的调用链来到 ApplyContext::write_to_db 函数,在该函数中,会调用 self.kv_wb_mut().write_opt(&write_opts) 函数将该 WriteBatch 提交到底层引擎,接着在最后调用 cb.invoke_with_response(resp) 来执行 callback 尽快返回客户端。
1 | |
在 ApplyPoller 一轮 loop 结尾的 end 函数中,其会调用 ApplyContext::flush 函数,进而通过 self.notifier.notify(apply_res) 将 ApplyRes 重新发送到 RaftBatchSystem 中去,进而更新某些内存结构,此处不再赘述。
1 | |
通过本小节,希望您能够了解 PreWrite 请求的完整流程,并进而具备分析其他写请求全链路的能力。
本篇博客介绍了 TiKV 中一条写请求的全链路流程。
希望本博客能够帮助对 TiKV 开发感兴趣的新同学尽快了解 TiKV 的 codebase。
感谢您的阅读~