Commit 033bee1ee78c0ca0fceb1edd039f94de82992995

Authored by Jiang
1 parent c77e2600

解决dtms消息执行失败后消息记录无法解锁的问题

dtms-client/pom.xml
... ... @@ -6,6 +6,7 @@
6 6 <artifactId>dtms-parent</artifactId>
7 7 <version>0.0.1-SNAPSHOT</version>
8 8 </parent>
  9 + <version>0.0.2-SNAPSHOT</version>
9 10 <artifactId>dtms-client</artifactId>
10 11  
11 12 <dependencies>
... ...
dtms-client/src/main/java/com/b2c/dtms/client/domain/dto/DtmsCallbackMessage.java 0 → 100644
  1 +/*
  2 + * Copyright (c) 2015 www.diligrp.com All rights reserved.
  3 + * 本软件源代码版权归----所有,未经许可不得任意复制与传播.
  4 + */
  5 +package com.b2c.dtms.client.domain.dto;
  6 +
  7 +import java.io.Serializable;
  8 +
  9 +/**
  10 + * DtmsMessage
  11 + *
  12 + * @author dev-center
  13 + * @since 2015-03-30
  14 + */
  15 +public class DtmsCallbackMessage implements Serializable {
  16 +
  17 + /**
  18 + *
  19 + */
  20 + private static final long serialVersionUID = 5636732184716529500L;
  21 +
  22 + /**
  23 + * 消息业务ID,比如订单id
  24 + */
  25 + private String bizId;
  26 +
  27 + /**
  28 + * 消息主题
  29 + */
  30 + private String topic;
  31 +
  32 + /**
  33 + * 消息内容
  34 + */
  35 + private String content;
  36 +
  37 + /**
  38 + * 消息备注
  39 + */
  40 + private String memo;
  41 +
  42 + public DtmsCallbackMessage() {
  43 + // 默认无参构造方法
  44 + }
  45 +
  46 + public String getBizId() {
  47 + return bizId;
  48 + }
  49 +
  50 + public void setBizId(String bizId) {
  51 + this.bizId = bizId;
  52 + }
  53 +
  54 + public String getTopic() {
  55 + return topic;
  56 + }
  57 +
  58 + public void setTopic(String topic) {
  59 + this.topic = topic;
  60 + }
  61 +
  62 + public String getContent() {
  63 + return content;
  64 + }
  65 +
  66 + public void setContent(String content) {
  67 + this.content = content;
  68 + }
  69 +
  70 + public String getMemo() {
  71 + return memo;
  72 + }
  73 +
  74 + public void setMemo(String memo) {
  75 + this.memo = memo;
  76 + }
  77 +
  78 +}
0 79 \ No newline at end of file
... ...
dtms-dao/src/main/java/com/b2c/dtms/dao/dtms/DtmsMessageDao.java
... ... @@ -4,81 +4,89 @@
4 4 */
5 5 package com.b2c.dtms.dao.dtms;
6 6  
7   -import com.b2c.dtms.dao.base.BaseDao;
8 7 import com.b2c.dtms.domain.DtmsMessage;
  8 +import com.b2c.dtms.dao.base.BaseDao;
