从零到一搭建 TCC 分布式事务框架

文摘   科技   2023-08-18 20:21   北京  

0 前言

上周和分享了一篇文章——万字长文漫谈分布式事务实现原理. 本周延续分布式事务的话题,为大家带来一期分布式事务实战篇的技术分享.

本着“理论先行,实践紧随”的理念,这里强烈建议大家先完成上期理论篇内容的学习,再跟随我的思路一起进入本期实战篇的学习.

在写这篇文章的过程中,我在另一边并行完成了一个开源项目的搭建. 这个项目是基于 golang 从零到一实现的 TCC 分布式事务框架,当前于 github 的开源地址为:https://github.com/xiaoxuxiansheng/gotcc

本期分享内容将会紧密围绕着这个开源项目展开. 受限于个人水平,在项目实现以及文章讲解中有着诸多不当之处,权当在此抛砖引玉,欢迎大家多多批评指正.

 

1 架构设计

1.1 整体架构

首先我们简单回顾一下有关于分布式事务以及 TCC 的概念.

所谓事务,对应的语义是“要么什么都不做,要么全都做到位”,需要针对多个执行动作,建立一套一气呵成、不可拆解的运行机制.

在事务中包括的一些执行动作,倘若涉及到跨数据库、跨组件、跨服务等分布式操作,那我们就称这样的事务是分布式事务.

分布式事务在实现上存在很多技术难点,是一个颇具挑战的有趣话题. 目前业界也形成了一套相对成熟且普遍认同的解决方案,就是——TCC:Try-Confirm/Cancel.

TCC 本质上是一种 2PC(two phase commitment protocal 两阶段提交)的实现:

  • • 把分布式事务中,负责维护状态数据变更的模块,封装成一个 TCC 组件

  • • 把数据的变更状态拆分为对应 Try 操作的【冻结】、对应 Confirm 操作的【成功】以及对应 Cancel 操作的【失败回滚】

  • • 抽出一个统筹全局的事务协调者角色 TXManager. 在执行分布式事务时,分为两个阶段:

  • • 阶段 I:先对所有组件执行 Try 操作

  • • 阶段 II:根据上阶段 Try 操作的执行结果,决定本轮执行 Confirm 还是 Cancel 操作

 

在我们实现 TCC 框架的实战环节中,首先需要明确的事情是:

  • • 哪部分内容在 TCC 架构中属于通用的流程,这部分内容可以抽取出来放在 sdk 中,以供后续复用

  • • 哪部分内容需要给使用方预留出足够的自由度,由使用方自行实现,然后和通用 sdk 进行接轨.

最终,这两部分内容明确如下:

  • • 在 TCC sdk 中实现的通用逻辑包含了和事务协调器 txManager 有关的核心流程

  • • 事务协调器 TXManager 开启事务以及 try-confirm/cancel 的 2PC 流程串联

  • • 事务协调器 TXManager 异步轮询任务,用于推进事务从中间态走向终态

  • • TCC 组件的注册流程

  • • 需要预定义事务日志存储模块 TXStore 的实现规范(声明 interface)

  • • 需要预定义 TCC 组件 TCCComponent 的实现规范(声明 interface)

  • • TCC 组件和 TXStore 两部分内容需要由使用方自行实现:

  • • 使用方自行实现 TCCComponent 类,包括其 Try、Confirm、Cancel 方法的执行逻辑

  • • 使用方自行实现具体的 TXStore 日志存储模块. 可以根据实际需要,选型合适的存储组件和存储方式

 

 

1.2 TCC Component

下面是关于 TCC 组件的定位:

  • • 这部分内容需要由用户自行实现,并在 TXManager 启动时将其注册到注册中心 RegistryCenter 当中.

  • • 当使用方调用 TXManager 开启事务时,会通过 RegistryCenter 获取这些组件,并对其进行使用

  • • TCC 组件需要具备的能力包括如下几项:

 

1.3 TX Manager

下面是关于事务协调器 TXManager 的定位.

  • • TXManager 是整个 TCC 架构中最核心的角色

  • • TXManager 作为 gotcc 的统一入口,供使用方执行启动事务和注册组件的操作

  • • TXManager 作为中枢系统分别和 RegisterCenter、TXStore 交互

  • • TXManager 需要串联起整个 Try-Confirm/Canel 的 2PC 调用流程

  • • TXManager 需要运行异步轮询任务,推进未完成的事务走向终态

 

1.4 TX Store

TXStore 是用于存储和管理事务日志明细记录的模块:

  • • 需要支持事务明细数据的 CRUD 能力

  • • 通常情况下,底层需要应用到实际的存储组件作为支持

  • • TXStore 在 gotcc 的 sdk 中体现为一个抽象的 interface. 需要由用户完成具体类的实现,并将其注入到 TXManager 当中.

 

1.5 RegistryCenter

最后是 TCC 组件的注册管理中心 RegistryCenter,负责给 txManager 提供出注册和查询 TCC 组件的能力.

 

2 TXManager 核心源码讲解

理完了基本的流程和概念,下面我们一起开启一线实战环节.

2.1 类图

首先捋一下,在 gotcc 核心 sdk 中,涉及到的几个核心类:

  • • TXManager:事务协调器,class

  • • TXStore:事务日志存储模块,interface

  • • registryCenter:TCC 组件注册管理中心,class

  • • TCCComponent:TCC 组件,interface

通过下面的 UML 类图,展示一下几个核心类之间的关联性:

 

