feat: BUG-10980 清洗156981aecdea44ab8a969b586dc600dd模板的代办消息

This commit is contained in:
lilong 2024-03-18 14:53:15 +08:00
parent a1b2165b39
commit a09acb3634
5 changed files with 81 additions and 283 deletions

View File

@ -0,0 +1,22 @@
package cn.axzo.msg.center.message.controller;
import cn.axzo.msg.center.message.xxl.UpdateOuIdJob;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.validation.Valid;
@RestController
public class PrivateJobController {
@Autowired
private UpdateOuIdJob updateOuIdJob;
@PostMapping("private/ou/update")
public Object updateOuIdJob(@RequestBody @Valid UpdateOuIdJob.UpdateOuIdParam req) throws Exception {
return updateOuIdJob.execute(JSONObject.toJSONString(req));
}
}

View File

@ -1,45 +1,50 @@
package cn.axzo.msg.center.message.migrate;
import cn.axzo.apollo.api.ApolloWorkerTaskOrderApi;
import cn.axzo.apollo.api.rsp.TaskOrderRsp;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.maokai.api.client.CooperateShipQueryApi;
import cn.axzo.maokai.api.client.OrganizationalNodeUserApi;
import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi;
import cn.axzo.maokai.api.vo.request.CooperateShipQueryReq;
import cn.axzo.maokai.api.vo.request.OrganizationalNodeUserBasicQueryVO;
import cn.axzo.maokai.api.vo.response.CooperateShipResp;
import cn.axzo.maokai.api.vo.response.OrganizationalNodeUserBasicVO;
import cn.axzo.maokai.api.vo.response.OrganizationalTeamOuRelationResp;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.dal.PendingMessageRecordDao;
import cn.axzo.msg.center.domain.entity.PendingMessageRecord;
import cn.axzo.msg.center.domain.entity.PendingRecordExt;
import cn.axzo.msg.center.domain.enums.YesNoEnum;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.service.enums.YesOrNo;
import cn.axzo.pluto.api.TeamServiceApi;
import cn.axzo.pluto.teams.common.enums.UserTerminalPerspective;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import jodd.util.concurrent.ThreadFactoryBuilder;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -70,10 +75,6 @@ public class OuIdMigrateService {
@Getter
private volatile boolean isRunning;
private static final Map<Long, DefaultTeamInfo> DEFAULT_TEAM_INFO_MAP = Maps.newConcurrentMap();
private static final Map<Long, OrganizationalTeamOuRelationResp> ORGANIZATIONAL_TEAM_OU_RELATION_RESP_MAP = Maps.newConcurrentMap();
private static final Map<String, CooperateShipResp> COOPERATE_SHIP_RESP_MAP = Maps.newConcurrentMap();
private final ThreadFactory moduleStatisticTaskThreadFactory = ThreadFactoryBuilder.create()
@ -83,92 +84,6 @@ public class OuIdMigrateService {
private final ExecutorService taskThreadPool = new ThreadPoolExecutor(20, 100, 10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000), moduleStatisticTaskThreadFactory);
public String execute(Param param) {
String runningError = "another operation is still running...";
if (isRunning)
return runningError;
synchronized (this) {
if (isRunning)
return runningError;
isRunning = true;
}
try {
log.info("start migrating ou id...");
return executeImpl(param);
} catch (Exception e) {
log.warn("migration error, param={}", param, e);
return Throwables.getStackTraceAsString(e);
} finally {
log.info("finish migrating ou id...");
// a single winner thread can reach this statement
synchronized (this) {
isRunning = false;
}
}
}
private String executeImpl(Param param) {
log.info("receive ou migrate, param={}", param);
if (CollectionUtils.isNotEmpty(param.migrateRecordIds))
return migrateRecords(param.migrateRecordIds);
else if (param.scanMigrate)
return scanAndMigrate();
else {
log.warn("unknown param, param={}", param);
return "unknown command";
}
}
private String migrateRecords(List<Long> ids) {
if (CollectionUtils.isEmpty(ids)) {
return "ids can't be empty";
}
List<PendingMessageRecord> records = pendingMessageRecordDao.lambdaQuery()
.in(PendingMessageRecord::getId, ids)
.list();
if (records.isEmpty())
return String.format("can't find pending records with given ids. ids=%s", JSON.toJSONString(ids));
else {
migrate(records);
return "done!!";
}
}
private String scanAndMigrate() {
Long scrollId = (Long) redisTemplate.opsForValue().get(MIGRATE_OU_CURSOR);
if (scrollId == null || scrollId < 0)
scrollId = 0L;
int count = 0;
List<PendingMessageRecord> records = nextBatch(scrollId);
while (!records.isEmpty()) {
if (Thread.currentThread().isInterrupted()) {
log.info("aborted by user!!");
return "aborted by user!!";
}
migrate(records);
count += records.size();
PendingMessageRecord lastRecord = records.get(records.size() - 1);
scrollId = lastRecord.getId();
records = nextBatch(scrollId);
redisTemplate.opsForValue().set(MIGRATE_OU_CURSOR, scrollId);
if (records.size() < migrateBatchSize)
break;
}
return String.format("migrated %d records", count);
}
private List<PendingMessageRecord> nextBatch(Long scrollId) {
log.info("scanning pending message records, scrollId={}", scrollId);
return pendingMessageRecordDao.lambdaQuery()
.eq(PendingMessageRecord::getIsDelete, TableIsDeleteEnum.NORMAL.value)
.eq(PendingMessageRecord::getIsOuIdMigrated, YesOrNo.NO)
.gt(PendingMessageRecord::getId, scrollId)
.orderByAsc(PendingMessageRecord::getId)
.last("LIMIT " + migrateBatchSize)
.list();
}
public void migrate(List<PendingMessageRecord> records) {
try {
String ids = records.stream()
@ -179,9 +94,6 @@ public class OuIdMigrateService {
// 迁移ou_id=0的
// 逻辑: 通过身份查询人默认的班组获取到班组后再获取班组对应的团队取团队的id
migrateZeroOuId(records);
// 迁移ouId为班组id的情况
// 逻辑获取班组对应的团队取团队的id
migrateTeamIdAsOuId(records);
log.info("end - migrate ou id: {}", ids);
} catch (Exception e) {
log.warn("ouId fail", e);
@ -219,15 +131,21 @@ public class OuIdMigrateService {
DefaultTeamInfo defaultTeam = getDefaultTeamId(identityId);
log.info("zeroOuId, record:{}, defaultTeam:{}", record.getId(), JSONObject.toJSONString(defaultTeam));
if (defaultTeam == null) {
return updateOuId(record);
PendingMessageRecord pendingMessageRecord = new PendingMessageRecord();
pendingMessageRecord.setId(record.getId());
pendingMessageRecord.setIsDelete(YesNoEnum.YES.getCode().longValue());
return pendingMessageRecord;
}
Map<Long, OrganizationalTeamOuRelationResp> teamId2OuInfo =
getOuInfoByTeamId(defaultTeam.teamId);
getOuInfoByTeamId(Lists.newArrayList(defaultTeam.teamId));
OrganizationalTeamOuRelationResp teamInfo = teamId2OuInfo.get(defaultTeam.teamId);
log.info("zeroOuId, record:{}, teamInfo:{}", record.getId(), JSONObject.toJSONString(teamInfo));
if (teamInfo == null) {
return updateOuId(record);
PendingMessageRecord pendingMessageRecord = new PendingMessageRecord();
pendingMessageRecord.setId(record.getId());
pendingMessageRecord.setIsDelete(YesNoEnum.YES.getCode().longValue());
return pendingMessageRecord;
} else {
return tryUpdate(record, teamInfo.getOuId(), String.format(
"找到了默认团队, 并通过身份id找到默认团队, 再找到对应的单位. teamId=%d, ouId=%d, 视角=%s, 视角type=%s",
@ -240,185 +158,47 @@ public class OuIdMigrateService {
})
.filter(Objects::nonNull)
.collect(toList());
pendingMessageRecordDao.updateBatchById(update);
}
private PendingMessageRecord updateOuId(PendingMessageRecord record) {
CooperateShipResp cooperateShip = getCooperateShipResp(record);
log.info("zeroOuId, record:{}, cooperateShip:{}", record.getId(), JSONObject.toJSONString(cooperateShip));
if (cooperateShip == null) {
PendingMessageRecord.RouterParams routerParams = record.resolveRouterParams();
String taskNo = Optional.ofNullable(routerParams.getTaskNo())
.orElseGet(() -> routerParams.getTaskId());
if (StringUtils.isBlank(taskNo)) {
return tryUpdate(record, null, "routerParams里面的taskNo不存在");
}
TaskOrderRsp taskOrderRsp = Optional.ofNullable(apolloWorkerTaskOrderApi.listTaskOrderInfo(Lists.newArrayList(taskNo)).getData())
.map(data -> data.stream()
.findFirst()
.orElse(null))
.orElse(null);
if (taskOrderRsp == null) {
return tryUpdate(record, null, "未通过bizCode找到对应的taskOrder");
}
Long orgNodeId = null;
if (Sets.newHashSet("fdf662e201c945dba040fc54db50a702",
"b8115314233d478ca70e9f50ca0b0dc3",
"a97ff39e4d39484ab7fccc55d50ea714",
"5791020e815741e4876328c98aa3bd34").contains(record.getTemplateCode())) {
orgNodeId = taskOrderRsp.getDistributorOrgNodeId();
} else if (Sets.newHashSet("f40eef2ee88e4a80a699389d009a1561",
"d1d4793c84e14d20ae1cbc399338efa3").contains(record.getTemplateCode())) {
orgNodeId = taskOrderRsp.getRecipientOrgNodeId();
}
if (orgNodeId == null) {
return tryUpdate(record, null, "模板code未找到对应的taskOrderNoderId");
}
OrganizationalNodeUserBasicQueryVO organizationalNodeUserBasicQueryVO = new OrganizationalNodeUserBasicQueryVO();
organizationalNodeUserBasicQueryVO.setNodeIds(Lists.newArrayList(orgNodeId));
organizationalNodeUserBasicQueryVO.setPersonIds(Sets.newHashSet(record.getExecutorPersonId()));
Optional<OrganizationalNodeUserBasicVO> nodeUserBasic = organizationalNodeUserApi.queryUserBasic(organizationalNodeUserBasicQueryVO).getData()
.stream()
.findFirst();
if (nodeUserBasic.isPresent()) {
return tryUpdate(record, nodeUserBasic.get().getOrganizationalUnitId(), "通过taskOrder的nodeId找到nodeUser的ouId");
} else {
return tryUpdate(record, null, "未通过taskOrder的orgNodeId找到nodeUser");
}
} else {
return tryUpdate(record, cooperateShip.getOrganizationalUnitId(), "通过cooperate ship找到对应的单位");
List<Long> removeIds = update.stream()
.filter(record -> Objects.equals(record.getIsDelete(), YesNoEnum.YES.getCode().longValue()))
.map(PendingMessageRecord::getId)
.collect(toList());
if (!CollectionUtils.isEmpty(removeIds)) {
pendingMessageRecordDao.removeByIds(removeIds);
}
}
@Nullable
private CooperateShipResp getCooperateShipResp(PendingMessageRecord record) {
CooperateShipResp cooperateShipResp = COOPERATE_SHIP_RESP_MAP.get(record.getOrgId() + "_" + record.getExecutorPersonId());
if (cooperateShipResp != null) {
return cooperateShipResp;
List<PendingMessageRecord> updates = update.stream()
.filter(record -> !Objects.equals(record.getIsDelete(), YesNoEnum.YES.getCode().longValue()))
.collect(toList());
if (!CollectionUtils.isEmpty(updates)) {
pendingMessageRecordDao.updateBatchById(updates);
}
CooperateShipResp cooperateShip = cooperateShipQueryApi.genericQuery(
CooperateShipQueryReq.builder()
.workspaceId(record.getOrgId())
.personId(record.getExecutorPersonId())
.build())
.getData()
.stream()
.findFirst()
.orElse(null);
if (cooperateShip == null) {
return cooperateShip;
}
COOPERATE_SHIP_RESP_MAP.put(record.getOrgId() + "_" + record.getExecutorPersonId(), cooperateShip);
return cooperateShip;
}
@Nullable
private DefaultTeamInfo getDefaultTeamId(Long identityId) {
Integer successCode = 200;
DefaultTeamInfo defaultTeamInfo = DEFAULT_TEAM_INFO_MAP.get(identityId);
if (defaultTeamInfo != null) {
return defaultTeamInfo;
}
CommonResponse<Long> resp1 = teamServiceApi.getDefaultTeam(
identityId, UserTerminalPerspective.LEADER_PERSPECTIVE.code);
if (successCode.equals(resp1.getCode())) {
defaultTeamInfo = new DefaultTeamInfo(resp1.getData(), UserTerminalPerspective.LEADER_PERSPECTIVE, "工人端-班组长视角");
DEFAULT_TEAM_INFO_MAP.put(identityId, defaultTeamInfo);
return defaultTeamInfo;
return new DefaultTeamInfo(resp1.getData(), UserTerminalPerspective.LEADER_PERSPECTIVE, "工人端-班组长视角");
}
CommonResponse<Long> resp2 = teamServiceApi.getDefaultTeam(
identityId, UserTerminalPerspective.CMP_LEADER_PERSPECTIVE.code);
if (successCode.equals(resp2.getCode())) {
defaultTeamInfo = new DefaultTeamInfo(resp2.getData(), UserTerminalPerspective.CMP_LEADER_PERSPECTIVE, "管理端-班b组长视角");
DEFAULT_TEAM_INFO_MAP.put(identityId, defaultTeamInfo);
return defaultTeamInfo;
return new DefaultTeamInfo(resp2.getData(), UserTerminalPerspective.CMP_LEADER_PERSPECTIVE, "管理端-班b组长视角");
}
CommonResponse<Long> resp3 = teamServiceApi.getDefaultTeam(
identityId, UserTerminalPerspective.WORKER_PERSPECTIVE.code);
if (successCode.equals(resp3.getCode())) {
defaultTeamInfo = new DefaultTeamInfo(resp3.getData(), UserTerminalPerspective.WORKER_PERSPECTIVE, "工人端-工人视角");
DEFAULT_TEAM_INFO_MAP.put(identityId, defaultTeamInfo);
return defaultTeamInfo;
return new DefaultTeamInfo(resp3.getData(), UserTerminalPerspective.WORKER_PERSPECTIVE, "工人端-工人视角");
}
return null;
}
private void migrateTeamIdAsOuId(List<PendingMessageRecord> records) throws InterruptedException, ExecutionException {
// 老数据可能把班组id当成ouId存到待办中了
List<PendingMessageRecord> maybeTeamIdAsOuIdRecords = records.stream()
.filter(i -> i.getOuId() > 0L)
.collect(toList());
if (maybeTeamIdAsOuIdRecords.isEmpty()) return;
List<Long> maybeTeamIds = maybeTeamIdAsOuIdRecords.stream()
.map(PendingMessageRecord::getOuId)
.distinct()
.collect(toList());
Map<Long, OrganizationalTeamOuRelationResp> teamId2OuInfo = getOuInfoByTeamIds(maybeTeamIds);
if (teamId2OuInfo.isEmpty()) return;
List<List<PendingMessageRecord>> partitions = Lists
.partition(maybeTeamIdAsOuIdRecords, 20);
List<CompletableFuture> futures = new ArrayList<>();
for (int i = 0; i < partitions.size(); i++) {
List<PendingMessageRecord> dataList = partitions.get(i);
futures.add(CompletableFuture.runAsync(() ->
updateNotZero(dataList, teamId2OuInfo), taskThreadPool));
}
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get();
}
private void updateNotZero(List<PendingMessageRecord> records,
Map<Long, OrganizationalTeamOuRelationResp> teamId2OuInfo) {
List<PendingMessageRecord> update = records.stream()
.map(record -> {
try {
// 如果能通过ouId拿到对应的平台班组信息, 说明待办中ouId被存成了班组的平台id
// 因此在这里修改为正确的ouId
Long teamId = record.getOuId();
OrganizationalTeamOuRelationResp teamInfo = teamId2OuInfo.get(teamId);
if (teamInfo != null) {
return tryUpdate(record, teamInfo.getOuId(), String.format("直接通过团队id找到ouId. teamId=%d", teamId));
}
} catch (Exception e) {
log.warn("ouId fail, recordId:{}; e:{}", record.getId(), e);
}
return null;
})
.filter(Objects::nonNull)
.collect(toList());
pendingMessageRecordDao.updateBatchById(update);
}
@NotNull
private Map<Long, OrganizationalTeamOuRelationResp> getOuInfoByTeamIds(List<Long> maybeTeamId) {
private Map<Long, OrganizationalTeamOuRelationResp> getOuInfoByTeamId(List<Long> maybeTeamIds) {
ApiResult<List<OrganizationalTeamOuRelationResp>> maybeTeamIdsResp =
organizationalTeamOuRelationApi.getByTeamOuIds(Lists.newArrayList(maybeTeamId));
return BizAssertions
.assertResponse(maybeTeamIdsResp, "can't get plat team info").stream()
.collect(Collectors.toMap(OrganizationalTeamOuRelationResp::getTeamOuId, Function.identity()));
}
@NotNull
private Map<Long, OrganizationalTeamOuRelationResp> getOuInfoByTeamId(Long maybeTeamId) {
OrganizationalTeamOuRelationResp cached = ORGANIZATIONAL_TEAM_OU_RELATION_RESP_MAP.get(maybeTeamId);
if (cached != null) {
Map<Long, OrganizationalTeamOuRelationResp> result = Maps.newHashMap();
result.put(maybeTeamId, cached);
return result;
}
ApiResult<List<OrganizationalTeamOuRelationResp>> maybeTeamIdsResp =
organizationalTeamOuRelationApi.getByTeamOuIds(Lists.newArrayList(maybeTeamId));
if (CollectionUtils.isNotEmpty(maybeTeamIdsResp.getData())) {
ORGANIZATIONAL_TEAM_OU_RELATION_RESP_MAP.put(maybeTeamId, maybeTeamIdsResp.getData().get(0));
}
organizationalTeamOuRelationApi.getByTeamOuIds(Lists.newArrayList(maybeTeamIds));
return BizAssertions
.assertResponse(maybeTeamIdsResp, "can't get plat team info").stream()
.collect(Collectors.toMap(OrganizationalTeamOuRelationResp::getTeamOuId, Function.identity()));

View File

@ -53,27 +53,22 @@ public class UpdateOuIdJob extends IJobHandler {
.orElseGet(() -> UpdateOuIdParam.builder().build());
long pageNumber = 1;
long minId = 0;
while (true) {
PendingMessageRecordService.PendingMessageRecordPageReq req = PendingMessageRecordService.PendingMessageRecordPageReq.builder()
.isOuIdMigrated(YesOrNo.NO)
.ids(updateOuIdParam.getIds())
.idGT(minId)
.templateCodes(UpdateRouterJob.TEMPLATE_CODES)
.build();
req.setPage(pageNumber);
req.setPage(pageNumber++);
req.setPageSize(DEFAULT_PAGE_SIZE);
Page<PendingMessageRecord> page = pendingMessageRecordService.page(req);
log.info("start updateOuIdJob,pageNumber:{}, pageListSize:{}", pageNumber, page.getList().size());
if (CollectionUtils.isNotEmpty(page.getList())) {
log.info("updateOuIdJob begin updateExecutorPerson");
updateExecutorPerson(page.getList());
log.info("updateOuIdJob begin migrate");
// 一条一个更新需要优化
ouIdMigrateService.migrate(page.getList());
log.info("updateOuIdJob end migrate");
minId = page.getList().get(page.getList().size() - 1).getId();
}
if (!UpdateRouterJob.hasNext(page)) {

View File

@ -40,16 +40,7 @@ public class UpdateRouterJob extends IJobHandler {
/**
* 只需要清洗templateCode
*/
public static final List<String> TEMPLATE_CODES = Lists.newArrayList("c3a4579f7f004af3bdeabe8f79ffcf1b",
"d1d4793c84e14d20ae1cbc399338efa3",
"b8115314233d478ca70e9f50ca0b0dc3",
"a97ff39e4d39484ab7fccc55d50ea714",
"f40eef2ee88e4a80a699389d009a1561",
"e67a037581f948ec9343e2217b828034",
"5791020e815741e4876328c98aa3bd34",
"fdf662e201c945dba040fc54db50a702",
"52ae3e8ec48242e485e9389202e102ce",
"e5571b2bec7c433d997f70856ecd2929");
public static final List<String> TEMPLATE_CODES = Lists.newArrayList("156981aecdea44ab8a969b586dc600dd");
@XxlJob("updateRouterJob")
@Override

View File

@ -1,13 +1,23 @@
package cn.axzo.msg.center.domain.entity;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.service.enums.*;
import cn.axzo.msg.center.service.enums.BizCategoryEnum;
import cn.axzo.msg.center.service.enums.BizFinalStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
import cn.axzo.msg.center.service.enums.YesOrNo;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import java.io.Serializable;
import java.util.Date;