etcd watch 机制源码解析——服务端篇

文摘   科技   2023-05-01 11:00   北京  

0 前言

最近在和大家探讨有关 etcd watch 机制底层实现原理的话题. 前两天刚和大家聊完 etcd 客户端部分的内容,本篇正式步入 etcd 服务端的领域,带大家一起剖析其底层细节和实现源码. 由于本文和前一篇 "etcd watch 机制源码解析——客户端"的内容关联性较强,建议大家优先完成前文的阅读,再连贯性地开启本篇的学习.

本文的话题围绕分布式存储组件 etcd 展开:

etcd 官方文档:https://etcd.io/

etcd 开源地址:https://github.com/etcd-io/etcd

本文走读的 etcd 源码版本:v3.5.8

 

本文的目录树结构如下:

 

1 服务端架构

1.1 整体架构

整体架构:

在前文”客户端篇“中有介绍到,etcd 客户端会和某个特定的 etcd 服务端节点建立 grpc 长连接,这是一条底层基于 HTTP2.0 实现的全双工通信管道,一方面用于推送从 etcd 客户端发往 etcd 服务端的请求. 另一方面用于推送从 etcd 服务端发往 etcd 客户端的响应或者 watch 回调事件.

在 etcd 服务端部分,watch 模块自上而下可以拆分为 serverWatchStream 和 watchableStore 两个层级,前者是承上启下的应用层中枢,后者是实现了 watcher 监听器存储管理的底层介质.

每当和 etcd 客户端建立起一笔 watch 长连接,etcd 服务端中的 serverWatchStream 会异步启动两个守护协程 recvLoop 和 sendLoop,其中 recvLoop 是读协程,用于持续接收处理来自 etcd 客户端的请求,而 sendLoop 是写协程,用于向 etcd 客户端推送响应以及 watch 回调事件.

而关于 watcher 监听器的存储管理以及 watch 回调事件的产生起源,则是位于更底层的 watchableStore 模块. watchableStore 中会完全基于内存的方式进行 watcher 的存储,其中 watcher 根据是否需要回溯历史变更记录,会被分为 synced group、unsynced group 两部分. 此外还有一个 victims 部分用于暂存因容量不足而发送阻塞的回调事件.

etcd 客户端和 etcd 服务端的 serverWatchStream 之间通过 grpc 长连接进行双向通信关系,而同为 etcd 服务端侧的 serverWatchStream 和 watchableStore 之间则是通过抽象出来的一条 watchStream 完成上下层之间的信息交互.

 

watch 回调事件起源:

etcd 在协议层基于分布式共识算法 raft 实现,通过预写日志(WAL)+ 写状态机(State Machine) 两步操作保证了集群的强一致性和高可用性. 对于每个 etcd 服务端节点而言,一系列被集群多数派所认可的数据变更操作会按照相同的顺序被写入状态机,只是不同节点的完成时间可能略微存在差异. 于是,在每个 etcd 服务端节点将数据写入状态机的位置就呈现出一个公共的切面,在这个切面上 etcd 执行了 notify 操作,基于变更的数据记录和当前节点中存在的 watchers 进行 join,生成一个批次的 watch 回调事件,然后向上传递,完成通知回调.

(对于这部分内容感兴趣的小伙伴,可以阅读一下我之前发表的文章 ”两万字长文解析raft算法原理“ 和 ”raft 工程化案例之 etcd 源码实现“)

 

1.2 创建 watch 链路

 

  • • 首先,etcd 客户端通过 grpc 长连接向 etcd 服务端发送创建 watch 请求

  • • serverWatchStream 中持续运行的读协程 recvLoop 通过 grpc 长连接接收到创建 watch 请求

  • • recvLoop 通过 watchStream,将创建 watch 的请求传递至底层的 watchableStore 模块

  • • watchableStore 根据新增的 watcher 是否需要回溯历史变更记录,会将 watcher 添加到 synced group 或者 unsynced group 当中存储

  • • revLoop 接收到来自 watchableStore 的响应后,会通过 ctrlStream 将响应数据发往写协程 sendLoop

  • • sendLoop 通过 ctrlStream 接收到响应后,通过 grpc 长连接将其发往 etcd 客户端

 

1.3 watch 回调链路

  • • 首先需要明确的是,不管是 etcd 客户端和 etcd 服务端节点建立的 grpc 长连接还是创建的 watcher 监听器,其生效范围都局限于某个特定的 etcd 服务端节点当中.

  • • 在一个 etcd 服务端节点发生写状态机数据的动作时,会执行 notify 动作,将变更的数据和 watchableStore synced group 中的 watcher 监听器进行 join,产生出一个批次的 watch 回调事件

  • • watchableStore 通过 watchStream 将 watch 回调事件发往上层的 serverWatchStream

  • • serverWatchStream 的 sendLoop 协程接收到 watch 回调事件,通过 grpc 长连接将其发送到 etcd 客户端

 

1.4 watchableStore 刷新链路

watchableStore 模块的存储介质分为三部分,都是完全基于内存实现的存储,并且不同节点之间这部分数据内容是相互独立的:

  • • synced:用于存储 watcher 监听器. 存放着的 watcher 是监听数据无需回溯历史变更记录、倘若有新数据变更事件发生即可立即发起 watch 回调的监听器

  • • unsynced:同样用于存储 watcher 监听器. 存放着的 watch 是监听数据仍存在历史变更记录需要回溯,因此新数据变更事件发生时也无法立即发起回调的这部分监听器

  • • victims:用于临时存放一部分 watch 回调事件. 这部分回调事件是由于在通过 watchStream 发往上层途中发现 channel 容量不足,为避免 notify 协程陷入阻塞,而选择先将这部分变更事件追加到一个容量无上限的 victims 列表当中

 

2 核心数据结构

