手动实现Quartz集群解决方案

概述

在企业项目应用中必然要集成定时任务,绝大部分JavaEE工程采用Quartz和Spring的相应配置即可完成,若要修改任务的执行时间、执行服务器、任务的启动暂停等操作需要改变配置代码甚至需要重启服务器,本文是集群化服务器下动态的处理定时任务的一个实现。

本文主要对以下问题分析:

  • Quartz集群原理
  • 集群化服务器下的任务解决方案
  • Quartz原生集群方案的弊端
  • 自定义实现集群化任务管理
  • 反射机制手动装载Bean,@Autowired注解注入失败问题

本文将实现以下功能:

  • 动态管理定时任务,启动、暂停、修改等
  • 实时切换任务执行服务器
  • 任务日志管理,运行监控、报警

希望读者具备:

  • 理解Spring Bean的装配方式与生命周期
  • 理解Java反射机制
  • JavaEE开发经验
  • Quartz集成经验
  • JavaWeb开发经验(Bootstrap)

开发环境

JDK 1.8 / Spring 3.2+ / Quartz 2.2.1+ / Gradle2.2+ / MySQL 5.7.9+ / Bootstrap 3.3.6+ / IntelliJ IDEA 2018.2.6

Gradle中必须引入quartz-jobs

'org.quartz-scheduler:quartz:2.2.1', 
'org.quartz-scheduler:quartz-jobs:2.2.1', 

Spring配置中引入注册bean

1
2
<bean id="schedulerFactoryBean"
class="org.springframework.scheduling.quartz.SchedulerFactoryBean" />

Quartz基本原理

核心元素

Quartz核心要素有Scheduler、Trigger、Job、JobDetail,其中trigger和job、jobDetail为元数据,而Scheduler为实际进行调度的控制器。

  • Trigger

Trigger用于定义调度任务的时间规则,在Quartz中主要有四种类型的Trigger:SimpleTrigger、CronTrigger、DataIntervalTrigger和NthIncludedTrigger。

  • Job&Jodetail

Quartz将任务分为Job、JobDetail两部分,其中Job用来定义任务的执行逻辑,而JobDetail用来描述Job的定义(例如Job接口的实现类以及其他相关的静态信息)。对Quartz而言,主要有两种类型的Job,StateLessJob、StateFulJob

  • Scheduler

实际执行调度逻辑的控制器,Quartz提供了DirectSchedulerFactory和StdSchedulerFactory等工厂类,用于支持Scheduler相关对象的产生。

核心元素间关系

主要线程

在Quartz中,有两类线程,也即执行线程和调度线程,其中执行任务的线程通常用一个线程池维护。线程间关系如下图所示。

在quartz中,Scheduler调度线程主要有两个:regular Scheduler Thread(执行常规调度)和Misfire Scheduler Thread(执行错失的任务)。其中Regular Thread 轮询Trigger,如果有将要触发的Trigger,则从任务线程池中获取一个空闲线程,然后执行与改Trigger关联的job;Misfire Thraed则是扫描所有的trigger,查看是否有错失的,如果有的话,根据一定的策略进行处理。

数据存储

Quartz中的trigger和job需要存储下来才能被使用。Quartz中有两种存储方式:RAMJobStore,JobStoreSupport,其中RAMJobStore是将trigger和job存储在内存中,而JobStoreSupport是基于jdbc将trigger和job存储到数据库中。RAMJobStore的存取速度非常快,但是由于其在系统被停止后所有的数据都会丢失,所以在集群应用中,必须使用JobStoreSupport。其中表结构如下表所示。

Table name Description
QRTZ_CALENDARS 存储Quartz的Calendar信息
QRTZ_CRON_TRIGGERS 存储CronTrigger,包括Cron表达式和时区信息
QRTZ_FIRED_TRIGGERS 存储与已触发的Trigger相关的状态信息,以及相联Job的执行信息
QRTZ_PAUSED_TRIGGER_GRPS 存储已暂停的Trigger组的信息
QRTZ_SCHEDULER_STATE 存储少量的有关Scheduler的状态信息,和别的Scheduler实例
QRTZ_LOCKS 存储程序的悲观锁的信息
QRTZ_JOB_DETAILS 存储每一个已配置的Job的详细信息
QRTZ_SIMPLE_TRIGGERS 存储简单的Trigger,包括重复次数、间隔、以及已触的次数
QRTZ_BLOG_TRIGGERS Trigger作为Blob类型存储
QRTZ_TRIGGERS 存储已配置的Trigger的信息
QRTZ_SIMPROP_TRIGGERS

Quartz原生集群支持

