SpringBoot 实现静态、动态定时任务,本地动态定时任务调度

科技   2025-01-06 11:55   上海  

👉 这是一个或许对你有用的社群

🐱 一对一交流/面试小册/简历优化/求职解惑,欢迎加入芋道快速开发平台知识星球。下面是星球提供的部分资料: 

👉这是一个或许对你有用的开源项目

国产 Star 破 10w+ 的开源项目,前端包括管理后台 + 微信小程序,后端支持单体和微服务架构。

功能涵盖 RBAC 权限、SaaS 多租户、数据权限、商城、支付、工作流、大屏报表、微信公众号等等功能:

  • Boot 仓库:https://gitee.com/zhijiantianya/ruoyi-vue-pro
  • Cloud 仓库:https://gitee.com/zhijiantianya/yudao-cloud
  • 视频教程:https://doc.iocoder.cn
【国内首批】支持 JDK 21 + SpringBoot 3.2.2、JDK 8 + Spring Boot 2.7.18 双版本 

来源:juejin.cn/post/
7393190128547610662


前言

在日常的业务需求中,很多时候会遇到需要写定时任务的场景,如:每天刷新缓存、定时刷新业务数据状态等等。

基本上遇到的都是编写固定执行时间的任务,最近认识Spring中自带的轻量级定时任务调度包,想着正好来使用一下,便写了一个基于spring framework scheduling的本地任务调度的demo,以此作为契机带来介绍静态、动态任务的文章。

demo仓库点此跳转:https://github.com/IsNott/springboot-scheduler

基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/ruoyi-vue-pro
  • 视频教程:https://doc.iocoder.cn/video/

依赖版本

demo环境搭建选择我常用的老三样:mysql-connectorjmybatis-plusspringboot3

基于 Spring Cloud Alibaba + Gateway + Nacos + RocketMQ + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能

  • 项目地址:https://github.com/YunaiV/yudao-cloud
  • 视频教程:https://doc.iocoder.cn/video/

固定任务

实现固定的定时任务很简单,关注org.springframework.scheduling包下的其中两个注解。

@EnableScheduling

启用Spring的计划任务执行功能,类似于Spring的<task:*> XML命名空间中的功能。顾名思义,就是开启spring项目的定时任务功能,通常跟随@Configuration注解使用。

在Spring文档中底部这样备注:Note: @EnableScheduling applies to its local application context only, allowing for selective scheduling of beans at different levels.

意思是它仅应用于其本地应用程序上下文,允许在不同级别对bean进行选择性调度。

@Scheduled

用于标记需要定时执行的方法,对于这些周期执行方法@Scheduled注解必须接收cron()fixedDelay()fixedRate()其中之一作为参数。

使用

@Component
@EnableScheduling
@Slf4j
public class TaskComponent {

    // 接收cron表达式作为@Scheduled参数
    // 下列表达式标识每五秒执行一次
    @Scheduled(cron = "0/5 * * * * ? ")
    public void originTask(){
      log.info("原始定时任务执行");
    }

    // 接收fixedDelay作为@Scheduled参数
    // 下列方法每延时五秒执行一次
    // 根据上一个方法结束开始计时
    @Scheduled(fixedDelay = 5000)
    public void fixedDelay(){
        log.info("原始定时任务执行");
    }

    // 接收fixedRate作为@Scheduled参数
    // 下列方法每间隔五秒执行一次
    // 根据上次任务开始时计时,假如任务中间花费了2.5秒,即+2.5秒开始执行
    // 假如间隔5秒,在单线程执行的情况下,A1任务执行7秒,A1还没执行完,A2会开始执行,此时A2会出现阻塞
    @Scheduled(fixedRate = 5000)
    public void fixedRate(){
        log.info("原始定时任务执行");
    }
}

以上是Spring自带的定时任务调度,有很多好用的第三方框架,例如:QuartZ、xxl-job等。

动态任务

假设你是第一次接触springboot中的定时任务,你会发现前文解释的任务,都是开发者提前知道每个任务需要在什么时候执行。

新问题出现了:假设我的项目中,有任务可能要定时执行,但我目前还没有认识到需要执行什么东西,如何实现?

下面有两种方法,加载数据库中的记录并注册到Spring定时任务上下文中。

建表