2.2 核心类定义

 

2.2.1 TXManager

下面是关于事务协调器 TXManager 的几个核心字段:

  • • txStore:内置的事务日志存储模块,需要由使用方实现并完成注入

  • • registryCenter:TCC 组件的注册管理中心

  • • opts:内聚了一些 TXManager 的配置项,可以由使用方自定义,并通过 option 注入

  • • ctx:用于反映 TXManager 运行生命周期的的 context,当 ctx 终止时,异步轮询任务也会随之退出

  • • stop:用于停止 txManager 的控制器. 当 stop 被调用后,异步轮询任务会被终止

type TXManager struct {
    ctx            context.Context
    stop           context.CancelFunc
    opts           *Options
    txStore        TXStore
    registryCenter *registryCenter
}


func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
    ctx, cancel := context.WithCancel(context.Background())
    txManager := TXManager{
        opts:           &Options{},
        txStore:        txStore,
        registryCenter: newRegistryCenter(),
        ctx:            ctx,
        stop:           cancel,
    }


    for _, opt := range opts {
        opt(txManager.opts)
    }


    repair(txManager.opts)


    go txManager.run()
    return &txManager
}

 

2.2.2 RegistryCenter

注册中心 registryCenter 中的内容很简单,通过 map 存储所有注册进来的 TCC 组件,要求各组件都有独立的组件 ID;通过一把读写锁 rwMutex 保护 map 的并发安全性

type registryCenter struct {
    mux        sync.RWMutex
    components map[string]component.TCCComponent
}


func newRegistryCenter() *registryCenter {
    return &registryCenter{
        components: make(map[string]component.TCCComponent),
    }
}

 

2.2.3 TXStore

下面 gotcc sdk 中,对事务日志存储模块 TXStore interface 的定义,这个点很重要,要求后续使用方在实现具体的 TXStore 模块时,需要实现这里所罗列出来的所有方法,并且要保证实现方法满足预期的功能:

  • • CreateTX:创建一条事务明细记录,会在入参中传入本事务涉及的 TCC 组件列表,同时需要在出参中返回全局唯一的事务 id

  • • TXUpdate:更新一条事务明细记录. 这里指的更新,针对于,事务中某个 TCC 组件 Try 响应状态的更新

  • • TXSubmit:提交一条事务的执行结果. 要么置为成功,要么置为失败

  • • GetHangingTXs:获取所有未完成的事务明细记录

  • • GetTX:根据事务 id,获取指定的一条事务明细记录

  • • Lock:锁住整个事务日志存储模块(要求为分布式锁)

  • • Unlock:解锁整个事务日志存储模块

type TXStore interface {
    // 创建一条事务明细记录
    CreateTX(ctx context.Context, components ...component.TCCComponent) (txID string, err error)
    // 更新事务进度:
    // 规则为:倘若有一个 component try 操作执行失败,则整个事务失败;倘若所有 component try 操作执行成功,则事务成功
    TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error
    // 提交事务的最终状态
    TXSubmit(ctx context.Context, txID string, success bool) error
    // 获取到所有处于中间态的事务
    GetHangingTXs(ctx context.Context) ([]*Transaction, error)
    // 获取指定的一笔事务
    GetTX(ctx context.Context, txID string) (*Transaction, error)
    // 锁住事务日志表
    Lock(ctx context.Context, expireDuration time.Duration) error
    // 解锁事务日志表
    Unlock(ctx context.Context) error
}

 

2.3 注册组件

下面是注册 TCC 组件的处理流程:

首先,使用方通过 TXManager 对外暴露的公开方法 Register,开启注册流程,传入对应的 TCCComponent:

func (t *TXManager) Register(component component.TCCComponent) error {
    return t.registryCenter.register(component)
}

 

  • • TXManager 会调用注册中心 registeryCenter 的 register 方法,将对应 component 注入到 map 中. 这里有两个点值得一提:

  • • Register 方法可以并发使用,其内部会通过 rwMutex 维护 map 的并发安全性

  • • TCC 组件不能重复注册,即不能存在重复的 component id

func (r *registryCenter) register(component component.TCCComponent) error {
    r.mux.Lock()
    defer r.mux.Unlock()
    if _, ok := r.components[component.ID()]; ok {
        return errors.New("repeat component id")
    }
    r.components[component.ID()] = component
    return nil
}

 

上游 TXManager 可以通过 component id,进行 TCC 组件的查询. 倘若某个 component id 不存在,则会抛出错误:

func (r *registryCenter) getComponents(componentIDs ...string) ([]component.TCCComponent, error) {
    components := make([]component.TCCComponent, 0, len(componentIDs))


    r.mux.RLock()
    defer r.mux.RUnlock()


    for _, componentID := range componentIDs {
        component, ok := r.components[componentID]
        if !ok {
            return nil, fmt.Errorf("component id: %s not existed", componentID)
        }
        components = append(components, component)
    }


    return components, nil
}

 

2.4 事务主流程

下面进入最核心的部分,介绍一下整个分布式事务的运行流程.

2.4.1 主流程

用户可以通过 txManager.Transaction 方法,一键启动动一个分布式事务流程,其中包含的几个核心步骤展示如下图:

txManager.Transaction 方法是用户启动分布式事务的入口,需要在入参中声明本次事务涉及到的组件以及需要在 Try 流程中传递给对应组件的请求参数:

type RequestEntity struct {
    // 组件名称
    ComponentID string `json:"componentName"`
    // Try 请求时传递的参数
    Request map[string]interface{} `json:"request"`
}

 

