Golang 设计模式之观察者模式

文摘   科技   2023-06-16 19:28   北京  

1 原理介绍

本期基于 go 语言和大家探讨设计模式中的观察者模式. 观察者模式适用于多对一的订阅/发布场景.

  • • ”多“:指的是有多名观察者

  • • ”一“:指的是有一个被观察事物

  • • ”订阅“:指的是观察者时刻关注着事物的动态

  • • ”发布“:指的是事物状态发生变化时是透明公开的,能够正常进入到观察者的视线

在上述场景中,我们了解到核心对象有两类,一类是“观察者”,一类是“被观察的事物”,且两者间在数量上存在多对一的映射关系.

在具体作编程实现时,上述场景的实现思路可以是百花齐放的,而观察者模式只是为我们提供了一种相对规范的设计实现思路,其遵循的核心宗旨是实现“观察者”与“被观察对象”之间的解耦,并将其设计为通用的模块,便于后续的扩展和复用.

学习设计模式时,我们脑海中需要中需要明白,教条是相对刻板的,而场景和问题则是灵活多变的,在工程实践中,我们避免生搬硬套,要做到因地制宜,随机应变.

 

2 代码实践

2.1 核心角色

在观察者模式中,核心的角色包含三类:

  • • Observer:观察者. 指的是关注事物动态的角色

  • • Event:事物的变更事件. 其中 Topic 标识了事物的身份以及变更的类型,Val 是变更详情

  • • EventBus:事件总线. 位于观察者与事物之间承上启下的代理层. 负责维护管理观察者,并且在事物发生变更时,将情况同步给每个观察者.

 

观察者模式的核心就在于建立了 EventBus 的角色. 由于 EventBus 模块的诞生,实现了观察者与具体被观察事物之间的解耦:

  • • 针对于观察者而言,需要向 EventBus 完成注册操作,注册时需要声明自己关心的变更事件类型(调用 EventBus 的 Subscribe 方法),不再需要直接和事物打交道

  • • 针对于事物而言,在其发生变更时,只需要将变更情况向 EventBus 统一汇报即可(调用 EventBus 的 Publish 方法),不再需要和每个观察者直接交互

  • • 对于 EventBus,需要提前维护好每个观察者和被关注事物之间的映射关系,保证在变更事件到达时,能找到所有的观察者逐一进行通知(调用 Observer 的 OnChange 方法)

 

三类角色组织生成的 UML 类图如下所示:

 

对应的代码实现示例展示如下:

type Event struct {
    Topic string
    Val   interface{}
}


type Observer interface {
    OnChange(ctx context.Context, e *Event) error
}


type EventBus interface {
    Subscribe(topic string, o Observer)
    Unsubscribe(topic string, o Observer)
    Publish(ctx context.Context, e *Event)
}

 

观察者 Observer 需要实现 OnChange 方法,用于向 EventBus 暴露出通知自己的“联系方式”,并且在方法内部实现好当关注对象发生变更时,自己需要采取的处理逻辑.

下面给出一个简单的观察者实现示例 BaseObserver:

type BaseObserver struct {
    name string
}


func NewBaseObserver(name string) *BaseObserver {
    return &BaseObserver{
        name: name,
    }
}


func (b *BaseObserver) OnChange(ctx context.Context, e *Event) error {
    fmt.Printf("observer: %s, event key: %s, event val: %v", b.name, e.Topic, e.Val)
    // ...
    return nil
}

 

事件总线 EventBus 需要实现 Subscribe 和 Unsubscribe 方法暴露给观察者,用于新增或删除订阅关系,其实现示例如下:

type BaseEventBus struct {
    mux       sync.RWMutex
    observers map[string]map[Observer]struct{}
}


func NewBaseEventBus() BaseEventBus {
    return BaseEventBus{
        observers: make(map[string]map[Observer]struct{}),
    }
}


func (b *BaseEventBus) Subscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    _, ok := b.observers[topic]
    if !ok {
        b.observers[topic] = make(map[Observer]struct{})
    }
    b.observers[topic][o] = struct{}{}
}


