spring schedule配置多任务动态cron(增删启停)

网友投稿 293 2023-01-26

spring schedule配置多任务动态cron(增删启停)

一、背景

之前公司经常会遇到配置定时任务,简单的任务可以直接依赖spring。

简单任务直接使用 @scheduled 注解配合@EnableScheduling。

但是如何实现简单的动态cron呢?

开发原则:

尽可能在项目本身去实现,少依赖第三方框架,避免项目过于臃肿和复杂。

俩种任务调度方式:

二、本篇说明

springBoot 基础模块 spring-boot-starter-web 已经内置 schedule ,无需引入额外依赖。

先思考几个问题:

1、动态 cron 实现的原理

任务的 【 停止】是基于 future接口 的cancel() 方法。

任务的 【增加、删除、启动】是基于 注册到 类ScheduledTaskRegistrar 的 ScheduledFuture的数量。

涉及核心类:

ScheduledFuture

SchedulingConfigurer

ScheduledTaskRegistrar

2、多任务并行执行配置

spring默认机制对schedule是单线程,需要配置多线程并行执行。

3、如何配置多个任务

好多博文,都是配置一个cron,这让初学者很难受。

4、如何配置任务分组

根据自己业务背景,可根据步骤三,进行改造。

5、如何配置服务启动自启任务。

想要程序启动时首次去加我们设置的task,只需实现 CommandLineRunner 即可。

6、如何从数据库读取配置

这个其实很简单,在实现 ScheduledTaskRegisZWDASOOQtrar 时,先直接查询我们需要的数据即可。

7、如何优雅的实现我们的代码

这里为了我们多个task实现时,去除臃肿的if else ,使用策略模式去实现我们的task,这里代码里面会具体介绍。

参考类图:

8、如何去触发我们的schedule 【增删启停】

配置好 task任务类,注入到 controller ,通过接口直接调用即可。

三、代码实现

先贴出我的github 代码,下面代码描述不全。

普通多任务动态cron

分组多任务动态cron

1. 普通多任务动态cron 实现

1.1 对应数据库的实体类 TaskEntity

@Data

@AllArgsConstructor

@NoArgsConstructor

public class TaskEntity {

/**

* 任务id

*/

private int taskId;

/**

* 任务说明

*/

private String desc;

/**

* cron 表达式

*/

private String expression;

}

1.2 配置每个任务实现

配置任务接口 TaskService

public interface TaskService {

void HandlerJob();

Integer jobId();

}

配置任务接口实现 TaskServiceJob1Impl、TaskServiceJob2Impl …

@Service

public class TaskServiceJob1Impl implements TaskService {

@Override

public void HandlerJob() {

System.out.println("------job1 开始执行---------:"+new Date());

System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " " + Thread.currentThread().getName() + " 任务一启动");

try {

Thread.sleep(10000);//任务耗时10秒

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " " + Thread.currentThread().getName() + " 结束");

}

@Override

public Integer jobId() {

return 1;

}

}

1.3 配置任务解析器 TaskSolverChooser

注:

这里引入策略模式

为啥要配置 任务解析器选择器:

因为我们实现多个任务时,一个任务对应一个 CronTask,需要在 MyScheduledTask 里面去实现我们每一个方法。

譬如,我们有100个任务就要自定义100个任务实现方法,代码会很臃肿,明显不符合,【开闭原则】,于是这里采用策略模式,解耦我们多个任务业务实现逻辑。

@Slf4j

@Component

public class TaskSolverChooser implements ApplicationContextAware {

private ApplicationContext applicationContext;

private Map chooseMap = new HashMap<>(16);

/**

* 拿到spring context 上下文

*/

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

@PostConstruct

private void registerToTaskSolver(){

Map taskServiceMap = applicationContext.getBeansOfType(TaskService.class);

for (TaskService value : taskServiceMap.values()) {

chooseMap.put(value.jobId(), value);

log.info("task {} 处理器: {} 注册成功",new Object[]{value.jobId(),value});

}

}

/**

* 获取需要的job

*/

public TaskService getTask(Integer jobId){

return chooseMap.get(jobId);

}

}

1.4 配置MyScheduledTask (动态cron核心配置)

说明:

1、配置多线程执行任务

2、配置 刷新 task

3、配置 停止 task

4、配置 执行task 业务逻辑

@Component