txManager.Transaction 对应源码如下,核心步骤均给出了注释. 核心的 try-confirm/cancel 流程,会在后续的 txManager.twoPhaseCommit 方法中展开.

// 启动事务
func (t *TXManager) Transaction(ctx context.Context, reqs ...*RequestEntity) (bool, error) {
    // 1 限制分布式事务执行时长
    tctx, cancel := context.WithTimeout(ctx, t.opts.Timeout)
    defer cancel()


    // 2 获得所有的涉及使用的 tcc 组件
    componentEntities, err := t.getComponents(tctx, reqs...)
    if err != nil {
        return false, err
    }


    // 3 调用 txStore 模块,创建新的事务明细记录,并取得全局唯一的事务 id
    txID, err := t.txStore.CreateTX(tctx, componentEntities.ToComponents()...)
    if err != nil {
        return false, err
    }


    // 4. 开启两阶段提交流程:try-confirm/cancel
    return t.twoPhaseCommit(ctx, txID, componentEntities)
}

 

2.4.2 2PC 串联

此处涉及 try-confirm/cancel 流程的串联,可以说是整个 gotcc 框架的精髓所在,请大家细品斟酌.

对应流程图展示如下,方法源码中也给出了相对详细的注释:

func (t *TXManager) twoPhaseCommit(ctx context.Context, txID string, componentEntities ComponentEntities) (bool, error) {
    // 1 创建子 context 用于管理子 goroutine 生命周期
    // 手握 cancel 终止器,能保证在需要的时候终止所有子 goroutine 生命周期
    cctx, cancel := context.WithCancel(ctx)
    defer cancel()


    // 2 创建一个 chan,用于接收子 goroutine 传递的错误
    errCh := make(chan error)
    // 3 并发启动,批量执行各 tcc 组件的 try 流程
    go func() {
        // 通过 waitGroup 进行多个子 goroutine 的汇总
        var wg sync.WaitGroup
        for _, componentEntity := range componentEntities {
            // shadow
            componentEntity := componentEntity
            wg.Add(1)
            // 并发执行各组件的 try 流程
            go func() {
                defer wg.Done()
                resp, err := componentEntity.Component.Try(cctx, &component.TCCReq{
                    ComponentID: componentEntity.Component.ID(),
                    TXID:        txID,
                    Data:        componentEntity.Request,
                })
                // 出现 tcc 组件执行 try 操作失败,则需要对事务明细记录进行更新,同时把错误通过 chan 抛给父 goroutine
                if err != nil || !resp.ACK {
                    // 对对应的事务进行更新
                    _ = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), false)
                    errCh <- fmt.Errorf("component: %s try failed", componentEntity.Component.ID())
                    return
                }
                // try 请求成功,则对事务明细记录进行更新. 倘若更新失败,也要视为错误,抛给父 goroutine
                if err = t.txStore.TXUpdate(cctx, txID, componentEntity.Component.ID(), true); err != nil {
                    errCh <- err
                }
            }()
        }


        // 等待所有子 goroutine 运行完成
        wg.Wait()
        // 关闭 errCh,告知父 goroutine 所有任务已运行完成的信息
        close(errCh)
    }()




    successful := true
    // 4 通过 chan,阻塞子 goroutine 执行完成
    // 4.1 但凡出现一个子 goroutine 遇到了错误,则会提前接收到错误,并调用 cancel 方法熔断其他所有子 goroutine 流程
    // 4.2 倘若所有子 goroutine 都执行成功,则会通过 chan 的关闭事件推进流程,对应 err 为 nil
    if err := <-errCh; err != nil {
        // 只要有一笔 try 请求出现问题,其他的都进行终止
        cancel()
        successful = false
    }


    // 5 异步执行第二阶段的 confirm/cancel 流程
    // 之所以是异步,是因为实际上在第一阶段 try 的响应结果尘埃落定时,对应事务的成败已经有了定论
    // 第二阶段能够容忍异步执行的原因在于,执行失败时,还有轮询任务进行兜底
    go t.advanceProgressByTXID(txID)
    
    // 6 响应结果
    // 6.1 倘若所有 try 请求都成功,则 successful 为 try,事务成功
    // 6.2 但凡有一个 try 请求处理出现问题,successful 为 false,事务失败
    return successful, nil
}

 

2.4.3 事务进度推进

当一笔事务在第一阶段中所有的 Try 请求都有了响应后,就需要根据第一阶段的结果,执行第二阶段的 Confirm 或者 Cancel 操作,并且将事务状态推进为成功或失败的终态:

  • • 倘若所有组件的 Try 响应都是成功,则需要批量调用组件的 Confirm 接口,并在这之后将事务状态更新为成功

  • • 倘若存在某个组件 Try 响应为失败,则需要批量调用组件的 Cancel 接口,并在这之后将事务状态更新为失败

  • • 倘若当前事务已执行超时,同样需要批量调用组件的 Cancel 接口,并在这之后将事务状态更新为失败

// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgressByTXID(txID string) error {
    // 获取事务日志明细
    tx, err := t.txStore.GetTX(t.ctx, txID)
    if err != nil {
        return err
    }
    // 推进进度
    return t.advanceProgress(tx)
}

 