本章开始进入源码解析部分. 首先对 etcd 服务端侧涉及到的几个核心数据结构进行介绍:

  • • etcd 服务端启动时会和 etcd 客户端 grpc 长连接一一对应的创建一个 serverWatchStream,并通过异步启动其读写协程 recvLoop 和 sendLoop 来和客户端之间完成通信

  • • serverWatchStream 中存在一个 watchStream,用于与更底层的 watchableStore 之间建立通信关系.

  • • watchableStore 是基于内存存储 watcher 监听器的模块,会将 watcher 分为 synced group、unsynced group 两部分分别存储,并将一部分 pending 的回调事件存储在 victims 列表当中

  • • synced group 和 unsynced group 都是 watcherGroup 类型,由于 watcher 监听数据范围可能是单个 key,也可能是一个 range 范围,因此 watcherGroup 中分别用 map(watcherSetByKey)以及一棵红黑树(adt.IntervalTree) 实现了单 key 监听以及 range 范围监听的 watcher 的存储

  • • 每个 watcher 监听器中包含了在服务端节点内唯一的 watchID,包含了记录当前 watcher 通知进度的 minRev 版本号,监听数据键范围的 key 和 end 字段,还内置了一个 ch,用于将 watch 回调事件发往上层的 sendLoop

 

2.1 serverWatchStream

serverWatchStream 是 etcd 服务端 watch 模块上层的处理中枢,和来自 etcd 客户端的 grpc 长连接之间是一一对应关系,核心内容包括:

  • • watchable:位于 serverWatchStream 更下层的 watcher 存储模块,实现类是 watchableStore

  • • gRPCStream:etcd 服务端 serverWatchStream 和 etcd 客户端之间的 grpc 通信长连接

  • • watchStream:上层 serverWatchStream 和底层 watchableStore 通信交互的介质

  • • ctrlStream:读协程 recvLoop 和写协程 sendLoop 之间通信交互的 channel

// etcd server 侧 watch 模块上层的处理中枢.
type serverWatchStream struct {
    // ...


    // watcher 的存储介质,其实现是 watchableStore
    watchable mvcc.WatchableKV
    // ...
  
    // 和 etcd client 通信的 grpc 长连接入口
    gRPCStream  pb.Watch_WatchServer
    // 和底层 watchableStore 通信时使用的 stream.
    // 创建 watcher 时通过 stream 的 sdk 方法实现. 回调时会通过 stream 中的 channel 将事件由 watchable 推送到 serverWatchStream 的写协程 sendLoop
    watchStream mvcc.WatchStream
    // serverWatchStream 中读协程 recvLoop 通过该 channnel 将同步请求的响应结果推送给写协程 sendLoop,并由其完成对 etcd 客户端的响应
    ctrlStream  chan *pb.WatchResponse
    // ...
}

 

2.2 watchableStore

watchableStore 是 etcd 服务端底层负责存储 watcher 监听器的模块:

  • • store:etcd 的数据存储模块. 其中记录了状态机数据提交的事务版本号,该版本号是在整个 etcd 集群中具有一致性保证的

  • • victims:当 watchabelStore 通过 watcherStream channel 向上层 serverWatchStream 发送回调事件途中发现 channel 空间不足时,避免将当前的 notify 协程陷入阻塞,会将这部分 watch 回调事件追加到 victims 列表中

  • • unsynced:对于还存在历史变更记录需要回溯处理的 watcher,会被存放在 unsynced group 当中

  • • synced:对于无历史变更记录需要处理的 watcher,会被存放在 synced group 中. 一旦监听数据再有变更事件发生,即可和 synced group 中的 watcher 进行 join ,并向上层传递 watch 回调事件

type watchableStore struct {
    // etcd 存储模块
    *store
    // 当 watchableStore 通过 watcherStream 的 channel 向上层 serverWatchStream 发送回调事件而发现 channel 空间不足时,会将这部分 watcher 添加到 victims 当中
    victims []watcherBatch
    // ...


    // 假如某些 watcher 监听数据还有一些历史版本的变更事件需要同步时,会将其存放在此处
    unsynced watcherGroup


    // 当 watcher 监听数据的历史变更事件已经回调完成,下次发生变更时可以直接进行回调的话,会将其存放在此处
    synced watcherGroup


    // ...
}

 

type store struct {
    // ...
    // 记录了 etcd 存储模块的版本号. 这个版本号会存着每次写操作的提交而递增,并且在整个 etcd 集群全局会保持最终一致性和顺序一致性.
    currentRev int64
    // ...
}

 

2.3 watchStream

watchStream 是上层 serverWatchStream 和底层 watchableStore 之间的通信交互介质,其中核心字段为:

  • • watchable:即底层存储 watcher 的 watchableStore

  • • ch:当底层 watchableStore 产生新的 watch 回调事件时,会通过该 channel 传递事件至上层的 serverWatchStream

  • • nextID:新创建的下一个 watcher 的标识 id(watcherID),在一个特定的 etcd 服务端节点内具有唯一性且单调递增

type watchStream struct {
    // 指向底层的 watchableStore
    watchable watchable
    // 当底层 watchableStore 中存储的 watch 监听的数据发生变化,会通过该 channel 将通知回调事件发往上层的 serverWatchStream
    ch        chan WatchResponse


    // ...
    // 本节点新创建下一个 watcher 时的使用的 watcher id,每个 etcd server 节点内全局唯一且递增
    nextID   WatchID
    // ...
}

 

2.4 watcherGroup

watchableStore 中 synced group 和 unsynced group 的实现类型都是 watcherGroup. watcherGroup 是 watcher 组的概念,是完全基于内存实现的存储结构,核心字段包括:

  • • keyWatchers:监听单 key 的 watcher 会存放于此. keyWatchers 类型是 map,key 即是监听数据的键 key, val 是一个 watcher set,因为可能存在会多个不同的 watcher 监听相同的 key

  • • ranges:监听 range 范围数据的 watcher 会存放于此. 数据结构是基于红黑树实现的,内部根据监听数据的键 key 进行排序,保证在 join 时能够在 logN 级别的时间复杂度下查找到对应的 watcher.

  • • watchers:当前 etcd server 节点内所有创建的 watcher 都会在此处记录

