代码拉取完成,页面将自动刷新
package goraft
import (
"fmt"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
)
// 1. Raft 选举模块
// 2. Raft 中实现状态机业务模块
/*
1. ConsensusModel 共识选举模块
属性
- 选举任期
- 节点角色
- 节点的所有节点列表
- 选票数量
日志复制相关
- 所有节点的下一个日志索引
- 最后提交索引
- 最后提交任期
- 本地日志列表
方法
- 初始化
- 开始选举任期
- 角色切换:Follewer,Candidate,Leader
*/
/*
添加成员变更的方法,Raft两阶段成员变更过程如下:
1. Leader收到成员变更请求从Cold切成Cold,new;
2. Leader在本地生成一个新的log entry,其内容是Cold∪Cnew,代表当前时刻新旧成员配置共存,写入本地日志,同时将该log entry复制至Cold∪Cnew中的所有副本。在此之后新的日志同步需要保证得到Cold和Cnew两个多数派的确认;
3. Follower收到Cold∪Cnew的log entry后更新本地日志,并且此时就以该配置作为自己的成员配置;
4. 如果Cold和Cnew中的两个多数派确认了Cold U Cnew这条日志,Leader就提交这条log entry并切换到Cnew;
5. 接下来Leader生成一条新的log entry,其内容是新成员配置Cnew,同样将该log entry写入本地日志,同时复制到Follower上;
6. Follower收到新成员配置Cnew后,将其写入日志,并且从此刻起,就以该配置作为自己的成员配置,并且如果发现自己不在Cnew这个成员配置中会自动退出;
7. Leader收到Cnew的多数派确认后,表示成员变更成功,后续的日志只要得到Cnew多数派确认即可。Leader给客户端回复成员变更执行成功
*/
// CRole 角色
type CRole int
const (
// Follower 追随者角色
Follower CRole = iota + 1
// Candidate 候选者
Candidate
// Leader 领导者
Leader
// Dead 退出状态
Dead
)
// RequestVoteArgs 选票请求
type RequestVoteArgs struct {
Term int32
CandidateID int
LastLogIndex int
LastLogTerm int
}
// RequestVoteReply 请求选票响应
type RequestVoteReply struct {
Term int32
VoteGranted bool
}
// AppendEntriesArgs 心跳相关
type AppendEntriesArgs struct {
Term int32
LeaderID int
PrevLogIndex int
PrevLogTerm int
Entries []LogEntry
LeaderCommit int
}
// AppendEntriesReply 心跳响应包
type AppendEntriesReply struct {
Term int32
Success bool
}
// LogEntry 日志实体
type LogEntry struct {
Command interface{}
Term int
}
// NewConsensusModel 创建模块
func NewConsensusModel(server *Server, id int) *ConsensusModel {
cm := &ConsensusModel{
server: server,
id: id,
quit: make(chan struct{}, 1),
}
return cm
}
// ConsensusModel 共识模块
type ConsensusModel struct {
// 状态锁
mux sync.RWMutex
// 任期超时时间
electionTimeOut time.Duration
// 最后的任期更新时间
lastElectionTime time.Time
// 角色
role CRole
// 服务器ID
id int
// 服务器任期
currentTerm int32
voteFor int
// 当前的选票
voteNum int32
// 其他节点的列表
peerIDList []int
server *Server
quit chan struct{}
state int
// 日志存储
logs []LogEntry
// 所有peer节点的下一个日志索引
peerNextLogIndex map[int]int
// 提交的索引ID
commitIndex int
// 记录未被Commit log的复制节点数量
logCommit map[int]int
}
// init 初始化
func (c *ConsensusModel) init() {
c.mux.Lock()
defer c.mux.Unlock()
// 1. 初始当前任期、任期超时时间、节点角色
c.currentTerm = 0
c.role = Follower
c.voteFor = -1
c.voteNum = 0
// 2. 开启任期的观测
c.log("初始化完成,所有节点:%v", c.id, c.peerIDList)
}
// AddPeerID 添加节点ID
func (c *ConsensusModel) AddPeerID(peerID int) {
c.mux.Lock()
defer c.mux.Unlock()
c.peerIDList = append(c.peerIDList, peerID)
}
// Server 启动开始选举
func (c *ConsensusModel) Server() {
// 1. 初始化
c.init()
// 2. 启动任期监测
go c.watchElection()
}
// Stop 退出
func (c *ConsensusModel) Stop() {
c.mux.Lock()
defer c.mux.Unlock()
close(c.quit)
c.log("quit consensusModel")
}
// Report 显示节点信息
func (c *ConsensusModel) Report() (id int, term int, isLeader bool) {
c.mux.Lock()
defer c.mux.Unlock()
return c.id, int(c.currentTerm), c.role == Leader
}
// SubmitLogEntity 接收客户端的日志提交
func (c *ConsensusModel) SubmitLogEntity() {
// 1.
}
// 开始选举,判断自身角色进行选举判断,任期的监测
func (c *ConsensusModel) watchElection() {
/*
校验1:最后任期更新时间,已经超出
校验2:任期ID更新
*/
// 1. 定时的触发任期校验
// 2. 创建一个定时器不断监测
ticker := time.NewTimer(15 * time.Microsecond)
for {
select {
// 1> 间隔15秒时间监测一次自己的状态
case <-ticker.C:
c.mux.Lock()
// 如果是Leader角色,不必再校验任期时间
if c.role == Leader {
c.log("角色为Leader,不校验选举任期")
c.mux.Unlock()
break
}
// 2. 如果最后任期更新时间,与当前时间已超出任期
if efficeTime := time.Since(c.lastElectionTime); efficeTime > c.electionTimeOut {
// 角色切换为Candidate
c.beCandidate()
c.mux.Unlock()
break
}
// 3. 任期未到更新最后更新时间
c.mux.Unlock()
break
// 2> 预留其他可打断定时器的事情:例如主动停止进程等
case <-c.quit:
c.log("退出")
break
}
}
}
// RequestVote 接收选票求情
func (c *ConsensusModel) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
// 接收到选票请求
c.mux.Lock()
// 1. 如果接收到任期比自己高的选票,就进行投票切换为Follower
if args.Term > c.currentTerm {
// 赞成票
reply.Term = args.Term
reply.VoteGranted = true
// 成为追随者
c.beFollower(args.Term)
c.mux.Unlock()
return nil
}
// 2. 任期相同查看自己是否已经投票
if args.Term == c.currentTerm {
reply.Term = c.currentTerm
// 2.0 未投票,即投赞成
if c.voteFor == -1 {
reply.VoteGranted = true
} else {
// 已偷完票
if c.voteFor == args.CandidateID {
// 2.1 已投此节点
reply.VoteGranted = true
} else {
// 2.2 已投其他节点
reply.VoteGranted = false
}
}
}
// 3. 更新最后任期时间
c.lastElectionTime = time.Now()
c.mux.Unlock()
return nil
}
// AppendEntries 发送心跳
func (c *ConsensusModel) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
// 接收心跳包
c.mux.Lock()
// 1. 任期判断: 大于或等于当前任期时,已经有其他成为Leader,则此节点需切换为Follower
if args.Term >= c.currentTerm && c.role != Follower {
reply.Success = true
reply.Term = args.Term
c.beFollower(args.Term)
c.mux.Unlock()
return nil
}
// 1.1 如果任期=当前任期,且是Follower 正常返回
// 1.2 如果任期<当前任期,不管是否Follower正常正确的任期(A<-C[leader] A<-B[new_leader] = A(C_Term)—>B)
reply.Success = true
reply.Term = c.currentTerm
// 2. 正常接收心跳,更新最后任期时间
c.lastElectionTime = time.Now()
c.mux.Unlock()
// 3. TODO: 接收心跳包中的日志内容,并更新
return nil
}
// 获取随机的任期时间
func (c *ConsensusModel) getElectionTimeout() time.Duration {
return time.Duration(150+rand.Intn(150)) * time.Millisecond
}
// 成为Follower角色
func (c *ConsensusModel) beFollower(currenTerm int32) {
/*
主要包含以下步骤
1. 更新任期
2. 更新角色
3. 重置选票信息
*/
// 1. 任期变更
c.currentTerm = currenTerm
// 2. 角色变更
c.role = Follower
c.lastElectionTime = time.Now()
// 3. 重置选票信息
c.voteFor = -1
c.voteNum = 0
}
// 成为Candidate角色: 已获取锁状态
func (c *ConsensusModel) beCandidate() {
// 1. 改变角色
c.role = Candidate
// 2. 任期加1
currentTerm := atomic.AddInt32(&c.currentTerm, 1)
// 3. 更新当前的选举时间
c.lastElectionTime = time.Now()
// 4. 开启向其他阶段的选票请求&给自己加一票
c.voteFor = c.id
c.voteNum = 1
// 发送选票请求
for _, peerID := range c.peerIDList {
go c.sendRequestVoteToPeer(peerID, currentTerm)
}
// 重新进入任期超时监测,如果再任期内本节点无法选主成功(或其他节点选主成功,本节点切换为Follower节点)
}
// 成为Leader角色
func (c *ConsensusModel) beLeader(currenTerm int32) {
/*
1. 任期变更
2. 角色变更
3. 启动心跳进程
4. 退出任期监测
*/
// 1. 任期变更
c.currentTerm = currenTerm
// 2. 角色变更
c.role = Leader
// 3. 选票重置
c.voteFor = -1
c.voteNum = 0
c.lastElectionTime = time.Now()
// 4. 启动心跳进程
go c.sendAppendEntity()
}
// sendAppendEntity 启动节点的心跳发送
func (c *ConsensusModel) sendAppendEntity() {
/*
1. 创建定时器触发心跳发送
2. 并发发送心跳给所有节点
3. 组合包含的日志
*/
ticker := time.NewTicker(15 * time.Millisecond)
for {
// 1. 定时触发发送心跳
<-ticker.C
// 2. 发送包含日志内容的心跳包
// var args AppendEntriesArgs
for _, peerID := range c.peerIDList {
c.mux.RLock()
// 2.1 获取节点的最后日志索引
peerNext, ok := c.peerNextLogIndex[peerID]
if !ok {
break
}
// 返回给Follower它上次从Leader正确同步的内容
preLogIndex := peerNext - 1
preLogTerm := -1
if preLogIndex > 0 {
preLogTerm = c.logs[preLogIndex].Term
}
args := AppendEntriesArgs{
Term: c.currentTerm,
LeaderID: c.id,
PrevLogIndex: preLogIndex,
PrevLogTerm: preLogTerm,
Entries: c.logs[peerNext:],
LeaderCommit: c.commitIndex,
}
c.mux.RUnlock()
// 3. 发送心跳及日志包到其他节点
go c.sendAppendEntityToPeer(peerID, args)
}
// 校验Leader状态
c.mux.Lock()
if c.role != Leader {
c.log("节点非Leader角色,退出心跳机制", c.id)
break
}
c.mux.Unlock()
}
}
// sendAppendEntityToPeer 发送心跳给节点
func (c *ConsensusModel) sendAppendEntityToPeer(peerID int, entrity AppendEntriesArgs) {
c.log("发送 AppendEntries 请求到服务器 %v: ni=%d, args=%+v", peerID, 0, entrity)
var reply AppendEntriesReply
if err := c.server.Call(peerID, "ConsensusModule.AppendEntries", entrity, &reply); err == nil {
c.mux.Lock()
defer c.mux.Unlock()
// 1. 返回的响应,存在更高任期,本节点转换为追随者
if reply.Term > c.currentTerm {
c.log("当前任期term早于heartbeat应答中的任期")
// 获取状态锁后进行角色切换
c.beFollower(reply.Term)
return
}
// 2. 任期相同:Follower正常返回,需加上对Follower对日志的处理结果
if reply.Term == c.currentTerm {
// 2.1 同意:表示日志操作成功,需更新Follower的nextLogIndex,
// 以及判断哪些SubmitLog被大多数Follower复制
if reply.Success {
currentCommitIndex := c.commitIndex
// 2.1.2 更新peer节点的下一个logIndex
c.peerNextLogIndex[peerID] = c.peerNextLogIndex[peerID] + len(entrity.Entries)
// 2.1.3 检查可提交的log; 最后提交LastCommit ~ 复制同步的最新logIndex
checkCommitLogLen := c.peerNextLogIndex[peerID] - c.commitIndex
// 复制的logIndex已经被提交过了: checkCommitLogLen =< 0
if checkCommitLogLen > 0 {
for id := c.commitIndex; id < checkCommitLogLen; id++ {
commitLogIndex := c.commitIndex + id
c.logCommit[commitLogIndex] = +1
if c.logCommit[commitLogIndex] >= len(c.peerIDList)/2 {
// 可以提交
c.commitIndex = commitLogIndex
// 从未提交的map中移除
delete(c.logCommit, commitLogIndex)
}
// 不可提交:无需处理
}
}
// 校验是否有新的可提交日志
if currentCommitIndex != c.commitIndex {
// 通知客户端(Server)及其他CM节点,让其他CM节点再通知到对应客户端(Server)
c.log("最后可提交的日志 commitIndex", c.commitIndex)
}
} else {
// 2.2 不同意: 暂不处理
c.log("日志复制应答,复制失败:%v", peerID, c.peerNextLogIndex[peerID])
}
}
}
}
// 发送选票请求
func (c *ConsensusModel) sendRequestVoteToPeer(peerID int, currenTerm int32) {
c.log("发送选票请求给其他节点[%v]", c.id, peerID)
args := RequestVoteArgs{
Term: currenTerm,
CandidateID: c.id,
}
var reply RequestVoteReply
// 1. 发送RPC请求
c.log("向服务器 %d 发送RequestVote请求 : %+v", peerID, args)
err := c.server.Call(peerID, "ConsensusModule.RequestVote", args, &reply)
// 2. 接收返回响应: 校验返回后的响应数据
if err == nil {
c.mux.Lock()
defer c.mux.Unlock()
c.log("收到RequestVote应答 : %+v", reply)
// 状态不是候选人,退出选举(可能退化为追随者,也可能已经胜选成为领导者)
if c.role != Candidate {
c.log("等待RequestVote回复时, 状态变为 %v", c.role)
return
}
// 存在更高任期(新领导者),转换为追随者
if reply.Term > currenTerm {
c.log("当前任期term早于RequestVote应答中的任期")
// 已获取锁状态
c.beFollower(reply.Term)
return
}
// 更新最后任期时间
c.lastElectionTime = time.Now()
// 选票的任期一致: 判断选票情况
if reply.Term == currenTerm {
if reply.VoteGranted {
votes := int(atomic.AddInt32(&c.voteNum, 1))
if votes*2 > len(c.peerIDList)+1 {
// 获得票数超过一半,选举获胜,成为最新的领导者
c.log("以 %d 票数胜选,成为Leader", votes)
// 已获取锁状态
c.beLeader(currenTerm)
return
}
}
// 投了不同意票
}
// 无效任期内的票,过期任期内的选票
}
}
func (c *ConsensusModel) log(format string, args ...interface{}) {
format = fmt.Sprintf("[%d] ", c.id) + format
log.Printf(format, args...)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。