// 传入一个事务 id 推进其进度
func (t *TXManager) advanceProgress(tx *Transaction) error {
    // 1 推断出事务当前的状态
    // 1.1 倘若所有组件 try 都成功,则为 successful
    // 1.2 倘若存在组件 try 失败,则为 failure
    // 1.3 倘若事务超时了,则为 failure
    // 1.4 否则事务状态为 hanging
    txStatus := tx.getStatus(time.Now().Add(-t.opts.Timeout))
    // hanging 状态的事务暂时不处理
    if txStatus == TXHanging {
        return nil
    }


    // 2 根据事务是否成功,定制不同的处理函数
    success := txStatus == TXSuccessful
    var confirmOrCancel func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error)
    var txAdvanceProgress func(ctx context.Context) error
    if success {
        // 如果事务成功,则需要对组件进行 confirm
        confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {
            return component.Confirm(ctx, tx.TXID)
        }
        // 如果事务成功,则需要在最后更新事务日志记录的状态为成功
        txAdvanceProgress = func(ctx context.Context) error {
            return t.txStore.TXSubmit(ctx, tx.TXID, true)
        }




    } else {
        // 如果事务失败,则需要对组件进行 cancel
        confirmOrCancel = func(ctx context.Context, component component.TCCComponent) (*component.TCCResp, error) {           
            return component.Cancel(ctx, tx.TXID)
        }


        // 如果事务失败,则需要在最后更新事务日志记录的状态为失败
        txAdvanceProgress = func(ctx context.Context) error {           
            return t.txStore.TXSubmit(ctx, tx.TXID, false)
        }
    }


    // 3 批量调用组件,执行第二阶段的 confirm/cancel 操作
    for _, component := range tx.Components {
        // 获取对应的 tcc component
        components, err := t.registryCenter.getComponents(component.ComponentID)
        if err != nil || len(components) == 0 {
            return errors.New("get tcc component failed")
        }      
        resp, err := confirmOrCancel(t.ctx, components[0])
        if err != nil {
            return err
        }
        if !resp.ACK {
            return fmt.Errorf("component: %s ack failed", component.ComponentID)
        }
    }


    // 4 二阶段 confirm/cancel 操作都执行完成后,对事务状态进行提交
    return txAdvanceProgress(t.ctx)
}

 

2.5 异步轮询流程

接下来聊聊 txManager 的异步轮询流程. 这个流程同样非常重要,是支撑 txManager 鲁棒性的重要机制.

倘若存在事务已经完成第一阶段 Try 操作的执行,但是第二阶段没执行成功,则需要由异步轮询流程进行兜底处理,为事务补齐第二阶段的操作,并将事务状态更新为终态

2.5.1 启动时机

异步轮询任务是在 txManager 的初始化流程中启动的,通过异步 goroutine 持久运行:

func NewTXManager(txStore TXStore, opts ...Option) *TXManager {
    ctx, cancel := context.WithCancel(context.Background())
    txManager := TXManager{
        opts:           &Options{},
        txStore:        txStore,
        registryCenter: NewRegistryCenter(),
        ctx:            ctx,
        stop:           cancel,
    }


    for _, opt := range opts {
        opt(txManager.opts)
    }


    repair(txManager.opts)


    go txManager.run()
    return &txManager
}

 

2.5.2 轮询流程

异步轮询任务运行时,基于 for 循环 + select 多路复用的方式,实现定时任务的执行.

轮询的时间间隔会根据一轮任务处理过程中是否出现错误,而进行动态调整. 这里调整规则指的是:当一次处理流程中发生了错误,就需要调大当前节点轮询的时间间隔,让其他节点的异步轮询任务得到更大的执行机会.

func (t *TXManager) run() {
    var tick time.Duration
    var err error
    // 1 for 循环自旋式运行任务
    for {
        // 如果处理过程中出现了错误,需要增长轮询时间间隔
        if err == nil {
            tick = t.opts.MonitorTick
        } else {
            tick = t.backOffTick(tick)
        }
        
        // select 多路复用
        select {
        // 倘若 txManager.ctx 被终止,则异步轮询任务退出
        case <-t.ctx.Done():
            return


        // 2 等待 tick 对应时长后,开始执行任务
        case <-time.After(tick):
            // 对 txStore 加分布式锁,避免分布式服务下多个服务节点的轮询任务重复执行
            if err = t.txStore.Lock(t.ctx, t.opts.MonitorTick); err != nil {
                // 取锁失败时(大概率被其他节点占有),不需要增加 tick 时长
                err = nil
                continue
            }


            // 3 获取处于 hanging 状态的事务
            var txs []*Transaction
            if txs, err = t.txStore.GetHangingTXs(t.ctx); err != nil {
                _ = t.txStore.Unlock(t.ctx)
                continue
            }
   
            // 4 批量推进事务进度
            err = t.batchAdvanceProgress(txs)
            _ = t.txStore.Unlock(t.ctx)
        }
    }
}

 

有关于轮询时间间隔的退避谦让策略为:每次对时间间隔进行翻倍,封顶为初始时长的 8 倍:

func (t *TXManager) backOffTick(tick time.Duration) time.Duration {
    tick <<= 1
    if threshold := t.opts.MonitorTick << 3; tick > threshold {
        return threshold
    }
    return tick
}

 

2.5.3 批量推进事务进度

下面是异步轮询任务批量推进事务第二阶段执行的流程,核心是开启多个 goroutine 并发对多项事务进行处理:

