etcd-raft日志管理

说明

日志是实现一致性协议的最重要手段。客户对应用发起的状态更新请求首先都会被记录在日志中,待主节点将更新日志在集群多数节点之间完成同步以后,便将该日志项内容在状态机中进行应用,进而便完成了一次客户的更新请求。

从上面的过程描述来看,日志是一致性协议数据同步的核心。因此,如何管理节点上的日志也是一个非常关键的技术,本文,我们就来研究etcd-raft中的日志管理实现。

需要说明的是:由于实现的关系,etcd-raft的核心协议处理层不会处理日志追加的逻辑,以保持核心协议实现的简洁性,但是etcd提供了一个WAL(Write-Ahead-Log)的日志库,日志追加功能由应用调用该库完成。本文我们分析的重点在于:

  1. 应用如何调用WAL库完成日志追加
  2. WAL库如何管理日志
  3. WAL如何与协议核心相互配合完成日志内容的同步

数据结构

好的实现应该是有非常清晰的数据结构,并且每个结构之间职责明确。

WAL

WAL管理所有的更新日志,主要处理日志的追加、日志文件的切换、日志的回放等等。

type WAL struct {
dir string
dirFile *os.File
metadata []byte
state raftpb.HardState
start walpb.Snapshot
decoder *decoder
readClose func() error
mu sync.Mutex
enti uint64
encoder *encoder
locks []*fileutil.LockedFile
fp *filePipeline
}

raftLog

type raftLog struct {
storage Storage
unstable unstable
committed uint64
applied uint64
}

由于raft协议的核心工作是在集群节点之间进行日志复制,因此,在etcd-raft实现的核心协议处理层也必须了解当前日志复制情况,这个结构便是raftLog。该结构被集成在核心数据结构raft中。

raftLog中主要记录了当前日志的状态,包括:

  • committed:
  • applied:
  • unstable:日志项内存缓存,便于集群节点之间进行日志复制
  • storage:也是日志项内存缓存,但是不明白其具体作用是什么,与unstable有什么区别

unstable

type unstable struct {
snapshot *pb.Snapshot
entries []pb.Entry
offset uint64
}

unstable在使用内存数组维护所有的更新日志项。对于Leader节点来说,它维护了客户端的更新请求对应的日志项;对于Follower节点而言,它维护的是Leader节点复制的日志项。

无论是Leader还是Follower节点,日志项首先都会被存储在unstable结构,然后再由其内部状态机将unstable维护的日志项交给上层应用,由应用负责将这些日志项进行持久化并转发至系统其它节点。这也是为什么它被称为unstable的原因:在unstable中的日志项都是不安全的,尚未持久化存储,可能会因意外而丢失。

Storage && MemoryStorage

type Storage interface {
InitialState() (pb.HardState, pb.ConfState, error)
Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
Term(i uint64) (uint64, error)
LastIndex() (uint64, error)
FirstIndex() (uint64, error)
Snapshot() (pb.Snapshot, error)
}
type MemoryStorage struct {
sync.Mutex
hardState pb.HardState
snapshot pb.Snapshot
ents []pb.Entry
}

Storage和MemoryStorage是接口和实现的关系
与unstable一样,Storage也被嵌入在raftLog结构中。需要说明的一点是:将日志项追加到Storage的动作是由应用完成的,而不是raft协议核心处理层。目前尚不理解Storage存在的意义是什么,与unstable到底有什么区别?

物理存储

所有的日志项最终都被追加存储在WAL文件中。

日志项类型

日志项有多种类型,见下列表:

metadataType int64 = iota + 1
entryType
stateType
crcType
snapshotType
  • metadataType:这是一个特殊的日志项,被写在每个WAL文件的头部,具体数据好像是可以由应用自定义。
  • entryType:应用的更新数据,也是日志中存储的最关键数据;
  • stateType:代表日志项中存储的内容是Snapshot;
  • crcType:前一个WAL文件里面的数据的crc,也是WAL文件的第一个记录项
  • snapshotType:当前Snapshot的索引{term, index},即当前的Snapshot位于哪个日志记录,不同于stateType,这里只是记录Snapshot的索引,而非snapshot的数据。

WAL文件物理格式

Alt text

每个日志项有以下四个部分组成

  • type:日志项类型,在上面详细描述过;
  • crc:校验和
  • data:根据日志项类型存储的实际数据也不尽相同,如snapshotType类型的日志项存储的是snapshot的日志索引,crcType类型的日志项则无数据项,其crc字段便充当了数据项
  • padding:为了保持日志项8字节对其而填充的无意义内容