type watcherGroup struct {
    // keyWatchers has the watchers that watch on a single key
    keyWatchers watcherSetByKey
    // ranges has the watchers that watch a range; it is sorted by interval
    ranges adt.IntervalTree
    // watchers is the set of all watchers
    watchers watcherSet
}


type watcherSetByKey map[string]watcherSet

 

type watcherSet map[*watcher]struct{}

 

// IntervalTree represents a (mostly) textbook implementation of the
// "Introduction to Algorithms" (Cormen et al, 3rd ed.) chapter 13 red-black tree
// and chapter 14.3 interval tree with search supporting "stabbing queries".
type IntervalTree interface {
    // Insert adds a node with the given interval into the tree.
    Insert(ivl Interval, val interface{})
    // Delete removes the node with the given interval from the tree, returning
    // true if a node is in fact removed.
    Delete(ivl Interval) bool
    // Len gives the number of elements in the tree.
    Len() int
    // Height is the number of levels in the tree; one node has height 1.
    Height() int
    // MaxHeight is the expected maximum tree height given the number of nodes.
    MaxHeight() int
    // Visit calls a visitor function on every tree node intersecting the given interval.
    // It will visit each interval [x, y) in ascending order sorted on x.
    Visit(ivl Interval, ivv IntervalVisitor)
    // Find gets the IntervalValue for the node matching the given interval
    Find(ivl Interval) *IntervalValue
    // Intersects returns true if there is some tree node intersecting the given interval.
    Intersects(iv Interval) bool
    // Contains returns true if the interval tree's keys cover the entire given interval.
    Contains(ivl Interval) bool
    // Stab returns a slice with all elements in the tree intersecting the interval.
    Stab(iv Interval) []*IntervalValue
    // Union merges a given interval tree into the receiver.
    Union(inIvt IntervalTree, ivl Interval)
}

 

2.5 watcherBatch

watchableStore 中 victims 的元素类型. watcherBatch 的类型是一个 map,实现了由 watcher 到 watch 回调事件 eventBatch 之间的映射

type watcherBatch map[*watcher]*eventBatch

 

2.6 watcher

watcher 就是针对于指定范围数据进行监听、当其发生变更时对应用方进行通知回调的监听器.

watcher 被存储在 watchableStore 内的 watcher group (synced 和 unsynced)当中,其中核心字段包括:

  • • key:监听数据的键

  • • end:如果监听数据为 range 范围,则通过 end 存储 range 边界信息

  • • victim:标识当前 watcher 对应的 watch 回调事件是否被添加 victims 列表中

  • • minRev:标识出当前 watcher 的回调进度版本号,只有大于等于 minRev 的数据变更事件才会被通知回调,这一设定是为了避免发生重复的 watch 回调

  • • id:watcher 的唯一标识 id,在单个 etcd 服务端节点内唯一且递增

  • • ch:当产生 watch 回调事件时,通过此 channel 将事件发给 serverWatcherStream 的 sendLoop

type watcher struct {
    // watcher 监听数据的键
    key []byte
    // watcher 监听范围数据的键边界
    end []byte
    // 标识出当前 watcher 是否被添加 victims 列表中
    victim bool
    // ...
    // 标识出当前 watcher 监听数据的回调版本进度,只有大于当前版本的变更事件才会被通知回调,以避免重复回调
    minRev int64
    // watcher 的唯一标识 id,在整个 etcd server 内唯一且递增
    id     WatchID


    // ...
    // watcher 对应的数据范围发生变更时,会通过此 channel 将变更事件发给 serverWatcherStream 的 sendLoop
    ch chan<- WatchResponse
}

 

3 搭建通信架构

下面进入几个核心链路的源码走读.

  • • 在 etcd 客户端使用 watch 功能时,会和 etcd 服务端建立 grpc 长连接

  • • etcd 服务端会创建出一个 serverWatchStream 实例

  • • serverWatchStream 会异步启动一个读协程 recvLoop,通过 for 循环自旋 + recv 阻塞读长连接请求的方式,持续接收和处理来自 etcd 客户端创建/删除 watch 的请求

  • • serverWatchStream 会异步启动一个写协程 sendLoop,会通过 for 循环的方式进行自旋,通过 channel 持续接收处理来自 watchableStore 侧的操作 watch 响应结果或者因监听数据变更引起的 watch 回调事件

 

3.1 watchServer.Watch

首先回顾一下 etcd 客户端侧发起的创建 watch 请求,watchClient.Watch 方法向 etcd 服务端发起了一笔 targe 为 /etcdserverpb.Watch/Watch 的 grpc 长连接请求.

func (*watchClient) Watch(ctx context.Context, opts ...grpc.CallOption) (Watch_WatchClient, error) {
    stream, err := c.cc.NewStream(ctx, &_Watch_serviceDesc.Streams[0], "/etcdserverpb.Watch/Watch", opts...)
    // ...
    x := &watchWatchClient{stream}
    return x, nil
}

 

请求来到 etcd 服务端后,会走进 etcdserverpb.Watch 方法对应的 handler:_Watch_Watch_Handler 当中,然后进一步走进 watchServer.Watch 方法当中.

var _Watch_serviceDesc = grpc.ServiceDesc{
    ServiceName: "etcdserverpb.Watch",
    HandlerType: (*WatchServer)(nil),
    Methods:     []grpc.MethodDesc{},
    Streams: []grpc.StreamDesc{
        {
            StreamName:    "Watch",
            Handler:       _Watch_Watch_Handler,
            ServerStreams: true,
            ClientStreams: true,
        },
    },
    Metadata: "rpc.proto",
}

 

func _Watch_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
    return srv.(WatchServer).Watch(&watchWatchServer{stream})
}

 