func (t *TXManager) batchAdvanceProgress(txs []*Transaction) error {
    // 1 创建一个 chan,用于接收子 goroutine 传输的 err
    errCh := make(chan error)
    go func() {
        // 2 通过 waitGroup 聚合多个子 groutine
        var wg sync.WaitGroup
        for _, tx := range txs {
            // shadow
            tx := tx
            wg.Add(1)
            go func() {
                defer wg.Done()
                // 3 推进每笔事务的进度
                if err := t.advanceProgress(tx); err != nil {
                    // 遇到错误则投递到 errCh
                    errCh <- err
                }
            }()
        }
        
        // 4 收口等待所有子 goroutine 执行完成
        wg.Wait()
        // 5 所有子 goroutine 执行完成后关闭 chan,唤醒阻塞等待的父 goroutine
        close(errCh)
    }()


    // 记录遇到的第一个错误
    var firstErr error
    // 6 父 goroutine 通过 chan 阻塞在这里,直到所有 goroutine 执行完成,chan 被 close 才能往下
    for err := range errCh {
        // 记录遇到的第一个错误
        if firstErr != nil {
            continue
        }
        firstErr = err
    }


    // 7 返回错误,核心是标识执行过程中,是否发生过错误
    return firstErr
}

 

3 GOTCC 使用案例讲解

从第 3 章开始,我们从实际应用 gotcc 框架的使用方视角出发,对所需要实现的模块进行定义,然后给出应用 gotcc 框架的代码示例.

3.1 TCC 组件实现

首先,我们对 TCC 组件的具体实现类进行定义:

3.1.1 类定义

定义一个 MockComponent 类,其中内置了 redis 客户端,用于完成一些状态数据的存取.

// 实现的 tcc 组件
type MockComponent struct {
    // tcc 组件唯一标识 id,构造时由使用方传入
    id     string
    // redis 客户端
    client *redis_lock.Client
}


func NewMockComponent(id string, client *redis_lock.Client) *MockComponent {
    return &MockComponent{
        id:     id,
        client: client,
    }
}


// 返回 tcc 组件的唯一标识 id
func (m *MockComponent) ID() string {
    return m.id
}

 

3.1.2 Try 流程

下面实现一下 TCC 组件的 Try 方法,关键要点已于代码中通过注释的形式给出:

func (m *MockComponent) Try(ctx context.Context, req *component.TCCReq) (*component.TCCResp, error) {
    // 1 基于 txID 维度加 redis 分布式锁
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, req.TXID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()


    // 2 基于 txID 幂等性去, 需要对事务的状态进行检查
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, req.TXID))
    if err != nil && !errors.Is(err, redis_lock.ErrNil) {
        return nil, err
    }


    res := component.TCCResp{
        ComponentID: m.id,
        TXID:        req.TXID,
    }
    switch txStatus {
    case TXTried.String(), TXConfirmed.String(): // 重复的 try 请求,给予成功的响应
        res.ACK = true
        return &res, nil
    case TXCanceled.String(): // 此前该事务已 cancel,则拒绝本次 try 请求
        return &res, nil
    default:
    }


    // 3 建立 txID 与 bizID 的关联
    bizID := gocast.ToString(req.Data["biz_id"])
    if _, err = m.client.Set(ctx, pkg.BuildTXDetailKey(m.id, req.TXID), bizID); err != nil {
        return nil, err
    }


    // 4 把 bizID 对应的业务数据置为冻结态
    reply, err := m.client.SetNX(ctx, pkg.BuildDataKey(m.id, req.TXID, bizID), DataFrozen.String())
    if err != nil {
        return nil, err
    }
    // 倘若数据此前已冻结或已使用,则拒绝本次 try 请求
    if reply != 1 {
        return &res, nil
    }


    // 5 更新当前组件下的事务状态为 tried
    _, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, req.TXID), TXTried.String())
    if err != nil {
        return nil, err
    }


    // 6 给予接收 try 请求的响应
    res.ACK = true
    return &res, nil
}

 

3.1.3 Confirm 流程

下面实现一下 TCC 组件的 Confirm 方法,关键要点已于代码中通过注释的形式给出:

func (m *MockComponent) Confirm(ctx context.Context, txID string) (*component.TCCResp, error) {
    // 1 基于 txID 维度加锁
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()




    // 2. 校验事务状态,要求对应组件下,事务此前的状态为 tried
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    res := component.TCCResp{
        ComponentID: m.id,
        TXID:        txID,
    }
    switch txStatus {
    case TXConfirmed.String(): // 事务状态已 confirm,直接幂等响应为成功
        res.ACK = true
        return &res, nil
    case TXTried.String(): // 只有事务状态为 try 才是合法的,会对程序放行
    default: // 其他情况直接拒绝,ack 为 false
        return &res, nil
    }


    // 3 获取事务对应的 bizID
    bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    // 4. 校验业务数据此前状态是否为冻结
    dataStatus, err := m.client.Get(ctx, pkg.BuildDataKey(m.id, txID, bizID))
    if err != nil {
        return nil, err
    }
    
    // 如果此前非冻结态,则拒绝本次请求
    if dataStatus != DataFrozen.String() {
        return &res, nil
    }




    // 5 把业务数据的更新操作置为 successful
    if _, err = m.client.Set(ctx, pkg.BuildDataKey(m.id, txID, bizID), DataSuccessful.String()); err != nil {
        return nil, err
    }


    // 6 把对应组件下的事务状态更新为成功,这一步哪怕失败了也不阻塞主流程
    _, _ = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXConfirmed.String())


    // 7 处理成功,给予成功的响应
    res.ACK = true
    return &res, nil
}

 

3.1.4 Cancel 流程