这里的crc字段不是很明白:看实现好像crc并非是日志项内容的校验和,而是该日志文件中当前日志项以前的所有日志项的校验和。这样设计的好处是什么呢?

关键流程

WAL初始化

etcd的wal库提供了初始化方法,应用需要显示调用初始化方法来完成日志初始化功能,初始化方法主要有两个API:Create与Open:

func Create(dirpath string, metadata []byte) (*WAL, error) {
if Exist(dirpath) {
return nil, os.ErrExist
}
tmpdirpath := filepath.Clean(dirpath) + “.tmp”
if fileutil.Exist(tmpdirpath) {
if err := os.RemoveAll(tmpdirpath); err != nil {
return nil, err
}
}
if err := fileutil.CreateDirAll(tmpdirpath); err != nil {
return nil, err
}
p := filepath.Join(tmpdirpath, walName(0, 0))
f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_CREATE, fileutil.PrivateFileMode)
if err != nil {
return nil, err
}
if _, err = f.Seek(0, io.SeekEnd); err != nil {
return nil, err
}
if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
return nil, err
}
w := &WAL{
dir: dirpath,
metadata: metadata,
}
w.encoder, err = newFileEncoder(f.File, 0)
if err != nil {
return nil, err
}
w.locks = append(w.locks, f)
if err = w.saveCrc(0); err != nil {
return nil, err
}
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
return nil, err
}
if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
return nil, err
}
if w, err = w.renameWal(tmpdirpath); err != nil {
return nil, err
}
pdir, perr := fileutil.OpenDir(filepath.Dir(w.dir))
if perr != nil {
return nil, perr
}
if perr = fileutil.Fsync(pdir); perr != nil {
return nil, perr
}
if perr = pdir.Close(); err != nil {
return nil, perr
}
return w, nil
}

Create做的事情也比较简单:

  1. 创建WAL目录,用于存储WAL日志文件以及snapshot索引;
  2. 预分配第一个WAL日志文件,默认是64MB,使用预分配机制可以提高写入性能;
  3. 其他,包括使用临时目录并最终重命名为正式目录名等trick,可以忽略。

Open则是在Create完成以后被调用,主要是用于打开WAL目录下的日志文件,Open的主要作用是找到当前Snapshot以后的所有WAL日志,这是因为当前的Snapshot之前的日志我们不再关心了,因为日志的内容肯定都已经被更新至Snapshot了,这些日志也是在后面日志回收中可以被删除的部分。

func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
w, err := openAtIndex(dirpath, snap, true)
if err != nil {
return nil, err
}
if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
return nil, err
}
return w, nil
}

其中最重要的就是openAtIndex了,该函数用于寻找snap以后的日志文件并打开。

WAL追加日志项

日志项的追加是通过调用etcd的wal库的Save()方法实现,具体如下:

func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
w.mu.Lock()
defer w.mu.Unlock()
// short cut, do not call sync
if raft.IsEmptyHardState(st) && len(ents) == 0 {
return nil
}
mustSync := raft.MustSync(st, w.state, len(ents))
for i := range ents {
if err := w.saveEntry(&ents[i]); err != nil {
return err
}
}
if err := w.saveState(&st); err != nil {
return err
}
curOff, err := w.tail().Seek(0, io.SeekCurrent)
if err != nil {
return err
}
if curOff < SegmentSizeBytes {
if mustSync {
return w.sync()
}
return nil
}
return w.cut()
}

该函数的核心是:

  1. 调用saveEntry()将日志项存储到WAL文件中;
  2. 如果追加后日志文件超过既定的SegmentSizeBytes大小,需要调用w.cut()进行WAL文件切换,即:关闭当前WAL日志,创建新的WAL日志,继续用于日志追加。

saveEntry 日志项数据进行编码并追加至WAL文件,存储在WAL文件中的日志项有多种类型,对于普通的应用更新请求,类型为entryType。
而具体的编码写入方法则由专门的编码结构实现,称为struct encoder,该结构实现了日志项的编码和将日志项编码后的数据写入日志文件的功能。感兴趣的读者可以结合上面的日志项结构自行阅读。

cut 则实现了WAL文件切换的功能,每个WAL文件的预设大小是64MB,一旦超过该大小,便创建新的WAL文件,这样做的好处是便于WAL文件的回收,我们在后面会说明。

