SpringBoot中基于XXL-JOB实现大量数据灵活控制的分片处理方案

文摘   2024-09-25 20:33   广东  

场景

一个应用需要支持大量数据的批处理任务,要求:

  1. 并行处理能力:应用需能够同时处理多个数据块,即实现并行处理。
  2. 灵活的并发控制:可以灵活调整并行处理的任务数量,以确保资源利用最大化且不过载。
  3. 均衡负载分配:应将任务均匀分配到不同的服务器节点上,以平衡各节点的负载,避免单点压力过大。

解决思路

因为需要并行处理同一张数据表里的数据,所以比较自然地想到了分片查询数据,可以利用对 id 取模的方法进行分片,避免同一条数据被重复处理。同时XXL-JOB 的路由策略「分片广播 & 动态分片」就很贴合这种场景。

实现案例

SpringBoot环境下,我们集成xxl-job来实现上述方案。SpringBoot如何集成xxl-job查看官网即可,这里不再叙述,下面看下分片调度的代码:

1.xxl-job调度管理页面配置分片调度任务

路由策略选择: 分片广播

2. 编写task代码:

要获取分片总数和当前分片序号,作为参数传给sql语句:

 @Resource
    private OrderDataMapper orderDataMapper;
 
    @XxlJob("orderDataStatusTask")
    public void orderDataStatusTask() {
         // 计时器
        Stopwatch timer = Stopwatch.createStarted();
        
        // 获取xxl-job的localThread中的总的分片数和当前分片
        OrderDataParam param = new OrderDataParam();
        param.setShardIndex(XxlJobHelper.getShardIndex());
        param.setShardTotal(XxlJobHelper.getShardTotal());
        // 其他参数设置,略了....
        
        // 根据分片数拉取当前分片的数据
        List<OrderData> orderDataList = orderDataMapper.getInitStatusOrder(this.getJobParams());
        XxlJobHelper.log("获取待处理订单数据:分片号={},数据量={},总分片数={}", XxlJobHelper.getShardIndex(), orderDataList.size(), XxlJobHelper.getShardTotal());
        if (CollUtil.isEmpty(orderDataList)) {
            return;
        }
        // 处理逻辑,略了....
        
        XxlJobHelper.log("当前分片({})处理完成,耗时={}秒", XxlJobHelper.getShardIndex(), timer.stop().elapsed(TimeUnit.SECONDS));
    }

这里服务启动了4个实例,总分片数ShardTotal就是4,每个实例的ShardIndex分别是0,1,2,3

3. mybatis中编写sql语句

根据分片总数和当前分片数据对Id哈希取模,这里做了两次hash,主要作用是用id最后一位hash方便直接看出数据被哪个分片调度了。

 // 获取未处理的订单数据
 // 根据id末位数取hash后分片拉取
 <select id="getInitStatusOrder" parameterType="com.xxx.OrderDataParam"
            resultType="com.xxx.OrderData">
   select id,order_no,customer_code,
            from tt_order_data t
            where t.status = 0
      and t.fail_count <![CDATA[ < ]]> #{retryCount}
      and t.update_time <![CDATA[ >= ]]> #{lastUpdateTime}
      and mod(mod(t.id, 10) , #{shardTotal}) = #{shardIndex}
   limit 0,200 
 </select> 
4.最后看下调度日志

同一次调度任务,4个实例个调度一次,并且拉取到各自部分的数据进行处理:

  • 第3个实例调度日志:
     2024-09-25 08:31:40 [com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144
  ----------- xxl-job job execute start -----------
  ----------- Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}
  2024-09-25 08:31:40 [com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=2,数据量=100,总分片数=4
  2024-09-25 08:31:41 [com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(2)处理完成,耗时=1
  2024-09-25 08:31:41 [com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144
  ----------- xxl-job job execute end(finish) -----------
  ----------- Result: handleCode
=200, handleMsg = null
  2024-09-25 08:31:41 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] 
        ----------- xxl-job job callback finish.
  • 第4个实例调度日志:
     2024-09-25 08:31:40 [com.xxl.job.core.thread.JobThread#run]-[130]-[Thread-144
  ----------- xxl-job job execute start -----------
  ----------- Param:{"lastHoursAgoModify":4,"rows":3000,"lastMonthAgoCreate":6,"retryCount":1}
  2024-09-25 08:31:40 [com.xxx.xxxx#orderDataStatusTask]-[47]-[Thread-144] 获取待处理订单数据:分片号=3,数据量=80,总分片数=4
  2024-09-25 08:31:41 [com.xxx.xxxx#orderDataStatusTask]-[53]-[Thread-144] 当前分片(3)处理完成,耗时=1
  2024-09-25 08:31:41 [com.xxl.job.core.thread.JobThread#run]-[176]-[Thread-144
  ----------- xxl-job job execute end(finish) -----------
  ----------- Result: handleCode
=200, handleMsg = null
  2024-09-25 08:31:41 [com.xxl.job.core.thread.TriggerCallbackThread#callbackLog]-[197]-[xxl-job, executor TriggerCallbackThread] 
        ----------- xxl-job job callback finish.


太强 ! SpringBoot中出入参增强的5种方法 : 加解密、脱敏、格式转换、时间时区处理

太强 ! SpringBoot中优化if-else语句的七种绝佳方法实战

SpringBoot使用EasyExcel并行导出多个excel文件并压缩zip下载
提升编程效率的利器: Google Guava库中双向映射BitMap
从MySQL行格式原理看:为什么开发规范中不推荐NULL?数据是如何在磁盘上存储的?
SpringBoot中使用Jackson实现自定义序列化和反序列化控制的5种方式总结
提升编程效率的利器: Google Guava库之RateLimiter优雅限流
深入JVM逃逸分析原理:且看其如何提高程序性能和内存利用率
必知必会!MySQL索引下推:原理与实战

深入解析JVM内存分配优化技术:TLAB

SpringBoot中基于JWT的双token(access_token+refresh_token)授权和续期方案
SpringBoot中基于JWT的单token授权和续期方案
SpringBoot中Token登录授权、续期和主动终止的方案(Redis+Token)
微服务中token鉴权设计的4种方式总结
提升编程效率的API利器:精通Google Guava库区间范围映射RangeMap
SpringBoot中Jackson控制序列化和反序列化的注解和扩展点总结【收藏版】

SpringBoot中大量数据导出方案:使用EasyExcel并行导出多个excel文件并压缩zip后下载

关注『 码到三十五 』,日有所获
                    点赞 和 在看 就是最大的支持

码到三十五
主要分享正经的开发技术(原理,架构,实践,源码等),以输出驱动输入;当然偶尔会穿插点生活琐碎,顺便吃个瓜,目的嘛,搞点精准流量,看能不能发发广告。
 最新文章