在 watchServer.Watch 方法中完成了几步工作:

  • • 创建了一个 serverWatchStream 实例

  • • 创建了用于 serverWatchStream 和 watchableStore 通信的 watchStream

  • • 异步启动了读写协程 recvLoop

  • • 异步启动了写协程 sendLoop.

func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
    sws := serverWatchStream{
        // ...
        watchable: ws.watchable,
        // ...
        // 与客户端建立的长连接
        gRPCStream:  stream,
        watchStream: ws.watchable.NewWatchStream(),
        // ...
    }


    sws.wg.Add(1)
    go func() {
        sws.sendLoop()
        sws.wg.Done()
    }()


    // ...
    go func() {
        if rerr := sws.recvLoop(); rerr != nil {
            // ...
        }
    }()
    // ...
}

 

3.2 watchableStore.NewWatchStream

watchStream 是上层 serverWatchStream 和底层存储 watchableStore 交互的通道:

  • • 成员字段 watchable 即为底层的 watchableStore 模块

  • • ch 是由于将底层 watch 回调事件发往上层的通道

func (*watchableStore) NewWatchStream() WatchStream {
    // ...
    return &watchStream{
        watchable: s,
        ch:        make(chan WatchResponse, chanBufLen),
        cancels:   make(map[WatchID]cancelFunc),
        watchers:  make(map[WatchID]*watcher),
    }
}

 

3.3 serverWatchStream.recvLoop

  • • recvLoop 是 serverWatchStream 异步启动的读协程

  • • 其运行结构是一个自旋的 for 循环,每轮都会阻塞尝试从客户端长连接 grpc stream 中读取来自客户端的请求

  • • 如果接收到创建 watch 请求,会根据其监听 key 的数据范围,对 req 的 key 和 end 字段进行调整

  • • 同步调用 watchStream.Watch 方法将创建 watcher 的请求发往底层的 watchableStore

  • • 创建好 watcher 后,通过 ctrlStream channel 将响应结果发往 sendLoop 中,并由其将响应通过 grpc 长连接发往 etcd 客户端

func (sws *serverWatchStream) recvLoop() error {
    for {
        // 通过 stream 接收到来自客户端的请求
        req, err := sws.gRPCStream.Recv()
        // ...


        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest:
            // ...


            creq := uv.CreateRequest
            // ...
            if len(creq.RangeEnd) == 0 {
                // force nil since watchstream.Watch distinguishes
                // between nil and []byte{} for single key / >=
                creq.RangeEnd = nil
            }
            if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
                // support  >= key queries
                creq.RangeEnd = []byte{}
            }


            // ...
            wsrev := sws.watchStream.Rev()
            rev := creq.StartRevision
            if rev == 0 {
                rev = wsrev + 1
            }
            // 创建 watcher
            id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
            // ...


            wr := &pb.WatchResponse{
                Header:   sws.newResponseHeader(wsrev),
                WatchId:  int64(id),
                Created:  true,
                Canceled: err != nil,
            }
          
            // ...
            select {
            case sws.ctrlStream <- wr:
            // ...
            }
        // ...
        }
     }
 }   

 

3.4 serverWatchStream.sendLoop

sendLoop 方法 serverWatchStream 异步启动的写协程,通过一个 for 循环自旋运行,并持续监听两个 channel:

  • • watchStream 中的 ch,用于接收来自 watchableStore 模块的 watch 回调事件

  • • serverWatchStream 中的 ctrlStream,由于接收来自 recvLoop 发送的操作 watch 的响应结果

不论是以上的哪种事件,sendLoop 都会通过 grpcStream.Send 方法,将事件发往 etcd 客户端

func (sws *serverWatchStream) sendLoop() {
    // watch ids that are currently active
    ids := make(map[mvcc.WatchID]struct{})
    // watch responses pending on a watch id creation message
    pending := make(map[mvcc.WatchID][]*pb.WatchResponse)


    // ...
    // ...
    for {
        select {
        // 接受到 watch 回调事件
        case wresp, ok := <-sws.watchStream.Chan():
            // ...
            evs := wresp.Events
            events := make([]*mvccpb.Event, len(evs))
            sws.mu.RLock()
            needPrevKV := sws.prevKV[wresp.WatchID]
            sws.mu.RUnlock()
            for i := range evs {
                events[i] = &evs[i]
                // ...             
            }


            // ...
            wr := &pb.WatchResponse{
                Header:          sws.newResponseHeader(wresp.Revision),
                WatchId:         int64(wresp.WatchID),
                Events:          events,
                CompactRevision: wresp.CompactRevision,
                Canceled:        canceled,
            }


            // Progress notifications can have WatchID -1
            // if they announce on behalf of multiple watchers
            if wresp.WatchID != clientv3.InvalidWatchID {
                // 如果对应的 watch 还未创建好,则先添加到 pending
                if _, okID := ids[wresp.WatchID]; !okID {
                    // buffer if id not yet announced
                    wrs := append(pending[wresp.WatchID], wr)
                    pending[wresp.WatchID] = wrs
                    continue
                }
            }


            // ...          
            serr = sws.gRPCStream.Send(wr)
            // ...         


        // recvLoop 中对创建/删除 watch 请求处理完成,会通过 ctrlStream 接收到对应的 response 
        case c, ok := <-sws.ctrlStream:
            if !ok {
                return
            }


            // 将创建 watch 的响应通过 grpc 长连接发往 etcd 客户端
            if err := sws.gRPCStream.Send(c); err != nil {
                // ...
                return
            }




            // track id creation
            wid := mvcc.WatchID(c.WatchId)


            // 对于创建好的 watch,将其从 pendings 转移到 ids 当中
            if c.Created {
                // flush buffered events
                ids[wid] = struct{}{}
                // ...
                delete(pending, wid)
            }


        // ...
        }
    }
}       

 

4 创建 watch 链路

下面进入创建 watch 方法链路的源码走读.

