REQ-3282: 删除无用代码
This commit is contained in:
parent
2dcc3455e4
commit
0807ba5ce8
@ -1,250 +0,0 @@
|
||||
package cn.axzo.msg.center.message.migrate;
|
||||
|
||||
import cn.axzo.apollo.api.ApolloWorkerTaskOrderApi;
|
||||
import cn.axzo.framework.domain.web.result.ApiResult;
|
||||
import cn.axzo.maokai.api.client.CooperateShipQueryApi;
|
||||
import cn.axzo.maokai.api.client.OrganizationalNodeUserApi;
|
||||
import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi;
|
||||
import cn.axzo.maokai.api.vo.response.OrganizationalTeamOuRelationResp;
|
||||
import cn.axzo.msg.center.common.utils.BizAssertions;
|
||||
import cn.axzo.msg.center.dal.PendingMessageRecordDao;
|
||||
import cn.axzo.msg.center.domain.entity.PendingMessageRecord;
|
||||
import cn.axzo.msg.center.domain.entity.PendingRecordExt;
|
||||
import cn.axzo.msg.center.domain.enums.YesNoEnum;
|
||||
import cn.axzo.msg.center.domain.persistence.BaseEntityExt;
|
||||
import cn.axzo.msg.center.service.enums.YesOrNo;
|
||||
import cn.axzo.pluto.api.TeamServiceApi;
|
||||
import cn.axzo.pluto.teams.common.enums.UserTerminalPerspective;
|
||||
import cn.azxo.framework.common.model.CommonResponse;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.collect.Lists;
|
||||
import jodd.util.concurrent.ThreadFactoryBuilder;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Data;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.ToString;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.stream.Collectors.joining;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class OuIdMigrateService {
|
||||
|
||||
private static final String MIGRATE_OU_CURSOR = "msg_center_migrate_ou_cursor";
|
||||
|
||||
private final RedisTemplate<Object, Object> redisTemplate;
|
||||
private final PendingMessageRecordDao pendingMessageRecordDao;
|
||||
private final OrganizationalTeamOuRelationApi organizationalTeamOuRelationApi;
|
||||
private final TeamServiceApi teamServiceApi;
|
||||
private final CooperateShipQueryApi cooperateShipQueryApi;
|
||||
private final ApolloWorkerTaskOrderApi apolloWorkerTaskOrderApi;
|
||||
private final OrganizationalNodeUserApi organizationalNodeUserApi;
|
||||
|
||||
@Value("${message.pending.ouMigrateBatchSize:100}")
|
||||
private final Long migrateBatchSize;
|
||||
|
||||
@Getter
|
||||
private volatile boolean isRunning;
|
||||
|
||||
|
||||
|
||||
private final ThreadFactory moduleStatisticTaskThreadFactory = ThreadFactoryBuilder.create()
|
||||
.setDaemon(true)
|
||||
.setNameFormat("MODULE_STATISTIC_TASK_%d")
|
||||
.get();
|
||||
private final ExecutorService taskThreadPool = new ThreadPoolExecutor(20, 100, 10, TimeUnit.SECONDS,
|
||||
new ArrayBlockingQueue<>(2000), moduleStatisticTaskThreadFactory);
|
||||
|
||||
public void migrate(List<PendingMessageRecord> records) {
|
||||
try {
|
||||
String ids = records.stream()
|
||||
.map(BaseEntityExt::getId)
|
||||
.map(String::valueOf)
|
||||
.collect(joining(","));
|
||||
log.info("start - migrate ou id: {}", ids);
|
||||
// 迁移ou_id=0的
|
||||
// 逻辑: 通过身份查询人默认的班组,获取到班组后,再获取班组对应的团队,取团队的id
|
||||
migrateZeroOuId(records);
|
||||
log.info("end - migrate ou id: {}", ids);
|
||||
} catch (Exception e) {
|
||||
log.warn("ouId fail", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void migrateZeroOuId(List<PendingMessageRecord> records) throws ExecutionException, InterruptedException {
|
||||
// 业务发送待办时未传ouId
|
||||
List<PendingMessageRecord> zeroOuIdRecords = records.stream()
|
||||
.filter(i -> i.getOuId().equals(0L))
|
||||
.collect(toList());
|
||||
if (zeroOuIdRecords.isEmpty()) {
|
||||
log.info("updateOuIdJob migrate is empty");
|
||||
return;
|
||||
}
|
||||
|
||||
List<List<PendingMessageRecord>> partitions = Lists
|
||||
.partition(zeroOuIdRecords, 20);
|
||||
List<CompletableFuture> futures = new ArrayList<>();
|
||||
for (int i = 0; i < partitions.size(); i++) {
|
||||
List<PendingMessageRecord> dataList = partitions.get(i);
|
||||
futures.add(CompletableFuture.runAsync(() ->
|
||||
updateZero(dataList), taskThreadPool));
|
||||
}
|
||||
CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get();
|
||||
}
|
||||
|
||||
private void updateZero(List<PendingMessageRecord> records) {
|
||||
List<PendingMessageRecord> update = records.stream()
|
||||
.map(record -> {
|
||||
try {
|
||||
log.info("zeroOuId, record:{}", record.getId());
|
||||
Long identityId = record.getExecutorId();
|
||||
|
||||
DefaultTeamInfo defaultTeam = getDefaultTeamId(identityId);
|
||||
log.info("zeroOuId, record:{}, defaultTeam:{}", record.getId(), JSONObject.toJSONString(defaultTeam));
|
||||
if (defaultTeam == null) {
|
||||
PendingMessageRecord pendingMessageRecord = new PendingMessageRecord();
|
||||
pendingMessageRecord.setId(record.getId());
|
||||
pendingMessageRecord.setIsDelete(YesNoEnum.YES.getCode().longValue());
|
||||
return pendingMessageRecord;
|
||||
}
|
||||
|
||||
Map<Long, OrganizationalTeamOuRelationResp> teamId2OuInfo =
|
||||
getOuInfoByTeamId(Lists.newArrayList(defaultTeam.teamId));
|
||||
OrganizationalTeamOuRelationResp teamInfo = teamId2OuInfo.get(defaultTeam.teamId);
|
||||
log.info("zeroOuId, record:{}, teamInfo:{}", record.getId(), JSONObject.toJSONString(teamInfo));
|
||||
if (teamInfo == null) {
|
||||
PendingMessageRecord pendingMessageRecord = new PendingMessageRecord();
|
||||
pendingMessageRecord.setId(record.getId());
|
||||
pendingMessageRecord.setIsDelete(YesNoEnum.YES.getCode().longValue());
|
||||
return pendingMessageRecord;
|
||||
} else {
|
||||
return tryUpdate(record, teamInfo.getOuId(), String.format(
|
||||
"找到了默认团队, 并通过身份id找到默认团队, 再找到对应的单位. teamId=%d, ouId=%d, 视角=%s, 视角type=%s",
|
||||
defaultTeam.teamId, teamInfo.getOuId(), defaultTeam.perspectiveDesc, defaultTeam.perspective));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.warn("ouId fail, recordId:{}; e:{}", record.getId(), e);
|
||||
}
|
||||
return null;
|
||||
})
|
||||
.filter(Objects::nonNull)
|
||||
.collect(toList());
|
||||
List<Long> removeIds = update.stream()
|
||||
.filter(record -> Objects.equals(record.getIsDelete(), YesNoEnum.YES.getCode().longValue()))
|
||||
.map(PendingMessageRecord::getId)
|
||||
.collect(toList());
|
||||
if (!CollectionUtils.isEmpty(removeIds)) {
|
||||
pendingMessageRecordDao.removeByIds(removeIds);
|
||||
}
|
||||
|
||||
List<PendingMessageRecord> updates = update.stream()
|
||||
.filter(record -> !Objects.equals(record.getIsDelete(), YesNoEnum.YES.getCode().longValue()))
|
||||
.collect(toList());
|
||||
if (!CollectionUtils.isEmpty(updates)) {
|
||||
pendingMessageRecordDao.updateBatchById(updates);
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private DefaultTeamInfo getDefaultTeamId(Long identityId) {
|
||||
Integer successCode = 200;
|
||||
CommonResponse<Long> resp1 = teamServiceApi.getDefaultTeam(
|
||||
identityId, UserTerminalPerspective.LEADER_PERSPECTIVE.code);
|
||||
if (successCode.equals(resp1.getCode())) {
|
||||
return new DefaultTeamInfo(resp1.getData(), UserTerminalPerspective.LEADER_PERSPECTIVE, "工人端-班组长视角");
|
||||
}
|
||||
CommonResponse<Long> resp2 = teamServiceApi.getDefaultTeam(
|
||||
identityId, UserTerminalPerspective.CMP_LEADER_PERSPECTIVE.code);
|
||||
if (successCode.equals(resp2.getCode())) {
|
||||
return new DefaultTeamInfo(resp2.getData(), UserTerminalPerspective.CMP_LEADER_PERSPECTIVE, "管理端-班b组长视角");
|
||||
}
|
||||
CommonResponse<Long> resp3 = teamServiceApi.getDefaultTeam(
|
||||
identityId, UserTerminalPerspective.WORKER_PERSPECTIVE.code);
|
||||
if (successCode.equals(resp3.getCode())) {
|
||||
return new DefaultTeamInfo(resp3.getData(), UserTerminalPerspective.WORKER_PERSPECTIVE, "工人端-工人视角");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
private Map<Long, OrganizationalTeamOuRelationResp> getOuInfoByTeamId(List<Long> maybeTeamIds) {
|
||||
ApiResult<List<OrganizationalTeamOuRelationResp>> maybeTeamIdsResp =
|
||||
organizationalTeamOuRelationApi.getByTeamOuIds(Lists.newArrayList(maybeTeamIds));
|
||||
return BizAssertions
|
||||
.assertResponse(maybeTeamIdsResp, "can't get plat team info").stream()
|
||||
.collect(Collectors.toMap(OrganizationalTeamOuRelationResp::getTeamOuId, Function.identity()));
|
||||
}
|
||||
|
||||
private PendingMessageRecord tryUpdate(PendingMessageRecord record, Long newOuId, String ouIdMigrateDesc) {
|
||||
PendingRecordExt ext = record.getRecordExt();
|
||||
if (ext == null)
|
||||
ext = new PendingRecordExt();
|
||||
if (record.getIsOuIdMigrated() == YesOrNo.YES)
|
||||
return null;
|
||||
// 找不到的newOuId不进行更新
|
||||
if (newOuId == null || newOuId == 0 ) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// 最原始的ouId不要被覆盖了
|
||||
if (ext.getMigrateOu().getOriginalOuId() != null)
|
||||
ext.getMigrateOu().setOriginalOuId(record.getOuId());
|
||||
ext.getMigrateOu().setOuIdMigrateTime(new Date());
|
||||
ext.getMigrateOu().setOuIdMigrateDesc(ouIdMigrateDesc);
|
||||
|
||||
PendingMessageRecord update = new PendingMessageRecord();
|
||||
update.setId(record.getId());
|
||||
update.setOuId(newOuId);
|
||||
update.setRecordExt(ext);
|
||||
update.setUpdateAt(new Date());
|
||||
update.setIsOuIdMigrated(YesOrNo.YES);
|
||||
log.warn("migrating pending record's ouId field. pending message id={}, originalOuId={}, newOuId={}",
|
||||
record.getId(), record.getOuId(), newOuId);
|
||||
return update;
|
||||
}
|
||||
|
||||
@Data
|
||||
@ToString
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class Param {
|
||||
private List<Long> migrateRecordIds;
|
||||
private boolean scanMigrate;
|
||||
}
|
||||
|
||||
@RequiredArgsConstructor
|
||||
private static class DefaultTeamInfo {
|
||||
private final Long teamId;
|
||||
private final UserTerminalPerspective perspective;
|
||||
private final String perspectiveDesc;
|
||||
}
|
||||
}
|
||||
@ -1,24 +0,0 @@
|
||||
package cn.axzo.msg.center.message.migrate;
|
||||
|
||||
import cn.axzo.msg.center.MsgCenterApplication;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@SpringBootTest(classes = MsgCenterApplication.class)
|
||||
@RequiredArgsConstructor(onConstructor_ = @Autowired)
|
||||
class OuIdMigrateServiceTest {
|
||||
|
||||
private final OuIdMigrateService ouIdMigrateService;
|
||||
|
||||
@Test
|
||||
void foo() {
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user