spring schedule配置多任務動態cron(增刪啟停)
之前公司經常會遇到配置定時任務,簡單的任務可以直接依賴spring。簡單任務直接使用 @scheduled 注解配合@EnableScheduling。但是如何實現簡單的動態cron呢?
開發原則:盡可能在項目本身去實現,少依賴第三方框架,避免項目過于臃腫和復雜。
倆種任務調度方式:
springBoot 基礎模塊 spring-boot-starter-web 已經內置 schedule ,無需引入額外依賴。先思考幾個問題:
1、動態 cron 實現的原理
任務的 【 停止】是基于 future接口 的cancel() 方法。任務的 【增加、刪除、啟動】是基于 注冊到 類ScheduledTaskRegistrar 的 ScheduledFuture的數量。涉及核心類:
ScheduledFuture SchedulingConfigurer ScheduledTaskRegistrar2、多任務并行執行配置spring默認機制對schedule是單線程,需要配置多線程并行執行。
3、如何配置多個任務好多博文,都是配置一個cron,這讓初學者很難受。
4、如何配置任務分組根據自己業務背景,可根據步驟三,進行改造。
5、如何配置服務啟動自啟任務。想要程序啟動時首次去加我們設置的task,只需實現 CommandLineRunner 即可。
6、如何從數據庫讀取配置這個其實很簡單,在實現 ScheduledTaskRegistrar 時,先直接查詢我們需要的數據即可。
7、如何優雅的實現我們的代碼這里為了我們多個task實現時,去除臃腫的if else ,使用策略模式去實現我們的task,這里代碼里面會具體介紹。
參考類圖:
8、如何去觸發我們的schedule 【增刪啟停】配置好 task任務類,注入到 controller ,通過接口直接調用即可。
三、代碼實現先貼出我的github 代碼,下面代碼描述不全。
普通多任務動態cron 分組多任務動態cron1. 普通多任務動態cron 實現1.1 對應數據庫的實體類 TaskEntity
@Data@AllArgsConstructor@NoArgsConstructorpublic class TaskEntity { /** * 任務id */ private int taskId; /** * 任務說明 */ private String desc; /** * cron 表達式 */ private String expression;}
1.2 配置每個任務實現
配置任務接口 TaskService
public interface TaskService { void HandlerJob(); Integer jobId();}
配置任務接口實現 TaskServiceJob1Impl、TaskServiceJob2Impl …
@Servicepublic class TaskServiceJob1Impl implements TaskService { @Override public void HandlerJob() { System.out.println('------job1 開始執行---------:'+new Date()); System.out.println(new SimpleDateFormat('yyyy-MM-dd HH:mm:ss').format(new Date()) + ' ' + Thread.currentThread().getName() + ' 任務一啟動'); try { Thread.sleep(10000);//任務耗時10秒 } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(new SimpleDateFormat('yyyy-MM-dd HH:mm:ss').format(new Date()) + ' ' + Thread.currentThread().getName() + ' 結束'); } @Override public Integer jobId() { return 1; }}
1.3 配置任務解析器 TaskSolverChooser
注:這里引入策略模式為啥要配置 任務解析器選擇器:因為我們實現多個任務時,一個任務對應一個 CronTask,需要在 MyScheduledTask 里面去實現我們每一個方法。譬如,我們有100個任務就要自定義100個任務實現方法,代碼會很臃腫,明顯不符合,【開閉原則】,于是這里采用策略模式,解耦我們多個任務業務實現邏輯。
@Slf4j@Componentpublic class TaskSolverChooser implements ApplicationContextAware { private ApplicationContext applicationContext; private Map<Integer, TaskService> chooseMap = new HashMap<>(16); /** * 拿到spring context 上下文 */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @PostConstruct private void registerToTaskSolver(){ Map<String, TaskService> taskServiceMap = applicationContext.getBeansOfType(TaskService.class); for (TaskService value : taskServiceMap.values()) { chooseMap.put(value.jobId(), value); log.info('task {} 處理器: {} 注冊成功',new Object[]{value.jobId(),value}); } } /** * 獲取需要的job */ public TaskService getTask(Integer jobId){ return chooseMap.get(jobId); }}
1.4 配置MyScheduledTask (動態cron核心配置)
說明:1、配置多線程執行任務2、配置 刷新 task3、配置 停止 task4、配置 執行task 業務邏輯
@Componentpublic class MyScheduledTask implements SchedulingConfigurer { private volatile ScheduledTaskRegistrar registrar; private final ConcurrentHashMap<Integer, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<>(); private final ConcurrentHashMap<Integer, CronTask> cronTasks = new ConcurrentHashMap<>(); @Autowired private TaskSolverChooser taskSolverChooser; @Override public void configureTasks(ScheduledTaskRegistrar registrar) { //設置20個線程,默認單線程,如果不設置的話,不能同時并發執行任務 registrar.setScheduler(Executors.newScheduledThreadPool(10)); this.registrar = registrar; } /** * 修改 cron 需要 調用該方法 */ public void refresh(List<TaskEntity> tasks){ //取消已經刪除的策略任務 Set<Integer> sids = scheduledFutures.keySet(); for (Integer sid : sids) { if(!exists(tasks, sid)){scheduledFutures.get(sid).cancel(false); } } for (TaskEntity TaskEntity : tasks) { String expression = TaskEntity.getExpression(); //計劃任務表達式為空則跳過 if(!StringUtils.hasLength(expression)){continue; } //計劃任務已存在并且表達式未發生變化則跳過 if (scheduledFutures.containsKey(TaskEntity.getTaskId()) && cronTasks.get(TaskEntity.getTaskId()).getExpression().equals(expression)) {continue; } //如果策略執行時間發生了變化,則取消當前策略的任務 if(scheduledFutures.containsKey(TaskEntity.getTaskId())){scheduledFutures.get(TaskEntity.getTaskId()).cancel(false);scheduledFutures.remove(TaskEntity.getTaskId());cronTasks.remove(TaskEntity.getTaskId()); } //業務邏輯處理 CronTask task = cronTask(TaskEntity, expression); //執行業務 ScheduledFuture<?> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger()); cronTasks.put(TaskEntity.getTaskId(), task); scheduledFutures.put(TaskEntity.getTaskId(), future); } } /** * 停止 cron 運行 */ public void stop(List<TaskEntity> tasks){ tasks.forEach(item->{ if (scheduledFutures.containsKey(item.getTaskId())) {// mayInterruptIfRunning設成false話,不允許在線程運行時中斷,設成true的話就允許。scheduledFutures.get(item.getTaskId()).cancel(false);scheduledFutures.remove(item.getTaskId()); } }); } /** * 業務邏輯處理 */ public CronTask cronTask(TaskEntity TaskEntity, String expression) { return new CronTask(() -> { //每個計劃任務實際需要執行的具體業務邏輯 //采用策略,模式 ,執行我們的job taskSolverChooser.getTask(TaskEntity.getTaskId()).HandlerJob();}, expression); } private boolean exists(List<TaskEntity> tasks, Integer tid){ for(TaskEntity TaskEntity:tasks){ if(TaskEntity.getTaskId() == tid){return true; } } return false; } @PreDestroy public void destroy() { this.registrar.destroy(); }}
1.5 配置程序啟動時首次去加我們設置的task
@Componentpublic class StartInitTask implements CommandLineRunner { @Autowired private MyScheduledTask myScheduledTask; @Override public void run(String... args) throws Exception { List<TaskEntity> list = Arrays.asList(new TaskEntity(1, '測試1', '0/1 * * * * ?'),new TaskEntity(2, '測試2', '0/1 * * * * ?') ); myScheduledTask.refresh(list); }}
1.6 配置web接口去觸發,增刪啟停
@RestControllerpublic class StartController { @Autowired private MyScheduledTask scheduledTask; @PostMapping(value = '/startOrChangeCron') public String changeCron(@RequestBody List<TaskEntity> list){ if (CollectionUtils.isEmpty(list)) { // 這里模擬存在數據庫的數據 list = Arrays.asList( new TaskEntity(1, '測試1','0/1 * * * * ?') , new TaskEntity(2, '測試2','0/1 * * * * ?') ); } scheduledTask.refresh(list); return 'task任務:' + list.toString() + '已經開始運行'; } @PostMapping(value = '/stopCron') public String stopCron(@RequestBody List<TaskEntity> list){ if (CollectionUtils.isEmpty(list)) { // 這里模擬將要停止的cron可通過前端傳來 list = Arrays.asList( new TaskEntity(1, '測試1','0/1 * * * * ?') , new TaskEntity(2, '測試2','0/1 * * * * ?') ); } scheduledTask.stop(list); List<Integer> collect = list.stream().map(TaskEntity::getTaskId).collect(Collectors.toList()); return 'task任務:' + collect.toString() + '已經停止啟動'; }}2. 分組多任務動態cron 實現
實現原理:基于反射實現,根據方法全類名,去動態執行方法。多任務分組配置,根據任務類型進行分組。eg:定時任務人員的相關操作,有檢測人員離職狀態,人員業績達標,人員考勤…等,作用:對人員定時任務做一個分類,在同一個類里面去實現不同的task,比較《1. 普通多任務動態cron 實現》,是一個類可以實現一個task《2. 分組多任務動態cron 實現》,是一個類可以實現多個task詳細可參考: 分組多任務動態cron
3 測試記錄測試1 項目啟動自啟TaskServiceJob1Impl和TaskServiceJob1Impl … 設置 阻塞10s觀察日志時間可發現,已經同時并發執行倆個任務。
測試2 觸發 刷新【增、刪、啟】我們的task,。其實這里沒這么智能,如果需要觸發刷新接口,實際上是重新加載我們的task,就是對應觸發我們,增加任務任務,刪除任務,啟動任務。使用idea插件測試接口
觀察日志
測試3 觸發 停止接口,停止一個接口。這里測試略過…
四、總結其實實現簡單的動態配置,以上代碼可用,比較簡單。
到此這篇關于spring schedule配置多任務動態cron(增刪啟停)的文章就介紹到這了,更多相關spring schedule 多任務動態cron內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
