SpringBoot整合XXL-JOB- 任务分片

科技   2024-09-21 10:24   河北  


前言

在实际业务场景中,当面临需要短时间内批量执行的定时任务时,若未采取适当措施,简单直接地执行可能会导致任务间的冲突或效率问题,特别是当上一批任务尚未完成时,新的定时任务触发,这无疑会对业务系统的稳定性和性能产生不良影响。因此,对这类批量任务进行分片处理显得尤为重要。接下来,我们将探讨Xxl-job(一种流行的分布式任务调度平台)如何实现任务的分片处理,以确保任务能够高效、有序地执行。

一、代码示例

一、环境准备

1.mock数据

首先在数据库里建一个测试的表,并mock一些数据进去,模拟业务中的“大表”。因为本身就是在模拟,也不讲究性能什么的,大家自己操作下就行。

2.添加依赖

<!--MyBatis驱动-->
<dependency>
    <groupId>org.mybatis.spring.boot</groupId>
    <artifactId>mybatis-spring-boot-starter</artifactId>
    <version>1.2.0</version>
</dependency>
<!--mysql驱动-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
</dependency>
<!--lombok依赖-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <scope>provided</scope>
</dependency>
<!--连接池-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>druid</artifactId>
    <version>1.1.10</version>
</dependency>

3.添加配置

配置数据库的连接信息,如下:

spring.datasource.url=jdbc:mysql://localhost:3306/xxl_job_demo?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=UTF-8
spring.datasource.driverClassName=com.mysql.jdbc.Driver
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.username=root
spring.datasource.password=123456

4.声明实体类和查询类

根据各自实际模拟的数据集,声明相应的实体类以及查询类,确保设计能够成功检索到数据即可。这一步的具体实现将依据每个人的数据模型选择以及所采用的ORM(对象关系映射)框架的不同而有所差异,因此不详细展开每个细节。

二、分片与不分片

1.不使用分片

话不多说,先上代码:

定义了一个名为 sendMsgHandler 的方法,它使用 @XxlJob 注解标记,表明这是一个 Xxl-Job 定时任务,

@XxlJob("sendMsgHandler")
public void sendMsgHandler() throws Exception{
    List<UserMobilePlan> userMobilePlans = userMobilePlanMapper.selectAll();
    System.out.println("任务开始时间:"+new Date()+",处理任务数量:"+userMobilePlans.size());
    Long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item->{
        try {
            //模拟发送短信动作
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
    System.out.println("任务结束时间:"+new Date());
    System.out.println("任务耗时:"+(System.currentTimeMillis()-startTime)+"毫秒");
}

mapper

@Mapper
public interface UserMobilePlanMapper {

    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}

通过查询用户表模拟发短信的场景,我们启动项目执行一下这个任务看看需要多久完成

可以看到,在本地环境中,我使用模拟的两千条数据进行测试,结果显示任务执行耗时约为30秒。然而,如果这一任务在实际业务场景中需要每隔20秒就执行一次,那么显然会出现任务执行时间重叠的问题,即新任务的触发可能会与尚未完成的前一任务冲突。为了应对这种情况,我们就需要对批量任务进行切片处理,通过将其分割成多个较小的任务组,使它们能够并行执行,从而有效缩短整体执行耗时,确保任务能够按时并高效地完成。

2.使用分片

与上述案例不同,如果采用分片广播的调度策略,那么当前任务的执行将会充分利用集群内的所有机器(即各个执行器)来并行处理,从而显著加快整体执行速度。为了实现这一点,我们需要在原有的业务逻辑中融入分片参数的概念。幸运的是,我们无需手动编写分片逻辑,因为XXL-JOB框架已经为我们提供了便捷的工具类,通过这些工具类,我们可以轻松地获取到分片参数,进而实现任务的分片广播与高效执行。

int shardTotal = XxlJobHelper.getShardTotal();
int shardIndex = XxlJobHelper.getShardIndex();

这里的shardTotal和shardIndex是分片处理中的两个关键参数,它们分别代表了分片总数和当前分片任务的索引。如果这样的描述仍让部分同学感到困惑,我们可以尝试用另一种方式来解释:可以将shardTotal理解为参与任务执行的执行器(也就是我们的机器)的总数,而shardIndex则可以看作是分配给每个执行器处理的具体任务标识,它在某种程度上可以与我们要查询的数据的ID相关联。基于这样的理解,我们改造原查询逻辑的思路就变成了利用shardTotal和shardIndex这两个参数,通过求模运算来定位到每个执行器应该处理的数据范围。具体的实现方式可能类似于根据数据的ID对shardTotal进行取模,然后根据shardIndex来确定由哪个执行器处理哪些ID范围内的数据

@Mapper
public interface UserMobilePlanMapper {

    @Select("select * from t_user_mobile_plan where mod(id, #{shardingTotal})=#{shardingIndex}")
    List<UserMobilePlan> selectByMod(@Param("shardingIndex") Integer shardingIndex,@Param("shardingTotal")Integer shardingTotal);

    @Select("select * from t_user_mobile_plan")
    List<UserMobilePlan> selectAll();
}

对接口进行小小的改造,根据执行器的个数来决定是否采用分片的方式查询,如下

@XxlJob("sendMsgHandler1")
public void sendMsgHandler1() throws Exception{
    System.out.println("任务开始时间" + new Date());
    int shardTotal = XxlJobHelper.getShardTotal();
    int shardIndex = XxlJobHelper.getShardIndex();
    List<UserMobilePlan> userMobilePlans = null;
    if (shardTotal == 1) {
        userMobilePlans= userMobilePlanMapper.selectAll();
    } else {
        userMobilePlans = userMobilePlanMapper.selectByMod(shardIndex, shardTotal);
    }
    System.out.println("处理任务数量" + userMobilePlans.size());
    long startTime = System.currentTimeMillis();
    userMobilePlans.forEach(item -> {
        try {
            TimeUnit.MILLISECONDS.sleep(10);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });
    System.out.println("任务结束时间" + new Date());
    System.out.println("任务耗时:" + (System.currentTimeMillis() - startTime) + "毫秒");
}

在调度中心里修改这个任务的路由策略

3.分片测试

执行一次,观察控制台的输出

可以观察到,原先需要处理的两千条数据查询任务,在通过分片策略被均匀地分配至两台机器上并行执行后,两台机器几乎同时启动了处理流程。这一改变直接导致了任务执行效率的大幅提升,原本需要耗时30秒才能完成的任务,现在仅用了15秒就圆满结束,实现了性能上的一倍提升。

Java技术前沿
专注分享Java技术,包括但不限于 SpringBoot,SpringCloud,Docker,消息中间件等。
 最新文章