REQ-2010: 刷ouId

This commit is contained in:
yanglin 2024-02-26 11:32:09 +08:00
parent cf49d8faae
commit cc5cd5687a
11 changed files with 412 additions and 28 deletions

View File

@ -1,5 +1,14 @@
## REQ-2097 ## REQ-2097
```mysql ```mysql
ALTER TABLE pending_message_record ALTER TABLE pending_message_record
ADD hide_until DATETIME NULL COMMENT '在这个时间之前都不显示'; ADD hide_until DATETIME NULL COMMENT '在这个时间之前都不显示';
```
### hotfix/ou_issue_20240220
```mysql
ALTER TABLE pending_message_record
ADD is_ou_id_migrated VARCHAR(10) DEFAULT 'NO' NOT NULL COMMENT 'ouId是否已经迁移',
ADD record_ext JSON NULL COMMENT '保存额外的数据信息';
``` ```

View File

@ -21,6 +21,18 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>cn.axzo.maokai</groupId>
<artifactId>maokai-api</artifactId>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.pluto</groupId>
<artifactId>pluto-api</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.json</groupId> <groupId>org.json</groupId>
<artifactId>json</artifactId> <artifactId>json</artifactId>

View File

@ -7,6 +7,7 @@ import cn.axzo.msg.center.domain.entity.BizEventMapping;
import cn.axzo.msg.center.domain.entity.MessageBaseTemplate; import cn.axzo.msg.center.domain.entity.MessageBaseTemplate;
import cn.axzo.msg.center.domain.entity.RelationTemplateMap; import cn.axzo.msg.center.domain.entity.RelationTemplateMap;
import cn.axzo.msg.center.domain.enums.BizActionCategory; import cn.axzo.msg.center.domain.enums.BizActionCategory;
import cn.axzo.msg.center.message.migrate.OuIdMigrateService;
import cn.axzo.msg.center.service.bizevent.request.ReachDto; import cn.axzo.msg.center.service.bizevent.request.ReachDto;
import cn.azxo.framework.common.model.CommonResponse; import cn.azxo.framework.common.model.CommonResponse;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
@ -26,6 +27,7 @@ import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
/** /**
@ -38,6 +40,7 @@ import org.springframework.web.bind.annotation.RestController;
@RequestMapping("/private/message") @RequestMapping("/private/message")
public class PrivateMessageController { public class PrivateMessageController {
private final OuIdMigrateService ouIdMigrateService;
private final BizEventMappingDao bizEventMappingDao; private final BizEventMappingDao bizEventMappingDao;
private final RelationTemplateMapDao relationTemplateMapDao; private final RelationTemplateMapDao relationTemplateMapDao;
private final MessageBaseTemplateDao messageBaseTemplateDao; private final MessageBaseTemplateDao messageBaseTemplateDao;
@ -120,4 +123,13 @@ public class PrivateMessageController {
private List<Long> originalRelationIds; private List<Long> originalRelationIds;
} }
@PostMapping("/migrateOuId")
public String migrateOuId(
@RequestParam("token") String token,
@RequestBody OuIdMigrateService.Param param) {
if (!"13145".equals(token))
return "invalid token";
return ouIdMigrateService.execute(param);
}
} }

View File

@ -0,0 +1,285 @@
package cn.axzo.msg.center.message.migrate;
import cn.axzo.framework.domain.web.result.ApiResult;
import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi;
import cn.axzo.maokai.api.vo.response.OrganizationalTeamOuRelationResp;
import cn.axzo.msg.center.common.enums.TableIsDeleteEnum;
import cn.axzo.msg.center.common.utils.BizAssertions;
import cn.axzo.msg.center.dal.PendingMessageRecordDao;
import cn.axzo.msg.center.domain.entity.PendingMessageRecord;
import cn.axzo.msg.center.domain.entity.PendingRecordExt;
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
import cn.axzo.msg.center.service.enums.YesOrNo;
import cn.axzo.pluto.api.TeamServiceApi;
import cn.axzo.pluto.teams.common.enums.UserTerminalPerspective;
import cn.azxo.framework.common.model.CommonResponse;
import com.alibaba.fastjson.JSON;
import com.google.common.base.Throwables;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
/**
* @author yanglin
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class OuIdMigrateService {
private static final String MIGRATE_OU_CURSOR = "msg_center_migrate_ou_cursor";
private final RedisTemplate<Object, Object> redisTemplate;
private final PendingMessageRecordDao pendingMessageRecordDao;
private final OrganizationalTeamOuRelationApi organizationalTeamOuRelationApi;
private final TeamServiceApi teamServiceApi;
@Value("${message.pending.ouMigrateBatchSize:100}")
private final Long migrateBatchSize;
@Getter
private volatile boolean isRunning;
public String execute(Param param) {
String runningError = "another operation is still running...";
if (isRunning)
return runningError;
synchronized (this) {
if (isRunning)
return runningError;
isRunning = true;
}
try {
log.info("start migrating ou id...");
return executeImpl(param);
} catch (Exception e) {
log.warn("migration error, param={}", param, e);
return Throwables.getStackTraceAsString(e);
} finally {
log.info("finish migrating ou id...");
// a single winner thread can reach this statement
synchronized (this) {
isRunning = false;
}
}
}
private String executeImpl(Param param) {
log.info("receive ou migrate, param={}", param);
if (CollectionUtils.isNotEmpty(param.migrateRecordIds))
return migrateRecords(param.migrateRecordIds);
else if (param.scanMigrate)
return scanAndMigrate();
else {
log.warn("unknown param, param={}", param);
return "unknown command";
}
}
private String migrateRecords(List<Long> ids) {
if (CollectionUtils.isEmpty(ids)) {
return "ids can't be empty";
}
List<PendingMessageRecord> records = pendingMessageRecordDao.lambdaQuery()
.in(PendingMessageRecord::getId, ids)
.list();
if (records.isEmpty())
return String.format("can't find pending records with given ids. ids=%s", JSON.toJSONString(ids));
else {
migrate(records);
return "done!!";
}
}
private String scanAndMigrate() {
Long scrollId = (Long) redisTemplate.opsForValue().get(MIGRATE_OU_CURSOR);
if (scrollId == null || scrollId < 0)
scrollId = 0L;
int count = 0;
List<PendingMessageRecord> records = nextBatch(scrollId);
while (!records.isEmpty()) {
if (Thread.currentThread().isInterrupted()) {
log.info("aborted by user!!");
return "aborted by user!!";
}
migrate(records);
count += records.size();
PendingMessageRecord lastRecord = records.get(records.size() - 1);
scrollId = lastRecord.getId();
records = nextBatch(scrollId);
redisTemplate.opsForValue().set(MIGRATE_OU_CURSOR, scrollId);
if (records.size() < migrateBatchSize)
break;
}
return String.format("migrated %d records", count);
}
private List<PendingMessageRecord> nextBatch(Long scrollId) {
log.info("scanning pending message records, scrollId={}", scrollId);
return pendingMessageRecordDao.lambdaQuery()
.eq(PendingMessageRecord::getIsDelete, TableIsDeleteEnum.NORMAL.value)
.eq(PendingMessageRecord::getIsOuIdMigrated, YesOrNo.NO)
.gt(PendingMessageRecord::getId, scrollId)
.orderByAsc(PendingMessageRecord::getId)
.last("LIMIT " + migrateBatchSize)
.list();
}
private void migrate(List<PendingMessageRecord> records) {
try {
String ids = records.stream()
.map(BaseEntityExt::getId)
.map(String::valueOf)
.collect(joining(","));
log.info("start - migrate ou id: {}", ids);
migrateZeroOuId(records);
migrateTeamIdAsOuId(records);
log.info("end - migrate ou id: {}", ids);
} catch (Exception e) {
log.warn("ouId迁移失败", e);
}
}
private void migrateZeroOuId(List<PendingMessageRecord> records) {
// 业务发送待办时未传ouId
List<PendingMessageRecord> zeroOuIdRecords = records.stream()
.filter(i -> i.getOuId().equals(0L))
.collect(toList());
if (zeroOuIdRecords.isEmpty()) {
return;
}
for (PendingMessageRecord record : zeroOuIdRecords) {
Long identityId = record.getExecutorId();
DefaultTeamInfo defaultTeam = getDefaultTeamId(identityId);
if (defaultTeam == null) {
tryUpdate(record, null, "未通过身份id找到默认团队, 跳过迁移");
continue;
}
Map<Long, OrganizationalTeamOuRelationResp> teamId2OuInfo =
getOuInfoByTeamIds(Collections.singletonList(defaultTeam.teamId));
OrganizationalTeamOuRelationResp teamInfo = teamId2OuInfo.get(defaultTeam.teamId);
if (teamInfo == null) {
tryUpdate(record, null, String.format(
"找到了默认团队, 但未通过默认团队找到对应的单位. teamId=%d", defaultTeam.teamId));
} else {
tryUpdate(record, teamInfo.getOuId(), String.format(
"找到了默认团队, 并通过身份id找到默认团队, 再找到对应的单位. teamId=%d, ouId=%d, 视角=%s, 视角type=%s",
defaultTeam.teamId, teamInfo.getOuId(), defaultTeam.perspectiveDesc, defaultTeam.perspective));
}
}
}
@Nullable
private DefaultTeamInfo getDefaultTeamId(Long identityId) {
Integer successCode = 200;
CommonResponse<Long> resp1 = teamServiceApi.getDefaultTeam(
identityId, UserTerminalPerspective.LEADER_PERSPECTIVE.code);
if (successCode.equals(resp1.getCode())) {
return new DefaultTeamInfo(resp1.getData(), UserTerminalPerspective.LEADER_PERSPECTIVE, "工人端-班组长视角");
}
CommonResponse<Long> resp2 = teamServiceApi.getDefaultTeam(
identityId, UserTerminalPerspective.CMP_LEADER_PERSPECTIVE.code);
if (successCode.equals(resp2.getCode())) {
return new DefaultTeamInfo(resp2.getData(), UserTerminalPerspective.CMP_LEADER_PERSPECTIVE, "管理端-班b组长视角");
}
CommonResponse<Long> resp3 = teamServiceApi.getDefaultTeam(
identityId, UserTerminalPerspective.WORKER_PERSPECTIVE.code);
if (successCode.equals(resp3.getCode())) {
return new DefaultTeamInfo(resp3.getData(), UserTerminalPerspective.WORKER_PERSPECTIVE, "工人端-工人视角");
}
return null;
}
private void migrateTeamIdAsOuId(List<PendingMessageRecord> records) {
// 老数据可能把班组id当成ouId存到待办中了
List<PendingMessageRecord> maybeTeamIdAsOuIdRecords = records.stream()
.filter(i -> i.getOuId() > 0L)
.collect(toList());
if (maybeTeamIdAsOuIdRecords.isEmpty()) return;
List<Long> maybeTeamIds = maybeTeamIdAsOuIdRecords.stream()
.map(PendingMessageRecord::getOuId)
.distinct()
.collect(toList());
Map<Long, OrganizationalTeamOuRelationResp> teamId2OuInfo = getOuInfoByTeamIds(maybeTeamIds);
if (teamId2OuInfo.isEmpty()) return;
for (PendingMessageRecord record : maybeTeamIdAsOuIdRecords) {
// 如果能通过ouId拿到对应的平台班组信息, 说明待办中ouId被存成了班组的平台id
// 因此在这里修改为正确的ouId
Long teamId = record.getOuId();
OrganizationalTeamOuRelationResp teamInfo = teamId2OuInfo.get(teamId);
if (teamInfo != null) {
tryUpdate(record, teamInfo.getOuId(), String.format("直接通过团队id找到ouId. teamId=%d", teamId));
}
}
}
@NotNull
private Map<Long, OrganizationalTeamOuRelationResp> getOuInfoByTeamIds(List<Long> maybeTeamIds) {
ApiResult<List<OrganizationalTeamOuRelationResp>> maybeTeamIdsResp =
organizationalTeamOuRelationApi.getByTeamOuIds(maybeTeamIds);
return BizAssertions
.assertResponse(maybeTeamIdsResp, "can't get plat team info").stream()
.collect(Collectors.toMap(OrganizationalTeamOuRelationResp::getTeamOuId, Function.identity()));
}
private void tryUpdate(PendingMessageRecord record, Long newOuId, String ouIdMigrateDesc) {
PendingRecordExt ext = record.getRecordExt();
if (ext == null)
ext = new PendingRecordExt();
if (record.getIsOuIdMigrated() == YesOrNo.YES)
return;
// 最原始的ouId不要被覆盖了
if (ext.getMigrateOu().getOriginalOuId() != null)
ext.getMigrateOu().setOriginalOuId(record.getOuId());
ext.getMigrateOu().setOuIdMigrateTime(new Date());
ext.getMigrateOu().setOuIdMigrateDesc(ouIdMigrateDesc);
PendingMessageRecord update = new PendingMessageRecord();
update.setId(record.getId());
update.setOuId(newOuId);
update.setRecordExt(ext);
update.setUpdateAt(new Date());
update.setIsOuIdMigrated(YesOrNo.YES);
boolean updated = pendingMessageRecordDao.updateById(update);
log.warn("migrating pending record's ouId field. pending message id={}, originalOuId={}, newOuId={}, updated={}",
record.getId(), record.getOuId(), newOuId, updated);
}
@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
public static class Param {
private List<Long> migrateRecordIds;
private boolean scanMigrate;
}
@RequiredArgsConstructor
private static class DefaultTeamInfo {
private final Long teamId;
private final UserTerminalPerspective perspective;
private final String perspectiveDesc;
}
}

1
lombok.config Normal file
View File

@ -0,0 +1 @@
lombok.copyableAnnotations += org.springframework.beans.factory.annotation.Value

View File

@ -0,0 +1,25 @@
package cn.axzo.msg.center.service.enums;
import com.baomidou.mybatisplus.annotation.EnumValue;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author yanglin
*/
@Getter
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public enum YesOrNo {
YES("YES", ""),
NO("NO", "")
;
@EnumValue
private final String code;
private final String desc;
public boolean is(String code) {
return this.code.equals(code);
}
}

