任务调度Quartz 2框架 页面配置调度任务
spring整合Quart 任务信息持久化到数据库,实现可以在页面配置任务 新增 暂定 删除 恢复 任务
1.定时任务配置类
/** * 定时任务配置 * */ @Configuration public class ScheduleConfig { @Bean public SchedulerFactoryBean schedulerFactoryBean(@Qualifier("dataSource")DataSource dataSource) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setDataSource(dataSource); //quartz参数 Properties prop = new Properties(); prop.put("org.quartz.scheduler.instanceName", "KedaScheduler"); prop.put("org.quartz.scheduler.instanceId", "AUTO"); //线程池配置 prop.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool"); prop.put("org.quartz.threadPool.threadCount", "20"); prop.put("org.quartz.threadPool.threadPriority", "5"); //JobStore配置 prop.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX"); //集群配置 prop.put("org.quartz.jobStore.isClustered", "false"); prop.put("org.quartz.jobStore.clusterCheckinInterval", "15000"); prop.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1"); prop.put("org.quartz.jobStore.misfireThreshold", "12000"); prop.put("org.quartz.jobStore.tablePrefix", "QRTZ_"); prop.put("org.quartz.jobStore.selectWithLockSQL", "SELECT * FROM {0}LOCKS UPDLOCK WHERE LOCK_NAME = ?"); //PostgreSQL数据库,需要打开此注释 //prop.put("org.quartz.jobStore.driverDelegateClass", "org.quartz.impl.jdbcjobstore.PostgreSQLDelegate"); factory.setQuartzProperties(prop); factory.setSchedulerName("kedaScheduler"); //延时启动 factory.setStartupDelay(30); factory.setApplicationContextSchedulerContextKey("applicationContextKey"); //可选,QuartzScheduler 启动时更新己存在的Job,这样就不用每次修改targetObject后删除qrtz_job_details表对应记录了 factory.setOverwriteExistingJobs(true); //设置自动启动,默认为true factory.setAutoStartup(true); return factory; } }
2.建表语句
/* Navicat MySQL Data Transfer Source Server : 153 Source Server Type : MySQL Source Server Version : 50721 Source Host : 192.168.8.153:3306 Source Schema : nongji_0.0.2 Target Server Type : MySQL Target Server Version : 50721 File Encoding : 65001 Date: 08/04/2020 08:49:09 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for schedule_job -- ---------------------------- DROP TABLE IF EXISTS `schedule_job`; CREATE TABLE `schedule_job` ( `job_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 任务id, `bean_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT spring bean名称, `params` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 参数, `cron_expression` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT cron表达式, `status` tinyint(4) NULL DEFAULT NULL COMMENT 任务状态 0:正常 1:暂停, `remark` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 备注, `create_time` datetime(0) NULL DEFAULT NULL COMMENT 创建时间, PRIMARY KEY (`job_id`) USING BTREE, INDEX `beanName`(`bean_name`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 27 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 定时任务 ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1; /* Navicat MySQL Data Transfer Source Server : 153 Source Server Type : MySQL Source Server Version : 50721 Source Host : 192.168.8.153:3306 Source Schema : nongji_0.0.2 Target Server Type : MySQL Target Server Version : 50721 File Encoding : 65001 Date: 08/04/2020 08:49:22 */ SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for schedule_job_log -- ---------------------------- DROP TABLE IF EXISTS `schedule_job_log`; CREATE TABLE `schedule_job_log` ( `log_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 任务日志id, `job_id` bigint(20) NOT NULL COMMENT 任务id, `bean_name` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT spring bean名称, `params` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 参数, `status` int(4) NOT NULL COMMENT 任务状态 0:成功 1:失败, `error` varchar(2000) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT 失败信息, `times` int(11) NOT NULL COMMENT 耗时(单位:毫秒), `create_time` datetime(6) NULL DEFAULT NULL COMMENT 创建时间, PRIMARY KEY (`log_id`) USING BTREE, INDEX `job_id`(`job_id`) USING BTREE, INDEX `create`(`create_time`) USING BTREE ) ENGINE = InnoDB AUTO_INCREMENT = 5324 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = 定时任务日志 ROW_FORMAT = Dynamic; SET FOREIGN_KEY_CHECKS = 1;
3.实体类
/** * 定时任务 * */ public class ScheduleJobEntity implements Serializable { private static final long serialVersionUID = 1L; /** * 任务调度参数key */ public static final String JOB_PARAM_KEY = "JOB_PARAM_KEY"; /** * 任务id */ private Long jobId; /** * spring bean名称 */ private String beanName; /** * 参数 */ private String params; /** * cron表达式 */ private String cronExpression; /** * 任务状态 */ private Integer status; /** * 备注 */ private String remark; /** * 创建时间 */ private Date createTime; public static long getSerialVersionUID() { return serialVersionUID; } public static String getJobParamKey() { return JOB_PARAM_KEY; } public Long getJobId() { return jobId; } public void setJobId(Long jobId) { this.jobId = jobId; } public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getCronExpression() { return cronExpression; } public void setCronExpression(String cronExpression) { this.cronExpression = cronExpression; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } public String getRemark() { return remark; } public void setRemark(String remark) { this.remark = remark; } public Date getCreateTime() { return createTime; } public void setCreateTime(Date createTime) { this.createTime = createTime; } } /** * 定时任务日志 * */ public class ScheduleJobLogEntity implements Serializable { private static final long serialVersionUID = 1L; /** * 日志id */ private Long logId; /** * 任务id */ private Long jobId; /** * spring bean名称 */ private String beanName; /** * 参数 */ private String params; /** * 任务状态 0:成功 1:失败 */ private Integer status; /** * 失败信息 */ private String error; /** * 耗时(单位:毫秒) */ private Integer times; /** * 创建时间 */ private String createTime; private String createDateStart; private String createDateEnd; public static long getSerialVersionUID() { return serialVersionUID; } public Long getLogId() { return logId; } public void setLogId(Long logId) { this.logId = logId; } public Long getJobId() { return jobId; } public void setJobId(Long jobId) { this.jobId = jobId; } public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public Integer getStatus() { return status; } public void setStatus(Integer status) { this.status = status; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Integer getTimes() { return times; } public void setTimes(Integer times) { this.times = times; } public String getCreateTime() { return createTime; } public void setCreateTime(String createTime) { this.createTime = createTime; } public String getCreateDateStart() { return createDateStart; } public void setCreateDateStart(String createDateStart) { this.createDateStart = createDateStart; } public String getCreateDateEnd() { return createDateEnd; } public void setCreateDateEnd(String createDateEnd) { this.createDateEnd = createDateEnd; } }
4.项目启动时初始化定时器
/** * 项目启动时,初始化定时器 */ @PostConstruct public void init(){ // TODO 查询全部job Map<String,Object> map = new HashMap<>(); List<ScheduleJobEntity> scheduleJobList = dalClient.queryForList("scheduleJob.getJobPage",map,ScheduleJobEntity.class); for(ScheduleJobEntity scheduleJob : scheduleJobList){ CronTrigger cronTrigger = ScheduleUtils.getCronTrigger(scheduler, scheduleJob.getJobId()); //如果不存在,则创建 if(cronTrigger == null) { ScheduleUtils.createScheduleJob(scheduler, scheduleJob); }else { ScheduleUtils.updateScheduleJob(scheduler, scheduleJob); } } }
5.主要类,任务调度类
** * 定时任务 * */ public class ScheduleJob extends QuartzJobBean { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void executeInternal(JobExecutionContext context) throws JobExecutionException { ScheduleJobLogService scheduleJobLogService = (ScheduleJobLogService)SpringContextUtils.getBean("scheduleJobLogService"); SendmaillEventPublisher sendMail = (SendmaillEventPublisher)SpringContextUtils.getBean("sendmaillEventPublisher"); ScheduleJobEntity scheduleJob = (ScheduleJobEntity) context.getMergedJobDataMap() .get(ScheduleJobEntity.JOB_PARAM_KEY); //数据库保存执行记录 ScheduleJobLogEntity log = new ScheduleJobLogEntity(); log.setJobId(scheduleJob.getJobId()); log.setBeanName(scheduleJob.getBeanName()); log.setParams(scheduleJob.getParams()); log.setCreateTime(DateUtils.getCurrentSecond()); //任务开始时间 long startTime = System.currentTimeMillis(); try { //执行任务 logger.debug("任务准备执行,任务ID:" + scheduleJob.getJobId()); //获取定时任务处理类 ITask target = SpringContextUtils.getBean(scheduleJob.getBeanName(), ITask.class); target.run( scheduleJob.getParams()); //通过反射执行任务 Object target = SpringContextUtils.getBean(scheduleJob.getBeanName()); Method method = target.getClass().getDeclaredMethod("run", String.class); method.invoke(target,scheduleJob.getParams()); //任务执行总时长 long times = System.currentTimeMillis() - startTime; log.setTimes((int)times); //任务状态 0:成功 1:失败 log.setStatus(0); logger.debug("任务执行完毕,任务ID:" + scheduleJob.getJobId() + " 总共耗时:" + times + "毫秒"); }catch (Exception e) { logger.error("任务执行失败,任务ID:" + scheduleJob.getJobId(), e); //任务执行总时长 long times = System.currentTimeMillis() - startTime; log.setTimes((int)times); //任务状态 0:成功 1:失败 log.setStatus(1); log.setError(StringUtils.substring(e.toString(), 0, 2000)); if(e instanceof InvocationTargetException){ //具体异常 Throwable t = ((InvocationTargetException) e).getTargetException();// 获取目标异常 log.setError(StringUtils.substring(t.toString(), 0, 2000)); } sendMail.sendmailTaskLog(log); }finally { scheduleJobLogService.save(log); } } }
6.任务调度工具类 重要
/** * 定时任务工具类 * */ public class ScheduleUtils { private final static String JOB_NAME = "TASK_"; /** * 获取触发器key */ public static TriggerKey getTriggerKey(Long jobId) { return TriggerKey.triggerKey(JOB_NAME + jobId); } /** * 获取jobKey */ public static JobKey getJobKey(Long jobId) { return JobKey.jobKey(JOB_NAME + jobId); } /** * 获取表达式触发器 */ public static CronTrigger getCronTrigger(Scheduler scheduler, Long jobId) { try { return (CronTrigger) scheduler.getTrigger(getTriggerKey(jobId)); } catch (SchedulerException e) { throw new DiyException("获取定时任务CronTrigger出现异常", e); } } /** * 创建定时任务 */ public static void createScheduleJob(Scheduler scheduler, ScheduleJobEntity scheduleJob) { try { //构建job信息 JobDetail jobDetail = JobBuilder.newJob(ScheduleJob.class).withIdentity(getJobKey(scheduleJob.getJobId())).build(); //表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()) .withMisfireHandlingInstructionDoNothing(); //按新的cronExpression表达式构建一个新的trigger CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(getTriggerKey(scheduleJob.getJobId())).withSchedule(scheduleBuilder).build(); //放入参数,运行时的方法可以获取 jobDetail.getJobDataMap().put(ScheduleJobEntity.JOB_PARAM_KEY, scheduleJob); scheduler.scheduleJob(jobDetail, trigger); //暂停任务 if(scheduleJob.getStatus() == TaskConstant.ScheduleStatus.PAUSE.getValue()){ pauseJob(scheduler, scheduleJob.getJobId()); } } catch (SchedulerException e) { throw new DiyException("创建定时任务失败", e); } } /** * 更新定时任务 */ public static void updateScheduleJob(Scheduler scheduler, ScheduleJobEntity scheduleJob) { try { TriggerKey triggerKey = getTriggerKey(scheduleJob.getJobId()); //表达式调度构建器 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(scheduleJob.getCronExpression()) .withMisfireHandlingInstructionDoNothing(); CronTrigger trigger = getCronTrigger(scheduler, scheduleJob.getJobId()); //按新的cronExpression表达式重新构建trigger trigger = trigger.getTriggerBuilder().withIdentity(triggerKey).withSchedule(scheduleBuilder).build(); //参数 trigger.getJobDataMap().put(ScheduleJobEntity.JOB_PARAM_KEY, scheduleJob); scheduler.rescheduleJob(triggerKey, trigger); //暂停任务 if(scheduleJob.getStatus() == TaskConstant.ScheduleStatus.PAUSE.getValue()){ pauseJob(scheduler, scheduleJob.getJobId()); } } catch (SchedulerException e) { throw new DiyException("更新定时任务失败", e); } } /** * 立即执行任务 */ public static void run(Scheduler scheduler, ScheduleJobEntity scheduleJob) { try { //参数 JobDataMap dataMap = new JobDataMap(); dataMap.put(ScheduleJobEntity.JOB_PARAM_KEY, scheduleJob); scheduler.triggerJob(getJobKey(scheduleJob.getJobId()), dataMap); } catch (SchedulerException e) { throw new DiyException("立即执行定时任务失败", e); } } /** * 暂停任务 */ public static void pauseJob(Scheduler scheduler, Long jobId) { try { scheduler.pauseJob(getJobKey(jobId)); } catch (SchedulerException e) { throw new DiyException("暂停定时任务失败", e); } } /** * 恢复任务 */ public static void resumeJob(Scheduler scheduler, Long jobId) { try { scheduler.resumeJob(getJobKey(jobId)); } catch (SchedulerException e) { throw new DiyException("恢复定时任务失败", e); } } /** * 删除定时任务 */ public static void deleteScheduleJob(Scheduler scheduler, Long jobId) { try { scheduler.deleteJob(getJobKey(jobId)); } catch (SchedulerException e) { throw new DiyException("删除定时任务失败", e); } } }
7.定时任务接口
/** * 定时任务接口,所有定时任务都要实现该接口 */ public interface ITask { /** * 执行定时任务接口 * * @param params 参数,多参数使用JSON数据 */ void run(String params); }
8.Quart持久化任务 在Quart中有自建的 11个表
上一篇:
多线程四大经典案例