WAL日志回放

WAL的日志回放的主要流程也是由应用来完成,以etcd-raft自带的示例应用为例:

  1. 加载最新的Snapshot
  2. 打开WAL文件目录,根据上面的描述,这里主要目的是找到最新Snapshot以后的日志文件,这些是需要被回放的日志;
  3. 在2的基础上读出所有的日志项(会不会日志项太多?导致内存装不下?)
  4. 将3读出的日志项应用到内存中,这里的内存指的是上面我们说过的Storage,给raft协议核心处理层提供的内存日志存储中,这样,raft核心协议处理层就可以将日志同步给其他节点了。

所以,对于WAL日志回放功能,底层的WAL日志库只需要给上层应用提供一个读取所有日志项的功能即可,这由ReadAll()实现。ReadAll的实现感兴趣的同学可以自行阅读。

WAL日志回收

Leader日志追加

Leader节点的日志是由应用根据客户端请求生成,更进一步说,是应用根据客户端的更新请求而生成。需要说明的是:这里的客户端请求不仅仅是来自于客户端,还有可能是来自于集群其他Follower节点对客户端的请求转发,因为客户端可能无法正确获知当前集群的Leader是谁。

之前的文章“etcd-raft示例分析”(https://zhuanlan.zhihu.com/p/29180575)中我们已经具体描述过应用是如何一步步将客户端请求转变至raft日志项并交由协议处理层处理。

协议的核心处理层会将更新日志存储在内存数组之中(前面描述过的unstable)。当然,这里还不算结束。unstable中的日志最终还需要被写入WAL日志、复制到集群Follower等一些列处理。

为此,raft协议核心处理层抽象了一个Node结构。Node在启动时会创建后台协程。该后台协程需要处理很多任务,其中之一就是为上层应用准备好Ready:这是底层的协议处理层为上层应用层准备好的任务,保存了当前需要应用处理的日志项以及当前协议的状态信息:例如当前的commit点、snapshot信息等。上层应用通过Node.Ready()接口获取此类任务并作如下处理:

  1. 将Ready中的日志项写入WAL日志;
  2. 将Ready中已commit的日志项应用至状态机;
  3. 通过网络传输模块将消息传输至集群其他节点,如果是从节点,这里应该无需向其他节点发送消息了

Leader向Follower推送日志

前面说过,当应用层有更新请求(MsgProp)送给底层raft协议处理层时,协议处理层便会驱动自己的状态机前进并触发向Follower节点进行日志复制。Leader向Follower进行日志同步的函数是bcastAppend:

func (r *raft) bcastAppend() {
for id := range r.prs {
if id == r.id {
continue
}
r.sendAppend(id)
}
}

因此,日志复制是Leader节点在收到应用的更新请求后便会向Follower发起的,具体同步的内容是内存中unstable结构中维护的更新日志。

func (r *raft) sendAppend(to uint64) {
pr := r.prs[to]
if pr.IsPaused() {
return
}
m := pb.Message{}
m.To = to
term, errt := r.raftLog.term(pr.Next – 1)
ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
// send snapshot if we failed to get term or entries
if errt != nil || erre != nil {
……
} else {
m.Type = pb.MsgApp
m.Index = pr.Next – 1
m.LogTerm = term
m.Entries = ents
m.Commit = r.raftLog.committed
if n := len(m.Entries); n != 0 {
switch pr.State {
case …
}
}
r.send(m)
}
}

sendAppend 向特定的Follower(由传入参数的to代表)发送日志同步命令。该方法首先会找到该Follower上一次已同步的日志位置(pr.Next-1),然后从raftLog中获取该位置以后的日志项,当然每次同步的数量不宜太多,由maxMsgSize限制。当然,如果无法从raftLog获取到想要的日志项,此时需要考虑发送Snapshot,这是因为对应的日志项可能由于已经被commit而丢弃了(向新加入节点同步日志的时候可能会出现这种情况),这里我们暂不作细致讨论,后续讨论Snapshot设计的时候再去探究。

Leader收到Follower对于日志复制消息MsgApp的响应后:

func stepLeader(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgAppResp:
pr.RecentActive = true
// 如果Follower拒绝了同步消息
if m.Reject {
} else {
oldPaused := pr.IsPaused()
if pr.maybeUpdate(m.Index) {
switch {
case xxx:
}
if r.maybeCommit() {
r.bcastAppend()
} else if oldPaused {
}
}
}
}
}