4.1 serverWatchStream.recvLoop

  • • 首先,serverWatchStream 的读协程 recvLoop 会通过 grpc 长连接接收到来自 etcd 客户端侧的创建 watcher 的请求

  • • 根据监听数据的范围,对 req 的 key 和 end 字段进行设置

  • • 调用 watchStream.Rev 方法,获取到 store 存储模块状态数据变更的最新事务版本号,默认在此版本号的基础上加一,作为 watcher 监听数据变更事件的起始版本号

  • • 调用 watchStream.watch 方法,步入 watchableStore 模块,进行 watcher 的创建

  • • 完成 watch 创建后,会取得 watchID,将其封装到 response 当中,通过 serverWatchStream.ctrlStream 发送到 sendLoop 当中

func (sws *serverWatchStream) recvLoop() error {
    for {
        // 通过 stream 接收到来自客户端的请求
        req, err := sws.gRPCStream.Recv()
        // ...


        switch uv := req.RequestUnion.(type) {
        case *pb.WatchRequest_CreateRequest:
            // ...


            creq := uv.CreateRequest
            // 对 watch target 的右边界进行调整
            if len(creq.RangeEnd) == 0 {
                // force nil since watchstream.Watch distinguishes
                // between nil and []byte{} for single key / >=
                creq.RangeEnd = nil
            }
            if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
                // support  >= key queries
                creq.RangeEnd = []byte{}
            }


            // 获取当前 store 模块的版本号,默认会从下一个版本开始建立 watch 监听动作
            wsrev := sws.watchStream.Rev()
            rev := creq.StartRevision
            if rev == 0 {
                rev = wsrev + 1
            }
            // 创建 watcher
            id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
            // ...


            wr := &pb.WatchResponse{
                Header:   sws.newResponseHeader(wsrev),
                WatchId:  int64(id),
                Created:  true,
                Canceled: err != nil,
            }
          
            // ...
            select {
            case sws.ctrlStream <- wr:
            // ...
            }
        // ...
        }
    }
}

 

4.2 watchStream.Watch

watchStream 位于上层 serverWatchStream 和下层 watchableStore 之间,扮演中转层的角色.

在 watchStream.Watch 方法中,为 watcher 分配了当前 etcd 服务端节点单调递增的 watchID,然后调用 watchableStore.watch 方法进入底层开始 watcher 的创建

// Watch creates a new watcher in the stream and returns its WatchID.
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
    // ...
    ws.mu.Lock()
    defer ws.mu.Unlock()
    // ...


    // watch id 节点内递增
    if id == clientv3.AutoWatchID {
        for ws.watchers[ws.nextID] != nil {
            ws.nextID++
        }
        id = ws.nextID
        ws.nextID++
    } else if _, ok := ws.watchers[id]; ok {
        return -1, ErrWatcherDuplicateID
    }
    
    // 创建 watcher
    w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)


    // ...
    ws.watchers[id] = w
    return id, nil
}

 

4.3 watchableStore.watch

至此进入底层 watchableStore 模块.

watchableStore.watch 方法是创建 watcher 的核心方法,步骤包括:

  • • 创建出 watcher 实例,并初步设定出此 watcher 开始监听的版本号 minRev(由上层传入)

  • • 倘若创建的 watcher 未显式声明监听版本号,或者要监听的版本号大于当前 store 模块的最新数据版本,则一律按照从下一次变更事件开始监听的方式作处理. 此时,会将这个新的 watcher 直接添加到 synced group 当中

  • • 若创建的 watcher 有显式设定监听版本且该版本早于当前 store 模块的最新数据版本,这说明该 watcher 需要先对历史变更记录进行回溯才能发起新一轮的 watch 回调,因此会把 watcher 添加 unsynced group 当中

func (*watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
    wa := &watcher{
        key:    key,
        end:    end,
        minRev: startRev,
        id:     id,
        ch:     ch,
        fcs:    fcs,
    }


    s.mu.Lock()
    s.revMu.RLock()
    // 如果未设置 watch 数据的版本号,或者版本号大于当前 store 的事务版本号,则默认从下一个事务版本号开始进行 watch 监听,并且将 watch 添加到 synced
    // 否则说明需要对历史版本的变更事件也进行回溯,则会将 watch 添加到 unsynced
    synced := startRev > s.store.currentRev || startRev == 0
    if synced {
        wa.minRev = s.store.currentRev + 1
        if startRev > wa.minRev {
            wa.minRev = startRev
        }
        s.synced.add(wa)
    } else {
        // ...
        s.unsynced.add(wa)
    }
    s.revMu.RUnlock()
    s.mu.Unlock()


    // ...
    return wa, func() { s.cancelWatcher(wa) }
}

 

4.4 serverWatchStream.sendLoop

sendLoop 通过 serverWatchStream 的 ctrlStream 接收到创建 watcher 的响应结果后,会调用 grpcStream.Send 方法将其发往 etcd 客户端.

func (sws *serverWatchStream) sendLoop() {
    // watch ids that are currently active
    ids := make(map[mvcc.WatchID]struct{})
    // watch responses pending on a watch id creation message
    pending := make(map[mvcc.WatchID][]*pb.WatchResponse)


    // ...
    for {
        select {
        // ...    
        // recvLoop 中对创建/删除 watch 请求处理完成,会通过 ctrlStream 接收到对应的 response 
        case c, ok := <-sws.ctrlStream:
            if !ok {
                return
            }


            // 将创建 watch 的响应通过 grpc 长连接发往 etcd 客户端
            if err := sws.gRPCStream.Send(c); err != nil {
                // ...
                return
            }


            // track id creation
            wid := mvcc.WatchID(c.WatchId)


            // 对于创建好的 watch,将其从 pendings 转移到 ids 当中
            if c.Created {
                // flush buffered events
                ids[wid] = struct{}{}
                // ...
                delete(pending, wid)
            }


        // ...
        }
    }
}

 