下面实现一下 TCC 组件的 Cancel 方法,关键要点已于代码中通过注释的形式给出:

func (m *MockComponent) Cancel(ctx context.Context, txID string) (*component.TCCResp, error) {
    // 1 基于 txID 维度加锁
    lock := redis_lock.NewRedisLock(pkg.BuildTXLockKey(m.id, txID), m.client)
    if err := lock.Lock(ctx); err != nil {
        return nil, err
    }
    defer func() {
        _ = lock.Unlock(ctx)
    }()


    // 2 校验事务状态,只要不是 confirmed,都允许被置为 canceld
    txStatus, err := m.client.Get(ctx, pkg.BuildTXKey(m.id, txID))
    if err != nil && !errors.Is(err, redis_lock.ErrNil) {
        return nil, err
    }
    // 倘若组件内事务此前的状态为 confirmed,则说明流程有异常.
    if txStatus == TXConfirmed.String() {
        return nil, fmt.Errorf("invalid tx status: %s, txid: %s", txStatus, txID)
    }


    // 3 根据事务获取对应的 bizID
    bizID, err := m.client.Get(ctx, pkg.BuildTXDetailKey(m.id, txID))
    if err != nil {
        return nil, err
    }


    // 4 删除对应的 frozen 冻结记录,代表对数据执行了回滚操作
    if err = m.client.Del(ctx, pkg.BuildDataKey(m.id, txID, bizID)); err != nil {
        return nil, err
    }


    // 5 把事务状态更新为 canceled
    _, err = m.client.Set(ctx, pkg.BuildTXKey(m.id, txID), TXCanceled.String())
    if err != nil {
        return nil, err
    }


    // 6 给予处理成功的 ack 
    return &component.TCCResp{
        ACK:         true,
        ComponentID: m.id,
        TXID:        txID,
    }, nil
}

 

3.2 TX Store 实现

接下来是关于事务日志存储模块 TXStore 的具体实现:

3.2.1 类定义

声明了一个 MockTXStore 类,里面通过 mysql 存储事务日志明细数据,通过 redis 实现 TXStore 模块的分布式锁.

其中和事务日志明细数据库直接交互的操作被封装在 TXRecordDAO 当中.

// TXStore 模块具体实现
type MockTXStore struct {
    // redis 客户端,用于实现分布式锁
    client *redis_lock.Client
    // 事务日志存储 DAO 层
    dao    *expdao.TXRecordDAO
}


func NewMockTXStore(dao *expdao.TXRecordDAO, client *redis_lock.Client) *MockTXStore {
    return &MockTXStore{
        dao:    dao,
        client: client,
    }
}

 

事务日志存储 DAO 层:

type TXRecordDAO struct {
    db *gorm.DB
}


func NewTXRecordDAO(db *gorm.DB) *TXRecordDAO {
    return &TXRecordDAO{
        db: db,
    }
}

 

接下来是关于事务日志明细记录的持久化对象(PO,Persistent Object)模型定义:

  • • 内置了 gorm.Model,包含了主键 ID、创建时间 CreatedAt、更新时间 UpdatedAt、删除时间 DeletedAt 几个字段

  • • 事务状态 Status,标识事务所处的状态,分为进行中 hanging、成功 successful、失败 failure

  • • 组件 Try 响应明细记录 ComponentTryStatuses: 记录了事务下各组件 Try 请求响应结果,会以一个 json 字符串的格式存储,其真实的类型为 map[string]*ComponentTryStatus

type TXRecordPO struct {
    gorm.Model
    Status               string `gorm:"status"`
    ComponentTryStatuses string `gorm:"component_try_statuses"`
}


func (t TXRecordPO) TableName() string {
    return "tx_record"
}


type ComponentTryStatus struct {
    ComponentID string `json:"componentID"`
    TryStatus   string `json:"tryStatus"`
}

 

下面是事务日志明细表的建表语句:

CREATE TABLE IF NOT EXISTS `tx_record`
(
    `id`                       bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID',
    `status`                   varchar(16) NOT NULL COMMENT '事务状态 hanging/successful/failure',
    `component_try_statuses`   json DEFAULT NULL COMMENT '各组件 try 接口请求状态 hanging/successful/failure',
    `deleted_at`        datetime     DEFAULT NULL COMMENT '删除时间',
    `created_at`        datetime     NOT NULL COMMENT '创建时间',
    `updated_at`        datetime     DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    PRIMARY KEY (`id`) USING BTREE COMMENT '主键索引',
    KEY `idx_status` (`status`) COMMENT '事务状态索引'
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COMMENT '事务日志记录';

 

3.2.2 创建事务记录

接下来是通过过 TXStore 模块创建一条事务明细记录的实现代码:

func (m *MockTXStore) CreateTX(ctx context.Context, components ...component.TCCComponent) (string, error) {
    // 创建一个记录组件 try 响应结果的 map,其中以组件 id 为 key
    componentTryStatuses := make(map[string]*expdao.ComponentTryStatus, len(components))
    for _, component := range components {
        componentTryStatuses[component.ID()] = &expdao.ComponentTryStatus{
            ComponentID: component.ID(),
            TryStatus:   txmanager.TryHanging.String(),
        }
    }


    statusesBody, _ := json.Marshal(componentTryStatuses)
    // 创建事务明细记录 po 示例,调用 dao 模块将记录落库
    txID, err := m.dao.CreateTXRecord(ctx, &expdao.TXRecordPO{
        Status:               txmanager.TXHanging.String(),
        ComponentTryStatuses: string(statusesBody),
    })
    if err != nil {
        return "", err
    }


    return gocast.ToString(txID), nil
}

 

dao 层创建事务明细记录的实现代码:

func (t *TXRecordDAO) CreateTXRecord(ctx context.Context, record *TXRecordPO) (uint, error) {
    return record.ID, t.db.WithContext(ctx).Model(&TXRecordPO{}).Create(record).Error
}

 

3.2.3 事务明细更新

下面是更新一笔事务明细的方法,其处理流程是:

  • • 针对这笔事务记录加写锁

  • • 根据组件的 try 响应结果,对 json字符串进行更新

  • • 将事务记录写回表中.

func (m *MockTXStore) TXUpdate(ctx context.Context, txID string, componentID string, accept bool) error {
    // 后续需要闭包传入执行函数
    do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
        componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
        _ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
        if accept {
            componentTryStatuses[componentID].TryStatus = txmanager.TrySucceesful.String()
        } else {
            componentTryStatuses[componentID].TryStatus = txmanager.TryFailure.String()
        }
        newBody, _ := json.Marshal(componentTryStatuses)
        record.ComponentTryStatuses = string(newBody)
        return dao.UpdateTXRecord(ctx, record)
    }


    _txID := gocast.ToUint(txID)
    return m.dao.LockAndDo(ctx, _txID, do)
}

 