View File

@ -6,11 +6,13 @@ import cn.axzo.msg.center.service.enums.BizFinalStateEnum;
import cn.axzo.msg.center.service.enums.IdentityTypeEnum; import cn.axzo.msg.center.service.enums.IdentityTypeEnum;
import cn.axzo.msg.center.service.enums.OrganizationTypeEnum; import cn.axzo.msg.center.service.enums.OrganizationTypeEnum;
import cn.axzo.msg.center.service.enums.PendingMessageStateEnum; import cn.axzo.msg.center.service.enums.PendingMessageStateEnum;
import cn.axzo.msg.center.service.enums.YesOrNo;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.extension.handlers.FastjsonTypeHandler;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import org.springframework.data.annotation.Transient;
import java.io.Serializable; import java.io.Serializable;
import java.util.Date; import java.util.Date;
@ -149,6 +151,15 @@ public class PendingMessageRecord extends BaseEntityExt<PendingMessageRecord> im
* 在这个时间之前不显示 * 在这个时间之前不显示
*/ */
private Date hideUntil; private Date hideUntil;
/**
* 保存额外的数据信息
*/
@TableField(value = "record_ext", typeHandler = FastjsonTypeHandler.class)
private PendingRecordExt recordExt;
/**
* ouId是否已经迁移 (解决ouId不准的问题)
*/
private YesOrNo isOuIdMigrated;
@Override @Override
public String toString() { public String toString() {

View File

@ -0,0 +1,24 @@
package cn.axzo.msg.center.domain.entity;
import lombok.Data;
import java.util.Date;
/**
* @author yanglin
*/
@Data
public class PendingRecordExt {
private MigrateOu migrateOu = new MigrateOu();
@Data
public static class MigrateOu {
// 最原始的ouId
private Long originalOuId;
// ouId迁移时间
private Date ouIdMigrateTime;
// ouId迁移描述
private String ouIdMigrateDesc;
}
}

View File

@ -19,7 +19,10 @@ import org.springframework.scheduling.annotation.EnableAsync;
@Slf4j @Slf4j
@MapperScan(value = {"cn.axzo.msg.center.dal"}) @MapperScan(value = {"cn.axzo.msg.center.dal"})
@SpringBootApplication(scanBasePackages = {"cn.axzo.msg"}) @SpringBootApplication(scanBasePackages = {"cn.axzo.msg"})
@EnableFeignClients(basePackages = "cn.axzo.log") @EnableFeignClients(basePackages = {
"cn.axzo.log",
"cn.axzo.pluto.api"
})
/*@EnableAsync*/ /*@EnableAsync*/
public class MsgCenterApplication { public class MsgCenterApplication {
public static void main(String[] args) { public static void main(String[] args) {

View File

@ -15,3 +15,27 @@ spring:
#指定激活环境 #指定激活环境
profiles: profiles:
active: ${NACOS_PROFILES_ACTIVE:dev} active: ${NACOS_PROFILES_ACTIVE:dev}
---
#测试环境
spring:
config:
activate:
on-profile: test
cloud:
nacos:
config:
server-addr: ${NACOS_HOST:https://test-nacos.axzo.cn}:${NACOS_PORT:443}
file-extension: yaml
namespace: ${NACOS_NAMESPACE_ID:f3c0f0d2-bac4-4498-bee7-9c3636b3afdf}
---
#预发布环境
spring:
config:
activate:
on-profile: pre
cloud:
nacos:
config:
server-addr: ${NACOS_HOST:https://pre-nacos.axzo.cn}:${NACOS_PORT:443}
file-extension: yaml
namespace: ${NACOS_NAMESPACE_ID:8b4cf725-7595-4c92-b2a6-9260a51ce078}

View File

@ -1,13 +1,7 @@
package cn.axzo.msg.center.message.service; package cn.axzo.msg.center.message.service;
import cn.axzo.msg.center.MsgCenterApplication; import cn.axzo.msg.center.MsgCenterApplication;
import cn.axzo.msg.center.service.pending.request.CompletePendingBySubCodeRequest; import cn.axzo.msg.center.message.domain.param.PendingMessagePushParam;
import cn.axzo.msg.center.service.pending.request.CompletePendingMessageByIdRequest;
import cn.axzo.msg.center.service.pending.request.CompletePendingMessageRequest;
import cn.axzo.msg.center.service.pending.request.PendingMessagePageRequest;
import cn.axzo.msg.center.service.pending.request.SetHideRequest;
import cn.axzo.msg.center.service.pending.response.PendingMessageResponse;
import cn.azxo.framework.common.model.Page;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -15,10 +9,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.annotation.Commit; import org.springframework.test.annotation.Commit;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
/** /**
* @author yanglin * @author yanglin
*/ */
@ -30,21 +20,9 @@ class PendingMessageNewServiceTest {
@Test @Test
@Commit @Commit
void foo() { void foo() {
String jsonStr = "{\"msgId\":null,\"subBizCode\":\"202401301728000000030\",\"hideSeconds\":null}\n"; String str = "{\"bizCategory\":\"OTHER\",\"bizCode\":\"200000700321808\",\"bizExtParams\":\"{\\\"teamLeaderName\\\":\\\"袁均清\\\"}\",\"executor\":[{\"id\":16562,\"identity\":{\"id\":0,\"type\":\"NOT_SUPPORT\",\"valid\":true},\"name\":\"马元猛\",\"valid\":true},{\"id\":16563,\"identity\":{\"id\":0,\"type\":\"NOT_SUPPORT\",\"valid\":true},\"name\":\"辛宁\",\"valid\":true}],\"orgType\":\"PROJECT\",\"ouId\":6066,\"promoter\":{\"id\":16444,\"identity\":{\"id\":2004889,\"type\":\"PRACTITIONER\",\"valid\":true},\"name\":\"袁均清\",\"valid\":true},\"routerParams\":\"{\\\"acceptanceNo\\\":\\\"700013411\\\",\\\"status\\\":\\\"1\\\"}\",\"templateCode\":\"52ae3e8ec48242e485e9389202e102ce\",\"workspaceId\":375}";
SetHideRequest req = JSON.parseObject(jsonStr, SetHideRequest.class); PendingMessagePushParam param = JSON.parseObject(str, PendingMessagePushParam.class);
Boolean b = pendingMessageNewService.setHide(req); pendingMessageNewService.push(param);
System.out.println();
//SetHideRequest req = new SetHideRequest();
//req.setSubBizCode("08345cb9-669a-11ee-8d91-02550a00001f");
//pendingMessageNewService.setHide(req);
//String jsonStr = "{\"appTerminalType\":\"CMS_WEB_PC\",\"groupNodeCode\":\"\",\"identityId\":1593,\"identityType\":\"PRACTITIONER\",\"msgState\":\"HAS_BEEN_SENT\",\"ouId\":5812,\"page\":1,\"pageSize\":10,\"personId\":2849,\"roleCategory\":\"EXECUTOR\",\"terminalType\":\"WEB\"}";
//PendingMessagePageRequest request = JSON.parseObject(jsonStr, PendingMessagePageRequest.class);
//Page<PendingMessageResponse> pendingMessageResponsePage = pendingMessageNewService.pageQuery(request);
//System.out.println();
//pendingMessageNewService.complete("650d3eaa3d3a4064b34a64b7b1c8dffb");
//pendingMessageNewService.completeById(CompletePendingMessageByIdRequest.builder().id(20578L).build());
//pendingMessageNewService.completeByTemplateCodeBizCode(CompletePendingMessageRequest.builder().templateCode("75047e6339484e81bcce244d56fb2363").bizCode("DFX202311231600002").build());
//pendingMessageNewService.completeByTemplateCodeSubBizCode(CompletePendingBySubCodeRequest.builder().bizCode("0818e560-669a-11ee-8d91-02550a00001f").subBizCode("08345cb9-669a-11ee-8d91-02550a00001f").templateCode("a97760e573674658b6b351e2f3a6e379").build());
} }
} }