代码拉取完成,页面将自动刷新
package goraft
import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
)
/*
Server状态机实现逻辑
- 实现Server本身的业务处理逻辑,此示例不用实现
- 实现节点间的正常通讯和rpc调用
- 实现用于模拟选举场景模拟的各种函数:添加节点、节点连接及断开、退出选举模块等
属性
- 嵌入ConsensusModel模块
- 连接所有节点的peerID的服务器
方法
- 初始化
- 连接其他节点的Server
- 监听其他节点的连接请求
- 保存所有节点的连接
- 断开与某个节点连接,模拟网络不互通
- 暂停选举模块,模拟程序异常或选举包丢失
*/
// Server 实现状态机Server
type Server struct {
id int
mux sync.RWMutex
cm *ConsensusModel
peerClients map[int]*rpc.Client
peerList map[int]net.Addr
// rpc处理函数
rpcHandler *RPCHandler
rpcServer *rpc.Server
listener net.Listener
// 接收退出信号
quit chan struct{}
}
// NewServer 创建Server
func NewServer() *Server {
server := &Server{}
return server
}
func (s *Server) init() {
s.mux.Lock()
defer s.mux.Unlock()
s.cm = NewConsensusModel(s, s.id)
// 创建一个包含cm的rpc处理函数
s.rpcServer = rpc.NewServer()
s.rpcHandler = &RPCHandler{cm: s.cm}
s.rpcServer.RegisterName("ConsensusModule", s.rpcHandler)
}
// Serve 启动
func (s *Server) Serve() {
s.mux.Lock()
var err error
s.listener, err = net.Listen("tcp", ":0")
if err != nil {
log.Fatal(err)
}
s.log("listening at %s", s.id, s.listener.Addr())
s.mux.Unlock()
// 1. 启动Server的节点通讯地址监听,监听其他节点的接入
for {
conn, err := s.listener.Accept()
if err != nil {
// 连接错误
s.log("Server接收tcp请求错误:%v", err.Error())
break
}
go func() {
s.rpcServer.ServeConn(conn)
}()
}
// 2. 启动Server的客户端连接监听,监听客户端接入; 此处暂不用
s.Stop()
}
// Stop 停止服务
func (s *Server) Stop() {
s.cm.Stop()
s.listener.Close()
close(s.quit)
}
// 停止选举模块
// GetListenAddr 获取服务器地址
func (s *Server) GetListenAddr() net.Addr {
s.mux.Lock()
defer s.mux.Unlock()
return s.listener.Addr()
}
// AddPeer 添加节点信息
func (s *Server) AddPeer(peerID int, addr net.Addr) {
s.peerList[peerID] = addr
}
// ConnectToPeer 连接到节点
func (s *Server) ConnectToPeer(peerID int) error {
s.mux.Lock()
defer s.mux.Unlock()
addr, ok := s.peerList[peerID]
if !ok {
s.log("连接节点错误,未添加节点地址信息:%v", peerID)
}
client, err := rpc.Dial(addr.Network(), addr.String())
if err != nil {
return err
}
s.peerClients[peerID] = client
return nil
}
// DisconnectPeer 断开连接
func (s *Server) DisconnectPeer(peerID int) error {
s.mux.Lock()
defer s.mux.Unlock()
if s.peerClients[peerID] != nil {
err := s.peerClients[peerID].Close()
s.peerClients[peerID] = nil
return err
}
return nil
}
// DisconnectAll 断开所有连接
func (s *Server) DisconnectAll() {
s.mux.Lock()
defer s.mux.Unlock()
for id := range s.peerClients {
if s.peerClients[id] != nil {
s.peerClients[id].Close()
s.peerClients[id] = nil
}
}
}
// Call rpc服务调用
func (s *Server) Call(id int, serviceMethod string, args interface{}, reply interface{}) error {
s.mux.Lock()
peer := s.peerClients[id]
s.mux.Unlock()
//
if peer == nil {
return fmt.Errorf("call client %d after it's closed", id)
}
return peer.Call(serviceMethod, args, reply)
}
func (s *Server) log(format string, args ...interface{}) {
format = fmt.Sprintf("[%d] ", s.id) + format
log.Printf(format, args...)
}
// RPCHandler RPC处理函数
type RPCHandler struct {
cm *ConsensusModel
}
// RequestVote 选票请求
func (rpp *RPCHandler) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) error {
// time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
return rpp.cm.RequestVote(args, reply)
}
// AppendEntries 心跳请求
func (rpp *RPCHandler) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) error {
// time.Sleep(time.Duration(1+rand.Intn(5)) * time.Millisecond)
return rpp.cm.AppendEntries(args, reply)
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。