5 watch 回调链路

1.3 小节中有提到,etcd 基于 raft 算法,通过 wal + state machine 两步走的方式保证了全局数据的一致性. 对于一笔被多数派认可的写操作,理论上会在所有 etcd 节点完成一次提交并写入状态机的操作. 这个写入状态机的事务操作位于 watchableStoreTxnWrite.End 方法当中,这是一个全局统一的切面,正是在此处,etcd 服务端会针对每个节点发起一轮 watcher 的 join 并组装出 watch 回调事件向上层进行传递.

 

5.1 watchableStoreTxnWrite.End

当数据被写入状态机时,会步入到 watchableStoreTxnWrite.End 方法:

  • • 首先会通过 watchableStoreTxnWrite.Changes 方法获取到这次写操作涉及到的 kv 对范围

  • • 通过 watchableStoreTxnWrite.Rev 获取本次写操作的事务版本 id,这项数据在整个 etcd 集群是保持一致的. 将事务 id 加一,作为本轮 watch 回调事件的版本号 ModRevision

  • • 将这些变更的 kv 对组装成变更事件列表 evs,调用 watchableStore.notify 方法,进行事件回调

func (tw *watchableStoreTxnWrite) End() {
    changes := tw.Changes()
    if len(changes) == 0 {
        tw.TxnWrite.End()
        return
    }


    rev := tw.Rev() + 1
    evs := make([]mvccpb.Event, len(changes))
    for i, change := range changes {
        evs[i].Kv = &changes[i]
        if change.CreateRevision == 0 {
            evs[i].Type = mvccpb.DELETE
            evs[i].Kv.ModRevision = rev
        } else {
            evs[i].Type = mvccpb.PUT
        }
    }


    // end write txn under watchable store lock so the updates are visible
    // when asynchronous event posting checks the current store revision
    tw.s.mu.Lock()
    tw.s.notify(rev, evs)
    tw.TxnWrite.End()
    tw.s.mu.Unlock()
}

 

5.2 watchableStore.notify

该方法位于 watchableStore 模块,核心步骤包括:

  • • 调用了 newWatcherBatch 方法,将本次变更的数据和 synced group 中的 watchers 进行join,生成出真正需要 watch 回调的事件列表

  • • 依次调用 watcher.send 方法,将回调事件通过 watchStream 发往上层 serverWatchStream 模块的 sendLoop

  • • 倘若 watchStream 中的 channel 容量不足,此时会将回调事件暂存到 watchableStore.victims 列表中

  • • 不论 watcher.send 操作的成败都会对 当前 watcher 的 minRev 进行更新,表示对应版本的回调事件已经被处理过了,避免后续被重复执行(哪怕 send 失败,对应事件也已被 append 到 victim 当中,后续会被补偿执行)

func (*watchableStore) notify(rev int64, evs []mvccpb.Event) {
    victim := make(watcherBatch)
    for w, eb := range newWatcherBatch(&s.synced, evs) {
        // ...
        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
            // ...
        } else {
            // move slow watcher to victims
            w.victim = true
            victim[w] = eb
            s.synced.delete(w)
            // ...
        }
        // ...
        w.minRev = rev + 1
    }
    s.addVictim(victim)
}

 

5.3 newWatcherBatch

在 newWatcherBatch 方法中,会结合 synced group 中的 watchers 以及当前传入的 kv 变更事件 evs 进行 join,只有 key 相同,并数据变更版本号大于等于 watcher minRev 的事件,才会添加到出参的 watcherBatch 中,进行 watch 事件回调

func newWatcherBatch(wg *watcherGroup, evs []mvccpb.Event) watcherBatch {
    if len(wg.watchers) == 0 {
        return nil
    }


    wb := make(watcherBatch)
    for _, ev := range evs {
        for w := range wg.watcherSetByKey(string(ev.Kv.Key)) {
            if ev.Kv.ModRevision >= w.minRev {
                // don't double notify
                wb.add(w, ev)
            }
        }
    }
    return wb
}

 

5.4 watcher.send

在 watcher.send 方法中,会尝试执行一笔 watch 回调动作:

  • • 先过一轮过滤器,将未通过校验的事件过滤掉

  • • 尝试通过 watcher.ch(和 watchStream.ch 是同一个 channel)将 watch 回调事件发往上层 serverWatchStream 的 sendLoop

  • • 如果因 channel 容量不足而发送失败,这部分回调事件会在 watchableStore.notify 方法中被追加到 victims 当中.

func (*watcher) send(wr WatchResponse) bool {
    progressEvent := len(wr.Events) == 0


    if len(w.fcs) != 0 {
        ne := make([]mvccpb.Event, 0, len(wr.Events))
        for i := range wr.Events {
            filtered := false
            for _, filter := range w.fcs {
                if filter(wr.Events[i]) {
                    filtered = true
                    break
                }
            }
            if !filtered {
                ne = append(ne, wr.Events[i])
            }
        }
        wr.Events = ne
    }


    // if all events are filtered out, we should send nothing.
    if !progressEvent && len(wr.Events) == 0 {
        return true
    }
    select {
    case w.ch <- wr:
        return true
    default:
        return false
    }
}

 

5.5 serverWatchStream.sendLoop

serverWatchStream 的写协程 sendLoop 通过 watchStream 的 ch 接收到来自底层 watchableStore 传达的回调事件后,会将其封装到 pb.WatchResponse 当中,通过 grpc 长连接发往 etcd 客户端.