public class MyScheduledTask implements SchedulingConfigurer {

private volatile ScheduledTaskRegistrar registrar;

private final ConcurrentHashMap> scheduledFutures = new ConcurrentHashMap<>();

private final ConcurrentHashMap();

@Autowired

private TaskSolverChooser taskSolverChooser;

@Override

public void configureTasks(ScheduledTaskRegistrar registrar) {

//设置20个线程,默认单线程,如果不设置的话,不能同时并发执行任务

registrar.setScheduler(Executors.newScheduledThreadPool(10));

this.registrar = registrar;

}

/**

* 修改 cron 需要 调用该方法

*/

public void refresh(List tasks){

//取消已经删除的策略任务

Set sids = scheduledFutures.keySet();

for (Integer sid : sids) {

if(!exists(tasks, sid)){

scheduledFutures.get(sid).cancel(false);

}

}

for (TaskEntity TaskEntity : tasks) {

String expression = TaskEntity.getExpression();

//计划任务表达式为空则跳过

if(!StringUtils.hasLength(expression)){

continue;

}

//计划任务已存在并且表达式未发生变化则跳过

if (scheduledFutures.containsKey(TaskEntity.getTaskId())

&& cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {

continue;

}

//如果策略执行时间发生了变化,则取消当前策略的任务

if(scheduledFutures.containsKey(TaskEntity.getTaskId())){

scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);

scheduledFutures.remove(TaskEntity.getTaskId());

cronTasks.remove(TaskEntity.getTaskId());

}

//业务逻辑处理

CronTask task = cronTask(TaskEntity, expression);

//执行业务

ScheduledFuture> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());

cronTasks.put(TaskEntity.getTaskId(), task);

scheduledFutures.put(TaskEntity.getTaskId(), future);

}

}

/**

* 停止 cron 运行

*/

public void stop(List tasks){

tasks.forEach(item->{

if (scheduledFutures.containsKey(item.getTaskId())) {

// mayInterruptIfRunning设成false话,不允许在线程运行时中断,设成true的话就允许。

scheduledFutures.get(item.getTaskId()).cancel(false);

scheduledFutures.remove(item.getTaskId());

}

});

}

/**

* 业务逻辑处理

*/

public CronTask cronTask(TaskEntity TaskEntity, String expresshttp://ion) {

return new CronTask(() -> {

//每个计划任务实际需要执行的具体业务逻辑

http:// //采用策略,模式 ,执行我们的job

taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();

}, expression);

}

private boolean exists(List tasks, Integer tid){

for(TaskEntity TaskEntity:tasks){

if(TaskEntity.getTaskId() == tid){

return true;

}

}

return false;

}

@PreDestroy

public void destroy() {

this.registrar.destroy();

}

}

1.5 配置程序启动时首次去加我们设置的task

@Component

public class StartInitTask implements CommandLineRunner {

@Autowired

private MyScheduledTask myScheduledTask;

@Override

public void run(String... args) throws Exception {

List list = Arrays.asList(

new TaskEntity(1, "测试1", "0/1 * * * * ?"),

new TaskEntity(2, "测试2", "0/1 * * * * ?")

);

myScheduledTask.refresh(list);

}

}

1.6 配置web接口去触发,增删启停

@RestController

public class StartController {

@Autowired

private MyScheduledTask scheduledTask;

@PostMapping(value = "/startOrChangeCron")

public String changeCron(@RequestBody List list){

if (CollectionUtils.isEmpty(list)) {

// 这里模拟存在数据库的数据

list = Arrays.asList(

new TaskEntity(1, "测试1","0/1 * * * * ?") ,

new TaskEntity(2, "测试2","0/1 * * * * ?")

);

}

scheduledTask.refresh(list);

return "task任务:" + list.toString() + "已经开始运行";

}

@PostMapping(value = "/stopCron")

public String stopCron(@RequestBody List list){

if (CollectionUtils.isEmpty(list)) {

// 这里模拟将要停止的cron可通过前端传来

list = Arrays.asList(

new TaskEntity(1, "测试1","0/1 * * * * ?") ,

new TaskEntity(2, "测试2","0/1 * * * * ?")

);

}

scheduledTask.stop(list);

List collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList());

return "task任务:" + collect.toString() + "已经停止启动";

}

}

2. 分组多任务动态cron 实现

实现原理:

基于反射实现,根据方法全类名,去动态执行方法。多任务分组配置,根据任务类型进行分组。

eg:

定时任务人员的相关操作,有检测人员离职状态,人员业绩达标,人员考勤…等,

作用:

对人员定时任务做一个分类,在同一个类里面去实现不同的task,

比较

《1. 普通多任务动态cron 实现》,是一个类可以实现一个task

《2. 分组多任务动态cron 实现》,是一个类可以实现多个task

详细可参考: 分组多任务动态cron

3 测试记录

测试1 项目启动自启

TaskServiceJob1Impl和TaskServiceJob1Impl … 设置 阻塞10s

观察日志时间可发现,已经同时并发执行俩个任务。

测试2 触发 刷新【增、删、启】我们的task,。

其实这里没这么智能,如果需要触发刷新接口,实际上是重新加载我们的task,就是对应触发我们,增加任务任务,删除任务,启动任务。

使用idea插件测试接口

观察日志

测试3 触发 停止接口,停止一个接口。

这里测试略过…

四、总结

其实实现简单的动态配置,以上代码可用,比较简单。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:火遍全网的Hutool使用Builder模式创建线程池的方法
下一篇:Spring注解实现Bean自动装配示例详解
相关文章

 发表评论

暂时没有评论,来抢沙发吧~