hashicorp raft 源码不多,抛除 boltdb 存储相关的只有 1万多行代码,不用一周时间就能读透所有细节。来看一下论文提到的几个问题如何实现
新的 Leader 当选后不知道 follower 接收了哪些 logs, AppendEntries
时一定报错,resp 里携带 follower 的 getLastIndex 加速重传, 参考函数 replicateTo
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
......
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true
}
// Update the last contact
s.setLastContact()
// Update s based on success
if resp.Success {
......
} else {
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
......
如果 append 失败,那么根据 LastLog 来更新下一次待发送该 follower 的 nextIndex
当前 hashicorp raft 实现做不到 exactly once, 比如 client 发起了 apply 请求,当 fsm 应用完数据返回前,client 与 raft 断开了连接,client 此时收不到 resp, 重试的话就会造成请求重复,如果非幂等操作,一定会有问题
paper 5.4.2 提到不能 Committing entries from previous terms, 那么如何确保之前的数据己经提交 match 了呢?根据 log matching property, 只需新当选的 leader 使用最新的 term 提交一条 no-op 日志即可
func (r *Raft) runLeader() {
r.logger.Info("entering leader state", "leader", r)
metrics.IncrCounter([]string{"raft", "state", "leader"}, 1)
......
// Start a replication routine for each peer
r.startStopReplication()
// Dispatch a no-op log entry first. This gets this leader up to the latest
// possible commit index, even in the absence of client commands. This used
// to append a configuration entry instead of a noop. However, that permits
// an unbounded number of uncommitted configurations in the log. We now
// maintain that there exists at most one uncommitted configuration entry in
// any log, so we have to do proper no-ops here.
noop := &logFuture{
log: Log{
Type: LogNoop,
},
}
r.dispatchLogs([]*logFuture{noop})
// Sit in the leader loop until we step down
r.leaderLoop()
}
对应到代码里 runLeader
在开启 loop 前主动 dispatch 一个 noop logs
当网络分区时,某个 follower term 会一直增大,然后发起 election 到 raft 集群中,将更高的 term 传播到集群,leader 发现更高的 term 后退步成 follower, raft 是一个 CP 系统,频繁 leader 选举会造成集群无法做到稳定提供服务。
raft 作者 paper 提到了一种 Preventing disruptions when a server rejoins the cluster 就是 PreVote, 当节点无法收到 leader 心跳时,先做一次 PreVote, 并不递增 term, 如果收到大多数节点同意后,再真正的做 Vote. hashicorp raft 实现了 PreVote 嘛?并没有。解决了 high term disruptions 问题嘛?我们来看代码
// requestVote is invoked when we get an request vote RPC call.
func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now())
r.observe(*req)
// Setup a response
resp := &RequestVoteResponse{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Granted: false,
}
var rpcErr error
defer func() {
rpc.Respond(resp, rpcErr)
}()
// Check if we have an existing leader [who's not the candidate] and also
// check the LeadershipTransfer flag is set. Usually votes are rejected if
// there is a known leader. But if the leader initiated a leadership transfer,
// vote!
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate && !req.LeadershipTransfer {
r.logger.Warn("rejecting vote request since we have a leader",
"from", candidate,
"leader", leader)
return
}
......
}
也就是说当不是强制 leader 转移时,如果 follower 的函数 requestVote
发现有 leader 了,那么会拒绝本次 candidate 的选举请求。此时 candidate 会退步成 follower, 如果此时真正的 leader 与 higher term follower 网络连通了,disruptions
现象一定会发生
什么是 Multi-Group
?拿 tikv 举例子,数据按照 key 的二进制排序是一个平坦的 (-∞,+∞) 区间,对这些 keys 空间按照 64MB 大小划分 shard, 每个 shard 三副本分部在不同物理机上,一组 shard 使用 raft 来做选举,也就是说一台机器上可能有成千上万个 raft group
hashicorp 是典型的 Single-Group
raft library, 如果直接用做 Multi-Group
场景,goroutine, 定时器满天飞,由于底层 transport 也不能复用,三台机器两两相连占用的 sockets 就会成千上万,性能相当的差
那么 Multi-Group
相对于 hashicorp 要做哪些优化呢?我们以后分析 dragonboat 和 tikv raft 后再看
关于什么是线性一致性网上有很多,大家可以搜一下,满足 linearizability 条件的分布式系统就像单个节点一样,永远不会读到过期 stale 数据(寄存器读),但是在 read 操作的 (invoke, resp) 时序区间如果有写入操作,此时可能读到不一样的数据,也符合线性一致性。
因为 raft 节点间可能有网络分区,产生多个 leader, 所以要用心跳来确认当前 leader 节点。此外在 read 发起时刻的 commitIndex 要确保 apply 到 fsm, 此时再从 fsm 中读数据才不是过期的,这个就叫做 readIndex
,当然 raft 论文还有优化版的 leaseRead, tikv 就采用这一方案,以后再讲这块。
先看 hashicorp 实现,consul[1] 有一段代码展示如何做到线性一致性读
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
future := s.raft.VerifyLeader()
if err := future.Error(); err != nil {
return err //fail fast if leader verification fails
}
......
}
略去无关代码,实际上就是调用 VerifyLeader
, 然后 future.Error()
阻塞这里等待返回结果,如果没有报错,说明当前节点是 leader
func (r *Raft) VerifyLeader() Future {
metrics.IncrCounter([]string{"raft", "verify_leader"}, 1)
verifyFuture := &verifyFuture{}
verifyFuture.init()
select {
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.verifyCh <- verifyFuture:
return verifyFuture
}
}
简单的发送一个 verifyFuture
请求到 r.verifyCh
中,我们来看一下实现。
func (r *Raft) leaderLoop() {
for r.getState() == Leader {
select {
......
case v := <-r.verifyCh:
if v.quorumSize == 0 {
// Just dispatched, start the verification
r.verifyLeader(v)
} else if v.votes < v.quorumSize {
// Early return, means there must be a new leader
r.logger.Warn("new leader elected, stepping down")
r.setState(Follower)
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(ErrNotLeader)
} else {
// Quorum of members agree, we are still leader
delete(r.leaderState.notify, v)
for _, repl := range r.leaderState.replState {
repl.cleanNotify(v)
}
v.respond(nil)
}
......
runFollower
和 runCandidate
不接受 r.verifyCh
数据,直接返回报错,也就是说 hashicorp 的实现不允许 follower 读数据。所以直接看 runLeader
实现即可。
quorumSize
为 0, 说明是第一次接收请求,还没有获取 majority 数据,应该设置成 (n+1)/2. 调用 verifyLeader
开始验证votes
小于 quorumSize
, 那一定不是 leader, 可以提前返回了votes
大于等于 quorumSize
,那也可以提前确认是 leaderfunc (r *Raft) verifyLeader(v *verifyFuture) {
// Current leader always votes for self
v.votes = 1
// Set the quorum size, hot-path for single node
v.quorumSize = r.quorumSize()
if v.quorumSize == 1 {
v.respond(nil)
return
}
// Track this request
v.notifyCh = r.verifyCh
r.leaderState.notify[v] = struct{}{}
// Trigger immediate heartbeats
for _, repl := range r.leaderState.replState {
repl.notifyLock.Lock()
repl.notify[v] = struct{}{}
repl.notifyLock.Unlock()
asyncNotifyCh(repl.notifyCh)
}
}
verifyLeader
首先给自己投票,所以 votes 默认设置 1每个 follower 的 replicate
函数会启动一个心跳 goroutine, 除了定时发送以外,还会监听 notifyCh, 立刻触发心跳
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
var failures uint64
req := AppendEntriesRequest{
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localID, r.localAddr),
}
var resp AppendEntriesResponse
for {
// Wait for the next heartbeat interval or forced notify
select {
case <-s.notifyCh:
case <-randomTimeout(r.conf.HeartbeatTimeout / 10):
case <-stopCh:
return
}
start := time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
......
} else {
s.setLastContact()
failures = 0
metrics.MeasureSince([]string{"raft", "replication", "heartbeat", string(s.peer.ID)}, start)
s.notifyAll(resp.Success)
}
}
}
// notifyAll is used to notify all the waiting verify futures
// if the follower believes we are still the leader.
func (s *followerReplication) notifyAll(leader bool) {
// Clear the waiting notifies minimizing lock time
s.notifyLock.Lock()
n := s.notify
s.notify = make(map[*verifyFuture]struct{})
s.notifyLock.Unlock()
// Submit our votes
for v := range n {
v.vote(leader)
}
}
AppendEntries
函数返回后,会调用 notifyAll
发起投票请求, s.notify
是一个 map, 所以一次心跳确认 leader, 会服务很多读请求。另外 follower 的 transport 会特殊处理心跳消息,参考函数 handleCommand
, 会直接调用 processHeartbeat
来验证心跳消息,不走 raft 状态机
func (v *verifyFuture) vote(leader bool) {
v.voteLock.Lock()
defer v.voteLock.Unlock()
// Guard against having notified already
if v.notifyCh == nil {
return
}
if leader {
v.votes++
if v.votes >= v.quorumSize {
v.notifyCh <- v
v.notifyCh = nil
}
} else {
v.notifyCh <- v
v.notifyCh = nil
}
}
获得的票数 votes++, 如果达到了 quorumSize, 把自己发送到 v.notifyCh 并置为 nil, 因为己经有结果了。另外如果被一个拒绝了,那么也可以提前返回证明不是 leader 了。最后触发 leaderLoop 的 r.verifyCh 的分支, 获得了 quorumSize 后 v.respond
解除客户端阻塞
看完了源码,心跳确认己经实现了,那么如何确保 readIndex 呢?issues-359[2] 有人问如何用 hashicorp 实现线性一致性,作者有回复
Another subtly: the leader has to get passed its write barrier: it has to have finished applying committed logs to it's FSM before it can proceed.
作者说还需要调用 Barrier()
来确保之前 committed logs apply 到 fsm 中,也就是发送类型 LogBarrier 的日志走一遍状态机流程。个人认为这是很慢的,因为只要确保发起请求时的 commitIndex apply 到 fsm 即满足线性一致性,如果再调用一次 Barrier()
虽然也保证了,但是会额外 apply 这中间堆积的的所有 uncommitted logs, 所以实现不高效
hashicorp 不允许 follower 读数据,如果允许的话,怎么实现 follower read Linearizability 呢?其实业界实现,还是得走 leader, commitIndex 也要从 leader 获取后才能在 follower 中读取。etcd 实现了 follower read, 以后分析 etcd 源码时再看
hashicorp raft 源码分析就到这了,总结一下吧,开箱即用,易懂。想学习 raft 的建议通读,如果想用到高并发工程上还有点距离,需要做下优化,至于 Multi-Group 那就更不适合了
[1]consul linearizability: https://github.com/hashicorp/consul/blob/89158c7a7665a48735ecf8541b72c83c860fe195/agent/consul/rpc.go#L524,
[2]issues-359: https://github.com/hashicorp/raft/issues/359,
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/xbxfdDpc9Jw5y76fWPHBdQ
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。