func (sws *serverWatchStream) sendLoop() {
    // watch ids that are currently active
    ids := make(map[mvcc.WatchID]struct{})
    // watch responses pending on a watch id creation message
    pending := make(map[mvcc.WatchID][]*pb.WatchResponse)


    interval := GetProgressReportInterval()
    progressTicker := time.NewTicker(interval)


    // ...
    for {
        select {
        case wresp, ok := <-sws.watchStream.Chan():
            // ...
            evs := wresp.Events
            events := make([]*mvccpb.Event, len(evs))
            // ...
            for i := range evs {
                events[i] = &evs[i]
                // ...
            }


            // ...
            wr := &pb.WatchResponse{
                Header:          sws.newResponseHeader(wresp.Revision),
                WatchId:         int64(wresp.WatchID),
                Events:          events,
                CompactRevision: wresp.CompactRevision,
                Canceled:        canceled,
            }


            // Progress notifications can have WatchID -1
            // if they announce on behalf of multiple watchers
            if wresp.WatchID != clientv3.InvalidWatchID {
                // 如果对应的 watch 还未创建好,则先添加到 pending
                if _, okID := ids[wresp.WatchID]; !okID {
                    // buffer if id not yet announced
                    wrs := append(pending[wresp.WatchID], wr)
                    pending[wresp.WatchID] = wrs
                    continue
                }
            }


            // ...
          
            serr = sws.gRPCStream.Send(wr)
            // ...


            // ...           
        // ...
        }
    }
 } 

 

6 watchableStore 数据刷新

1.4 小节中谈到,watchableStore 中的 watcher 会分别存放在 synced group 和 unsynced group 中. 此外,倘若回调流程中通过 watchStream 尝试发往上层的回调事件因 channel 空间不足而发生失败,则会被追加到 victims 列表当中.

下面我们就一起来看看,这部分 unsynced group 中的 watcher 和 victims 当中的 event 是何时被消化处理的.

6.1 newWatchableStore

在 watchableStore 被初始化时,会异步启动两个协程 syncWatchersLoop 和 syncVictimsLoop,分别负责定期清理 unsynced group 和 victims.

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
    // ...
    s := &watchableStore{
        store:    NewStore(lg, b, le, cfg),
        victimc:  make(chan struct{}, 1),
        unsynced: newWatcherGroup(),
        synced:   newWatcherGroup(),
        stopc:    make(chan struct{}),
    }
    s.store.ReadView = &readView{s}
    s.store.WriteView = &writeView{s}
    s.wg.Add(2)
    go s.syncWatchersLoop()
    go s.syncVictimsLoop()
    return s
}

 

6.2 syncWatchersLoop

在守护协程 watchableStore.syncWatchersLoop 方法中,每隔 100ms 会尝试处理一轮 unsyncd group 中的 watchers,处理方法的核心逻辑位于 watchableStore.syncWatchers 方法当中:

func (*watchableStore) syncWatchersLoop() {
    defer s.wg.Done()


    waitDuration := 100 * time.Millisecond
    delayTicker := time.NewTicker(waitDuration)
    defer delayTicker.Stop()


    for {
        s.mu.RLock()
        st := time.Now()
        lastUnsyncedWatchers := s.unsynced.size()
        s.mu.RUnlock()


        unsyncedWatchers := 0
        if lastUnsyncedWatchers > 0 {
            unsyncedWatchers = s.syncWatchers()
        }
        syncDuration := time.Since(st)


        delayTicker.Reset(waitDuration)
        // more work pending?
        if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
            // be fair to other store operations by yielding time taken
            delayTicker.Reset(syncDuration)
        }


        select {
        case <-delayTicker.C:
        case <-s.stopc:
            return
        }
    }
}

 

在 watchableStore.syncWatchers 方法中,会调用 baseReadTx.UnsafeRange 方法,获取到 watcher 关心的历史变更记录,然后依次遍历调用 send 方法将其发往上层,如果发送成功则添加到 synced group 中,发送失败则添加到 victims.

不论此处发送成功还是失败,都会对 watcher 监听数据的版本进行更新,因为进入 victims 中的回调事件会被补偿处理,无须继续关心.

func (*watchableStore) syncWatchers() int {
    s.mu.Lock()
    defer s.mu.Unlock()


    if s.unsynced.size() == 0 {
        return 0
    }


    s.store.revMu.RLock()
    defer s.store.revMu.RUnlock()


    // in order to find key-value pairs from unsynced watchers, we need to
    // find min revision index, and these revisions can be used to
    // query the backend store of key-value pairs
    curRev := s.store.currentRev
    // ...
    wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
    minBytes, maxBytes := newRevBytes(), newRevBytes()
    revToBytes(revision{main: minRev}, minBytes)
    revToBytes(revision{main: curRev + 1}, maxBytes)


    // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
    // values are actual key-value pairs in backend.
    tx := s.store.b.ReadTx()
    tx.RLock()
    revs, vs := tx.UnsafeRange(schema.Key, minBytes, maxBytes, 0)
    evs := kvsToEvents(s.store.lg, wg, revs, vs)
    // Must unlock after kvsToEvents, because vs (come from boltdb memory) is not deep copy.
    // We can only unlock after Unmarshal, which will do deep copy.
    // Otherwise we will trigger SIGSEGV during boltdb re-mmap.
    tx.RUnlock()


    victims := make(watcherBatch)
    wb := newWatcherBatch(wg, evs)
    for w := range wg.watchers {
        w.minRev = curRev + 1


        eb, ok := wb[w]
        if !ok {
            // bring un-notified watcher to synced
            s.synced.add(w)
            s.unsynced.delete(w)
            continue
        }


        if eb.moreRev != 0 {
            w.minRev = eb.moreRev
        }


        if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
            // ...
        } else {
            w.victim = true
        }


        if w.victim {
            victims[w] = eb
        } else {
            if eb.moreRev != 0 {
                // stay unsynced; more to read
                continue
            }
            s.synced.add(w)
        }
        s.unsynced.delete(w)
    }
    s.addVictim(victims)


    vsz := 0
    for _, v := range s.victims {
        vsz += len(v)
    }
    // ...
    return s.unsynced.size()
}

 

6.3 syncVictimsLoop

负责清理 victims 的 watchableStore.syncVictimsLoop 协程中,每隔 10 ms 会尝试对 victims 进行一轮处理,使用的方法是 watchableStore.moveVictims.