9 9  
10 10 import java.util.List;
11 11  
12   -
13 12 /**
14 13 * DtmsMessageDao 接口
  14 + *
15 15 * @author dev-center
16 16 * @since 2015-03-30
17 17 */
18   -public interface DtmsMessageDao extends BaseDao<DtmsMessage,Long>{
  18 +public interface DtmsMessageDao extends BaseDao<DtmsMessage, Long> {
  19 +
  20 + // 自定义扩展
  21 + List<DtmsMessage> getListUnLockedBeforeSepcTime(DtmsMessage condition);
  22 +
  23 + /**
  24 + * 获取锁定超时的消息
  25 + *
  26 + * @param condition
  27 + * @return
  28 + */
  29 + List<DtmsMessage> getLockTimeOutList(DtmsMessage condition);
19 30  
20   - //自定义扩展
21   - List<DtmsMessage> getListUnLockedBeforeSepcTime(DtmsMessage condition);
22   -
23   - /**
24   - * 获取锁定超时的消息
25   - * @param condition
26   - * @return
27   - */
28   - List<DtmsMessage> getLockTimeOutList(DtmsMessage condition);
  31 + int updateByCondition(DtmsMessage dtmsMessage);
29 32  
30   - int updateByCondition(DtmsMessage dtmsMessage);
  33 + int deleteByCondition(DtmsMessage condition);
31 34  
32   - int deleteByCondition(DtmsMessage condition);
  35 + /**
  36 + * 尝试对消息进行加锁
  37 + *
  38 + * @param message
  39 + * @return
  40 + */
  41 + int tryAddLock(DtmsMessage message);
33 42  
34   - /**
35   - * 尝试对消息进行加锁
36   - * @param message
37   - * @return
38   - */
39   - int tryAddLock(DtmsMessage message);
  43 + /**
  44 + * 获取被加锁消息的个数
  45 + *
  46 + * @param dtmsMessage
  47 + * @return
  48 + */
  49 + int selectLockedCount(DtmsMessage dtmsMessage);
40 50  
41   - /**
42   - * 获取被加锁消息的个数
43   - * @param dtmsMessage
44   - * @return
45   - */
46   - int selectLockedCount(DtmsMessage dtmsMessage);
  51 + /**
  52 + * 解锁被加锁超时的消息
  53 + *
  54 + * @param updateMessage
  55 + * @return
  56 + */
  57 + int tryReleaseLock(DtmsMessage updateMessage);
47 58  
48   - /**
49   - * 解锁被加锁超时的消息
50   - * @param updateMessage
51   - * @return
52   - */
53   - int tryReleaseLock(DtmsMessage updateMessage);
  59 + /**
  60 + * 解锁被加锁超时的消息
  61 + *
  62 + * @param updateMessage
  63 + * @return
  64 + */
  65 + int tryReleaseLock(List<DtmsMessage> list);
54 66  
55   - /**
56   - * 解锁被加锁超时的消息
57   - * @param updateMessage
58   - * @return
59   - */
60   - int tryReleaseLock(List<DtmsMessage> list);
  67 + /**
  68 + * 删除消息,根据消息id和版本号.
  69 + *
  70 + * @param message
  71 + * the message。必须参数值id和version
  72 + * @return the int
  73 + */
  74 + int deleteByKeyAndVersion(DtmsMessage message);
61 75  
62   - /**
63   - * 删除消息,根据消息id和版本号.
64   - *
65   - * @param message the message。必须参数值id和version
66   - * @return the int
67   - */
68   - int deleteByKeyAndVersion(DtmsMessage message);
  76 + /**
  77 + * 减少消息重试次数
  78 + *
  79 + * @param message
  80 + * @return
  81 + */
  82 + int reduceRetryCount(DtmsMessage message);
69 83  
70   - /**
71   - * 减少消息重试次数
72   - * @param message
73   - * @return
74   - */
75   - int reduceRetryCount(DtmsMessage message);
  84 + /**
  85 + * 减少消息重试次数及下次运行时间
  86 + *
  87 + * @param toUpdateMessage
  88 + * @return
  89 + */
  90 + int reduceRetryCountAndSetNextRuntime(DtmsMessage message);
76 91  
77   - /**
78   - * 减少消息重试次数及下次运行时间
79   - * @param toUpdateMessage
80   - * @return
81   - */
82   - int reduceRetryCountAndSetNextRuntime(DtmsMessage message);
83   -
84 92 }
85 93 \ No newline at end of file
... ...
dtms-dao/src/main/java/com/b2c/dtms/dao/impl/DtmsMessageDaoImpl.java
... ... @@ -4,9 +4,9 @@
4 4 */
5 5 package com.b2c.dtms.dao.impl;
6 6  
  7 +import com.b2c.dtms.domain.DtmsMessage;
7 8 import com.b2c.dtms.dao.base.BaseDaoImpl;
8 9 import com.b2c.dtms.dao.dtms.DtmsMessageDao;
9   -import com.b2c.dtms.domain.DtmsMessage;
10 10  
11 11 import org.springframework.stereotype.Repository;
12 12  
... ...
dtms-dao/src/main/resources/sqlmap-config.xml
... ... @@ -11,7 +11,7 @@
11 11 <setting name="useColumnLabel" value="true" />
12 12 <!-- 数据库超过25000秒仍未响应则超时 -->
13 13 <setting name="defaultStatementTimeout" value="25000" />
14   - <!-- <setting name="logImpl" value="STDOUT_LOGGING" /> -->
  14 + <setting name="logImpl" value="STDOUT_LOGGING" />
15 15 </settings>
16 16  
17 17 <!-- 全局别名设置,在映射文件中只需写别名,而不必写出整个类路径 -->
... ...
dtms-domain/pom.xml
... ... @@ -43,7 +43,6 @@
43 43 <dependency>
44 44 <groupId>${project.groupId}</groupId>
45 45 <artifactId>dtms-client</artifactId>
46   - <version>${project.version}</version>
47 46 </dependency>
48 47 </dependencies>
49 48 </project>
50 49 \ No newline at end of file
... ...
dtms-service/pom.xml
... ... @@ -11,11 +11,6 @@
11 11 <dependencies>
12 12 <dependency>
13 13 <groupId>${project.groupId}</groupId>
14   - <artifactId>dtms-client</artifactId>
15   - <version>${project.version}</version>
16   - </dependency>
17   - <dependency>
18   - <groupId>${project.groupId}</groupId>
19 14 <artifactId>dtms-manager</artifactId>
20 15 <version>${project.version}</version>
21 16 </dependency>
... ...
dtms-service/src/main/java/com/b2c/dtms/handler/CallUrlHandlerImpl.java
1 1 package com.b2c.dtms.handler;
2 2  
3   -import com.b2c.dtms.common.enums.dtms.DtmsMessageTopic;
4 3 import com.b2c.dtms.domain.DtmsMessage;
  4 +import com.b2c.dtms.common.enums.dtms.DtmsMessageTopic;
5 5  
6 6 import org.apache.commons.lang3.StringUtils;
7 7 import org.springframework.stereotype.Service;
8 8  
9 9 @Service
10   -public class CallUrlHandlerImpl extends DtmsHandler {
11   -
12   - @Override
13   - public boolean isSupported(DtmsMessage messsage) {
14   - if(messsage==null){
15   - return false;
16   - }
17   - return StringUtils.equals(DtmsMessageTopic.CallUrl.code(),messsage.getTopic());
18   - }
19   -
20   - @Override
21   - public boolean doExecute(DtmsHandleContext ctx) {
22   - return true;
23   - }
  10 +public class CallUrlHandlerImpl extends DtmsHandler {
24 11  
25   -}
  12 + @Override
  13 + public boolean isSupported(DtmsMessage messsage) {
  14 + if (messsage == null) {
  15 + return false;
  16 + }
  17 + return StringUtils.equals(DtmsMessageTopic.CallUrl.code(), messsage.getTopic());
  18 + }
26 19  
  20 + @Override
  21 + public boolean doExecute(DtmsHandleContext ctx) {
  22 + return true;
  23 + }
  24 +
  25 +}
... ...
dtms-service/src/main/java/com/b2c/dtms/handler/DtmsHandleContext.java
... ... @@ -2,97 +2,90 @@ package com.b2c.dtms.handler;
2 2  
3 3 import com.b2c.dtms.domain.DtmsMessage;
4 4  
5   -
6 5 public class DtmsHandleContext {
7 6  
8   -
9   - /**待处理的消息*/
10   - private DtmsMessage waitHandleDtmsMessage;
11   -
12   - /** 返回信息*/
13   - private RetrunInfo returnInfo;
14   -
15   -
16   - public String getReturnCode(){
17   - return returnInfo.getCode();
18   - }
19   -
20   - public String getReturnMessage(){
21   - return returnInfo.getMessage();
22   - }
23   -
24   - public String getReturnData(){
25   - return returnInfo.getData();
26   - }
27   -
28   - public void setReturnCode(String returnCode){
29   - returnInfo.setCode(returnCode);
30   - }
31   -
32   - public void setReturnMessage(String returnMessage){
33   - returnInfo.setMessage(returnMessage);
34   - }
35   -
36   - public void setReturnData(String returnData){
37   - returnInfo.setData(returnData);
38   - }
39   -
40   - public static DtmsHandleContext create(DtmsMessage waitHandleDtmsMessage){
41   - return new DtmsHandleContext(waitHandleDtmsMessage);
42   - }
43   -
44   - public DtmsHandleContext(DtmsMessage waitHandleDtmsMessage){
45   - this.waitHandleDtmsMessage=waitHandleDtmsMessage;
46   - this.returnInfo=new RetrunInfo();
47   - }
48   -
49   - public DtmsMessage getWaitHandleDtmsMessage() {
50   - return waitHandleDtmsMessage;
51   - }
52   -
53   -
54   - public void setWaitHandleDtmsMessage(DtmsMessage waitHandleDtmsMessage) {
55   - this.waitHandleDtmsMessage = waitHandleDtmsMessage;
56   - }
57   -
58   - public class RetrunInfo{
59   - public String code;
60   - public String data;
61   - public String message;
62   -
63   - public RetrunInfo (){
64   - }
65   -
66   - public RetrunInfo (String code,String message){
67   - this.code=code;
68   - this.message=message;
69   - }
70   -
71   - public String getCode() {
72   - return code;
73   - }
74   -
75   -
76   - public void setCode(String code) {
77   - this.code = code;
78   - }
79   -
80   -
81   - public String getMessage() {
82   - return message;
83   - }
84   -
85   -
86   - public void setMessage(String message) {
87   - this.message = message;
88   - }
89   -
90   - public String getData() {
91   - return data;
92   - }
93   -
94   - public void setData(String data) {
95   - this.data = data;
96   - }
97   - }
  7 + /** 待处理的消息 */
  8 + private DtmsMessage waitHandleDtmsMessage;
  9 +
  10 + /** 返回信息 */
  11 + private RetrunInfo returnInfo;
  12 +
  13 + public String getReturnCode() {
  14 + return returnInfo.getCode();
  15 + }
  16 +
  17 + public String getReturnMessage() {
  18 + return returnInfo.getMessage();
  19 + }
  20 +
  21 + public String getReturnData() {
  22 + return returnInfo.getData();
  23 + }
  24 +
  25 + public void setReturnCode(String returnCode) {
  26 + returnInfo.setCode(returnCode);
  27 + }
  28 +
  29 + public void setReturnMessage(String returnMessage) {
  30 + returnInfo.setMessage(returnMessage);
  31 + }
  32 +
  33 + public void setReturnData(String returnData) {
  34 + returnInfo.setData(returnData);
  35 + }
  36 +
  37 + public static DtmsHandleContext create(DtmsMessage waitHandleDtmsMessage) {
  38 + return new DtmsHandleContext(waitHandleDtmsMessage);
  39 + }
  40 +
  41 + public DtmsHandleContext(DtmsMessage waitHandleDtmsMessage) {
  42 + this.waitHandleDtmsMessage = waitHandleDtmsMessage;
  43 + this.returnInfo = new RetrunInfo();
  44 + }
  45 +
  46 + public DtmsMessage getWaitHandleDtmsMessage() {
  47 + return waitHandleDtmsMessage;
  48 + }
  49 +
  50 + public void setWaitHandleDtmsMessage(DtmsMessage waitHandleDtmsMessage) {
  51 + this.waitHandleDtmsMessage = waitHandleDtmsMessage;
  52 + }
  53 +
  54 + public class RetrunInfo {
  55 + public String code;
  56 + public String data;
  57 + public String message;
  58 +
  59 + public RetrunInfo() {
  60 + }
  61 +
  62 + public RetrunInfo(String code, String message) {
  63 + this.code = code;
  64 + this.message = message;
  65 + }
  66 +
  67 + public String getCode() {
  68 + return code;
  69 + }
  70 +
  71 + public void setCode(String code) {
  72 + this.code = code;
  73 + }
  74 +
  75 + public String getMessage() {
  76 + return message;
  77 + }
  78 +
  79 + public void setMessage(String message) {
  80 + this.message = message;
  81 + }
  82 +
  83 + public String getData() {
  84 + return data;
  85 + }
  86 +
  87 + public void setData(String data) {
  88 + this.data = data;
  89 + }
  90 + }
98 91 }
... ...
dtms-service/src/main/java/com/b2c/dtms/handler/DtmsHandler.java
1 1 package com.b2c.dtms.handler;
2 2  
  3 +import javax.annotation.Resource;
  4 +
  5 +import org.apache.commons.lang.StringUtils;
  6 +import org.slf4j.Logger;
  7 +import org.slf4j.LoggerFactory;
  8 +
3 9 import com.alibaba.fastjson.JSON;
4 10 import com.alibaba.fastjson.JSONObject;
  11 +import com.b2c.dtms.domain.DtmsMessage;
5 12 import com.b2c.dtms.common.enums.dtms.ConfirmCode;
6 13 import com.b2c.dtms.common.enums.dtms.DtmsMessageStatus;
7 14 import com.b2c.dtms.common.enums.dtms.DtmsMessageType;
... ... @@ -10,13 +17,6 @@ import com.b2c.dtms.common.http.HttpRequester;
10 17 import com.b2c.dtms.common.http.HttpResponse;
11 18 import com.b2c.dtms.common.tools.WebUtils;
12 19 import com.b2c.dtms.domain.DtmsCallBackReturn;
13   -import com.b2c.dtms.domain.DtmsMessage;
14   -
15   -import org.apache.commons.lang.StringUtils;
16   -import org.slf4j.Logger;
17   -import org.slf4j.LoggerFactory;
18   -
19   -import javax.annotation.Resource;
20 20  
21 21 /**
22 22 * DTMS业务消息处理类
... ... @@ -110,8 +110,8 @@ public abstract class DtmsHandler {
110 110 }
111 111  
112 112 try {
113   - HttpResponse httpResponse = HttpRequester.sendPost(webUtils.getContextPath() + dtmsMessage.getConfirmUrl(),
114   - null, JSON.toJSONString(dtmsMessage));
  113 + HttpResponse httpResponse = HttpRequester.sendPost(dtmsMessage.getConfirmUrl(), null,
  114 + JSON.toJSONString(dtmsMessage));
115 115 if (httpResponse == null) {
116 116 ctx.setReturnCode(HandleCode.CONFIRM_FAILED.code());
117 117 logMsg = String.format("[DTMS]消息ID%s:确认失败,无返回值,需等待再次确认", dtmsMessage.getId());
... ... @@ -180,8 +180,8 @@ public abstract class DtmsHandler {
180 180 String logMsg = null;
181 181 try {
182 182  
183   - HttpResponse httpResponse = HttpRequester.sendPost(webUtils.getContextPath() + dtmsMessage.getCallUrl(),
184   - null, JSON.toJSONString(dtmsMessage));
  183 + HttpResponse httpResponse = HttpRequester.sendPost(dtmsMessage.getCallUrl(), null,
  184 + JSON.toJSONString(dtmsMessage));
185 185 if (httpResponse == null) {
186 186 ctx.setReturnCode(HandleCode.FAILED.code());
187 187 logMsg = String.format("[DTMS]消息ID%:处理失败,调用CallUrl之后无返回值,需等待再次执行", dtmsMessage.getId());
... ...
dtms-service/src/main/java/com/b2c/dtms/schedule/DtmsMessageMonitorExecutor.java
1 1 package com.b2c.dtms.schedule;
2 2  
  3 +import com.b2c.dtms.domain.DtmsMessage;
3 4 import com.b2c.dtms.common.enums.dtms.LockStatus;
4 5 import com.b2c.dtms.common.tools.DateUtils;
5   -import com.b2c.dtms.domain.DtmsMessage;
6 6 import com.b2c.dtms.service.DtmsMessageService;
7 7  
8 8 import org.apache.commons.lang.StringUtils;
... ... @@ -20,144 +20,146 @@ import java.util.List;
20 20 *
21 21 */
22 22 public class DtmsMessageMonitorExecutor implements DtmsMessageScheduler {
23   -
24   - private static final Logger LOGGER = LoggerFactory.getLogger(DtmsMessageMonitorExecutor.class);
25   - private final static int IDLE = 0;
26   - private final static int RUNNING = 1;
27   - private volatile int state = IDLE;
28   -
29   - private DtmsMessageService dtmsMessageService;
30   -
31   - /**取消息从哪行开始*/
32   - private Integer startRow;
33   -
34   - /**取消息从哪行结束*/
35   - private Integer pageSize;
36   - /**运行dtms机器名*/
37   - private String serverName;
38   -
39   - public void init(){
40   -
41   - try{
42   - InetAddress inetAddress=Inet4Address.getLocalHost();
43   - if( StringUtils.isNotBlank(inetAddress.getHostAddress())){
44   - serverName="Server["+inetAddress.getHostAddress()+"]";
45   - }
46   - }catch(Exception e){
47   - LOGGER.warn("[DTMS]获取运行机器名失败,发生异常:"+e.getMessage());
48   - serverName="";
49   - }
50   - }
51   -
52   - private boolean isRunning()
53   - {
54   - final int _state = state;
55   - return _state == RUNNING;
56   - }
57   - private void setState(int state)
58   - {
59   - this.state = state;
60   - }
61   -
62   - @Override
63   - public void execute() {
64   - if(isRunning()){
65   - return;
66   - }
67   - try{
68   - synchronized (this) {
69   - if(isRunning()){
70   - return;
71   - }
72   - setState(RUNNING);
73   - DtmsMessage condition=new DtmsMessage();
74   - condition.setLockStatus(LockStatus.LOCKING.getIndex());
75   - condition.addQueryData("currentTime",DateUtils.getCurrentDate());
76   - condition.setStartIndex(startRow);
77   - condition.setEndIndex(pageSize);
78   - LOGGER.debug(serverName+"[DTMS]扫描锁定超时消息...");
79   - final List<DtmsMessage> list=dtmsMessageService.getLockTimeOutList(condition);
80   -
81   - //重新设置查询起点
82   - if(list==null || list.size() < pageSize){//对任务进行循环查询,保证有很多任务时都能被执行到
83   - startRow=0;//如已查询到表结束,重置查询起始号
84   - }else{
85   - startRow += pageSize;//调整查询起始号
86   - }
87   - LOGGER.debug(serverName+"[DTMS]锁定超时消息数:"+(list==null?0:list.size())+"条");
88   - if(list==null || list.isEmpty()){
89   - return;
90   - }
91   -
92   - //处理
93   - List<DtmsMessage> releaseList=new ArrayList<>();
94   - for(DtmsMessage msg:list){
95   - if(msg==null || LockStatus.UNLOCK.getIndex()==msg.getLockStatus()){
96   - continue;
97   - }
98   - releaseList.add(msg);
99   - //10个为一批,然后批量解锁
100   - if(releaseList.size()%10==0){
101   - tryReleaseLock(releaseList);
102   - releaseList=new ArrayList<>();
103   - }
104   - }
105   - //剩下的再批量解锁
106   - if(releaseList.size()!=0){
107   - tryReleaseLock(releaseList);
108   - releaseList=null;
109   - }
110   -
111   - setState(IDLE);
112   - }
113   - }catch(Exception e){
114   - LOGGER.error(serverName+"[DTMS]死锁监控任务执行异常",e);
115   - }finally{
116   - setState(IDLE);
117   - }
118   - }
119   -
120   - public int tryReleaseLock(List<DtmsMessage> list){
121   - int ret=0;
122   - try{
123   - ret =dtmsMessageService.tryReleaseLock(list);
124   - }catch(Exception e){
125   - List<String> msgIdList=new ArrayList<>();
126   - for(DtmsMessage msg:list){
127   - msgIdList.add(msg.getId().toString());
128   - }
129   - String loginfo=String.format(serverName+"[DTMS]消息解锁:解锁消息[%s]失败,发生异常[s]", StringUtils.join(msgIdList, ","), e.getMessage());
130   - LOGGER.error(loginfo, e);
131   - try{
132   - ret=dtmsMessageService.tryReleaseLock(list);
133   - }catch(Exception e1){
134   - loginfo=String.format(serverName+"[DTMS]消息解锁:第二次尝试解锁消息[%s]失败,发生异常[s]",StringUtils.join(msgIdList, ","), e.getMessage());
135   - LOGGER.error(loginfo, e);
136   - }
137   - }
138   - return ret;
139   - }
140   -
141   - public Integer getStartRow() {
142   - return startRow;
143   - }
144   -
145   - public void setStartRow(Integer startRow) {
146   - this.startRow = startRow;
147   - }
148   -
149   - public Integer getPageSize() {
150   - return pageSize;
151   - }
152   -
153   - public void setPageSize(Integer pageSize) {
154   - this.pageSize = pageSize;
155   - }
156   - public DtmsMessageService getDtmsMessageService() {
157   - return dtmsMessageService;
158   - }
159   -
160   - public void setDtmsMessageService(DtmsMessageService dtmsMessageService) {
161   - this.dtmsMessageService = dtmsMessageService;
162   - }
  23 +
  24 + private static final Logger LOGGER = LoggerFactory.getLogger(DtmsMessageMonitorExecutor.class);
  25 + private final static int IDLE = 0;
  26 + private final static int RUNNING = 1;
  27 + private volatile int state = IDLE;
  28 +
  29 + private DtmsMessageService dtmsMessageService;
  30 +
  31 + /** 取消息从哪行开始 */
  32 + private Integer startRow;
  33 +
  34 + /** 取消息从哪行结束 */
  35 + private Integer pageSize;
  36 + /** 运行dtms机器名 */
  37 + private String serverName;
  38 +
  39 + public void init() {
  40 +
  41 + try {
  42 + InetAddress inetAddress = Inet4Address.getLocalHost();
  43 + if (StringUtils.isNotBlank(inetAddress.getHostAddress())) {
  44 + serverName = "Server[" + inetAddress.getHostAddress() + "]";
  45 + }
  46 + } catch (Exception e) {
  47 + LOGGER.warn("[DTMS]获取运行机器名失败,发生异常:" + e.getMessage());
  48 + serverName = "";
  49 + }
  50 + }
  51 +
  52 + private boolean isRunning() {
  53 + final int _state = state;
  54 + return _state == RUNNING;
  55 + }
  56 +
  57 + private void setState(int state) {
  58 + this.state = state;
  59 + }
  60 +
  61 + @Override
  62 + public void execute() {
  63 + if (isRunning()) {
  64 + return;
  65 + }
  66 + try {
  67 + synchronized (this) {
  68 + if (isRunning()) {
  69 + return;
  70 + }
  71 + setState(RUNNING);
  72 + DtmsMessage condition = new DtmsMessage();
  73 + condition.setLockStatus(LockStatus.LOCKING.getIndex());
  74 + condition.addQueryData("currentTime", DateUtils.getCurrentDate());
  75 + condition.setStartIndex(startRow);
  76 + condition.setEndIndex(pageSize);
  77 + LOGGER.debug(serverName + "[DTMS]扫描锁定超时消息...");
  78 + final List<DtmsMessage> list = dtmsMessageService.getLockTimeOutList(condition);
  79 +
  80 + // 重新设置查询起点
  81 + if (list == null || list.size() < pageSize) {// 对任务进行循环查询,保证有很多任务时都能被执行到
  82 + startRow = 0;// 如已查询到表结束,重置查询起始号
  83 + } else {
  84 + startRow += pageSize;// 调整查询起始号
  85 + }
  86 + LOGGER.debug(serverName + "[DTMS]锁定超时消息数:" + (list == null ? 0 : list.size()) + "条");
  87 + if (list == null || list.isEmpty()) {
  88 + return;
  89 + }
  90 +
  91 + // 处理
  92 + List<DtmsMessage> releaseList = new ArrayList<>();
  93 + for (DtmsMessage msg : list) {
  94 + if (msg == null || LockStatus.UNLOCK.getIndex() == msg.getLockStatus()) {
  95 + continue;
  96 + }
  97 + releaseList.add(msg);
  98 + // 10个为一批,然后批量解锁
  99 + if (releaseList.size() % 10 == 0) {
  100 + tryReleaseLock(releaseList);
  101 + releaseList = new ArrayList<>();
  102 + }
  103 + }
  104 + // 剩下的再批量解锁
  105 + if (releaseList.size() != 0) {
  106 + tryReleaseLock(releaseList);
  107 + releaseList = null;
  108 + }
  109 +
  110 + setState(IDLE);
  111 + }
  112 + } catch (Exception e) {
  113 + LOGGER.error(serverName + "[DTMS]死锁监控任务执行异常", e);
  114 + } finally {
  115 + setState(IDLE);
  116 + }
  117 + }
  118 +
  119 + public int tryReleaseLock(List<DtmsMessage> list) {
  120 + int ret = 0;
  121 + try {
  122 + ret = dtmsMessageService.tryReleaseLock(list);
  123 + } catch (Exception e) {
  124 + List<String> msgIdList = new ArrayList<>();
  125 + for (DtmsMessage msg : list) {
  126 + msgIdList.add(msg.getId().toString());
  127 + }
  128 + String loginfo = String.format(serverName + "[DTMS]消息解锁:解锁消息[%s]失败,发生异常[s]",
  129 + StringUtils.join(msgIdList, ","), e.getMessage());
  130 + LOGGER.error(loginfo, e);
  131 + try {
  132 + ret = dtmsMessageService.tryReleaseLock(list);
  133 + } catch (Exception e1) {
  134 + loginfo = String.format(serverName + "[DTMS]消息解锁:第二次尝试解锁消息[%s]失败,发生异常[s]",
  135 + StringUtils.join(msgIdList, ","), e.getMessage());
  136 + LOGGER.error(loginfo, e);
  137 + }
  138 + }
  139 + return ret;
  140 + }
  141 +
  142 + public Integer getStartRow() {
  143 + return startRow;
  144 + }
  145 +
  146 + public void setStartRow(Integer startRow) {
  147 + this.startRow = startRow;
  148 + }
  149 +
  150 + public Integer getPageSize() {
  151 + return pageSize;
  152 + }
  153 +
  154 + public void setPageSize(Integer pageSize) {
  155 + this.pageSize = pageSize;
  156 + }
  157 +
  158 + public DtmsMessageService getDtmsMessageService() {
  159 + return dtmsMessageService;
  160 + }
  161 +
  162 + public void setDtmsMessageService(DtmsMessageService dtmsMessageService) {
  163 + this.dtmsMessageService = dtmsMessageService;
  164 + }
163 165 }
... ...
dtms-service/src/main/java/com/b2c/dtms/schedule/DtmsMessageScheduleExecutor.java
1 1 package com.b2c.dtms.schedule;
2 2  
3   -import com.b2c.dtms.common.tools.DateUtils;
4 3 import com.b2c.dtms.domain.DtmsMessage;
  4 +import com.b2c.dtms.common.tools.DateUtils;
5 5 import com.b2c.dtms.service.DtmsMessageService;
6 6 import com.b2c.dtms.task.ScheduleTask;
7 7 import com.b2c.dtms.task.ScheduleTaskFactory;
... ... @@ -18,159 +18,160 @@ import java.util.concurrent.SynchronousQueue;
18 18 import java.util.concurrent.ThreadPoolExecutor;
19 19 import java.util.concurrent.TimeUnit;
20 20  
21   -public class DtmsMessageScheduleExecutor implements DtmsMessageScheduler{
22   -
23   - private int corePoolSize;
24   - private int maximumPoolSize;
25   - private int keepAliveTime;
26   - private static final Logger LOGGER = LoggerFactory.getLogger(DtmsMessageScheduleExecutor.class);
27   - private ThreadPoolExecutor executor;
28   - private ScheduleTaskFactory scheduleTaskFactory;
29   - private DtmsMessageService dtmsMessageService;
30   - /**取消息从哪行开始*/
31   - private Integer startRow;
32   - private Integer pageSize;
33   - private final static int IDLE = 0;
34   - private final static int RUNNING = 1;
35   - private volatile int state = IDLE;
36   -
37   - /**运行dtms机器地址*/
38   - private String serverName;
39   -
40   - public void init(){
41   - try{
42   - InetAddress inetAddress=Inet4Address.getLocalHost();
43   - if(StringUtils.isNotBlank(inetAddress.getHostAddress())){
44   - serverName="Server["+inetAddress.getHostAddress()+"]";
45   - }
46   - }catch(Exception e){
47   - LOGGER.warn("[DTMS]获取运行机器名失败,发生异常:"+e.getMessage());
48   - serverName="";
49   - }
50   -
51   - executor= new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
52   - executor.allowCoreThreadTimeOut(true);
53   - executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
54   - @Override
55   - public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
56   - LOGGER.error(serverName+"[DTMS]消息处理线程池:线程池已满,无法接收处理-新任务:"+r.toString());
57   - LOGGER.info(serverName+"[DTMS]消息处理线程池:使用非线程池线程处理");
58   - if (!executor.isShutdown()) {
59   - Thread th=new Thread(r);
60   - th.start();
61   - }
62   - }
63   - });
64   - }
65   -
66   - @Override
67   - public void execute() {
68   - if(isRunning()){
69   - return;
70   - }
71   - try {
72   - synchronized (this) {
73   - if(isRunning()){
74   - return;
75   - }
76   - setState(RUNNING);
77   - LOGGER.debug(serverName+"[DTMS]扫描待执行消息...");
78   - final List<DtmsMessage> list = dtmsMessageService.getListUnLockedBeforeSepcTime(startRow, pageSize, DateUtils.getCurrentDate());
79   -
80   - //设置下次查询记录起点
81   - if(list==null || list.size() < pageSize){//对任务进行循环查询,保证有很多任务时都能被执行到
82   - startRow=0;//如已查询到表结束,重置查询起始号
83   - }else{
84   - startRow += pageSize;//调整查询起始号
85   - }
86   -
87   - LOGGER.debug(serverName+"[DTMS]待执行消息数:"+(list==null?0:list.size())+"条");
88   - if(list==null || list.isEmpty()){
89   - return;
90   - }
91   - for (DtmsMessage dtmsMessage : list) {
92   - ScheduleTask scheduleTask=scheduleTaskFactory.create(dtmsMessage,serverName);
93   - if(scheduleTask==null){
94   - continue;
95   - }
96   - executor.execute(scheduleTask);
97   - }
98   - setState(IDLE);
99   - }
100   - } catch (Exception e) {
101   - LOGGER.error(serverName+"[DTMS]定时任务执行异常",e);
102   - } finally{
103   - setState(IDLE);
104   - }
105   - }
106   -
107   - private boolean isRunning()
108   - {
109   - final int _state = state;
110   - return _state == RUNNING;
111   - }
112   - private void setState(int state)
113   - {
114   - this.state = state;
115   - }
116   -
117   - public Integer getStartRow() {
118   - return startRow;
119   - }
120   -
121   - public void setStartRow(Integer startRow) {
122   - this.startRow = startRow;
123   - }
124   -
125   - public int getState() {
126   - return state;
127   - }
128   -
129   - public ScheduleTaskFactory getScheduleTaskFactory() {
130   - return scheduleTaskFactory;
131   - }
132   -
133   - public void setScheduleTaskFactory(ScheduleTaskFactory scheduleTaskFactory) {
134   - this.scheduleTaskFactory = scheduleTaskFactory;
135   - }
136   -
137   - public DtmsMessageService getDtmsMessageService() {
138   - return dtmsMessageService;
139   - }
140   -
141   - public void setDtmsMessageService(DtmsMessageService dtmsMessageService) {
142   - this.dtmsMessageService = dtmsMessageService;
143   - }
144   -
145   - public Integer getPageSize() {
146   - return pageSize;
147   - }
148   -
149   - public void setPageSize(Integer pageSize) {
150   - this.pageSize = pageSize;
151   - }
152   -
153   - public int getCorePoolSize() {
154   - return corePoolSize;
155   - }
156   -
157   - public void setCorePoolSize(int corePoolSize) {
158   - this.corePoolSize = corePoolSize;
159   - }
160   -
161   - public int getMaximumPoolSize() {
162   - return maximumPoolSize;
163   - }
164   -
165   - public void setMaximumPoolSize(int maximumPoolSize) {
166   - this.maximumPoolSize = maximumPoolSize;
167   - }
168   -
169   - public int getKeepAliveTime() {
170   - return keepAliveTime;
171   - }
172   -
173   - public void setKeepAliveTime(int keepAliveTime) {
174   - this.keepAliveTime = keepAliveTime;
175   - }
  21 +public class DtmsMessageScheduleExecutor implements DtmsMessageScheduler {
  22 +
  23 + private int corePoolSize;
  24 + private int maximumPoolSize;
  25 + private int keepAliveTime;
  26 + private static final Logger LOGGER = LoggerFactory.getLogger(DtmsMessageScheduleExecutor.class);
  27 + private ThreadPoolExecutor executor;
  28 + private ScheduleTaskFactory scheduleTaskFactory;
  29 + private DtmsMessageService dtmsMessageService;
  30 + /** 取消息从哪行开始 */
  31 + private Integer startRow;
  32 + private Integer pageSize;
  33 + private final static int IDLE = 0;
  34 + private final static int RUNNING = 1;
  35 + private volatile int state = IDLE;
  36 +
  37 + /** 运行dtms机器地址 */
  38 + private String serverName;
  39 +
  40 + public void init() {
  41 + try {
  42 + InetAddress inetAddress = Inet4Address.getLocalHost();
  43 + if (StringUtils.isNotBlank(inetAddress.getHostAddress())) {
  44 + serverName = "Server[" + inetAddress.getHostAddress() + "]";
  45 + }
  46 + } catch (Exception e) {
  47 + LOGGER.warn("[DTMS]获取运行机器名失败,发生异常:" + e.getMessage());
  48 + serverName = "";
  49 + }
  50 +
  51 + executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
  52 + new SynchronousQueue<Runnable>());
  53 + executor.allowCoreThreadTimeOut(true);
  54 + executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
  55 + @Override
  56 + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  57 + LOGGER.error(serverName + "[DTMS]消息处理线程池:线程池已满,无法接收处理-新任务:" + r.toString());
  58 + LOGGER.info(serverName + "[DTMS]消息处理线程池:使用非线程池线程处理");
  59 + if (!executor.isShutdown()) {
  60 + Thread th = new Thread(r);
  61 + th.start();
  62 + }
  63 + }
  64 + });
  65 + }
  66 +
  67 + @Override
  68 + public void execute() {
  69 + if (isRunning()) {
  70 + return;
  71 + }
  72 + try {
  73 + synchronized (this) {
  74 + if (isRunning()) {
  75 + return;
  76 + }
  77 + setState(RUNNING);
  78 + LOGGER.debug(serverName + "[DTMS]扫描待执行消息...");
  79 + final List<DtmsMessage> list = dtmsMessageService.getListUnLockedBeforeSepcTime(startRow, pageSize,
  80 + DateUtils.getCurrentDate());
  81 +
  82 + // 设置下次查询记录起点
  83 + if (list == null || list.size() < pageSize) {// 对任务进行循环查询,保证有很多任务时都能被执行到
  84 + startRow = 0;// 如已查询到表结束,重置查询起始号
  85 + } else {
  86 + startRow += pageSize;// 调整查询起始号
  87 + }
  88 +
  89 + LOGGER.debug(serverName + "[DTMS]待执行消息数:" + (list == null ? 0 : list.size()) + "条");
  90 + if (list == null || list.isEmpty()) {
  91 + return;
  92 + }
  93 + for (DtmsMessage dtmsMessage : list) {
  94 + ScheduleTask scheduleTask = scheduleTaskFactory.create(dtmsMessage, serverName);
  95 + if (scheduleTask == null) {
  96 + continue;
  97 + }
  98 + executor.execute(scheduleTask);
  99 + }
  100 + setState(IDLE);
  101 + }
  102 + } catch (Exception e) {
  103 + LOGGER.error(serverName + "[DTMS]定时任务执行异常", e);
  104 + } finally {
  105 + setState(IDLE);
  106 + }
  107 + }
  108 +
  109 + private boolean isRunning() {
  110 + final int _state = state;
  111 + return _state == RUNNING;
  112 + }
  113 +
  114 + private void setState(int state) {
  115 + this.state = state;
  116 + }
  117 +
  118 + public Integer getStartRow() {
  119 + return startRow;
  120 + }
  121 +
  122 + public void setStartRow(Integer startRow) {
  123 + this.startRow = startRow;
  124 + }
  125 +
  126 + public int getState() {
  127 + return state;
  128 + }
  129 +
  130 + public ScheduleTaskFactory getScheduleTaskFactory() {
  131 + return scheduleTaskFactory;
  132 + }
  133 +
  134 + public void setScheduleTaskFactory(ScheduleTaskFactory scheduleTaskFactory) {
  135 + this.scheduleTaskFactory = scheduleTaskFactory;
  136 + }
  137 +
  138 + public DtmsMessageService getDtmsMessageService() {
  139 + return dtmsMessageService;
  140 + }
  141 +
  142 + public void setDtmsMessageService(DtmsMessageService dtmsMessageService) {
  143 + this.dtmsMessageService = dtmsMessageService;
  144 + }
  145 +
  146 + public Integer getPageSize() {
  147 + return pageSize;
  148 + }
  149 +
  150 + public void setPageSize(Integer pageSize) {
  151 + this.pageSize = pageSize;
  152 + }
  153 +
  154 + public int getCorePoolSize() {
  155 + return corePoolSize;
  156 + }
  157 +
  158 + public void setCorePoolSize(int corePoolSize) {
  159 + this.corePoolSize = corePoolSize;
  160 + }
  161 +
  162 + public int getMaximumPoolSize() {
  163 + return maximumPoolSize;
  164 + }
  165 +
  166 + public void setMaximumPoolSize(int maximumPoolSize) {
  167 + this.maximumPoolSize = maximumPoolSize;
  168 + }
  169 +
  170 + public int getKeepAliveTime() {
  171 + return keepAliveTime;
  172 + }
  173 +
  174 + public void setKeepAliveTime(int keepAliveTime) {
  175 + this.keepAliveTime = keepAliveTime;
  176 + }