// 通过 gorm 实现数据记录加写锁,并执行闭包函数的操作:
func (t *TXRecordDAO) LockAndDo(ctx context.Context, id uint, do func(ctx context.Context, dao *TXRecordDAO, record *TXRecordPO) error) error {
    // 开启事务
    return t.db.Transaction(func(tx *gorm.DB) error {
        defer func() {
            if err := recover(); err != nil {
                tx.Rollback()
            }
        }()


        // 加写锁
        var record TXRecordPO
        if err := tx.Set("gorm:query_option", "FOR UPDATE").WithContext(ctx).First(&record, id).Error; err != nil {
            return err
        }


        txDAO := NewTXRecordDAO(tx)
        // 执行闭包函数
        return do(ctx, txDAO, &record)
    })
}

 

// 更新一条事务日志数据记录
func (t *TXRecordDAO) UpdateTXRecord(ctx context.Context, record *TXRecordPO) error {
    return t.db.WithContext(ctx).Updates(record).Error
}

 

3.2.4 查询事务

接下来是查询事务的两个方法:

// 根据事务 id 查询指定的一笔事务明细记录:
func (m *MockTXStore) GetTX(ctx context.Context, txID string) (*txmanager.Transaction, error) {
    // 通过 option 在查询条件中注入事务 id
    records, err := m.dao.GetTXRecords(ctx, expdao.WithID(gocast.ToUint(txID)))
    if err != nil {
        return nil, err
    }
    if len(records) != 1 {
        return nil, errors.New("get tx failed")
    }
 
    // 对各组件 try 明细内容进行反序列化
    componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
    _ = json.Unmarshal([]byte(records[0].ComponentTryStatuses), &componentTryStatuses)


    components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
    for _, tryItem := range componentTryStatuses {
        components = append(components, &txmanager.ComponentTryEntity{
            ComponentID: tryItem.ComponentID,
            TryStatus:   txmanager.ComponentTryStatus(tryItem.TryStatus),
        })
    }
    return &txmanager.Transaction{
        TXID:       txID,
        Status:     txmanager.TXStatus(records[0].Status),
        Components: components,
        CreatedAt:  records[0].CreatedAt,
    }, nil
}

 

// 获取全量处于中间态的事务明细记录
func (m *MockTXStore) GetHangingTXs(ctx context.Context) ([]*txmanager.Transaction, error) {
    // 通过 option 在查询条件中指定事务状态为 hanging
    records, err := m.dao.GetTXRecords(ctx, expdao.WithStatus(txmanager.TryHanging))
    if err != nil {
        return nil, err
    }


    txs := make([]*txmanager.Transaction, 0, len(records))
    for _, record := range records {
        // 对各组件 try 响应结果进行反序列化
        componentTryStatuses := make(map[string]*expdao.ComponentTryStatus)
        _ = json.Unmarshal([]byte(record.ComponentTryStatuses), &componentTryStatuses)
        components := make([]*txmanager.ComponentTryEntity, 0, len(componentTryStatuses))
        for _, component := range componentTryStatuses {
            components = append(components, &txmanager.ComponentTryEntity{
                ComponentID: component.ComponentID,
                TryStatus:   txmanager.ComponentTryStatus(component.TryStatus),
            })
        }


        txs = append(txs, &txmanager.Transaction{
            TXID:       gocast.ToString(record.ID),
            Status:     txmanager.TXHanging,
            CreatedAt:  record.CreatedAt,
            Components: components,
        })
    }


    return txs, nil
}

 

在 dao 层实现了一个通用的事务日志查询方法,通过 option 模式实现查询条件的灵活组装:

func (t *TXRecordDAO) GetTXRecords(ctx context.Context, opts ...QueryOption) ([]*TXRecordPO, error) {
    db := t.db.WithContext(ctx).Model(&TXRecordPO{})
    for _, opt := range opts {
        db = opt(db)
    }


    var records []*TXRecordPO
    return records, db.Scan(&records).Error
}

 

下面是关于 option 的具体定义,更多有关于这种模式的设计实现思路,可以参见我之前发表的文章——Golang 设计模式之建造者模式