func (*watchableStore) syncVictimsLoop() {
    defer s.wg.Done()


    for {
        for s.moveVictims() != 0 {
            // try to update all victim watchers
        }
        s.mu.RLock()
        isEmpty := len(s.victims) == 0
        s.mu.RUnlock()


        var tickc <-chan time.Time
        if !isEmpty {
            tickc = time.After(10 * time.Millisecond)
        }


        select {
        case <-tickc:
        case <-s.victimc:
        case <-s.stopc:
            return
        }
    }
}

 

watchableStore.moveVictims 方法中,会遍历一轮 victims 中的事件列表,尝试调用 send 方法将其发送到上层,如果发送失败的话,则将其继续保留在 victims 当中.

此外,对于 victims 当中的每个事件的 watcher 都会进行处理,如果其版本小于当前 store 的版本,则会添加到 unsynced group ,否则加到 synced group当中(两个 group 本身实现了去重).

// moveVictims tries to update watches with already pending event data
func (*watchableStore) moveVictims() (moved int) {
    s.mu.Lock()
    victims := s.victims
    s.victims = nil
    s.mu.Unlock()


    var newVictim watcherBatch
    for _, wb := range victims {
        // try to send responses again
        for w, eb := range wb {
            // watcher has observed the store up to, but not including, w.minRev
            rev := w.minRev - 1
            if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
                // ...
            } else {
                if newVictim == nil {
                    newVictim = make(watcherBatch)
                }
                newVictim[w] = eb
                continue
            }
            moved++
        }


        // assign completed victim watchers to unsync/sync
        s.mu.Lock()
        s.store.revMu.RLock()
        curRev := s.store.currentRev
        for w, eb := range wb {
            if newVictim != nil && newVictim[w] != nil {
                // couldn't send watch response; stays victim
                continue
            }
            w.victim = false
            if eb.moreRev != 0 {
                w.minRev = eb.moreRev
            }
            if w.minRev <= curRev {
                s.unsynced.add(w)
            } else {
                // ...
                s.synced.add(w)
            }
        }
        s.store.revMu.RUnlock()
        s.mu.Unlock()
    }


    if len(newVictim) > 0 {
        s.mu.Lock()
        s.victims = append(s.victims, newVictim)
        s.mu.Unlock()
    }


    return moved
}

 

7 两个注意问题

7.1 watcher 易失性存储?

前文谈到 watchableStore 时,我多次提到 etcd 服务端节点中创建的 watcher 是存储在 watchableStore 模块的内存中的(synced group 和 unsynced group). 这部分数据属于易失性存储,且每个 etcd 服务端节点的数据各不相同,因此不存在数据备份.

看到此大家难免在心中产生这样一个疑问:倘若 etcd 节点宕机,或者 etcd 进程挂了,岂不是会导致创建的 watcher 永久丢失不可恢复了吗?

关于这个问题我的个人理解是(不一定对哈,欢迎指正),由于 etcd watch 功能是基于 grpc 长连接实现的,这依赖于 etcd 客户端和某个特定的 etcd 服务端节点之间保持健康良好的长连接通信关系,并且后续的 watch 回调事件也强依赖于这笔 grpc 长连接才能完成. 倘若 etcd 节点宕机或者进程挂了,那么这笔 grpc 长连接也就断开了,因此这次 watch 动作就注定是失败的,哪怕通过一些特殊机制保证创建的 watcher 不丢失,本身的意义也不大.

 

7.2 watch 数据更新停滞?

etcd 通过 raft 算法实现,基于多数派准则保证了整个集群的健壮性,然而这并不能保证所有节点都处于健康状态,比如 etcd 集群中可以容忍少数几个节点和多数派之间存在网络分区,这部分少数派的状态数据是有可能长时间得不到更新同步的.

在 etcd watch 功能中,etcd 客户端会和某个特定的 etcd 服务端节点建立长连接并开启 watch 监听,倘若选中的这个服务端节点正好和 etcd 集群中的 leader 产生了网络分区,导致其中的状态机数据长期停滞得不到更新,这样对应的 watch 功能也会随之失效.

这个问题是客观存在的,但也并非无解. 在使用 etcd watch 时,可以强制指定与 etcd 客户端建立 watch 长连接的 etcd 服务端节点必须处在和 leader 保持通信的多数派当中,这样就能规避因网络分区而导致的状态数据更新停滞问题.

具体的操作方式是,在创建 watcher 的时候,显式添加一个 requireLeader 的 option:

    ctx := etcdClient.WithRequireLeader(ctx)
    ch,:= etcdManager.NewWatchChannel(ctx)

 

8 总结

至此,etcd watch 机制源码解析系列完结.

本文聚焦于 etcd 服务端模块的底层实现细节,核心点包括:

  • • watch 功能通过 etcd 客户端与 etcd 服务端特定节点间建立的 grpc 长连接实现

  • • etcd 服务端模块分为上层的 serverWatchStream 和底层的 watchableStore 模块两部分

  • • serverWatchStream 承上启下,一方面和 etcd 客户端通过长连接通信,一方面和 watchableStore 通过 watchStream 通信交互

  • • 在 serverWatchStream 中会异步启动读协程 recvLoop 和写协程 sendLoop,分别负责读写客户端的请求/响应数据

  • • watchableStore 中会完全基于内存存储 watcher 的存储,根据是否需要回溯历史变更记录,watcher 会被分为 syncd、uysynced 两部分. 此外还有一个 victims 部分用于暂存因容量不足而发送阻塞的回调事件

  • • 在每个 etcd 服务端节点将数据写入状态机的位置存在一个公共的切面,在这个切面上 etcd 执行了 notify 操作,基于变更的数据记录和当前节点中存在的 watchers 进行 join,生成一个批次的 watch 回调事件,然后向上传递,完成通知回调


小徐先生的编程世界
在学钢琴,主业码农
 最新文章