176 177 }
... ...
dtms-service/src/main/java/com/b2c/dtms/service/DtmsMessageProducer.java
... ... @@ -4,56 +4,60 @@ import java.util.List;
4 4  
5 5 import com.b2c.dtms.domain.DtmsMessage;
6 6  
7   -
8 7 public interface DtmsMessageProducer {
9   - /**
10   - * 产生一个消息
11   - * @param message
12   - * @return
13   - */
14   - public Long produce(DtmsMessage message);
15   -
16   - /**
17   - * 产生多个消息
18   - * @param messageList
19   - * @return
20   - */
21   - public List<Long> produce(List<DtmsMessage> messageList);
22   -
23   - /**
24   - * 移除一个消息
25   - * @param messageId
26   - * @return
27   - */
28   - public int remove(Long messageId);
29   -
30   - /**
31   - * 移除多个消息
32   - * @param msgIdList
33   - * @return
34   - */
35   - public int removeById(List<Long> msgIdList);
36   -
37   -// /**
38   -// * 根据一个或者多个tag移除一个或者多个消息
39   -// * @param tag
40   -// * @return
41   -// */
42   -// public int removeByTag(List<String> tagList);
43   -//
44   -// /**
45   -// * 根据多个tag确认提交(这里的提交是指修改消息runtime和状态,让其马上能够被执行)一个或者多个消息
46   -// * @param tag
47   -// * @return
48   -// */
49   -// public int confirm(List<String> tagList);
50   -
51   - /**
52   - * 根据id确认
53   - * @param idList
54   - * @return
55   - * @createTime 2015年12月28日 下午6:47:55
56   - * @author zhangshirui
57   - */
  8 + /**
  9 + * 产生一个消息
  10 + *
  11 + * @param message
  12 + * @return
  13 + */
  14 + public Long produce(DtmsMessage message);
  15 +
  16 + /**
  17 + * 产生多个消息
  18 + *
  19 + * @param messageList
  20 + * @return
  21 + */
  22 + public List<Long> produce(List<DtmsMessage> messageList);
  23 +
  24 + /**
  25 + * 移除一个消息
  26 + *
  27 + * @param messageId
  28 + * @return
  29 + */
  30 + public int remove(Long messageId);
  31 +
  32 + /**
  33 + * 移除多个消息
  34 + *
  35 + * @param msgIdList
  36 + * @return
  37 + */
  38 + public int removeById(List<Long> msgIdList);
  39 +
  40 + // /**
  41 + // * 根据一个或者多个tag移除一个或者多个消息
  42 + // * @param tag
  43 + // * @return
  44 + // */
  45 + // public int removeByTag(List<String> tagList);
  46 + //
  47 + // /**
  48 + // * 根据多个tag确认提交(这里的提交是指修改消息runtime和状态,让其马上能够被执行)一个或者多个消息
  49 + // * @param tag
  50 + // * @return
  51 + // */
  52 + // public int confirm(List<String> tagList);
  53 +
  54 + /**
  55 + * 根据id确认
  56 + *
  57 + * @param idList
  58 + * @return
  59 + * @createTime 2015年12月28日 下午6:47:55
  60 + * @author zhangshirui
  61 + */
58 62 int confirmIds(List<Long> idList);
59 63 }
... ...
dtms-service/src/main/java/com/b2c/dtms/service/DtmsMessageService.java
... ... @@ -12,94 +12,112 @@ import java.util.List;
12 12  
13 13 /**
14 14 * DtmsMessageService接口
  15 + *
15 16 * @author dev-center
16 17 * @since 2015-03-30
17 18 */
18   -public interface DtmsMessageService extends BaseService<DtmsMessage,Long> {
19   -
20   - /**
21   - * 获取指定数量的且运行时间小于等于指定时间的未被锁定的消息.
22   - *
23   - * @param startRow the start row
24   - * @param endRow the end row
25   - * @param specTime the spec time
26   - * @return the list before sepc time
27   - */
28   - List<DtmsMessage> getListUnLockedBeforeSepcTime(int startRow, int endRow,Date specTime);
29   -
30   - /**
31   - * 移动待重试次数为0的消息到异常列表.
32   - *
33   - * @param dtmsMessage the dtms message
34   - * @param moveMemo 移动原因
35   - * @return the int
36   - */
37   - int move2Exception(DtmsMessage dtmsMessage,String moveMemo);
38   -
39   -
40   - /**
41   - * 移动消息到历史列表.
42   - *
43   - * @param message the message
44   - * @param moveMemo 移动原因
45   - * @return 返回受影响的消息数
46   - */
47   - public int move2History(DtmsMessage message,String moveMemo);
48   -
49   - /**
50   - * 减小消息尝试重试次数.
51   - *
52   - * @param message 消息
53   - * @param count 减小次数
54   - * @param consumeLog the consume log
55   - * @return 返回受影响的消息数
56   - */
57   - int reduceRetryCount(DtmsMessage message,int count,String consumeLog);
  19 +public interface DtmsMessageService extends BaseService<DtmsMessage, Long> {
58 20  
59   - /**
60   - * 减小消息尝试重试次数和设置下次运行时间.
61   - *
62   - * @param message 消息
63   - * @param count 减小次数
64   - * @param nextRuntime 下次运行时间
65   - * @param consumeLog the consume log
66   - * @return 返回受影响的消息数
67   - */
68   - int reduceRetryCountAndSetNextRuntime(DtmsMessage message,int count,Date nextRuntime,String consumeLog);
69   -
70   - /**
71   - * 尝试为消息加锁,加锁成功返回加锁之后的消息,加锁失败,返回null
72   - * @param message
73   - * @return
74   - */
75   - public DtmsMessage tryAddLock(DtmsMessage message);
76   -
77   -
78   - /**
79   - * 尝试为消息解锁,加锁成功返回解锁成功的消息数量,解锁 失败,返回0
80   - * @param message
81   - * @return
82   - */
83   - public int tryReleaseLock(DtmsMessage message);
84   -
85   - /**
86   - * 获取锁定超时的消息
87   - * @param condition
88   - * @return
89   - */
90   - List<DtmsMessage> getLockTimeOutList(DtmsMessage condition);
  21 + /**
  22 + * 获取指定数量的且运行时间小于等于指定时间的未被锁定的消息.
  23 + *
  24 + * @param startRow
  25 + * the start row
  26 + * @param endRow
  27 + * the end row
  28 + * @param specTime
  29 + * the spec time
  30 + * @return the list before sepc time
  31 + */
  32 + List<DtmsMessage> getListUnLockedBeforeSepcTime(int startRow, int endRow, Date specTime);
91 33  
92   - /**
93   - * 检查消息是否存在
94   - * @param dtmsMessage
95   - * @return
96   - */
97   - boolean isExist(DtmsMessage dtmsMessage);
98   -
99   - /**
100   - * 尝试为消息解锁,加锁成功返回解锁成功的消息数量,解锁 失败,返回0
101   - * @param message
102   - * @return
103   - */
104   - public int tryReleaseLock(List<DtmsMessage> releaseList);
  34 + /**
  35 + * 移动待重试次数为0的消息到异常列表.
  36 + *
  37 + * @param dtmsMessage
  38 + * the dtms message
  39 + * @param moveMemo
  40 + * 移动原因
  41 + * @return the int
  42 + */
  43 + int move2Exception(DtmsMessage dtmsMessage, String moveMemo);
  44 +
  45 + /**
  46 + * 移动消息到历史列表.
  47 + *
  48 + * @param message
  49 + * the message
  50 + * @param moveMemo
  51 + * 移动原因
  52 + * @return 返回受影响的消息数
  53 + */
  54 + public int move2History(DtmsMessage message, String moveMemo);
  55 +
  56 + /**
  57 + * 减小消息尝试重试次数.
  58 + *
  59 + * @param message
  60 + * 消息
  61 + * @param count
  62 + * 减小次数
  63 + * @param consumeLog
  64 + * the consume log
  65 + * @return 返回受影响的消息数
  66 + */
  67 + int reduceRetryCount(DtmsMessage message, int count, String consumeLog);
  68 +
  69 + /**
  70 + * 减小消息尝试重试次数和设置下次运行时间.
  71 + *
  72 + * @param message
  73 + * 消息
  74 + * @param count
  75 + * 减小次数
  76 + * @param nextRuntime
  77 + * 下次运行时间
  78 + * @param consumeLog
  79 + * the consume log
  80 + * @return 返回受影响的消息数
  81 + */
  82 + int reduceRetryCountAndSetNextRuntime(DtmsMessage message, int count, Date nextRuntime, String consumeLog);
  83 +
  84 + /**
  85 + * 尝试为消息加锁,加锁成功返回加锁之后的消息,加锁失败,返回null
  86 + *
  87 + * @param message
  88 + * @return
  89 + */
  90 + public DtmsMessage tryAddLock(DtmsMessage message);
  91 +
  92 + /**
  93 + * 尝试为消息解锁,加锁成功返回解锁成功的消息数量,解锁 失败,返回0
  94 + *
  95 + * @param message
  96 + * @return
  97 + */
  98 + public int tryReleaseLock(DtmsMessage message);
  99 +
  100 + /**
  101 + * 获取锁定超时的消息
  102 + *
  103 + * @param condition
  104 + * @return
  105 + */
  106 + List<DtmsMessage> getLockTimeOutList(DtmsMessage condition);
  107 +
  108 + /**
  109 + * 检查消息是否存在
  110 + *
  111 + * @param dtmsMessage
  112 + * @return
  113 + */
  114 + boolean isExist(DtmsMessage dtmsMessage);
  115 +
  116 + /**
  117 + * 尝试为消息解锁,加锁成功返回解锁成功的消息数量,解锁 失败,返回0
  118 + *
  119 + * @param message
  120 + * @return
  121 + */
  122 + public int tryReleaseLock(List<DtmsMessage> releaseList);
105 123 }
106 124 \ No newline at end of file
... ...
dtms-service/src/main/java/com/b2c/dtms/service/impl/DtmsMessageProducerImpl.java
1 1 package com.b2c.dtms.service.impl;
2 2  
  3 +import com.b2c.dtms.domain.DtmsMessage;
3 4 import com.b2c.dtms.common.CommonConstants;
4 5 import com.b2c.dtms.common.CommonUtils;
5 6 import com.b2c.dtms.common.enums.dtms.DtmsMessageStatus;
... ... @@ -7,7 +8,6 @@ import com.b2c.dtms.common.enums.dtms.LockStatus;
7 8 import com.b2c.dtms.common.tools.DateUtils;
8 9 import com.b2c.dtms.common.tools.ValidatorUtil;
9 10 import com.b2c.dtms.dao.dtms.DtmsMessageDao;
10   -import com.b2c.dtms.domain.DtmsMessage;
11 11 import com.b2c.dtms.service.DtmsMaxidService;
12 12 import com.b2c.dtms.service.DtmsMessageProducer;
13 13  
... ... @@ -21,148 +21,150 @@ import java.util.Date;
21 21 import java.util.List;
22 22  
23 23 @Service("dtmsMessageProducer")
24   -public class DtmsMessageProducerImpl implements DtmsMessageProducer{
25   -
26   - @Resource private DtmsMessageDao dtmsMessageDao;
27   - @Resource private DtmsMaxidService dtmsMaxidService;
28   -
29   - @Override
30   - @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
31   - public Long produce(DtmsMessage message) {
32   - if(message==null){
33   - return null;
34   - }
35   -
36   - message.setId(dtmsMaxidService.getMessageNewSeqNo());
37   - message.setRuntime(calRuntime(message));
38   - message.setLockStatus(LockStatus.UNLOCK.getIndex());
39   - message.setStatus(DtmsMessageStatus.WaitProcess.code());
40   - if(message.getCreateTime()==null){
41   - message.setCreateTime(DateUtils.getCurrentDate());
42   - }
43   - if(message.getWaitRetryNum()==null){
44   - message.setWaitRetryNum(DtmsMessage.defaultWaitRetryNum);
45   - }
46   - int ret=dtmsMessageDao.insertEntry(message);
47   - CommonUtils.throwAppException(ret!=1, "新增消息失败");
48   - return message.getId();
49   - }
50   -
51   - private Date calRuntime(DtmsMessage message){
52   - if(message.getDelaySeconds()==null && message.getRuntime()==null){
53   - return DateUtils.getCurrentDate();
54   - }else if(message.getDelaySeconds()!=null && message.getRuntime()==null){
55   - return DateUtils.addSeconds(DateUtils.getCurrentDate(), message.getDelaySeconds());
56   - }else if(message.getDelaySeconds()!=null && message.getRuntime()!=null){
57   - return DateUtils.addSeconds(message.getRuntime(), message.getDelaySeconds());
58   - }else if(message.getRuntime()!=null){
59   - return message.getRuntime();
60   - }
61   - return DateUtils.getCurrentDate();
62   - }
63   -
64   - @Override
65   - @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
66   - public List<Long> produce(List<DtmsMessage> messageList) {
67   - if(messageList==null || messageList.isEmpty()){
68   - return new ArrayList<Long>(0);
69   - }
70   - List<Long> idList=new ArrayList<Long>(messageList.size());
71   - List<DtmsMessage> list=new ArrayList<DtmsMessage>(messageList.size());
72   - for(DtmsMessage msg:messageList){
73   - if(msg==null){
74   - continue;
75   - }
76   - msg.setId(dtmsMaxidService.getMessageNewSeqNo());
77   - msg.setRuntime(calRuntime(msg));
78   - msg.setLockStatus(LockStatus.UNLOCK.getIndex());
79   - msg.setStatus(DtmsMessageStatus.WaitProcess.code());
80   - if(msg.getCreateTime()==null){
81   - msg.setCreateTime(DateUtils.getCurrentDate());
82   - }
83   - if(msg.getWaitRetryNum()==null){
84   - msg.setWaitRetryNum(DtmsMessage.defaultWaitRetryNum);
85   - }
86   - list.add(msg);
87   -
88   - idList.add(msg.getId());
89   - }
90   - int ret=dtmsMessageDao.insertBatch(list);
91   - CommonUtils.throwAppException(ret!=list.size(), "新增消息失败");
92   - return idList;
93   - }
94   -
95   - @Override
96   - @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
97   - public int remove(Long messageId) {
98   - int ret=dtmsMessageDao.deleteByKey(messageId);
99   - //CommonUtils.throwAppException(ret!=messageId.length, "删除消息失败");
100   - return ret;
101   - }
102   -
103   - @Override
104   - @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
105   - public int removeById(List<Long> msgIdList) {
106   - if (ValidatorUtil.isEmpty(msgIdList)) {
  24 +public class DtmsMessageProducerImpl implements DtmsMessageProducer {
  25 +
  26 + @Resource
  27 + private DtmsMessageDao dtmsMessageDao;
  28 + @Resource
  29 + private DtmsMaxidService dtmsMaxidService;
  30 +
  31 + @Override
  32 + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
  33 + public Long produce(DtmsMessage message) {
  34 + if (message == null) {
  35 + return null;
  36 + }
  37 +
  38 + message.setId(dtmsMaxidService.getMessageNewSeqNo());
  39 + message.setRuntime(calRuntime(message));
  40 + message.setLockStatus(LockStatus.UNLOCK.getIndex());
  41 + message.setStatus(DtmsMessageStatus.WaitProcess.code());
  42 + if (message.getCreateTime() == null) {
  43 + message.setCreateTime(DateUtils.getCurrentDate());
  44 + }
  45 + if (message.getWaitRetryNum() == null) {
  46 + message.setWaitRetryNum(DtmsMessage.defaultWaitRetryNum);
  47 + }
  48 + int ret = dtmsMessageDao.insertEntry(message);
  49 + CommonUtils.throwAppException(ret != 1, "新增消息失败");
  50 + return message.getId();
  51 + }
  52 +
  53 + private Date calRuntime(DtmsMessage message) {
  54 + if (message.getDelaySeconds() == null && message.getRuntime() == null) {
  55 + return DateUtils.getCurrentDate();
  56 + } else if (message.getDelaySeconds() != null && message.getRuntime() == null) {
  57 + return DateUtils.addSeconds(DateUtils.getCurrentDate(), message.getDelaySeconds());
  58 + } else if (message.getDelaySeconds() != null && message.getRuntime() != null) {
  59 + return DateUtils.addSeconds(message.getRuntime(), message.getDelaySeconds());
  60 + } else if (message.getRuntime() != null) {
  61 + return message.getRuntime();
  62 + }
  63 + return DateUtils.getCurrentDate();
  64 + }
  65 +
  66 + @Override
  67 + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
  68 + public List<Long> produce(List<DtmsMessage> messageList) {
  69 + if (messageList == null || messageList.isEmpty()) {
  70 + return new ArrayList<Long>(0);
  71 + }
  72 + List<Long> idList = new ArrayList<Long>(messageList.size());
  73 + List<DtmsMessage> list = new ArrayList<DtmsMessage>(messageList.size());
  74 + for (DtmsMessage msg : messageList) {
  75 + if (msg == null) {
  76 + continue;
  77 + }
  78 + msg.setId(dtmsMaxidService.getMessageNewSeqNo());
  79 + msg.setRuntime(calRuntime(msg));
  80 + msg.setLockStatus(LockStatus.UNLOCK.getIndex());
  81 + msg.setStatus(DtmsMessageStatus.WaitProcess.code());
  82 + if (msg.getCreateTime() == null) {
  83 + msg.setCreateTime(DateUtils.getCurrentDate());
  84 + }
  85 + if (msg.getWaitRetryNum() == null) {
  86 + msg.setWaitRetryNum(DtmsMessage.defaultWaitRetryNum);
  87 + }
  88 + list.add(msg);
  89 +
  90 + idList.add(msg.getId());
  91 + }
  92 + int ret = dtmsMessageDao.insertBatch(list);
  93 + CommonUtils.throwAppException(ret != list.size(), "新增消息失败");
  94 + return idList;
  95 + }
  96 +
  97 + @Override
  98 + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
  99 + public int remove(Long messageId) {
  100 + int ret = dtmsMessageDao.deleteByKey(messageId);
  101 + // CommonUtils.throwAppException(ret!=messageId.length, "删除消息失败");
  102 + return ret;
  103 + }
  104 +
  105 + @Override
  106 + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
  107 + public int removeById(List<Long> msgIdList) {
  108 + if (ValidatorUtil.isEmpty(msgIdList)) {
107 109 return CommonConstants.COMMONS_ZERO_INT;
108 110 }
109   - int ret=dtmsMessageDao.deleteByKey(msgIdList.toArray(new Long[msgIdList.size()]));
110   - //CommonUtils.throwAppException(ret!=messageIds.size(), "删除消息失败");
111   - return ret;
112   - }
113   -
114   -// @Override
115   -// @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
116   -// public int removeByTag(List<String> tagList) {
117   -// if(tagList==null || tagList.size()==0){
118   -// return 0;
119   -// }
120   -// DtmsMessage condition=new DtmsMessage();
121   -// condition.addQueryData("tags", tagList);
122   -// int ret=dtmsMessageDao.deleteByCondition(condition);
123   -// return ret;
124   -// }
125   -//
126   -// @Override
127   -// @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
128   -// public int confirm(List<String> tagList) {
129   -//
130   -// if(tagList==null || tagList.size()==0){
131   -// return 0;
132   -// }
133   -//
134   -// DtmsMessage dtmsMessage=new DtmsMessage();
135   -// //修改消息状态,已二次确认,消息可以被直接执行
136   -// dtmsMessage.setStatus(DtmsMessageStatus.SecondConfirmed.code());
137   -//
138   -// //查询条件
139   -// dtmsMessage.addQueryData("tags", tagList);
140   -// dtmsMessage.addQueryData("lockStatus",LockStatus.UNLOCK.getIndex());
141   -//
142   -// //更新
143   -// int ret=dtmsMessageDao.updateByCondition(dtmsMessage);
144   -// return ret;
145   -// }
146   -
147   - @Override
148   - @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
149   - public int confirmIds(List<Long> idList) {
150   -
151   - if (ValidatorUtil.isEmpty(idList)) {
  111 + int ret = dtmsMessageDao.deleteByKey(msgIdList.toArray(new Long[msgIdList.size()]));
  112 + // CommonUtils.throwAppException(ret!=messageIds.size(), "删除消息失败");
  113 + return ret;
  114 + }
  115 +
  116 + // @Override
  117 + // @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
  118 + // public int removeByTag(List<String> tagList) {
  119 + // if(tagList==null || tagList.size()==0){
  120 + // return 0;
  121 + // }
  122 + // DtmsMessage condition=new DtmsMessage();
  123 + // condition.addQueryData("tags", tagList);
  124 + // int ret=dtmsMessageDao.deleteByCondition(condition);
  125 + // return ret;
  126 + // }
  127 + //
  128 + // @Override
  129 + // @Transactional(propagation=Propagation.REQUIRES_NEW,rollbackFor=Exception.class)
  130 + // public int confirm(List<String> tagList) {
  131 + //
  132 + // if(tagList==null || tagList.size()==0){
  133 + // return 0;
  134 + // }
  135 + //
  136 + // DtmsMessage dtmsMessage=new DtmsMessage();
  137 + // //修改消息状态,已二次确认,消息可以被直接执行
  138 + // dtmsMessage.setStatus(DtmsMessageStatus.SecondConfirmed.code());
  139 + //
  140 + // //查询条件
  141 + // dtmsMessage.addQueryData("tags", tagList);
  142 + // dtmsMessage.addQueryData("lockStatus",LockStatus.UNLOCK.getIndex());
  143 + //
  144 + // //更新
  145 + // int ret=dtmsMessageDao.updateByCondition(dtmsMessage);
  146 + // return ret;
  147 + // }
  148 +
  149 + @Override
  150 + @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
  151 + public int confirmIds(List<Long> idList) {
  152 +
  153 + if (ValidatorUtil.isEmpty(idList)) {
152 154 return CommonConstants.COMMONS_ZERO_INT;
153 155 }
154   -
155   - DtmsMessage dtmsMessage=new DtmsMessage();
156   - //修改消息状态,已二次确认,消息可以被直接执行
157   - dtmsMessage.setStatus(DtmsMessageStatus.SecondConfirmed.code());
158   -
159   - //查询条件
160   - dtmsMessage.addQueryData("idList", idList);
161   - dtmsMessage.addQueryData("lockStatus",LockStatus.UNLOCK.getIndex());
162   -
163   - //更新
164   - int ret=dtmsMessageDao.updateByCondition(dtmsMessage);
165   - return ret;
166   - }
  156 +
  157 + DtmsMessage dtmsMessage = new DtmsMessage();
  158 + // 修改消息状态,已二次确认,消息可以被直接执行
  159 + dtmsMessage.setStatus(DtmsMessageStatus.SecondConfirmed.code());
  160 +
  161 + // 查询条件
  162 + dtmsMessage.addQueryData("idList", idList);
  163 + dtmsMessage.addQueryData("lockStatus", LockStatus.UNLOCK.getIndex());
  164 +
  165 + // 更新
  166 + int ret = dtmsMessageDao.updateByCondition(dtmsMessage);
  167 + return ret;
  168 + }
167 169  
168 170 }
... ...
dtms-service/src/main/java/com/b2c/dtms/service/impl/DtmsMessageServiceImpl.java
... ... @@ -4,13 +4,13 @@
4 4 */
5 5 package com.b2c.dtms.service.impl;
6 6  
  7 +import com.b2c.dtms.domain.DtmsMessage;
7 8 import com.b2c.dtms.common.CommonUtils;
8 9 import com.b2c.dtms.common.enums.dtms.DtmsMessageStatus;
9 10 import com.b2c.dtms.common.enums.dtms.LockStatus;
10 11 import com.b2c.dtms.common.tools.DateUtils;
11 12 import com.b2c.dtms.dao.base.BaseDao;
12 13 import com.b2c.dtms.dao.dtms.DtmsMessageDao;
13   -import com.b2c.dtms.domain.DtmsMessage;
14 14 import com.b2c.dtms.domain.DtmsMessageException;
15 15 import com.b2c.dtms.domain.DtmsMessageHistory;
16 16 import com.b2c.dtms.service.DtmsMaxidService;
... ... @@ -31,210 +31,213 @@ import java.util.List;
31 31  
32 32 /**
33 33 * DtmsMessageService 实现类
  34 + *
34 35 * @author dev-center
35 36 * @since 2015-03-30
36 37 */
37 38 @Service("dtmsMessageService")
38   -public class DtmsMessageServiceImpl extends BaseServiceImpl<DtmsMessage,Long> implements DtmsMessageService {
39   -
40   - @Resource private DtmsMessageDao dtmsMessageDao;
41   - @Resource private DtmsMaxidService dtmsMaxidService;
42   - @Resource private DtmsMessageExceptionService dtmsMessageExceptionService;
43   - @Resource private DtmsMessageHistoryService dtmsMessageHistoryService;
44   -
45   - protected static final Logger LOGGER = LoggerFactory.getLogger(DtmsMessageServiceImpl.class);
46   -
47   - public BaseDao<DtmsMessage,Long> getDao() {
48   - return dtmsMessageDao;
49   - }
50   -
51   - @Override
52   - public List<DtmsMessage> getListUnLockedBeforeSepcTime(int startRow, int endRow,Date specTime) {
53   - DtmsMessage condition=new DtmsMessage();
54   - condition.setStartIndex(startRow);
55   - condition.setEndIndex(endRow);
56   - condition.setRuntime(specTime);
57   - return dtmsMessageDao.getListUnLockedBeforeSepcTime(condition);
58   - }
59   -
60   -
61   - @Override
62   - @Transactional(rollbackFor=Exception.class)
63   - public int move2Exception(DtmsMessage message,String moveMemo){
64   -
65   -
66   - message.setStatus(DtmsMessageStatus.Exception.code());
67   -
68   - DtmsMessageException exceptionMessage=DtmsMessageUtils.message2exceptionWithoutId(message,moveMemo);
69   - exceptionMessage.setId(dtmsMaxidService.getMessageExceptionNewSeqNo());
70   -
71   - DtmsMessageHistory historyMessage=DtmsMessageUtils.message2HisotryWithoutId(message, moveMemo);
72   - historyMessage.setId(dtmsMaxidService.getMessageHistoryNewSeqNo());
73   -
74   - int ret=dtmsMessageDao.deleteByKeyAndVersion(message);
75   - /**
76   - * 因消息并发访问(多个任务同时执行一个消息),删除不到时,直接返回,不抛异常
77   - */
78   - //CommonUtils.throwAppException(ret!=1, "删除消息失败");
79   -
80   - /**
81   - * 放开重复执行之后 历史表及异常表记录,用于跟踪哪些消息被重复执行。TODO:以后可加统计任务,统计出哪些消息被重复执行力了
82   - */
83   -// if(ret==0){
84   -// return ret;
85   -// }
86   -
87   - ret=dtmsMessageHistoryService.insertEntry(historyMessage);
88   - CommonUtils.throwAppException(ret!=1, "增加历史消息失败");
89   - ret=dtmsMessageExceptionService.insertEntry(exceptionMessage);
90   - CommonUtils.throwAppException(ret!=1, "增加异常消息失败");
91   -
92   - return ret;
93   - }
94   -
95   - @Override
96   - @Transactional(rollbackFor=Exception.class)
97   - public int move2History(DtmsMessage message,String consumeLog) {
98   - if(message==null){
99   - return 0;
100   - }
101   - message.setStatus(DtmsMessageStatus.Porcessed.code());
102   -
103   - DtmsMessageHistory history=DtmsMessageUtils.message2HisotryWithoutId(message, consumeLog);
104   - history.setId(dtmsMaxidService.getMessageHistoryNewSeqNo());
105   -
106   - int ret=dtmsMessageDao.deleteByKeyAndVersion(message);
107   -
108   - /**
109   - * 因消息并发访问(多个任务同时执行一个消息),删除不到时,直接返回,不抛异常
110   - */
111   - //CommonUtils.throwAppException(ret!=1, "删除消息失败");
112   -
113   - /**
114   - * 放开重复执行之后 历史表及异常表记录,用于跟踪哪些消息被重复执行。TODO:以后可加统计任务,统计出哪些消息被重复执行力了
115   - */
116   -// if(ret==0){
117   -// return ret;
118   -// }
119   -
120   - ret=dtmsMessageHistoryService.insertEntry(history);
121   - CommonUtils.throwAppException(ret!=1, "增加历史消息失败");
122   - return 1;
123   - }
124   -
125   - @Override
126   - @Transactional(rollbackFor=Exception.class)
127   - public int reduceRetryCount(DtmsMessage message,int count,String consumeLog) {
128   - if(message==null){
129   - return 0;
130   - }
131   - DtmsMessage toUpdateMessage=new DtmsMessage();
132   - toUpdateMessage.setId(message.getId());
133   - toUpdateMessage.setWaitRetryNum(message.getWaitRetryNum()-count);
134   - toUpdateMessage.setLatestConsumeLog(consumeLog);
135   - toUpdateMessage.setVersion(message.getVersion());
136   - return dtmsMessageDao.reduceRetryCount(toUpdateMessage);
137   - }
138   -
139   - @Override
140   - @Transactional(rollbackFor=Exception.class)
141   - public int reduceRetryCountAndSetNextRuntime(DtmsMessage message,int count, Date nextRuntime,String consumeLog) {
142   - if(message==null){
143   - return 0;
144   - }
145   - DtmsMessage toUpdateMessage=new DtmsMessage();
146   - toUpdateMessage.setId(message.getId());
147   - toUpdateMessage.setWaitRetryNum(message.getWaitRetryNum()-count);
148   - toUpdateMessage.setRuntime(nextRuntime);
149   - toUpdateMessage.setLatestConsumeLog(consumeLog);
150   - toUpdateMessage.setVersion(message.getVersion());
151   - return dtmsMessageDao.reduceRetryCountAndSetNextRuntime(toUpdateMessage);
152   - }
153   -
154   - //@Transactional(rollbackFor=Exception.class)
155   - /**
156   - * 因加事务回滚之后,spring会自动处理为当整个方法执行完之后,事务才提交,数据才对其他
157   - * 调用者可见。为了使加锁立即生效,需要再执行完tryAddLock之后立即对其他调用者可见,因此此方法不能加事务回滚声明
158   - * 出现异常,直接记录异常即可,消息会被下次执行或者监控任务执行解锁
159   - */
160   - @Override
161   - public synchronized DtmsMessage tryAddLock(DtmsMessage message) {
162   - try{
163   - if(message==null){
164   - return null;
165   - }
166   - message.setLockTime(DateUtils.getCurrentDate());
167   - int lockedCount=dtmsMessageDao.tryAddLock(message);
168   - if(lockedCount==1){
169   - //提高处理效率,不重新查询
170   - message.setLockStatus(LockStatus.LOCKING.getIndex());
171   - message.setVersion(message.getVersion()+1);
172   - LOGGER.info(String.format("[DTMS]消息ID%s:被成功锁定,当前版本号%s,锁定时间%s",message.getId(),message.getVersion(),DateUtils.format(message.getLockTime())));
173   - return message;
174   - }
175   - }catch(Exception e){
176   - //此处,异常只记录日志
177   - LOGGER.error(String.format("[DTMS]消息ID%s:尝试加锁失败,发生异常[%s]",message.getId(),e.getMessage()));
178   - }
179   - return null;
180   - }
181   -
182   -// @Override
183   -// @Transactional(rollbackFor=Exception.class)
184   -// public DtmsMessage tryAddLock(DtmsMessage message) {
185   -// if(message==null){
186   -// return null;
187   -// }
188   -// DtmsMessage updateMsg=new DtmsMessage();
189   -// updateMsg.setId(message.getId());
190   -// updateMsg.setLockTime(DateUtils.getCurrentDate());
191   -// updateMsg.setLockStatus(LockStatus.LOCKING.getIndex());
192   -// updateMsg.setVersion(message.getVersion());
193   -// int lockedCount=dtmsMessageDao.updateByKey(updateMsg);
194   -// if(lockedCount==1){
195   -// //提高处理效率,不重新查询
196   -// // return dtmsMessageDao.selectEntry(message.getId());
197   -// message.setLockTime(updateMsg.getLockTime());
198   -// message.setLockStatus(updateMsg.getLockStatus());
199   -// message.setVersion(updateMsg.getVersion()+1);
200   -// return message;
201   -// }
202   -// return null;
203   -// }
204   -
205   - @Override
206   - @Transactional(rollbackFor=Exception.class)
207   - public int tryReleaseLock(DtmsMessage message) {
208   - if(message==null || message.getLockStatus()==LockStatus.UNLOCK.getIndex()){
209   - return 0;
210   - }
211   - DtmsMessage updateMessage=new DtmsMessage();
212   - updateMessage.setId(message.getId());
213   - //分布式或者多机环境下,需要版本控制
214   - updateMessage.setVersion(message.getVersion());
215   - updateMessage.setLockStatus(message.getLockStatus());
216   - updateMessage.setLockTime(message.getLockTime());
217   - return dtmsMessageDao.tryReleaseLock(updateMessage);
218   - }
219   -
220   -
221   - @Override
222   - @Transactional(rollbackFor=Exception.class)
223   - public int tryReleaseLock(List<DtmsMessage> list) {
224   - if(list==null || list.size()==0){
225   - return 0;
226   - }
227   - return dtmsMessageDao.tryReleaseLock(list);
228   - }
229   -
230   - @Override
231   - public List<DtmsMessage> getLockTimeOutList(DtmsMessage condition) {
232   - return dtmsMessageDao.getLockTimeOutList(condition);
233   - }
234   -
235   - @Override
236   - public boolean isExist(DtmsMessage dtmsMessage) {
237   - int ret= dtmsMessageDao.selectLockedCount(dtmsMessage);
238   - return ret==1?true:false;
239   - }
  39 +public class DtmsMessageServiceImpl extends BaseServiceImpl<DtmsMessage, Long> implements DtmsMessageService {
  40 +
  41 + @Resource
  42 + private DtmsMessageDao dtmsMessageDao;
  43 + @Resource
  44 + private DtmsMaxidService dtmsMaxidService;
  45 + @Resource
  46 + private DtmsMessageExceptionService dtmsMessageExceptionService;
  47 + @Resource
  48 + private DtmsMessageHistoryService dtmsMessageHistoryService;
  49 +
  50 + protected static final Logger LOGGER = LoggerFactory.getLogger(DtmsMessageServiceImpl.class);
  51 +
  52 + public BaseDao<DtmsMessage, Long> getDao() {
  53 + return dtmsMessageDao;
  54 + }
  55 +
  56 + @Override
  57 + public List<DtmsMessage> getListUnLockedBeforeSepcTime(int startRow, int endRow, Date specTime) {
  58 + DtmsMessage condition = new DtmsMessage();
  59 + condition.setStartIndex(startRow);
  60 + condition.setEndIndex(endRow);
  61 + condition.setRuntime(specTime);
  62 + return dtmsMessageDao.getListUnLockedBeforeSepcTime(condition);
  63 + }
  64 +
  65 + @Override
  66 + @Transactional(rollbackFor = Exception.class)
  67 + public int move2Exception(DtmsMessage message, String moveMemo) {
  68 +
  69 + message.setStatus(DtmsMessageStatus.Exception.code());
  70 +
  71 + DtmsMessageException exceptionMessage = DtmsMessageUtils.message2exceptionWithoutId(message, moveMemo);
  72 + exceptionMessage.setId(dtmsMaxidService.getMessageExceptionNewSeqNo());
  73 +
  74 + DtmsMessageHistory historyMessage = DtmsMessageUtils.message2HisotryWithoutId(message, moveMemo);
  75 + historyMessage.setId(dtmsMaxidService.getMessageHistoryNewSeqNo());
  76 +
  77 + int ret = dtmsMessageDao.deleteByKeyAndVersion(message);
  78 + /**
  79 + * 因消息并发访问(多个任务同时执行一个消息),删除不到时,直接返回,不抛异常
  80 + */
  81 + // CommonUtils.throwAppException(ret!=1, "删除消息失败");
  82 +
  83 + /**
  84 + * 放开重复执行之后 历史表及异常表记录,用于跟踪哪些消息被重复执行。TODO:以后可加统计任务,统计出哪些消息被重复执行力了
  85 + */
  86 + // if(ret==0){
  87 + // return ret;
  88 + // }
  89 +
  90 + ret = dtmsMessageHistoryService.insertEntry(historyMessage);
  91 + CommonUtils.throwAppException(ret != 1, "增加历史消息失败");
  92 + ret = dtmsMessageExceptionService.insertEntry(exceptionMessage);
  93 + CommonUtils.throwAppException(ret != 1, "增加异常消息失败");
  94 +
  95 + return ret;
  96 + }
  97 +
  98 + @Override
  99 + @Transactional(rollbackFor = Exception.class)
  100 + public int move2History(DtmsMessage message, String consumeLog) {
  101 + if (message == null) {
  102 + return 0;
  103 + }
  104 + message.setStatus(DtmsMessageStatus.Porcessed.code());
  105 +
  106 + DtmsMessageHistory history = DtmsMessageUtils.message2HisotryWithoutId(message, consumeLog);
  107 + history.setId(dtmsMaxidService.getMessageHistoryNewSeqNo());
  108 +
  109 + int ret = dtmsMessageDao.deleteByKeyAndVersion(message);
  110 +
  111 + /**
  112 + * 因消息并发访问(多个任务同时执行一个消息),删除不到时,直接返回,不抛异常
  113 + */
  114 + // CommonUtils.throwAppException(ret!=1, "删除消息失败");
  115 +
  116 + /**
  117 + * 放开重复执行之后 历史表及异常表记录,用于跟踪哪些消息被重复执行。TODO:以后可加统计任务,统计出哪些消息被重复执行力了
  118 + */
  119 + // if(ret==0){
  120 + // return ret;
  121 + // }
  122 +
  123 + ret = dtmsMessageHistoryService.insertEntry(history);
  124 + CommonUtils.throwAppException(ret != 1, "增加历史消息失败");
  125 + return 1;
  126 + }
  127 +
  128 + @Override
  129 + @Transactional(rollbackFor = Exception.class)
  130 + public int reduceRetryCount(DtmsMessage message, int count, String consumeLog) {
  131 + if (message == null) {
  132 + return 0;
  133 + }
  134 + DtmsMessage toUpdateMessage = new DtmsMessage();
  135 + toUpdateMessage.setId(message.getId());
  136 + toUpdateMessage.setWaitRetryNum(message.getWaitRetryNum() - count);
  137 + toUpdateMessage.setLatestConsumeLog(consumeLog);
  138 + toUpdateMessage.setVersion(message.getVersion());
  139 + return dtmsMessageDao.reduceRetryCount(toUpdateMessage);
  140 + }
  141 +
  142 + @Override
  143 + @Transactional(rollbackFor = Exception.class)
  144 + public int reduceRetryCountAndSetNextRuntime(DtmsMessage message, int count, Date nextRuntime, String consumeLog) {
  145 + if (message == null) {
  146 + return 0;
  147 + }
  148 + DtmsMessage toUpdateMessage = new DtmsMessage();
  149 + toUpdateMessage.setId(message.getId());
  150 + toUpdateMessage.setWaitRetryNum(message.getWaitRetryNum() - count);
  151 + toUpdateMessage.setRuntime(nextRuntime);
  152 + toUpdateMessage.setLatestConsumeLog(consumeLog);
  153 + toUpdateMessage.setVersion(message.getVersion());
  154 + return dtmsMessageDao.reduceRetryCountAndSetNextRuntime(toUpdateMessage);
  155 + }
  156 +
  157 + // @Transactional(rollbackFor=Exception.class)
  158 + /**
  159 + * 因加事务回滚之后,spring会自动处理为当整个方法执行完之后,事务才提交,数据才对其他
  160 + * 调用者可见。为了使加锁立即生效,需要再执行完tryAddLock之后立即对其他调用者可见,因此此方法不能加事务回滚声明
  161 + * 出现异常,直接记录异常即可,消息会被下次执行或者监控任务执行解锁
  162 + */
  163 + @Override
  164 + public synchronized DtmsMessage tryAddLock(DtmsMessage message) {
  165 + try {
  166 + if (message == null) {
  167 + return null;
  168 + }
  169 + message.setLockTime(DateUtils.getCurrentDate());
  170 + int lockedCount = dtmsMessageDao.tryAddLock(message);
  171 + if (lockedCount == 1) {
  172 + // 提高处理效率,不重新查询
  173 + message.setLockStatus(LockStatus.LOCKING.getIndex());
  174 + message.setVersion(message.getVersion() + 1);
  175 + LOGGER.info(String.format("[DTMS]消息ID%s:被成功锁定,当前版本号%s,锁定时间%s", message.getId(), message.getVersion(),
  176 + DateUtils.format(message.getLockTime())));
  177 + return message;
  178 + }
  179 + } catch (Exception e) {
  180 + // 此处,异常只记录日志
  181 + LOGGER.error(String.format("[DTMS]消息ID%s:尝试加锁失败,发生异常[%s]", message.getId(), e.getMessage()));
  182 + }
  183 + return null;
  184 + }
  185 +
  186 + // @Override
  187 + // @Transactional(rollbackFor=Exception.class)
  188 + // public DtmsMessage tryAddLock(DtmsMessage message) {
  189 + // if(message==null){
  190 + // return null;
  191 + // }
  192 + // DtmsMessage updateMsg=new DtmsMessage();
  193 + // updateMsg.setId(message.getId());
  194 + // updateMsg.setLockTime(DateUtils.getCurrentDate());
  195 + // updateMsg.setLockStatus(LockStatus.LOCKING.getIndex());
  196 + // updateMsg.setVersion(message.getVersion());
  197 + // int lockedCount=dtmsMessageDao.updateByKey(updateMsg);
  198 + // if(lockedCount==1){
  199 + // //提高处理效率,不重新查询
  200 + // // return dtmsMessageDao.selectEntry(message.getId());
  201 + // message.setLockTime(updateMsg.getLockTime());
  202 + // message.setLockStatus(updateMsg.getLockStatus());
  203 + // message.setVersion(updateMsg.getVersion()+1);
  204 + // return message;
  205 + // }
  206 + // return null;
  207 + // }
  208 +
  209 + @Override
  210 + @Transactional(rollbackFor = Exception.class)
  211 + public int tryReleaseLock(DtmsMessage message) {
  212 + if (message == null || message.getLockStatus() == LockStatus.UNLOCK.getIndex()) {
  213 + return 0;
  214 + }
  215 + DtmsMessage updateMessage = new DtmsMessage();
  216 + updateMessage.setId(message.getId());
  217 + // 分布式或者多机环境下,需要版本控制
  218 + updateMessage.setVersion(message.getVersion());
  219 + updateMessage.setLockStatus(message.getLockStatus());
  220 + updateMessage.setLockTime(message.getLockTime());
  221 + return dtmsMessageDao.tryReleaseLock(updateMessage);
  222 + }
  223 +
  224 + @Override
  225 + @Transactional(rollbackFor = Exception.class)
  226 + public int tryReleaseLock(List<DtmsMessage> list) {
  227 + if (list == null || list.size() == 0) {
  228 + return 0;
  229 + }
  230 + return dtmsMessageDao.tryReleaseLock(list);
  231 + }
  232 +
  233 + @Override
  234 + public List<DtmsMessage> getLockTimeOutList(DtmsMessage condition) {
  235 + return dtmsMessageDao.getLockTimeOutList(condition);
  236 + }
  237 +
  238 + @Override
  239 + public boolean isExist(DtmsMessage dtmsMessage) {
  240 + int ret = dtmsMessageDao.selectLockedCount(dtmsMessage);
  241 + return ret == 1 ? true : false;
  242 + }
240 243 }
241 244 \ No newline at end of file
... ...
dtms-service/src/main/java/com/b2c/dtms/task/ScheduleTask.java
1 1 package com.b2c.dtms.task;
2 2  
  3 +import com.b2c.dtms.domain.DtmsMessage;
3 4 import com.b2c.dtms.common.enums.dtms.DtmsMessageConfigStatus;
4 5 import com.b2c.dtms.common.enums.dtms.DtmsMessageTopic;
5 6 import com.b2c.dtms.common.enums.dtms.HandleCode;
6 7 import com.b2c.dtms.common.enums.dtms.TimeUnit;
7 8 import com.b2c.dtms.common.tools.DateUtils;
8   -import com.b2c.dtms.domain.DtmsMessage;
9 9 import com.b2c.dtms.domain.DtmsMessageConfig;
10 10 import com.b2c.dtms.handler.DtmsHandleContext;
11 11 import com.b2c.dtms.handler.DtmsHandler;
... ... @@ -97,6 +97,7 @@ public class ScheduleTask implements Runnable {
97 97  
98 98 // 处理执行之后的消息
99 99 HandleCode handleCode = HandleCode.getHandleCode(ctx.getReturnCode());
  100 + int rows = 0;
100 101 switch (handleCode) {
101 102 case SUCCESS:
102 103 // 处理成功,移动消息到历史表。不需要释放消息锁
... ... @@ -106,12 +107,15 @@ public class ScheduleTask implements Runnable {
106 107 case FAILED:
107 108 case CONFIRM_FAILED:
108 109 if (msgConfig.getRetryStep() == null || msgConfig.getRetryStepUnit() == null) {
109   - dtmsMessageService.reduceRetryCount(ctx.getWaitHandleDtmsMessage(), 1,
  110 + rows = dtmsMessageService.reduceRetryCount(ctx.getWaitHandleDtmsMessage(), 1,
110 111 serverName + ctx.getReturnMessage());
111 112 } else {
112 113 Date nextRuntime = calculateNextRuntime(ctx.getWaitHandleDtmsMessage(), msgConfig);
113   - dtmsMessageService.reduceRetryCountAndSetNextRuntime(ctx.getWaitHandleDtmsMessage(), 1, nextRuntime,
114   - serverName + ctx.getReturnMessage());
  114 + rows = dtmsMessageService.reduceRetryCountAndSetNextRuntime(ctx.getWaitHandleDtmsMessage(), 1,
  115 + nextRuntime, serverName + ctx.getReturnMessage());
  116 + }
  117 + if (rows > 0) {
  118 + this.dtmsMessage.setVersion(this.dtmsMessage.getVersion() + 1);
115 119 }
116 120 dtmsMessageService.tryReleaseLock(dtmsMessage);
117 121 break;
... ...
dtms-service/src/main/java/com/b2c/dtms/task/ScheduleTaskFactory.java
... ... @@ -9,24 +9,27 @@ import org.springframework.stereotype.Service;
9 9  
10 10 import javax.annotation.Resource;
11 11  
12   -
13 12 @Service
14 13 public class ScheduleTaskFactory {
15 14  
16   - @Resource private DtmsMessageService dtmsMessageService;
17   - @Resource private DtmsHandlerCache dtmsHandlerCache;
18   - @Resource private DtmsMessageConfigCache dtmsMessageConfigCache;
19   -
20   - public ScheduleTask create(DtmsMessage dtmsMessage,String serverName){
21   -
22   - if(dtmsMessage==null){
23   - return null;
24   - }
25   - //尝试加锁,失败则不执行任务
26   - DtmsMessage lockedDtmsMessage=dtmsMessageService.tryAddLock(dtmsMessage);
27   - if(lockedDtmsMessage==null){
28   - return null;
29   - }
30   - return new ScheduleTask(lockedDtmsMessage,dtmsMessageConfigCache,dtmsHandlerCache,dtmsMessageService,serverName);
31   - }
  15 + @Resource
  16 + private DtmsMessageService dtmsMessageService;
  17 + @Resource
  18 + private DtmsHandlerCache dtmsHandlerCache;
  19 + @Resource
  20 + private DtmsMessageConfigCache dtmsMessageConfigCache;
  21 +
  22 + public ScheduleTask create(DtmsMessage dtmsMessage, String serverName) {
  23 +
  24 + if (dtmsMessage == null) {
  25 + return null;
  26 + }
  27 + // 尝试加锁,失败则不执行任务
  28 + DtmsMessage lockedDtmsMessage = dtmsMessageService.tryAddLock(dtmsMessage);
  29 + if (lockedDtmsMessage == null) {
  30 + return null;
  31 + }
  32 + return new ScheduleTask(lockedDtmsMessage, dtmsMessageConfigCache, dtmsHandlerCache, dtmsMessageService,
  33 + serverName);
  34 + }
32 35 }
... ...
dtms-service/src/main/java/com/b2c/dtms/utils/DtmsMessageUtils.java
1 1 package com.b2c.dtms.utils;
2 2  
3   -import com.b2c.dtms.common.tools.DateUtils;
4 3 import com.b2c.dtms.domain.DtmsMessage;
  4 +import com.b2c.dtms.common.tools.DateUtils;
5 5 import com.b2c.dtms.domain.DtmsMessageException;
6 6 import com.b2c.dtms.domain.DtmsMessageHistory;
7 7  
8   -
9 8 public class DtmsMessageUtils {
10 9  
11   - public static DtmsMessageHistory message2HisotryWithoutId(DtmsMessage message,String moveMemo){
12   - DtmsMessageHistory history=new DtmsMessageHistory();
13   - history.setMsgId(message.getId());
14   - history.setBizId(message.getBizId());
15   - history.setContent(message.getContent());
16   - history.setCreateTime(message.getCreateTime());
17   - history.setRuntime(message.getRuntime());
18   - history.setDelaySeconds(message.getDelaySeconds());
19   - history.setTopic(message.getTopic());
20   - history.setMemo(message.getMemo());
21   - history.setWaitRetryNum(message.getWaitRetryNum());
22   - history.setVersion(message.getVersion());
23   - history.setConfirmUrl(message.getConfirmUrl());
24   - history.setCallUrl(message.getCallUrl());
25   - history.setTag(message.getTag());
26   - history.setLockStatus(message.getLockStatus());
27   - history.setLockTime(message.getLockTime());
28   - history.setStatus(message.getStatus());
29   - history.setHistoryTime(DateUtils.getCurrentDate());
30   - history.setHistoryMemo(moveMemo);
31   - history.setType(message.getType());
32   - history.setLatestConsumeLog(message.getLatestConsumeLog()); //此处记录移动到历史表原因
33   - return history;
34   - }
35   -
36   - public static DtmsMessageException message2exceptionWithoutId(DtmsMessage message,String exceptionMemo){
37   - DtmsMessageException exception=new DtmsMessageException();
38   - exception.setMsgId(message.getId());
39   - exception.setBizId(message.getBizId());
40   - exception.setStatus(message.getStatus());
41   - exception.setTopic(message.getTopic());
42   - exception.setContent(message.getContent());
43   - exception.setMemo(message.getMemo());
44   - exception.setCreateTime(message.getCreateTime());
45   - exception.setExceptionTime(DateUtils.getCurrentDate());
46   - exception.setExceptionMemo(exceptionMemo);
47   - exception.setLatestConsumeLog(message.getLatestConsumeLog()); //此处记录消息的最近一次的消费日志
48   - return exception;
49   - }
  10 + public static DtmsMessageHistory message2HisotryWithoutId(DtmsMessage message, String moveMemo) {
  11 + DtmsMessageHistory history = new DtmsMessageHistory();
  12 + history.setMsgId(message.getId());
  13 + history.setBizId(message.getBizId());
  14 + history.setContent(message.getContent());
  15 + history.setCreateTime(message.getCreateTime());
  16 + history.setRuntime(message.getRuntime());
  17 + history.setDelaySeconds(message.getDelaySeconds());
  18 + history.setTopic(message.getTopic());
  19 + history.setMemo(message.getMemo());
  20 + history.setWaitRetryNum(message.getWaitRetryNum());
  21 + history.setVersion(message.getVersion());
  22 + history.setConfirmUrl(message.getConfirmUrl());
  23 + history.setCallUrl(message.getCallUrl());
  24 + history.setTag(message.getTag());
  25 + history.setLockStatus(message.getLockStatus());
  26 + history.setLockTime(message.getLockTime());
  27 + history.setStatus(message.getStatus());
  28 + history.setHistoryTime(DateUtils.getCurrentDate());
  29 + history.setHistoryMemo(moveMemo);
  30 + history.setType(message.getType());
  31 + history.setLatestConsumeLog(message.getLatestConsumeLog()); // 此处记录移动到历史表原因
  32 + return history;
  33 + }
  34 +
  35 + public static DtmsMessageException message2exceptionWithoutId(DtmsMessage message, String exceptionMemo) {
  36 + DtmsMessageException exception = new DtmsMessageException();
  37 + exception.setMsgId(message.getId());
  38 + exception.setBizId(message.getBizId());
  39 + exception.setStatus(message.getStatus());
  40 + exception.setTopic(message.getTopic());
  41 + exception.setContent(message.getContent());
  42 + exception.setMemo(message.getMemo());
  43 + exception.setCreateTime(message.getCreateTime());
  44 + exception.setExceptionTime(DateUtils.getCurrentDate());
  45 + exception.setExceptionMemo(exceptionMemo);
  46 + exception.setLatestConsumeLog(message.getLatestConsumeLog()); // 此处记录消息的最近一次的消费日志
  47 + return exception;
  48 + }
50 49 }
... ...
dtms-web/pom.xml
... ... @@ -38,11 +38,6 @@
38 38 <artifactId>logback-core</artifactId>
39 39 </dependency>
40 40 <dependency>
41   - <groupId>${project.groupId}</groupId>
42   - <artifactId>dtms-client</artifactId>
43   - <version>${project.parent.version}</version>
44   - </dependency>
45   - <dependency>
46 41 <groupId>com.b2c.manage</groupId>
47 42 <artifactId>diligrp-manage-sdk</artifactId>
48 43 </dependency>
... ...
dtms-web/src/main/java/com/b2c/dtms/web/restful/DtmsRestController.java
... ... @@ -8,10 +8,10 @@ import org.springframework.web.bind.annotation.RequestBody;
8 8 import org.springframework.web.bind.annotation.RequestMapping;
9 9 import org.springframework.web.bind.annotation.RestController;
10 10  
  11 +import com.b2c.dtms.domain.DtmsMessage;
11 12 import com.b2c.dtms.client.domain.dto.request.DtmsProduceRequestDto;
12 13 import com.b2c.dtms.client.domain.dto.response.DtmsResponseDto;
13 14 import com.b2c.dtms.common.exception.AppException;
14   -import com.b2c.dtms.domain.DtmsMessage;
15 15 import com.b2c.dtms.service.DtmsMessageProducer;
16 16  
17 17 @RestController
... ...
... ... @@ -53,6 +53,11 @@
53 53 <dependencyManagement>
54 54 <dependencies>
55 55 <dependency>
  56 + <groupId>${project.groupId}</groupId>
  57 + <artifactId>dtms-client</artifactId>
  58 + <version>0.0.2-SNAPSHOT</version>
  59 + </dependency>
  60 + <dependency>
56 61 <groupId>com.b2c.manage</groupId>
57 62 <artifactId>diligrp-manage-sdk</artifactId>
58 63 <version>0.0.3-SNAPSHOT</version>
... ...