上文讲到 hashicorp[1] raft 如何做 leader 选举,transport 层如何工作的,本次看下如何做日志复制 log replication
来看一下日志同步的函数 AppendEntriesRequest
和 AppendEntriesResponse
type AppendEntriesRequest struct {
// Provide the current term and leader
Term uint64
Leader []byte
// Provide the previous entries for integrity checking
PrevLogEntry uint64
PrevLogTerm uint64
// New entries to commit
Entries []*Log
// Commit index on the leader
LeaderCommitIndex uint64
}
Term
当前 leader 的任期,Leader
是 leader idPrevLogEntry
和 PrevLogTerm
是 leader 知道的该 follower 上一次得到日志的索引和任期号,用于日志一致性检测Entries
本次 rpc 调用发送的所有日志条目,因为允许 batch 发送,所以是数组LeaderCommitIndex
当前 leader commit 的日志序号,follower 得到后可以 apply 这之前的所有日志type AppendEntriesResponse struct {
// Newer term if leader is out of date
Term uint64
// Last Log is a hint to help accelerate rebuilding slow nodes
LastLog uint64
// We may not succeed if we have a conflicting entry
Success bool
// There are scenarios where this request didn't succeed
// but there's no need to wait/back-off the next attempt.
NoRetryBackoff bool
}
Term
当前 follower 知道的任期号,如果比 leader 的大,说明 leader 过期了,会重新触发选举LastLog
表示 follower 拥有的最新 log, 加速 leader 重传,否则每次都是 nextIndex-1 就太低效了Success
函数调用是否成功,NoRetryBackoff
表示是否需要重传我们先用 application 视角看如何写入共识模块
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
return r.ApplyLog(Log{Data: cmd}, timeout)
}
用户直接调用 Apply
函数,将 cmd 封装成 Log
调用 ApplyLog
写入,并且提供超时时间 timeout
// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
metrics.IncrCounter([]string{"raft", "apply"}, 1)
var timer <-chan time.Time
if timeout > 0 {
timer = time.After(timeout)
}
// Create a log future, no index or term yet
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: log.Data,
Extensions: log.Extensions,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
}
将 Log
封装到 Future
类里面,然后将 future 写到 raft 的通道 applyCh
里面,异步的等待处理。此时用户需要调用 logFuture.Error()
阻塞等待返回值即可。接下来看一下 raft 模块如何处理
NewRaft
启动后,会运行两个 goroutine r.run
和 r.runFSM
, r.run 根据 raft 节点不同角色来运行不同 raft 函数。r.runFSM 用于从 raft 中获取完成共识的 log, 然后应用到用户的 fsm,然后 logFuture.Error()
解除阻塞,获得数据,先看一下 runFSM
func (r *Raft) runFSM() {
var lastIndex, lastTerm uint64
batchingFSM, batchingEnabled := r.fsm.(BatchingFSM)
configStore, configStoreEnabled := r.fsm.(ConfigurationStore)
commitSingle := func(req *commitTuple) {
// Apply the log if a command or config change
var resp interface{}
// Make sure we send a response
defer func() {
// Invoke the future if given
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}()
switch req.log.Type {
case LogCommand:
start := time.Now()
resp = r.fsm.Apply(req.log)
metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start)
case LogConfiguration:
if !configStoreEnabled {
// Return early to avoid incrementing the index and term for
// an unimplemented operation.
return
}
start := time.Now()
configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data))
metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start)
}
// Update the indexes
lastIndex = req.log.Index
lastTerm = req.log.Term
}
commitBatch := func(reqs []*commitTuple) {
if !batchingEnabled {
for _, ct := range reqs {
commitSingle(ct)
}
return
}
// Only send LogCommand and LogConfiguration log types. LogBarrier types
// will not be sent to the FSM.
shouldSend := func(l *Log) bool {
switch l.Type {
case LogCommand, LogConfiguration:
return true
}
return false
}
var lastBatchIndex, lastBatchTerm uint64
sendLogs := make([]*Log, 0, len(reqs))
for _, req := range reqs {
if shouldSend(req.log) {
sendLogs = append(sendLogs, req.log)
}
lastBatchIndex = req.log.Index
lastBatchTerm = req.log.Term
}
var responses []interface{}
if len(sendLogs) > 0 {
start := time.Now()
responses = batchingFSM.ApplyBatch(sendLogs)
metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start)
metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs)))
// Ensure we get the expected responses
if len(sendLogs) != len(responses) {
panic("invalid number of responses")
}
}
// Update the indexes
lastIndex = lastBatchIndex
lastTerm = lastBatchTerm
var i int
for _, req := range reqs {
var resp interface{}
// If the log was sent to the FSM, retrieve the response.
if shouldSend(req.log) {
resp = responses[i]
i++
}
if req.future != nil {
req.future.response = resp
req.future.respond(nil)
}
}
}
restore := func(req *restoreFuture) {
......
}
snapshot := func(req *reqSnapshotFuture) {
......
}
for {
select {
case ptr := <-r.fsmMutateCh:
switch req := ptr.(type) {
case []*commitTuple:
commitBatch(req)
case *restoreFuture:
restore(req)
default:
panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr))
}
case req := <-r.fsmSnapshotCh:
snapshot(req)
case <-r.shutdownCh:
return
}
}
}
这个函数先看底下的 for 循环,主要是从 r.fsmMutateCh, r.fsmSnapshotCh 两个通道接收事件,snapshot 相关的先忽略,只看 commitSingle
, commitBatch
, 应用 Logs 到用户 fsm
commitSingle
很简单,应用单个 Log, cmd 类型是 LogCommand
的话,调用用户层 fsm.Apply
应用日志。如果是 LogConfiguration
, 那么 configStore 保存配置变更。commitBatch
如果用户 fsm 不支持 batch 操作,挨个调用 commitSingle
去应用单个 Log. 否则就可以调用 fsm.ApplyBatch
批量应用日志,可以提高并发这两步操作最后都要更新 lastIndex, lastTerm, 调用 req.future.respond
写 future, 解除 client 阻塞。至此看完了 raft 与 client 简单交互的逻辑。记住,r.applyCh
用于写用户请求,r.fsmMutateCh
用于读取数据返回到 application 层。
r.run 根据节点角色来调用不同函数,主要是 runLeader
, 先忽略其它,只看 leaderLoop
如何处理用户请求
// leaderLoop is the hot loop for a leader. It is invoked
// after all the various leader setup is done.
func (r *Raft) leaderLoop() {
// stepDown is used to track if there is an inflight log that
// would cause us to lose leadership (specifically a RemovePeer of
// ourselves). If this is the case, we must not allow any logs to
// be processed in parallel, otherwise we are basing commit on
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
for r.getState() == Leader {
select {
......
case newLog := <-r.applyCh:
if r.getLeadershipTransferInProgress() {
r.logger.Debug(ErrLeadershipTransferInProgress.Error())
newLog.respond(ErrLeadershipTransferInProgress)
continue
}
// Group commit, gather all the ready commits
ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:
break GROUP_COMMIT_LOOP
}
}
// Dispatch the logs
if stepDown {
// we're in the process of stepping down as leader, don't process anything new
for i := range ready {
ready[i].respond(ErrNotLeader)
}
} else {
r.dispatchLogs(ready)
}
......
}
}
}
select 有很多 case 分支,暂时只看 <-r.applyCh
部分
r.applyCh
通道中拿更多的 logFuture, 这样可以做 batch 操作,提高性能r.dispatchLogs
发送 ready 所有日志func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
now := time.Now()
defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)
term := r.getCurrentTerm()
lastIndex := r.getLastIndex()
n := len(applyLogs)
logs := make([]*Log, n)
metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))
for idx, applyLog := range applyLogs {
applyLog.dispatch = now
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
// Write the log entry locally
if err := r.logs.StoreLogs(logs); err != nil {
r.logger.Error("failed to commit logs", "error", err)
for _, applyLog := range applyLogs {
applyLog.respond(err)
}
r.setState(Follower)
return
}
r.leaderState.commitment.match(r.localID, lastIndex)
// Update the last log since it's on disk now
r.setLastLog(lastIndex, term)
// Notify the replicators of the new log
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
}
StoreLogs
保存持久化所有 logs, 如果报错,那么循环 respond, 设退到 follower 状态match
更新 commitment, 这个是用来判断大多数节点 commit 到哪个 index 上了,多数 commit 就可以 apply 了setLastLog
更新当前 leader 最新 log 的 index, termasyncNotifyCh
异步通知当前有新的 log 需要发送到所有 follower那么 f.triggerCh
哪来的呢?
刚才提到 runLeader
函数,里机会调用 startStopReplication
建立给所有 follower 发送消息的通道。我们来看一下
func (r *Raft) startStopReplication() {
inConfig := make(map[ServerID]bool, len(r.configurations.latest.Servers))
lastIdx := r.getLastIndex()
// Start replication goroutines that need starting
for _, server := range r.configurations.latest.Servers {
if server.ID == r.localID {
continue
}
inConfig[server.ID] = true
if _, ok := r.leaderState.replState[server.ID]; !ok {
r.logger.Info("added peer, starting replication", "peer", server.ID)
s := &followerReplication{
peer: server,
commitment: r.leaderState.commitment,
stopCh: make(chan uint64, 1),
triggerCh: make(chan struct{}, 1),
triggerDeferErrorCh: make(chan *deferError, 1),
currentTerm: r.getCurrentTerm(),
nextIndex: lastIdx + 1,
lastContact: time.Now(),
notify: make(map[*verifyFuture]struct{}),
notifyCh: make(chan struct{}, 1),
stepDown: r.leaderState.stepDown,
}
r.leaderState.replState[server.ID] = s
r.goFunc(func() { r.replicate(s) })
asyncNotifyCh(s.triggerCh)
r.observe(PeerObservation{Peer: server, Removed: false})
}
}
// Stop replication goroutines that need stopping
for serverID, repl := range r.leaderState.replState {
if inConfig[serverID] {
continue
}
// Replicate up to lastIdx and stop
r.logger.Info("removed peer, stopping replication", "peer", serverID, "last-index", lastIdx)
repl.stopCh <- lastIdx
close(repl.stopCh)
delete(r.leaderState.replState, serverID)
r.observe(PeerObservation{Peer: repl.peer, Removed: true})
}
}
followerReplication
用于追踪每个 follower 的同步状态。每个 follower 会起一个 replicate
的异步 goroutine让我们来看一下 replicate
函数
// replicate is a long running routine that replicates log entries to a single
// follower.
func (r *Raft) replicate(s *followerReplication) {
// Start an async heartbeating routing
stopHeartbeat := make(chan struct{})
defer close(stopHeartbeat)
r.goFunc(func() { r.heartbeat(s, stopHeartbeat) })
RPC:
shouldStop := false
for !shouldStop {
select {
......
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
// This is _not_ our heartbeat mechanism but is to ensure
// followers quickly learn the leader's commit index when
// raft commits stop flowing naturally. The actual heartbeats
// can't do this to keep them unblocked by disk IO on the
// follower. See https://github.com/hashicorp/raft/issues/282.
case <-randomTimeout(r.conf.CommitTimeout):
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
// If things looks healthy, switch to pipeline mode
if !shouldStop && s.allowPipeline {
goto PIPELINE
}
}
return
PIPELINE:
// Disable until re-enabled
s.allowPipeline = false
// Replicates using a pipeline for high performance. This method
// is not able to gracefully recover from errors, and so we fall back
// to standard mode on failure.
if err := r.pipelineReplicate(s); err != nil {
if err != ErrPipelineReplicationNotSupported {
r.logger.Error("failed to start pipeline replication to", "peer", s.peer, "error", err)
}
}
goto RPC
}
s.triggerCh
触发,说明有数据待发送到 follower, lastLogIdx 为当前 leader 最新的日志号,根据该 follower 的状态 s 来调用 replicateTo
发送pipelineReplicate
开启流水线发送,提高效率// replicateTo is a helper to replicate(), used to replicate the logs up to a
// given last index.
// If the follower log is behind, we take care to bring them up to date.
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
// Create the base request
var req AppendEntriesRequest
var resp AppendEntriesResponse
var start time.Time
START:
// Prevent an excessive retry rate on errors
if s.failures > 0 {
select {
case <-time.After(backoff(failureWait, s.failures, maxFailureScale)):
case <-r.shutdownCh:
}
}
// Setup the request
if err := r.setupAppendEntries(s, &req, atomic.LoadUint64(&s.nextIndex), lastIndex); err == ErrLogNotFound {
goto SEND_SNAP
} else if err != nil {
return
}
// Make the RPC call
start = time.Now()
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
r.logger.Error("failed to appendEntries to", "peer", s.peer, "error", err)
s.failures++
return
}
appendStats(string(s.peer.ID), start, float32(len(req.Entries)))
// Check for a newer term, stop running
if resp.Term > req.Term {
r.handleStaleTerm(s)
return true
}
// Update the last contact
s.setLastContact()
// Update s based on success
if resp.Success {
// Update our replication state
updateLastAppended(s, &req)
// Clear any failures, allow pipelining
s.failures = 0
s.allowPipeline = true
} else {
atomic.StoreUint64(&s.nextIndex, max(min(s.nextIndex-1, resp.LastLog+1), 1))
if resp.NoRetryBackoff {
s.failures = 0
} else {
s.failures++
}
r.logger.Warn("appendEntries rejected, sending older logs", "peer", s.peer, "next", atomic.LoadUint64(&s.nextIndex))
}
CHECK_MORE:
// Poll the stop channel here in case we are looping and have been asked
// to stop, or have stepped down as leader. Even for the best effort case
// where we are asked to replicate to a given index and then shutdown,
// it's better to not loop in here to send lots of entries to a straggler
// that's leaving the cluster anyways.
select {
case <-s.stopCh:
return true
default:
}
// Check if there are more logs to replicate
if atomic.LoadUint64(&s.nextIndex) <= lastIndex {
goto START
}
return
// SEND_SNAP is used when we fail to get a log, usually because the follower
// is too far behind, and we must ship a snapshot down instead
SEND_SNAP:
if stop, err := r.sendLatestSnapshot(s); stop {
return true
} else if err != nil {
r.logger.Error("failed to send snapshot to", "peer", s.peer, "error", err)
return
}
// Check if there is more to replicate
goto CHECK_MORE
}
setupAppendEntries
构建 rpc 请求的 req, 里面包含所有待发送的 logsAppendEntries
发送请求到网络,如果错误是日志不存在,那么跳转到 sendLatestSnapshot
发送快照setLastContact
更新该 follower 最新 contact 时间updateLastAppended
触发 commitment 发起 match 匹配调用leader 调用 replicateTo
发送完数据,会调用 updateLastAppended
触发 match 操作
func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {
// Mark any inflight logs as committed
if logs := req.Entries; len(logs) > 0 {
last := logs[len(logs)-1]
atomic.StoreUint64(&s.nextIndex, last.Index+1)
s.commitment.match(s.peer.ID, last.Index)
}
// Notify still leader
s.notifyAll(true)
}
更新当前 follower 的 s.nextIndex 下一次待发送日志号,调用 match 触发匹配操作。notifyAll
是用来确认 leader 的,以后再看
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
c.matchIndexes[server] = matchIndex
c.recalculate()
}
}
// Internal helper to calculate new commitIndex from matchIndexes.
// Must be called with lock held.
func (c *commitment) recalculate() {
if len(c.matchIndexes) == 0 {
return
}
matched := make([]uint64, 0, len(c.matchIndexes))
for _, idx := range c.matchIndexes {
matched = append(matched, idx)
}
sort.Sort(uint64Slice(matched))
quorumMatchIndex := matched[(len(matched)-1)/2]
if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
c.commitIndex = quorumMatchIndex
asyncNotifyCh(c.commitCh)
}
}
给所有 follower 的 matchIndex 做排序,过半 quorum 接收到的就可以 commit 了,更新 c.commitIndex, 然后通知 c.commitCh 可以 apply 后续操作了。
func (r *Raft) leaderLoop() {
// stepDown is used to track if there is an inflight log that
// would cause us to lose leadership (specifically a RemovePeer of
// ourselves). If this is the case, we must not allow any logs to
// be processed in parallel, otherwise we are basing commit on
// only a single peer (ourself) and replicating to an undefined set
// of peers.
stepDown := false
lease := time.After(r.conf.LeaderLeaseTimeout)
for r.getState() == Leader {
select {
......
case <-r.leaderState.commitCh:
// Process the newly committed entries
oldCommitIndex := r.getCommitIndex()
commitIndex := r.leaderState.commitment.getCommitIndex()
r.setCommitIndex(commitIndex)
......
start := time.Now()
var groupReady []*list.Element
var groupFutures = make(map[uint64]*logFuture)
var lastIdxInGroup uint64
// Pull all inflight logs that are committed off the queue.
for e := r.leaderState.inflight.Front(); e != nil; e = e.Next() {
commitLog := e.Value.(*logFuture)
idx := commitLog.log.Index
if idx > commitIndex {
// Don't go past the committed index
break
}
// Measure the commit time
metrics.MeasureSince([]string{"raft", "commitTime"}, commitLog.dispatch)
groupReady = append(groupReady, e)
groupFutures[idx] = commitLog
lastIdxInGroup = idx
}
// Process the group
if len(groupReady) != 0 {
r.processLogs(lastIdxInGroup, groupFutures)
for _, e := range groupReady {
r.leaderState.inflight.Remove(e)
}
}
......
}
}
}
忽略 leaderLoop
函数里其它分支,只看 r.leaderState.commitCh
消息
getCommitIndex
从 commitment 里获取最新的 commitIndex,中间的差值就是可以 commit 并 apply 的processLogs
处理日志,并且从 inFlight 队列中删除func (r *Raft) processLogs(index uint64, futures map[uint64]*logFuture) {
// Reject logs we've applied already
lastApplied := r.getLastApplied()
if index <= lastApplied {
r.logger.Warn("skipping application of old log", "index", index)
return
}
applyBatch := func(batch []*commitTuple) {
select {
case r.fsmMutateCh <- batch:
case <-r.shutdownCh:
for _, cl := range batch {
if cl.future != nil {
cl.future.respond(ErrRaftShutdown)
}
}
}
}
batch := make([]*commitTuple, 0, r.conf.MaxAppendEntries)
// Apply all the preceding logs
for idx := lastApplied + 1; idx <= index; idx++ {
var preparedLog *commitTuple
// Get the log, either from the future or from our log store
future, futureOk := futures[idx]
if futureOk {
preparedLog = r.prepareLog(&future.log, future)
} else {
l := new(Log)
if err := r.logs.GetLog(idx, l); err != nil {
r.logger.Error("failed to get log", "index", idx, "error", err)
panic(err)
}
preparedLog = r.prepareLog(l, nil)
}
switch {
case preparedLog != nil:
// If we have a log ready to send to the FSM add it to the batch.
// The FSM thread will respond to the future.
batch = append(batch, preparedLog)
// If we have filled up a batch, send it to the FSM
if len(batch) >= r.conf.MaxAppendEntries {
applyBatch(batch)
batch = make([]*commitTuple, 0, r.conf.MaxAppendEntries)
}
case futureOk:
// Invoke the future if given.
future.respond(nil)
}
}
// If there are any remaining logs in the batch apply them
if len(batch) != 0 {
applyBatch(batch)
}
// Update the lastApplied index and term
r.setLastApplied(index)
}
runFollower
调用 processRPC
处理接收到的请求
// processRPC is called to handle an incoming RPC request. This must only be
// called from the main thread.
func (r *Raft) processRPC(rpc RPC) {
if err := r.checkRPCHeader(rpc); err != nil {
rpc.Respond(nil, err)
return
}
switch cmd := rpc.Command.(type) {
case *AppendEntriesRequest:
r.appendEntries(rpc, cmd)
case *RequestVoteRequest:
r.requestVote(rpc, cmd)
case *InstallSnapshotRequest:
r.installSnapshot(rpc, cmd)
case *TimeoutNowRequest:
r.timeoutNow(rpc, cmd)
default:
r.logger.Error("got unexpected command",
"command", hclog.Fmt("%#v", rpc.Command))
rpc.Respond(nil, fmt.Errorf("unexpected command"))
}
}
我们只看 appendEntries
就好
// appendEntries is invoked when we get an append entries RPC call. This must
// only be called from the main thread.
func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
// Setup a response
resp := &AppendEntriesResponse{
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
LastLog: r.getLastIndex(),
Success: false,
NoRetryBackoff: false,
}
var rpcErr error
defer func() {
rpc.Respond(resp, rpcErr)
}()
// Ignore an older term
if a.Term < r.getCurrentTerm() {
return
}
// Increase the term if we see a newer one, also transition to follower
// if we ever get an appendEntries call
if a.Term > r.getCurrentTerm() || r.getState() != Follower {
// Ensure transition to follower
r.setState(Follower)
r.setCurrentTerm(a.Term)
resp.Term = a.Term
}
// Save the current leader
r.setLeader(ServerAddress(r.trans.DecodePeer(a.Leader)))
// Verify the last log entry
if a.PrevLogEntry > 0 {
lastIdx, lastTerm := r.getLastEntry()
var prevLogTerm uint64
if a.PrevLogEntry == lastIdx {
prevLogTerm = lastTerm
} else {
var prevLog Log
if err := r.logs.GetLog(a.PrevLogEntry, &prevLog); err != nil {
r.logger.Warn("failed to get previous log",
"previous-index", a.PrevLogEntry,
"last-index", lastIdx,
"error", err)
resp.NoRetryBackoff = true
return
}
prevLogTerm = prevLog.Term
}
if a.PrevLogTerm != prevLogTerm {
r.logger.Warn("previous log term mis-match",
"ours", prevLogTerm,
"remote", a.PrevLogTerm)
resp.NoRetryBackoff = true
return
}
}
// Process any new entries
if len(a.Entries) > 0 {
start := time.Now()
// Delete any conflicting entries, skip any duplicates
lastLogIdx, _ := r.getLastLog()
var newEntries []*Log
for i, entry := range a.Entries {
if entry.Index > lastLogIdx {
newEntries = a.Entries[i:]
break
}
var storeEntry Log
if err := r.logs.GetLog(entry.Index, &storeEntry); err != nil {
r.logger.Warn("failed to get log entry",
"index", entry.Index,
"error", err)
return
}
if entry.Term != storeEntry.Term {
r.logger.Warn("clearing log suffix",
"from", entry.Index,
"to", lastLogIdx)
if err := r.logs.DeleteRange(entry.Index, lastLogIdx); err != nil {
r.logger.Error("failed to clear log suffix", "error", err)
return
}
if entry.Index <= r.configurations.latestIndex {
r.setLatestConfiguration(r.configurations.committed, r.configurations.committedIndex)
}
newEntries = a.Entries[i:]
break
}
}
if n := len(newEntries); n > 0 {
// Append the new entries
if err := r.logs.StoreLogs(newEntries); err != nil {
r.logger.Error("failed to append to logs", "error", err)
// TODO: leaving r.getLastLog() in the wrong
// state if there was a truncation above
return
}
// Handle any new configuration changes
for _, newEntry := range newEntries {
r.processConfigurationLogEntry(newEntry)
}
// Update the lastLog
last := newEntries[n-1]
r.setLastLog(last.Index, last.Term)
}
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start)
}
// Update the commit index
if a.LeaderCommitIndex > 0 && a.LeaderCommitIndex > r.getCommitIndex() {
start := time.Now()
idx := min(a.LeaderCommitIndex, r.getLastIndex())
r.setCommitIndex(idx)
if r.configurations.latestIndex <= idx {
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
}
r.processLogs(idx, nil)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "processLogs"}, start)
}
// Everything went well, set success
resp.Success = true
r.setLastContact()
return
}
AppendEntriesResponse
,如果 leader 发来的 term 小于自身 follower 的,返回并报错StoreLogs
保存,如果这中间的配置的变更,也要应用min(a.LeaderCommitIndex, r.getLastIndex())
这个位置至此看完了大致的流程,他也实现了论文提到的两个优化
还有待优化的地方,比如 follower processLogs
的时候,logs 需要从存储中获取,但是很多时候 raft 传播日志很快,logs 保存个 lru 列表或是缓存更好一些,减少每次无意义的 IO 调用。
另外 runLeader 中,r.applyCh 持久化 Logs, 与 apply Logs 是串行的,真的一定要串行嘛?
这次分享就这些,以后面还会分享更多 etcd 与 raft 的内容。
[1]hashicorp raft: https://github.com/hashicorp/raft,
本文由哈喽比特于3年以前收录,如有侵权请联系我们。
文章来源:https://mp.weixin.qq.com/s/n7619RNMd3NISoDlJxvjSw
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。