CREATE TABLE `table_task_info` (
  `id` varchar(255NOT NULL,
  `class_name` varchar(1024DEFAULT NULL,
  `bean_name` varchar(255DEFAULT NULL,
  `period_unit` varchar(255DEFAULT NULL,
  `period` bigint(20DEFAULT NULL,
  `cron` varchar(255DEFAULT NULL,
  `execute_method` varchar(255DEFAULT NULL,
  `execute_mode` tinyint(4DEFAULT NULL,
  `param` text,
  `execute_time` datetime DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8

CREATE TABLE `table_task_execute_record` (
  `id` varchar(255NOT NULL,
  `task_id` varchar(255DEFAULT NULL,
  `execute_time` datetime DEFAULT NULL,
  `execute_status` tinyint(4DEFAULT NULL,
  `execute_msg` text,
  `error_msg` text,
  PRIMARY KEY (`id`)
ENGINE=InnoDB DEFAULT CHARSET=utf8

实现SchedulingConfigurer接口

这是org.springframework.scheduling包下的一个接口,通常用于注册定时任务,相当于用编程方式注册原本被@Scheduled注解注册的方法。接口只提供了一个方法:void configureTasks(ScheduledTaskRegistrar taskRegistrar);

ScheduledTaskRegistrar

作为configureTasks方法接收的唯一参数,单看名称能简单了解它是作为任务注册的对象。下图是它所提供的方法,看方法名称再联系上文的@cron注解,容易看出它是担任了@cron注解的任务。

使用

以下案例,可在项目启动时搜索数据库中需要执行的任务类并注册到Spring上下文中。

@Slf4j
@Configuration
public class ScheduledConfig implements SchedulingConfigurer {

    @Resource
    private TaskService taskService;

    @Bean(name = "myExecutor")
    public Executor taskExecutor() {
        return Executors.newScheduledThreadPool(10);
    }


    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.setScheduler(taskExecutor());
        List<TaskInfo> taskInfos = taskService.queryStableCronTask();
        for (TaskInfo info : taskInfos) {
            // addCronTask(Runnable task, String expression)
            // 接收一个Runnable对象,和执行的表达式
            taskRegistrar.addCronTask(taskService.getRunnableTask(info),
                    info.getCron());
            log.info("添加新定时任务id{},Cron-》{}",info.getId(),info.getCron());
        }
    }
}

使用ThreadPoolTaskScheduler

它是org.springframework.scheduling.concurrent下的一个类,实现了TaskScheduler接口,封装了一个线程池对象ScheduledThreadPoolExecutor,默认线程数为1,所以使用时需要配置线程数量。

需要关注下图的几个方法,第一个参数都是接收Runnable对象,可理解为需要执行的实际任务。后面的参数姑且理解为需要执行的时间。

注册Bean

@Configuration
public class TaskConfiguration {

    @Bean(name = "myTaskScheduler")
    public ThreadPoolTaskScheduler setThreadPoolTaskScheduler(){
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(5);
        taskScheduler.setThreadNamePrefix("task-pool-");
        // 执行shutdown时,等待前一个任务执行完
        taskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        // 执行shutdown时,等待的超时时间
        taskScheduler.setAwaitTerminationSeconds(30);
        // 自定义错误处理器,当某个线程执行时出现错误会跳过线程内部的Try/catch,进入ErrorHandler
        // 需要实现 ErrorHandler接口
//        taskScheduler.setErrorHandler(new MyTaskErrorHandler());
        return taskScheduler;
    }
}

使用

demo中的TaskController,编写RestController层用于接收需要调度的定时任务并记录入库,用于后续的执行状态查看、修改或取消。

@RestController
@RequestMapping("/task/")
public class TaskController {

    @Description("任务列表")
    @GetMapping("list")
    public Collection<TaskInfo> taskList(){
        return taskService.getTaskList();
    }

    @Description("取消任务调度")
    @RequestMapping("cancel/{id}")
    public boolean cancelTask(@PathVariable String id){
        return taskService.cancelTask(id);
    }

    @Description("新增任务记录")
    @PostMapping("add")
    public void addTask(@RequestBody TaskParam param){
        taskService.addTask(param);
    }
}

主要关注taskService.addTask(TaskParam param)方法内部。

@Transactional(rollbackFor = Exception.class)
public void addTask(TaskParam param
{
     // ..任务入库省略
    this.putTaskMap(info, null);
    if(param.isScheduleNow()){
        ScheduledFuture<?> future = this.scheduleTaskByMode(info);

        if(future == null){
            throw new RuntimeException("传入的任务参数调度失败");
        }
    }
}

private ScheduledFuture<?> scheduleTaskByMode(TaskInfo info) {
        Integer mode = info.getExecuteMode();
        Timestamp executeTime = info.getExecuteTime();
        Long period = info.getPeriod();
        String periodUnit = info.getPeriodUnit();
        // ScheduledFuture继承Future,带有cancel方法,遇到方法需要修改时可cancel后重新调度
        ScheduledFuture<?> future;
        switch (mode) {
            default -> future = null;
            case 0 -> {
                // 接收Runnable+Trigger对象(这里使用CronTrigger)
                // 根据cron表达式定时执行
                future = threadPoolTaskScheduler.schedule(getRunnableTask(info), new CronTrigger(info.getCron()));
                break;
            }
            // 接收Runnable+时间戳类对象
            // 根据时间戳定时执行
            case 1 -> {
                future = threadPoolTaskScheduler.schedule(getRunnableTask(info), executeTime.toInstant());
                break;
            }
            // 接收Runnable+Duration对象
            // 根据时间间隔执行(上次开始时计算)
            case 2 -> {
                future = threadPoolTaskScheduler.scheduleAtFixedRate(getRunnableTask(info), Duration.of(period, PeriodUnit.getByName(periodUnit)));
                break;
            }
            // 接收Runnable+Duration对象
            // 根据时间延迟执行(上次结束时计算)
            case 3 -> {
                future = threadPoolTaskScheduler.scheduleWithFixedDelay(getRunnableTask(info), executeTime.toInstant(), Duration.of(period, PeriodUnit.getByName(periodUnit)));
                break;
            }
        }
        log.info("调度一个任务:\n{}",info.toString());
        this.putTaskMap(info, future);
        return future;
    }
    
public Runnable getRunnableTask(TaskInfo info) {
        return () -> {
            long s = System.currentTimeMillis();
            log.info("----Task Execute----");
            // ..具体的执行内容,并在finally写入当前Task状态
            log.info("----Task End [{}ms]----", System.currentTimeMillis() - s);
        };
    }

结束

上文简单介绍了spring framework框架自带的定时任务包下的冰山一角,还有很多内容在遇到某些具体问题才可能挖掘出来,再加上定时任务还有其他的场景,例如:分布式调度、任务告警等需要考虑。

Spring文档:https://docs.spring.io/spring-framework/docs/current/javadoc-api/index.html


欢迎加入我的知识星球,全面提升技术能力。

👉 加入方式,长按”或“扫描”下方二维码噢

星球的内容包括:项目实战、面试招聘、源码解析、学习路线。

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

Java基基
一个苦练基本功的 Java 公众号,所以取名 Java 基基
 最新文章