REQ-2135: fix bugs
This commit is contained in:
parent
2618b57e88
commit
01aae2d782
@ -15,7 +15,7 @@ import java.util.List;
|
||||
class StateAdvanceResult {
|
||||
private final boolean advanced;
|
||||
private final TodoBusiness business;
|
||||
private final List<Todo> todos;
|
||||
private final List<Todo> advancedTodos;
|
||||
|
||||
Long getBusinessId() {
|
||||
return business == null ? 0L : business.getId();
|
||||
|
||||
@ -13,6 +13,7 @@ import cn.axzo.msg.center.inside.notices.config.PendingMessageBizConfig;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
|
||||
import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
|
||||
import cn.axzo.msg.center.message.service.MessageTemplateNewService;
|
||||
import cn.axzo.msg.center.message.service.todo.manage.event.NewTodoEvent;
|
||||
import cn.axzo.msg.center.mq.MqMessageRecord;
|
||||
import cn.axzo.msg.center.mq.MqMessageType;
|
||||
import cn.axzo.msg.center.mq.MqProducer;
|
||||
@ -31,13 +32,18 @@ import cn.axzo.msg.center.utils.DateFormatUtil;
|
||||
import cn.axzo.msg.center.utils.QueryFormatter;
|
||||
import cn.axzo.msg.center.utils.UUIDUtil;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.joda.time.DateTime;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@ -68,7 +74,7 @@ public class TodoManager {
|
||||
private final MqProducer mqProducer;
|
||||
private final ErrorAssembler errorAssembler;
|
||||
private final PendingMessageBizConfig cfg;
|
||||
private final TodoPushClient todoPushClient;
|
||||
private final ApplicationContext applicationContext;
|
||||
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public List<PushPendingMessageDTO> send(PendingMessagePushParam request) {
|
||||
@ -138,7 +144,8 @@ public class TodoManager {
|
||||
if (businessCreated)
|
||||
todoLogger.logBusinessUpdated(ctx, business);
|
||||
todoLogger.logTodosUpdated(ctx, todos);
|
||||
todoPushClient.sendPush(template, todos);
|
||||
applicationContext.publishEvent(
|
||||
new NewTodoEvent(this, template, todos));
|
||||
return todos.stream()
|
||||
.map(todo -> new PushPendingMessageDTO(
|
||||
todo.getId(), todo.getIdentityCode(),
|
||||
@ -162,7 +169,7 @@ public class TodoManager {
|
||||
.set(Todo::getState, PendingMessageStateEnum.COMPLETED));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -175,17 +182,18 @@ public class TodoManager {
|
||||
StateAdvanceResult advanceResult = advanceState(ctx, execAdvanceBuilder()
|
||||
.eq(Todo::getId, request.getId())
|
||||
.set(Todo::getState, PendingMessageStateEnum.COMPLETED));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
if (request.getBizExtParams() != null) {
|
||||
boolean businessUpdated = false;
|
||||
if (advanceResult.getBusiness() != null && request.getBizExtParams() != null) {
|
||||
JSONObject bizExtParams = JSONUtils.parseObjectOrThrow(
|
||||
"bizExtParams", request.getBizExtParams());
|
||||
todoBusinessDao.updateBizExtParams(advanceResult.getBusinessId(), bizExtParams);
|
||||
businessUpdated = todoBusinessDao.updateBizExtParams(advanceResult.getBusinessId(), bizExtParams);
|
||||
ctx.addLogContent("bizExtParam", bizExtParams);
|
||||
ctx.addLogContent("businessUpdated", businessUpdated);
|
||||
todoLogger.logBusinessUpdated(ctx, advanceResult.getBusiness());
|
||||
}
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getTodos());
|
||||
return true;
|
||||
if (advanceResult.isAdvanced())
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getAdvancedTodos());
|
||||
return advanceResult.isAdvanced() || businessUpdated;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -198,17 +206,18 @@ public class TodoManager {
|
||||
.eq(Todo::getTemplateCode, request.getTemplateCode())
|
||||
.eq(Todo::getBizCode, request.getBizCode())
|
||||
.set(Todo::getState, PendingMessageStateEnum.COMPLETED));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
if (request.determineUpdateFinalBizState()) {
|
||||
todoBusinessDao.updateBizFinalState(
|
||||
boolean businessUpdated = false;
|
||||
if (advanceResult.getBusiness() != null && request.determineUpdateFinalBizState()) {
|
||||
businessUpdated = todoBusinessDao.updateBizFinalState(
|
||||
advanceResult.getBusinessId(), request.getBizFinalStateEnum());
|
||||
ctx.addLogContent("bizFinalState", request.getBizFinalStateEnum());
|
||||
ctx.addLogContent("businessId", advanceResult.getBusiness().getId());
|
||||
ctx.addLogContent("businessUpdated", businessUpdated);
|
||||
todoLogger.logBusinessUpdated(ctx, advanceResult.getBusiness());
|
||||
}
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getTodos());
|
||||
return true;
|
||||
if (advanceResult.isAdvanced())
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getAdvancedTodos());
|
||||
return advanceResult.isAdvanced() || businessUpdated;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -224,7 +233,7 @@ public class TodoManager {
|
||||
.set(Todo::getState, PendingMessageStateEnum.COMPLETED));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -243,7 +252,7 @@ public class TodoManager {
|
||||
.set(Todo::getState, PendingMessageStateEnum.RETRACT));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -258,7 +267,7 @@ public class TodoManager {
|
||||
.set(Todo::getState, PendingMessageStateEnum.RETRACT));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -273,7 +282,7 @@ public class TodoManager {
|
||||
.set(Todo::getState, PendingMessageStateEnum.RETRACT));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -289,7 +298,7 @@ public class TodoManager {
|
||||
.set(Todo::getState, PendingMessageStateEnum.RETRACT));
|
||||
if (!advanceResult.isAdvanced())
|
||||
return false;
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoRevoked(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -382,7 +391,7 @@ public class TodoManager {
|
||||
}
|
||||
// 如果不是重复发送, 就只记一条日志. 所以这个记录日志不能提前
|
||||
if (advanceResult.isAdvanced())
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodoCompleted(ctx, advanceResult.getAdvancedTodos());
|
||||
return isAdvancedOrCompleted;
|
||||
}
|
||||
|
||||
@ -449,41 +458,39 @@ public class TodoManager {
|
||||
errorAssembler.exposeSetCopiedToMeReadFailReason(identityCode);
|
||||
return false;
|
||||
}
|
||||
todoLogger.logTodosUpdated(ctx, advanceResult.getTodos());
|
||||
todoLogger.logTodosUpdated(ctx, advanceResult.getAdvancedTodos());
|
||||
return true;
|
||||
}
|
||||
|
||||
// !! advance state
|
||||
|
||||
private StateAdvanceResult advanceState(TodoRequestContext ctx, StateAdvanceBuilder builder) {
|
||||
Runnable advanceFailLogger = () -> {
|
||||
Supplier<BusinessTodos> advanceFailLogger = () -> {
|
||||
// rerun no state query in case of race condition
|
||||
List<Todo> noStateTodos = builder.getNoStateQuery()
|
||||
.orderByDesc(Todo::getId)
|
||||
.list();
|
||||
Todo currentStateSample = noStateTodos.isEmpty() ? null : noStateTodos.get(0);
|
||||
String failReason = currentStateSample == null ? "没有找到对应的待办" : "起初状态不匹配";
|
||||
BusinessTodos noStateBusinessTodos = getBusinessTodos(builder.getNoStateQuery());
|
||||
String failReason = noStateBusinessTodos.sampleTodo() == null ? "没有找到对应的待办" : "起初状态不匹配";
|
||||
ctx.addLogContent("stateAdvanced", false);
|
||||
if (!noStateTodos.isEmpty()) {
|
||||
if (!noStateBusinessTodos.todos.isEmpty()) {
|
||||
HashMap<String, Object> fail = new HashMap<>();
|
||||
fail.put("failReason", failReason);
|
||||
fail.put("currentStateSample", currentStateSample);
|
||||
fail.put("currentStateSample", noStateBusinessTodos.sampleTodo());
|
||||
ctx.addLogContent("stateAdvanceFail", fail);
|
||||
todoLogger.logTodosUpdated(ctx, noStateTodos);
|
||||
todoLogger.logTodosUpdated(ctx, noStateBusinessTodos.todos);
|
||||
}
|
||||
// throw an error? dunno
|
||||
log.warn("尝试推进待办状态, 但是 {}. ctx={}, query={}, currentStateSample={}",
|
||||
failReason, ctx, QueryFormatter.format(builder.getQuery()), currentStateSample);
|
||||
failReason, ctx, QueryFormatter.format(builder.getQuery()), noStateBusinessTodos.sampleTodo());
|
||||
return noStateBusinessTodos;
|
||||
};
|
||||
List<Todo> todos = builder.getQuery().list();
|
||||
if (todos.isEmpty()) {
|
||||
advanceFailLogger.run();
|
||||
return new StateAdvanceResult(false, null, Collections.emptyList());
|
||||
BusinessTodos businessTodos = getBusinessTodos(builder.getQuery());
|
||||
if (businessTodos.todos.isEmpty()) {
|
||||
BusinessTodos noStateBusinessTodos = advanceFailLogger.get();
|
||||
return new StateAdvanceResult(false, noStateBusinessTodos.business, Collections.emptyList());
|
||||
}
|
||||
boolean updated = builder.getUpdate().update();
|
||||
// race condition
|
||||
if (updated && builder.isSendPullNotification()) {
|
||||
Set<Long> executorPersonIds = todos.stream()
|
||||
Set<Long> executorPersonIds = businessTodos.todos.stream()
|
||||
.map(Todo::getExecutorPersonId)
|
||||
.collect(toSet());
|
||||
pullTodoBroadcaster.fireTodoChanged(executorPersonIds);
|
||||
@ -491,10 +498,8 @@ public class TodoManager {
|
||||
if (updated)
|
||||
ctx.addLogContent("stateAdvanced", true);
|
||||
else
|
||||
advanceFailLogger.run();
|
||||
Todo sample = todos.get(0);
|
||||
TodoBusiness business = todoBusinessDao.getById(sample.getTodoBusinessId());
|
||||
return new StateAdvanceResult(updated, business, todos);
|
||||
advanceFailLogger.get();
|
||||
return new StateAdvanceResult(updated, businessTodos.business, businessTodos.todos);
|
||||
}
|
||||
|
||||
private StateAdvanceBuilder execAdvanceBuilder() {
|
||||
@ -513,4 +518,28 @@ public class TodoManager {
|
||||
.eq(Todo::getIsDelete, TableIsDeleteEnum.NORMAL.value);
|
||||
}
|
||||
|
||||
BusinessTodos getBusinessTodos(LambdaQueryChainWrapper<Todo> query) {
|
||||
List<Todo> todos = query
|
||||
.orderByDesc(Todo::getId)
|
||||
.list();
|
||||
if (todos == null)
|
||||
return new BusinessTodos(null, null);
|
||||
TodoBusiness business = null;
|
||||
if (CollectionUtils.isNotEmpty(todos)) {
|
||||
Todo sample = todos.get(0);
|
||||
business = todoBusinessDao.getById(sample.getTodoBusinessId());
|
||||
}
|
||||
return new BusinessTodos(business, todos);
|
||||
}
|
||||
|
||||
@Getter
|
||||
@RequiredArgsConstructor
|
||||
static class BusinessTodos {
|
||||
private final TodoBusiness business;
|
||||
private final List<Todo> todos;
|
||||
|
||||
Todo sampleTodo() {
|
||||
return CollectionUtils.isEmpty(todos) ? null : todos.get(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,33 @@
|
||||
package cn.axzo.msg.center.message.service.todo.manage.event;
|
||||
|
||||
import cn.axzo.msg.center.domain.entity.Todo;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import lombok.Getter;
|
||||
import org.springframework.context.ApplicationEvent;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Getter
|
||||
public class NewTodoEvent extends ApplicationEvent {
|
||||
private final MessageTemplateDTO template;
|
||||
private final List<Todo> todos;
|
||||
|
||||
public NewTodoEvent(Object source, MessageTemplateDTO template, List<Todo> todos) {
|
||||
super(source);
|
||||
this.template = template;
|
||||
this.todos = todos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
HashMap<String, Object> fields = new HashMap<>();
|
||||
fields.put("template", template);
|
||||
fields.put("todos", todos);
|
||||
return JSON.toJSONString(fields);
|
||||
}
|
||||
}
|
||||
@ -1,15 +1,19 @@
|
||||
package cn.axzo.msg.center.message.service.todo.manage;
|
||||
package cn.axzo.msg.center.message.service.todo.manage.event;
|
||||
|
||||
import cn.axzo.msg.center.domain.entity.Todo;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateDTO;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO.MessageRouteDetailDTO;
|
||||
import cn.axzo.msg.center.message.domain.dto.MessageTemplateRouterDTO.MessageRouterConfigDTO;
|
||||
import cn.axzo.msg.center.message.service.youmeng.YoumengPush;
|
||||
import cn.axzo.msg.center.message.service.youmeng.YoumengPush.PlatformUrl;
|
||||
import cn.axzo.msg.center.message.service.youmeng.YoumengTemplateClient;
|
||||
import cn.axzo.msg.center.utils.MessageRouterUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -19,30 +23,33 @@ import java.util.List;
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
class TodoPushClient {
|
||||
class TodoPushSender implements ApplicationListener<NewTodoEvent> {
|
||||
|
||||
private final YoumengTemplateClient youmengTemplateClient;
|
||||
|
||||
void sendPush(MessageTemplateDTO template, List<Todo> todos) {
|
||||
@Override
|
||||
public void onApplicationEvent(@NotNull NewTodoEvent event) {
|
||||
log.info("Prepare sending todo push. event={}", event);
|
||||
ArrayList<YoumengPush> pushes = new ArrayList<>();
|
||||
for (Todo todo : todos) {
|
||||
for (Todo todo : event.getTodos()) {
|
||||
YoumengPush push = new YoumengPush();
|
||||
push.setTitle(todo.getTitle());
|
||||
push.setContent(todo.getContent());
|
||||
push.setReceiverPersonId(todo.getExecutorPersonId());
|
||||
push.setReceiverOuId(todo.getOuId());
|
||||
push.setPlatforms(getPlatformUrl(template, todo));
|
||||
push.setPlatforms(getPlatformUrl(event.getTemplate(), todo));
|
||||
pushes.add(push);
|
||||
}
|
||||
youmengTemplateClient.asyncSend(template.getId(), pushes);
|
||||
youmengTemplateClient.asyncSend(event.getTemplate().getId(), pushes);
|
||||
}
|
||||
|
||||
private List<PlatformUrl> getPlatformUrl(MessageTemplateDTO template, Todo todo) {
|
||||
if (template.getMsgTemplateRouter() == null)
|
||||
return Collections.emptyList();
|
||||
MessageTemplateRouterDTO.MessageRouteDetailDTO detail = template.getMsgTemplateRouter().getRouteDetail();
|
||||
MessageRouteDetailDTO detail = template.getMsgTemplateRouter().getRouteDetail();
|
||||
if (detail == null)
|
||||
return Collections.emptyList();
|
||||
List<MessageRouterConfigDTO> routerConfigs = detail.getRouterConfigs();
|
||||
@ -57,5 +64,4 @@ class TodoPushClient {
|
||||
platforms.add(new PlatformUrl(routerCfg.getTerminalType(), routerCfg.getUrl()));
|
||||
return platforms;
|
||||
}
|
||||
|
||||
}
|
||||
@ -40,9 +40,9 @@ public class YoumengTemplateClient {
|
||||
private final MessageBaseTemplateMapper messageBaseTemplateMapper;
|
||||
|
||||
private final ExecutorService executor = new ThreadPoolExecutor(
|
||||
1, 15,
|
||||
5, 10,
|
||||
5, TimeUnit.MINUTES,
|
||||
new ArrayBlockingQueue<>(1000),
|
||||
new ArrayBlockingQueue<>(500),
|
||||
new NamedThreadFactory(getClass().getSimpleName()));
|
||||
|
||||
public void asyncSend(Long templateId, List<YoumengPush> pushes) {
|
||||
@ -72,13 +72,13 @@ public class YoumengTemplateClient {
|
||||
log.info("Skip sending push, because push terminals is empty.");
|
||||
return;
|
||||
}
|
||||
JSONObject pushDataObj = template.getPushData();
|
||||
if (pushDataObj == null) {
|
||||
JSONObject pushCfgObj = template.getPushData();
|
||||
if (pushCfgObj == null) {
|
||||
log.info("Skip sending push because push configuration is missing. template={}", template);
|
||||
return;
|
||||
}
|
||||
PushData pushData = pushDataObj.toJavaObject(PushData.class);
|
||||
if (!pushData.isSwitchOn()) {
|
||||
PushData pushCfg = pushCfgObj.toJavaObject(PushData.class);
|
||||
if (!pushCfg.isSwitchOn()) {
|
||||
log.info("Skip sending push because push is turned off. template={}", template);
|
||||
return;
|
||||
}
|
||||
@ -94,7 +94,7 @@ public class YoumengTemplateClient {
|
||||
.appClient(terminal.toAppType().getCode())
|
||||
.m(push.getContent())
|
||||
.m3(push.getReceiverPersonId() + "")
|
||||
.m2(configurePlatforms(pushData, push))
|
||||
.m2(configurePlatforms(pushCfg, push))
|
||||
// 为了兼容老版本,保证老版本的工人端能收到push,所以新的push都alias都是person
|
||||
.t("not_identity" + push.getReceiverPersonId())
|
||||
.ouId(push.determineReceiverOuId())
|
||||
|
||||
@ -57,15 +57,15 @@ public class TodoBusinessDao extends ServiceImpl<TodoBusinessMapper, TodoBusines
|
||||
.list();
|
||||
}
|
||||
|
||||
public void updateBizExtParams(Long businessId, JSONObject bizExtParams) {
|
||||
public boolean updateBizExtParams(Long businessId, JSONObject bizExtParams) {
|
||||
TodoBusiness update = new TodoBusiness();
|
||||
update.setId(businessId);
|
||||
update.setBizExtParam(bizExtParams);
|
||||
updateById(update);
|
||||
return updateById(update);
|
||||
}
|
||||
|
||||
public void updateBizFinalState(Long businessId, BizFinalStateEnum bizFinalState) {
|
||||
lambdaUpdate()
|
||||
public boolean updateBizFinalState(Long businessId, BizFinalStateEnum bizFinalState) {
|
||||
return lambdaUpdate()
|
||||
.eq(TodoBusiness::getId, businessId)
|
||||
.set(TodoBusiness::getBizFinalState, bizFinalState)
|
||||
.update();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user