hotfix: mysql deadlock issue
This commit is contained in:
parent
2ec2ca6c8a
commit
89af1222df
@ -33,13 +33,9 @@ import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
|
||||
import cn.axzo.msg.center.inside.notices.service.MessageRelationService;
|
||||
import cn.axzo.msg.center.inside.notices.service.MessageRouterService;
|
||||
import cn.axzo.msg.center.inside.notices.service.RawMessageRecordService;
|
||||
import cn.axzo.msg.center.message.domain.param.MessageGroupNodeStatisticParam;
|
||||
import cn.axzo.msg.center.message.service.PendingMessageNewService;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
import cn.axzo.msg.center.service.dto.PersonDTO;
|
||||
import cn.axzo.msg.center.service.enums.AppTerminalTypeEnum;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import cn.azxo.framework.common.model.Page;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
@ -51,10 +51,10 @@ import cn.axzo.msg.center.inside.notices.event.SendMessageEvent;
|
||||
import cn.axzo.msg.center.inside.notices.service.MessageRecordService;
|
||||
import cn.axzo.msg.center.message.service.GeneralMessageMapperService;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
import cn.axzo.msg.center.service.dto.IdentifyAndReceiveType;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import cn.axzo.msg.center.utils.PersonIdentityUtil;
|
||||
import cn.azxo.framework.common.utils.LogUtil;
|
||||
import cn.azxo.framework.common.utils.LogUtil.ErrorLevel;
|
||||
|
||||
@ -15,11 +15,11 @@ import cn.axzo.msg.center.message.service.impl.GeneralMessageOldServiceImpl;
|
||||
import cn.axzo.msg.center.message.service.impl.oldmsg.OldMsgStatCacheValue.IdentityResponse;
|
||||
import cn.axzo.msg.center.message.service.impl.person.PersonIdInfo;
|
||||
import cn.axzo.msg.center.message.service.impl.person.PersonService;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.notices.common.enums.ReturnCodeEnum;
|
||||
import cn.axzo.msg.center.notices.common.exception.BizException;
|
||||
import cn.axzo.msg.center.service.dto.IdentityDTO;
|
||||
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import cn.axzo.msg.center.service.general.request.GeneralMessageOldDataStatisticRequest;
|
||||
import cn.axzo.msg.center.service.general.response.GeneralMessageOldDataStatisticResponse;
|
||||
import cn.azxo.framework.common.model.Page;
|
||||
|
||||
@ -22,11 +22,11 @@ import cn.axzo.msg.center.message.service.todo.manage.broadcast.TodoMqBroadcaste
|
||||
import cn.axzo.msg.center.message.service.todo.manage.event.HandoverEvent;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.event.NewTodoEvent;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
|
||||
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
|
||||
import cn.axzo.msg.center.service.enums.MessageCategoryEnum;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
|
||||
import cn.axzo.msg.center.service.enums.TodoType;
|
||||
import cn.axzo.msg.center.service.enums.YesOrNo;
|
||||
@ -49,6 +49,7 @@ import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapp
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@ -60,6 +61,7 @@ import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.transaction.support.TransactionTemplate;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
@ -67,6 +69,7 @@ import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
@ -90,19 +93,32 @@ public class TodoManager {
|
||||
private final PendingMessageBizConfig cfg;
|
||||
private final ApplicationContext applicationContext;
|
||||
private final TodoBroadcaster todoBroadcaster;
|
||||
private final TransactionTemplate transactionTemplate;
|
||||
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public List<PushPendingMessageDTO> send(PendingMessagePushParam request) {
|
||||
TodoRequestContext ctx = TodoRequestContext.create("send", request.normalize());
|
||||
// 因为有外层事务, 所以这里可以直接调用
|
||||
return send(ctx, request);
|
||||
}
|
||||
|
||||
public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) {
|
||||
// 3 seconds at most
|
||||
for (int i = 0; i < 30; i++) {
|
||||
try {
|
||||
return transactionTemplate.execute(unused -> sendImpl(ctx, request));
|
||||
} catch (DuplicateKeyException e) {
|
||||
log.info("Try to save todo but todo business already exists, request={}. " +
|
||||
"RetryCount={}", request, i);
|
||||
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
throw new ServiceException("服务器内部错误");
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送待办或抄送
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public List<PushPendingMessageDTO> send(TodoRequestContext ctx, PendingMessagePushParam request) {
|
||||
private List<PushPendingMessageDTO> sendImpl(TodoRequestContext ctx, PendingMessagePushParam request) {
|
||||
// 可能会存在重新查询模版的情况, 成本不高
|
||||
MessageTemplateDTO template = messageTemplateNewService
|
||||
.queryEnableTemplateByCode(request.getTemplateCode())
|
||||
@ -120,25 +136,18 @@ public class TodoManager {
|
||||
request.setBizCode(UUIDUtil.uuidString());
|
||||
else if (StringUtils.isBlank(request.getBizCode()))
|
||||
request.setBizCode("");
|
||||
Supplier<TodoBusiness> todoBusinessQueryFun = () -> todoBusinessDao
|
||||
.findByBiz(request.getTemplateCode(), request.getBizCode(), true)
|
||||
.orElse(null);
|
||||
// 如果已经存在对应的待办业务, 就把待办追加到对应的待办业务上
|
||||
// 流程会并发, 这里对业务进行加锁
|
||||
TodoBusiness business = todoBusinessQueryFun.get();
|
||||
TodoBusiness business = todoBusinessDao
|
||||
.findByBiz(request.getTemplateCode(), request.getBizCode(), true)
|
||||
.orElse(null);
|
||||
boolean businessCreated = false;
|
||||
TodoExt ext = new TodoExt(request, template);
|
||||
if (business == null) {
|
||||
business = todoRecordBuilder.buildBusiness(request, ext);
|
||||
business.getRecordExt().setGenBizCode(genBizCode);
|
||||
try {
|
||||
todoBusinessDao.save(business);
|
||||
businessCreated = true;
|
||||
} catch (DuplicateKeyException e) {
|
||||
log.warn("Found duplicated business. Try to find saved one, request={}", request, e);
|
||||
business = todoBusinessQueryFun.get();
|
||||
log.warn("Found duplicated business. Found business={}, request={}", business, request, e);
|
||||
}
|
||||
todoBusinessDao.save(business);
|
||||
businessCreated = true;
|
||||
}
|
||||
//批量默认为false
|
||||
YesOrNo supportBatchProcess = request.getSupportBatchProcess() == null ? YesOrNo.NO : request.getSupportBatchProcess();
|
||||
|
||||
@ -9,8 +9,8 @@ import cn.axzo.msg.center.domain.entity.Todo;
|
||||
import cn.axzo.msg.center.domain.entity.TodoBusiness;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.TodoExt;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Getter;
|
||||
|
||||
|
||||
@ -4,6 +4,7 @@ import cn.axzo.framework.rocketmq.Event;
|
||||
import cn.axzo.framework.rocketmq.EventProducer;
|
||||
import cn.axzo.msg.center.api.mq.MqMessage;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
import cn.axzo.msg.center.service.enums.MqMessageType;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
package cn.axzo.msg.center.mq;
|
||||
package cn.axzo.msg.center.service.enums;
|
||||
|
||||
import cn.axzo.framework.rocketmq.Event;
|
||||
import lombok.AccessLevel;
|
||||
Loading…
Reference in New Issue
Block a user