func (b *BaseEventBus) Unsubscribe(topic string, o Observer) {
    b.mux.Lock()
    defer b.mux.Unlock()
    delete(b.observers[topic], o)
}

针对 EventBus 将事物变更事件同步给每个观察者的 Publish 流程,可以分为同步模式和异步模式,分别在 2.2 小节和 2.3 小节中展开介绍.

 

2.2 同步模式

在同步模式的实现中,通过 SyncEventBus 实现了 EventBus 的同步通知版本,对应类图如下:

 

 

在同步模式下,EventBus 在接受到变更事件 Event 时,会根据事件类型 Topic 匹配到对应的观察者列表 observers,然后采用串行遍历的方式分别调用 Observer.OnChange 方法对每个观察者进行通知,并对处理流程中遇到的错误进行聚合,放到 handleErr 方法中进行统一的后处理.

type SyncEventBus struct {
    BaseEventBus
}


func NewSyncEventBus() *SyncEventBus {
    return &SyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
}


func (s *SyncEventBus) Publish(ctx context.Context, e *Event) {
    s.mux.RLock()
    subscribers := s.observers[e.Topic]
    s.mux.RUnlock()


    errs := make(map[Observer]error)
    for subscriber := range subscribers {
        if err := subscriber.OnChange(ctx, e); err != nil {
            errs[subscriber] = err
        }
    }


    s.handleErr(ctx, errs)
}

 

此处对 handleErr 方法的实现逻辑进行建立了简化,在真实的实践场景中,可以针对遇到的错误建立更完善的后处理流程,如采取重试或告知之类的操作.

func (s *SyncEventBus) handleErr(ctx context.Context, errs map[Observer]error) {
    for o, err := range errs {
        // 处理 publish 失败的 observer
        fmt.Printf("observer: %v, err: %v", o, err)
    }
}

 

2.3 异步模式

在异步模式的实现中,通过 AsyncEventBus 实现了 EventBus 的异步通知版本,对应类图如下:

 

 

在异步模式下,会在 EventBus 启动之初,异步启动一个守护协程,负责对接收到的错误进行后处理.

在事物发生变更时,EventBus 的 Publish 方法会被调用,此时 EventBus 会并发调用 Observer.OnChange 方法对每个观察者进行通知,在这个过程中遇到的错误会通过 channel 统一汇总到 handleErr 的守护协程中进行处理.

type observerWithErr struct {
    o   Observer
    err error
}




type AsyncEventBus struct {
    BaseEventBus
    errC chan *observerWithErr
    ctx  context.Context
    stop context.CancelFunc
}




func NewAsyncEventBus() *AsyncEventBus {
    aBus := AsyncEventBus{
        BaseEventBus: NewBaseEventBus(),
    }
    aBus.ctx, aBus.stop = context.WithCancel(context.Background())
    // 处理处理错误的异步守护协程
    go aBus.handleErr()
    return &aBus
}


func (a *AsyncEventBus) Stop() {
    a.stop()
}


func (a *AsyncEventBus) Publish(ctx context.Context, e *Event) {
    a.mux.RLock()
    subscribers := a.observers[e.Topic]
defer a.mux.RUnlock()
    for subscriber := range subscribers {
        // shadow
        subscriber := subscriber
        go func() {
            if err := subscriber.OnChange(ctx, e); err != nil {
                select {
                case <-a.ctx.Done():
                case a.errC <- &observerWithErr{
                    o:   subscriber,
                    err: err,
                }:
                }
            }
        }()
    }
}