type QueryOption func(db *gorm.DB) *gorm.DB


// 通过事务主键 id 进行查询
func WithID(id uint) QueryOption {
    return func(db *gorm.DB) *gorm.DB {
        return db.Where("id = ?", id)
    }
}


// 通过事务状态进行查询
func WithStatus(status txmanager.ComponentTryStatus) QueryOption {
    return func(db *gorm.DB) *gorm.DB {
        return db.Where("status = ?", status.String())
    }
}

 

3.2.5 提交事务结果

接下来是在事务执行完成后,将执行结果更新到事务明细记录中的处理方法:

// 提交事务的最终状态
func (m *MockTXStore) TXSubmit(ctx context.Context, txID string, success bool) error {
    do := func(ctx context.Context, dao *expdao.TXRecordDAO, record *expdao.TXRecordPO) error {
        if success {
            record.Status = txmanager.TXSuccessful.String()
        } else {
            record.Status = txmanager.TXFailure.String()
        }
        return dao.UpdateTXRecord(ctx, record)
    }
    return m.dao.LockAndDo(ctx, gocast.ToUint(txID), do)
}

 

3.2.6 加/解全局锁

最后,是实现整个 txStore 模块加/解锁的处理方法,内部是基于 redis 实现的分布式锁:

func (m *MockTXStore) Lock(ctx context.Context, expireDuration time.Duration) error {
    lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client, redis_lock.WithExpireSeconds(int64(expireDuration.Seconds())))
    return lock.Lock(ctx)
}


func (m *MockTXStore) Unlock(ctx context.Context) error {
    lock := redis_lock.NewRedisLock(pkg.BuildTXRecordLockKey(), m.client)
    return lock.Unlock(ctx)
}

到这里为止,所有前置准备工作都已经处理完成,接下来我们展示一个应用到 gotcc 框架的使用示例.

 

3.3 使用代码示例

由于我实现的 txStore 和 tccComponent 需要依赖到 mysql 和 redis 两个组件,因此在这里需要输入对应的信息.

单测代码相对比较简单,其中一些要点通过注释给出:

const (
    dsn      = "请输入你的 mysql dsn"
    network  = "tcp"
    address  = "请输入你的 redis ip"
    password = "请输入你的 redis 密码"
)


// 使用 tcc 单测代码
func Test_TCC(t *testing.T) {
    // 创建 redis 客户端
    redisClient := pkg.NewRedisClient(network, address, password)
    // 创建 mysql 客户端
    mysqlDB, err := pkg.NewDB(dsn)
    if err != nil {
        t.Error(err)
        return
    }


    // 构造三个 tcc 组件
    componentAID := "componentA"
    componentBID := "componentB"
    componentCID := "componentC"
    componentA := NewMockComponent(componentAID, redisClient)
    componentB := NewMockComponent(componentBID, redisClient)
    componentC := NewMockComponent(componentCID, redisClient)


    // 构造出事务日志存储模块
    txRecordDAO := dao.NewTXRecordDAO(mysqlDB)
    txStore := NewMockTXStore(txRecordDAO, redisClient)


    // 构造出 txManager 模块
    txManager := txmanager.NewTXManager(txStore, txmanager.WithMonitorTick(time.Second))
    defer txManager.Stop()


    // 完成三个组件的注册
    if err := txManager.Register(componentA); err != nil {
        t.Error(err)
        return
    }


    if err := txManager.Register(componentB); err != nil {
        t.Error(err)
        return
    }


    if err := txManager.Register(componentC); err != nil {
        t.Error(err)
        return
    }


    
    ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
    defer cancel()
    // 启动分布式事务
    success, err := txManager.Transaction(ctx, []*txmanager.RequestEntity{
        {ComponentID: componentAID,
            Request: map[string]interface{}{
                "biz_id": componentAID + "_biz",
            },
        },
        {ComponentID: componentBID,
            Request: map[string]interface{}{
                "biz_id": componentBID + "_biz",
            },
        },
        {ComponentID: componentCID,
            Request: map[string]interface{}{
                "biz_id": componentCID + "_biz",
            },
        },
    }...)
    if err != nil {
        t.Errorf("tx failed, err: %v", err)
        return
    }
    if !success {
        t.Error("tx failed")
        return
    }
    
    // 分布式事务处理成功
    t.Log("success")
}

 

4 总结

到这里,本文正文内容全部结束. 这里回头再对本期分享的内容做个总结:

  • • 本期我基于 golang 从零到一搭建了一个 TCC 分布式事务框架的开源项目 gotcc,并围绕着这个项目展开源码级别的讲解

  • • 在 gotcc 中,对事务协调器 TXManager 相关的核心处理逻辑,如 Try-Confirm/Cancel 两阶段流程串联、TCC 组件注册、异步轮询任务等内容进行实现,并将这部分核心内容抽出放在了 SDK 中,供应用方使用

  • • 在 gotcc 中还定义了 TCC 组件和事务日志存储模块的抽象 interface,这部分内容需要由应用方自行实现,并在使用 gotcc 时将其注入到 TXManager 当中

gotcc 的开源地址为 https://github.com/xiaoxuxiansheng/gotcc . 大家走过路过,帮忙留个 star,非常感谢.

至此,有关 TCC 分布式事务框架实战篇内容全部讲解完成. 个人水平有限,不当之处恳请大家不吝赐教.


小徐先生的编程世界
在学钢琴,主业码农
 最新文章