这里的处理也比较简单:主要是调用r.maybeCommit(),看看是否可以推进commit点,如果可以的话,继续向Follower发送日志同步消息。commit点的推进也比较简单,只是简单地将raftLog中的commit位置置为新的值即可。

func (l *raftLog) commitTo(tocommit uint64) {
if l.committed < tocommit {
if l.lastIndex() < tocommit {
}
l.committed = tocommit
}
}

Follower日志追加

上面我们讨论了Leader的日志复制和同步响应处理流程,接下来我们考察下Follower节点在收到Leader的日志同步消息时的处理。

Follower节点的日志追加过程与Leader节点完全一致,不同的是日志来源:Leader节点的日志来自于应用的更新请求(MsgProp),而Follower的日志则是来自于Leader的日志复制消息(MsgApp)。

至于etcd-raft的Follower如何接受Leader节点的消息并将该消息进一步转发至底层核心的raft协议处理层,我们已经在前面的“etcd-raft网络传输组件实现分析”章节有了比较清晰的描述,不熟悉的朋友可以移步至 https://zhuanlan.zhihu.com/p/29207055

Leader节点的日志复制消息终进入Follower节点的raft核心协议处理层,更具体的说,是进入了函数 stepFollower:

func stepFollower(r *raft, m pb.Message) {
switch m.Type {
case pb.MsgProp:
case pb.MsgApp:
r.electionElapsed = 0
r.lead = m.From
r.handleAppendEntries(m)
}
}
func (r *raft) handleAppendEntries(m pb.Message) {
// 如果消息的序号在本地已经被提交了,拒绝消息
if m.Index < r.raftLog.committed {
r.send(pb.Message{To: m.From,
Type: pb.MsgAppResp,
Index: r.raftLog.committed})
return
}
// 调用raftLog.maybeAppend
if mlastIndex, ok := r.raftLog.maybeAppend(…); ok {
r.send(pb.Message{To: m.From,
Type: pb.MsgAppResp,
Index: mlastIndex})
} else {
r.send(pb.Message{To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: r.raftLog.lastIndex()})
}
}
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents …pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
…..
default:
offset := index + 1
l.append(ents[ci-offset:]…)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
func (l *raftLog) append(ents …pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index – 1; after < l.committed {
panic(…)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}

从上面的流程跟踪下来看,与Leader一样,Follower节点的数据最终也是被写入了日志模块的unstable日志中,其实就是被追加至内存中的日志项数组。

应用对unstable日志处理

前面说过,主从节点上的日志首先被存储在内存的unstable中,但这些unstable中的日志项最终需要被应用获取到并作进一步处理。

协议处理层的后台任务会将unstable中的日志项以及协议状态信息等打包成Ready结构塞进一个管道readyc:

func (n *node) run(r *raft) {
for {
if advancec != nil {
} else {
rd = newReady(r, prevSoftSt, prevHardSt)
if rd.containsUpdates() {
readyc = n.readyc
} else {
readyc = nil
}
}
select {
case readyc <- rd:
case xxx:
}
}
}

应用通过协议层抽象的Node.Ready()获取保存Ready的管道:

func (n *node) Ready() <-chan Ready { return n.readyc }

应用对Ready的标准处理方法如下:

case rd := <-rc.node.Ready():
rc.wal.Save(rd.HardState, rd.Entries)
if !raft.IsEmptySnap(rd.Snapshot) {
rc.saveSnap(rd.Snapshot)
rc.raftStorage.ApplySnapshot(rd.Snapshot)
rc.publishSnapshot(rd.Snapshot)
}
rc.raftStorage.Append(rd.Entries)
rc.transport.Send(rd.Messages)
if ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries)); !ok {
rc.stop()
return
}
rc.maybeTriggerSnapshot()
rc.node.Advance()

关键处理流程:

  1. 将日志项写入WAL;
  2. 处理Snapshot,如果有的话;
  3. 追加至Storage(这是干什么?);
  4. 消息发送至集群其他节点,这是对Leader而言的;
  5. 将已提交的日志应用至状态机(publishEntries)。

总结

从上面的etcd-raft日志模块实现分析来看,我们总结如下结论:

  • 日志项会被存储在三个地方,按照其出现的顺序分别为:unstable、WAL、storage
  • unstable维护协议层的日志项,这也是raft进行日志复制的数据源泉;
  • WAL负责日志项的持久化存储;
  • storage中存储日志项目的不明
  • 应用负责串联这些日志存储模块。

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>