func (a *AsyncEventBus) handleErr() {
    for {
        select {
        case <-a.ctx.Done():
            return
        case resp := <-a.errC:
            // 处理 publish 失败的 observer
            fmt.Printf("observer: %v, err: %v", resp.o, resp.err)
        }
    }

 

2.4 使用示例

下面分别给出同步和异步模式下观察者模式的使用示例:

func Test_syncEventBus(t *testing.T) {
    observerA := NewBaseObserver("a")
    observerB := NewBaseObserver("b")
    observerC := NewBaseObserver("c")
    observerD := NewBaseObserver("d")


    sbus := NewSyncEventBus()
    topic := "order_finish"
    sbus.Subscribe(topic, observerA)
    sbus.Subscribe(topic, observerB)
    sbus.Subscribe(topic, observerC)
    sbus.Subscribe(topic, observerD)


    sbus.Publish(context.Background(), &Event{
        Topic: topic,
        Val:   "order_id: xxx",
    })
}

 

异步测试代码:

func Test_asyncEventBus(t *testing.T) {
    observerA := NewBaseObserver("a")
    observerB := NewBaseObserver("b")
    observerC := NewBaseObserver("c")
    observerD := NewBaseObserver("d")


    abus := NewAsyncEventBus()
    defer abus.Stop()


    topic := "order_finish"
    abus.Subscribe(topic, observerA)
    abus.Subscribe(topic, observerB)
    abus.Subscribe(topic, observerC)
    abus.Subscribe(topic, observerD)


    abus.Publish(context.Background(), &Event{
        Topic: topic,
        Val:   "order_id: xxx",
    })


    <-time.After(time.Second)
}

 

3 工程案例

本章和大家一起梳理一下在工程实践中对观察者模式的使用场景.

3.1 MQ 发布/订阅

 

大家耳熟能详的消息队列就是对观察者模式的一种实践,大家可以采用类比的方式在 MQ (Message Queue)架构中代入观察者模式中的每一类角色:

  • • EventBus:对应的是消息队列组件,为整个通信架构提供了分布式解耦、流量削峰等能力

  • • Event:对应的是消息队列中的一条消息,有明确的主题 topic,由生产者 producer 提供

  • • Observer:对应的是消费者 consumer,对指定事物的动态(topic)进行订阅,并在消费到对应的变更事件后执行对应的处理逻辑

 

3.2 ETCD 监听回调

另一个践行了观察者模式的工程案例是基于 golang 编写的分布式 kv 存储组件 etcd.

etcd 提供了作用于指定数据范围的监听回调功能,能在对应数据状态发生变更时,将变更通知传达到每个订阅者的手中,在这个过程中:

  • • EventBus:对应的是 etcd 服务端的 watchableStore 监听器存储模块,该模块会负责存储用户创建的一系列监听器 watcher,并建立由监听数据 key 到监听器集合 watcherGroup 之间的映射关系. 当任意存储数据发生变化时,etcd 的数据存储模块会在一个统一的切面中调用通知方法,将这一信息传达到 watchableStore 模块,watchableStore 则会将变更数据与监听数据 key 之间进行 join,最终得到一个需要执行回调操作的 watchers 组合,顺沿 watcher 中的路径,向订阅者发送通知消息

  • • Event:对应的是一条 etcd 状态机的数据变更事件,由 etcd 使用方在执行一条写数据操作时触发,在写操作真正生效后,变更事件会被传送到 watchableStore 模块执行回调处理

  • • Observer:使用 etcd watch 功能对指定范围数据建立监听回调机制的使用方,在 etcd 服务端 watchableStore 模块会建立监听器实体 watcher 作为自身的代理,当变更事件真的发生后,watchableStore 会以 watcher 作为起点,沿着返回路径一路将变更事件发送到使用方手中.

想了解有关 etcd watch 机制的更多内容,可以阅读我之前发表的文章:

etcd watch 机制源码解析——服务端篇 以及 etcd watch 机制源码解析——客户端篇.

 

4 总结

本文和大家一起探讨了设计模式中的观察者模式:

  • • 观察者模式适用于多对一的订阅/发布场景,其实现思路是在观察者与被观察对象之间添加收口了发布订阅功能的中间层,核心宗旨是实现“观察者”与“被观察对象”之间的解耦

  • • 通过 UML 类图结合具体代码示例,对观察者模式进行实践. 根据变更事件的通知模式,观察者模式可以分为同步和异步两种模型

  • • 本文给出两个践行了观察者模式的工程案例,一个是 Message Queue 的发布订阅模式,一个是 ETCD 服务端对 watch 功能的实现思路


小徐先生的编程世界
在学钢琴,主业码农
 最新文章