一个Quartz集群中的每个节点是一个独立的Quartz应用,它又管理着其他的节点。这就意味着你必须对每个节点分别启动或停止。Quartz集群中,独立的Quartz节点并不与另一其的节点或是管理节点通信,而是通过相同的数据库表来感知到另一Quartz应用的。

原生集群支持的问题

原生集群支持需要在数据库新增十二张表,且每个任务调度是随机分配服务器,加上部分任务执行时间过长可能超时报错等等原因,可定制程度不高,固未采用原生集群方案。下面开始手写方案。

自定义实现集群化任务管理

先展示成果

表结构设计

  • 创建任务信息表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
   CREATE TABLE `t_quartz_job` (
`job_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '任务ID',
`create_time` timestamp NULL DEFAULT NULL COMMENT '创建时间',
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',
`job_name` varchar(255) DEFAULT NULL COMMENT '任务名',
`job_group` varchar(255) DEFAULT NULL COMMENT '任务组',
`job_status` varchar(255) DEFAULT NULL COMMENT '任务状态',
`cron_expression` varchar(255) NOT NULL COMMENT '时间表达式',
`description` varchar(255) DEFAULT NULL COMMENT '描述',
`bean_class` varchar(255) NOT NULL COMMENT 'bean',
`is_concurrent` varchar(255) DEFAULT NULL COMMENT '1',
`spring_id` varchar(255) DEFAULT NULL COMMENT 'spring',
`method_name` varchar(255) NOT NULL COMMENT '方法名',
`allowed_ip` varchar(50) NOT NULL COMMENT '允许执行定时任务的服务器IP',
PRIMARY KEY (`job_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
  • 创建保存任务信息的实体类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
public class JobDetail implements java.io.Serializable {

private static final long serialVersionUID = 2L;
/**
* 正在运行
*/
public static final String STATUS_RUNNING = "1";

/**
* 停止运行
*/
public static final String STATUS_NOT_RUNNING = "0";

/**
* 同步
*/
public static final String CONCURRENT_IS = "1";

/**
* 非同步
*/
public static final String CONCURRENT_NOT = "0";
private Long jobId;

private String createTime;

private String updateTime;
/**
* 任务名称
*/
private String jobName;
/**
* 任务分组
*/
private String jobGroup;
/**
* 任务状态 是否启动任务
*/
private String jobStatus;
/**
* cron表达式
*/
private String cronExpression;
/**
* 描述
*/
private String description;
/**
* 任务执行时调用哪个类的方法 包名+类名
*/
private String beanClass;
/**
* 任务是否有状态
*/
private String isConcurrent;
/**
* spring bean
*/
private String springId;
/**
* 任务调用的方法名
*/
private String methodName;

/**
* IP
*/
private String ip;

public String getCreateTime() {
return createTime;
}

public void setCreateTime(String createTime) {
this.createTime = createTime;
}

public String getUpdateTime() {
return updateTime;
}

public void setUpdateTime(String updateTime) {
this.updateTime = updateTime;
}

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public Long getJobId() {
return jobId;
}

public void setJobId(Long jobId) {
this.jobId = jobId;
}


public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getJobGroup() {
return jobGroup;
}

public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}

public String getJobStatus() {
return jobStatus;
}

public void setJobStatus(String jobStatus) {
this.jobStatus = jobStatus;
}

public String getCronExpression() {
return cronExpression;
}

public void setCronExpression(String cronExpression) {
this.cronExpression = cronExpression;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getBeanClass() {
return beanClass;
}

public void setBeanClass(String beanClass) {
this.beanClass = beanClass;
}

public String getIsConcurrent() {
return isConcurrent;
}

public void setIsConcurrent(String isConcurrent) {
this.isConcurrent = isConcurrent;
}

public String getSpringId() {
return springId;
}

public void setSpringId(String springId) {
this.springId = springId;
}

public String getMethodName() {
return methodName;
}

public void setMethodName(String methodName) {
this.methodName = methodName;
}
}

该实体类对应数据库中的表,jobName 与 groupName的组合是唯一的,beanClass/springId至少一个不为空。

新增批次

  • 前端代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    function add() {
    //获取模态框数据
    var description = $('#addDescription').val();
    var cronExpression = $('#AddCronExpression').val();
    var jobName = $('#AddName').val();
    var jobGroup = $('#AddGroup').val();
    var jobStatus = $('#AddStatus').val();
    var beanClass = $('#AddClass').val();
    var methodName = $('#AddMethodName').val();
    var ip = $('#addIp').val();
    var id = $('#addId').val();
    var params = {
    jobName: jobName,
    jobGroup: jobGroup,
    jobStatus: jobStatus,
    beanClass: beanClass,
    methodName: methodName,
    description: description,
    cronExpression: cronExpression,
    ip: ip,
    jobId: id
    };
    $.ajax({
    type: "post",
    url: "job/add-task.do",
    data: JSON.stringify(params),
    dataType: 'json',
    contentType: 'application/json;charset=UTF-8',
    success: function (result) {
    consoleLog(result)
    if (result.status === 0) {
    modals.info('新增成功');
    userTable.reloadData();
    } else {
    modals.info('新增异常,请联系管理员');
    }
    $('#myModalAdd').modal(
    'hide'
    );
    }
    });
    }
  • MyBatis 插入映射

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    <insert id="insertJob" parameterType="JobDetail">
    insert into t_quartz_job
    <trim prefix="(" suffix=")" suffixOverrides=",">
    <if test="jobId != null">
    job_id,
    </if>
    <if test="createTime != null">
    create_time,
    </if>
    <if test="updateTime != null">
    update_time,
    </if>
    <if test="jobName != null">
    job_name,
    </if>
    <if test="jobGroup != null">
    job_group,
    </if>
    <if test="jobStatus != null">
    job_status,
    </if>
    <if test="cronExpression != null">
    cron_expression,
    </if>
    <if test="description != null">
    description,
    </if>
    <if test="beanClass != null">
    bean_class,
    </if>
    <if test="isConcurrent != null">
    is_concurrent,
    </if>
    <if test="springId != null">
    spring_id,
    </if>
    <if test="methodName != null">
    method_name,
    </if>
    <if test="ip != null">
    ip,
    </if>
    </trim>
    <trim prefix="values (" suffix=")" suffixOverrides=",">
    <if test="jobId != null">
    #{jobId,jdbcType=BIGINT},
    </if>
    <if test="createTime != null">
    #{createTime,jdbcType=TIMESTAMP},
    </if>
    <if test="updateTime != null">
    #{updateTime,jdbcType=TIMESTAMP},
    </if>
    <if test="jobName != null">
    #{jobName,jdbcType=VARCHAR},
    </if>
    <if test="jobGroup != null">
    #{jobGroup,jdbcType=VARCHAR},
    </if>
    <if test="jobStatus != null">
    #{jobStatus,jdbcType=VARCHAR},
    </if>
    <if test="cronExpression != null">
    #{cronExpression,jdbcType=VARCHAR},
    </if>
    <if test="description != null">
    #{description,jdbcType=VARCHAR},
    </if>
    <if test="beanClass != null">
    #{beanClass,jdbcType=VARCHAR},
    </if>
    <if test="isConcurrent != null">
    #{isConcurrent,jdbcType=VARCHAR},
    </if>
    <if test="springId != null">
    #{springId,jdbcType=VARCHAR},
    </if>
    <if test="methodName != null">
    #{methodName,jdbcType=VARCHAR},
    </if>
    <if test="ip != null">
    #{ip,jdbcType=VARCHAR},
    </if>
    </trim>
    </insert>

立即运行一次

考虑到服务器不同配置。可自定义端口。

  • 后端需做端口字段判断

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public Boolean runJobOnce(String json) {
    JSONObject requestJson = JSON.parseObject(json);
    String port = requestJson.getString("runOncePort");
    String ip = requestJson.getString("runOnceIp");
    try {
    String result;
    if (StringUtils.isEmpty(port)) {
    result = HttpClientUtils.sendPostToMidServer("http://" + ip + "/" + "run-job-now.do", json);
    } else {
    result = HttpClientUtils.sendPostToMidServer("http://" + ip + ":" + port + "/" + "run-job-now.do", json);
    }
    if (StringUtils.isEmpty(result)) {
    return false;
    }

    JSONObject jsonObject = JSON.parseObject(result);
    return (jsonObject != null && jsonObject.containsKey("success") && jsonObject.getBooleanValue("success"));
    } catch (Exception e) {
    logger.info(">>>>>>>>>runJobOnce>>>>>>exception>>>>{}", e.toString());
    }
    return false;
    }
  • 后端反射运行批次任务,并对IP检验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void invokMethod(JobDetail scheduleJob) {
String ip = getLocalHostIP();
Object object = null;
Class clazz = null;
if (!ip.equals(scheduleJob.getIp())) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,不在执行IP范围!!!");
return;
}
if (StringUtils.isNotBlank(scheduleJob.getSpringId())) {
object = SpringUtils.getBean(scheduleJob.getSpringId());
} else if (StringUtils.isNotBlank(scheduleJob.getBeanClass())) {
try {
clazz = Class.forName(scheduleJob.getBeanClass());
object = clazz.newInstance();
} catch (Exception e) {
e.printStackTrace();
}

}
if (object == null) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,请检查配置参数!!!");
return;
}
clazz = object.getClass();
Method method = null;
try {
method = clazz.getDeclaredMethod(scheduleJob.getMethodName());
} catch (NoSuchMethodException e) {
log.error("任务名称 = [" + scheduleJob.getJobName() + "]---------------未启动成功,方法名设置错误!!!");
} catch (SecurityException e) {
e.printStackTrace();
}
if (method != null) {
try {
method.invoke(object);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
log.info("任务名称 = [" + scheduleJob.getJobName() + "]----------启动成功");
}

反射机制手动装载Bean,@Autowired注解注入失败问题

Spring的三种装配Bean的方式

  • 在XML中显式配置
  • 在Java的接口和类中实现配置
  • 隐式Bean的发现机制和自动装配原则

在现实工作中,以上三种方式都会被用到,并且在工作总常常混合使用,所以需要知道三种方式的优先级,也就是应该选择合适的方式把Bean装配到Spring IoC容器中。

  • 基于约定优于配置原则,最优先的应该是通过隐式Bean的发现机制和自动装配的原则。这样的好处是减少程序开发则的决定权,简单又不失灵活。
  • 在没有办法使用自动装配原则的情况下,应该优先考虑Java接口和类中实现配置,这样的好处就是避免XML配置泛滥,也更为容易。
  • 在上述方法都无法使用的情况下,那么只能选择XML去显式配置Spring IoC容器。

反射机制装载Bean的问题

如上图所示,反射出来的东西,里面@Autowired注入的组件都是null。

究其原因是:通过反射创建实例时,是根据你调用的构造函数完成的实例化过程,Spring容器并不知晓,故不会自动化创建实例。因此需要自己对依赖对象进行注入。

处理方式有两种,还是基于上文:

  • 在XML中显式配置
1
<bean id="testUtils" class="这里写全路径名" init-method="init"></bean>
  • @PostConstruct注解方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component 
public class TestUtils {
@Autowired
private ItemService itemService;

@Autowired
private ItemMapper itemMapper;

public static TestUtils testUtils;

@PostConstruct
public void init() {
testUtils = this;
}

//utils工具类中使用service和mapper接口的方法例子,用"testUtils.xxx.方法" 就可以了
public static void test(Item record){
testUtils.itemMapper.insert(record);
testUtils.itemService.queryAll();
}
}

结论:依赖Spring容器实例化与自己用反射实例化是两种各自独立的方式,互不干涉。

日志管理

  • 日志详情实体类,关联任务详情表和日志表,仅供参考。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public class JobLogDetail {

private Long jobId;

/**
* 任务名称
*/
private String jobName;

/**
* 描述
*/
private String description;

/**
* 任务分组
*/
private String jobGroup;

/**
* 执行ip
*/
private String execIp;

/**
* 执行时间
*/
private String execTime;


/**
* 执行结果
*/
private String execResult;

/**
* 失败原因
*/
private String failReason;

public Long getJobId() {
return jobId;
}

public void setJobId(Long jobId) {
this.jobId = jobId;
}

public String getJobName() {
return jobName;
}

public void setJobName(String jobName) {
this.jobName = jobName;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getJobGroup() {
return jobGroup;
}

public void setJobGroup(String jobGroup) {
this.jobGroup = jobGroup;
}

public String getExecIp() {
return execIp;
}

public void setExecIp(String execIp) {
this.execIp = execIp;
}

public String getExecTime() {
return execTime;
}

public void setExecTime(String execTime) {
this.execTime = execTime;
}

public String getExecResult() {
return execResult;
}

public void setExecResult(String execResult) {
this.execResult = execResult;
}

public String getFailReason() {
return failReason;
}

public void setFailReason(String failReason) {
this.failReason = failReason;
}
}

Bootstrap与jQuery的datepicker冲突问题

种种原因,前端工程是Bootstrap搭的,单独导了jQuery UI库,使用datepicker控件时,日期的格式如何设置都不正确。

后面发现Bootstrap与jQuery的datepicker写法有细微差别。

  • Bootstrap的datepicker写法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    $("#from").datepicker({
    language: "zh-CN",
    format: 'yyyy-mm-dd',
    defaultDate: "+1w",
    changeMonth: true,
    autoclose: true,
    onClose: function (selectedDate) {
    $("#to").datepicker("option", "minDate", selectedDate);
    }
    });
  • jQuery的datepicker写法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    $.datepicker.setDefaults($.datepicker.regional["zh-CN"]);

    $("#from").datepicker({
    dateFormat:'yy-mm-dd'
    defaultDate: "+1w",
    changeMonth: true,
    autoclose: true,
    onClose: function (selectedDate) {
    $("#to").datepicker("option", "minDate", selectedDate);
    }
    });

终于正常了

待填坑