etcd-raft节点变更

说明

从etcd-raft的架构来看,节点变更功能的实现需要应用和底层核心协议处理层互相配合。客户端发起节点增加或移除的命令,应用获得该请求,并将其转换为一个节点变更指令交给底层的raft协议核心处理层。

数据结构

ConfChange

type ConfChange struct {
ID uint64
Type ConfChangeType
NodeID uint64
}

该结构表示节点变更的信息,也是上层应用传递给底层的节点变更消息的结构,其中:

  • ID: 表示节点变更的消息id,这个意义不大
  • Type: ConfChangeAddNode或者ConfChangeRemoveNode
  • NodeID: 变更节点的id

关键流程

节点变更请求应用处理

以etcd-raft自带的示例应用来说明应用对此类请求的处理流程:

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
key := r.RequestURI
switch {
case …:
case r.Method == “POST”:
……
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: nodeId,
Context: url,
}
h.confChangeC <- cc
case r.Method == “DELETE”:
……
cc := raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: nodeId,
}
h.confChangeC <- cc
}
}

应用的前端接入层实现较为简单:将命令封装成ConfChange消息并通过管道通知其他后台任务即可:

func (rc *raftNode) serveChannels() {
go func() {
for rc.proposeC != nil && rc.confChangeC != nil {
select {
case prop, ok := <-rc.proposeC:
……
case cc, ok := <-rc.confChangeC:
if !ok {
rc.confChangeC = nil
} else {
confChangeCount += 1
cc.ID = confChangeCount
rc.node.ProposeConfChange(context.TODO(), cc)
}
}
}
}()

该ConfChange消息最终被该协程捕获,并最终调用底层的ProposeConfChange来处理该消息。

底层处理入口
应用的ConfChange消息最终通过ProposeConfChange进入底层协议处理层:

func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
data, err := cc.Marshal()
if err != nil {
return err
}
return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
}
func (n *node) Step(ctx context.Context, m pb.Message) error {
……
return n.step(ctx, m)
}

可以发现,对于节点变更消息ConfChange,它与正常的更新命令处理在命令的类型(EntryConfChange)上不同外,对命令的处理流程则是完全一致的。关于这个,我们就不再赘述,有兴趣的可以参考之前的文章。

当消息在多数Follower节点上被持久化存储后,该消息便会在Leader上进行Commit,接下来该Commit也会被同步至集群的Follower节点上。此时,应用会执行日志项中的命令,对于ConfChange命令:

func (rc *raftNode) publishEntries(ents []raftpb.Entry) bool {
for i := range ents {
switch ents[i].Type {
case raftpb.EntryNormal:
case raftpb.EntryConfChange:
cc.Unmarshal(ents[i].Data)
rc.confState = *rc.node.ApplyConfChange(cc)
switch cc.Type {
case raftpb.ConfChangeAddNode:
if len(cc.Context) > 0 {
rc.transport.AddPeer(types.ID(cc.NodeID), []string{string(cc.Context)})
}
case raftpb.ConfChangeRemoveNode:
if cc.NodeID == uint64(rc.id) {
return false
}
rc.transport.RemovePeer(types.ID(cc.NodeID))
}
}
}
}

对于EntryConfChange的日志项,有以下工作要做:

  1. 通知raft协议核心处理层节点变更消息;
  2. 通知网络传输模块节点变更消息;

1的通知是调用函数ApplyConfChange完成,而2则是根据节点变更的种类调用相应的函数:AddPeer或者RemovePeer。

需要说明的一点是:在etcd-raft的实现中,节点变更日志项详细被应用的时机和普通的更新日志项一样,都是在日志项消息被复制到集群多数节点上,消息被Commit之后执行的。

增加节点

应用通知raft协议核心处理层的日志项消息最终会被协议核心层的后台协程接受到并处理:

func (n *node) run(r *raft) {
……
for {
select {
case cc := <-n.confc:
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeRemoveNode:
r.removeNode(cc.NodeID)
default:
panic(“unexpected conf type”)
}
select {
case n.confstatec <- pb.ConfState{Nodes: r.nodes()}:
case <-n.done:
}
case …:
}
}
}

真正的增加节点处理如下:

func (r *raft) addNode(id uint64) {
r.pendingConf = false
if _, ok := r.prs[id]; ok {
return
}
r.setProgress(id, 0, r.raftLog.lastIndex()+1)
r.prs[id].RecentActive = true
}

在raft协议核心处理层,增加节点便是为其分配一个Progress结构,通过该结构追踪对端节点的运行状态。

删除节点

func (r *raft) removeNode(id uint64) {
r.delProgress(id)
r.pendingConf = false
if len(r.prs) == 0 {
return
}
if r.maybeCommit() {
r.bcastAppend()
}
if r.state == StateLeader && r.leadTransferee == id {
r.abortLeaderTransfer()
}
}

removeNode相比addNode复杂一点,除了移除该节点的Progress结构外,还需要考虑到删除节点带来的集群节点数量减少导致的影响,如由于多数派变少而导致的某些日志项可以被Commit等。

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>