From 0807ba5ce869701892b6d57f2ea3ea4647fed5f2 Mon Sep 17 00:00:00 2001 From: yanglin Date: Mon, 23 Dec 2024 10:22:08 +0800 Subject: [PATCH] =?UTF-8?q?REQ-3282:=20=E5=88=A0=E9=99=A4=E6=97=A0?= =?UTF-8?q?=E7=94=A8=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../message/migrate/OuIdMigrateService.java | 250 ------------------ .../migrate/OuIdMigrateServiceTest.java | 24 -- 2 files changed, 274 deletions(-) delete mode 100644 inside-notices/src/main/java/cn/axzo/msg/center/message/migrate/OuIdMigrateService.java delete mode 100644 start/src/test/java/cn/axzo/msg/center/message/migrate/OuIdMigrateServiceTest.java diff --git a/inside-notices/src/main/java/cn/axzo/msg/center/message/migrate/OuIdMigrateService.java b/inside-notices/src/main/java/cn/axzo/msg/center/message/migrate/OuIdMigrateService.java deleted file mode 100644 index 1eab458f..00000000 --- a/inside-notices/src/main/java/cn/axzo/msg/center/message/migrate/OuIdMigrateService.java +++ /dev/null @@ -1,250 +0,0 @@ -package cn.axzo.msg.center.message.migrate; - -import cn.axzo.apollo.api.ApolloWorkerTaskOrderApi; -import cn.axzo.framework.domain.web.result.ApiResult; -import cn.axzo.maokai.api.client.CooperateShipQueryApi; -import cn.axzo.maokai.api.client.OrganizationalNodeUserApi; -import cn.axzo.maokai.api.client.OrganizationalTeamOuRelationApi; -import cn.axzo.maokai.api.vo.response.OrganizationalTeamOuRelationResp; -import cn.axzo.msg.center.common.utils.BizAssertions; -import cn.axzo.msg.center.dal.PendingMessageRecordDao; -import cn.axzo.msg.center.domain.entity.PendingMessageRecord; -import cn.axzo.msg.center.domain.entity.PendingRecordExt; -import cn.axzo.msg.center.domain.enums.YesNoEnum; -import cn.axzo.msg.center.domain.persistence.BaseEntityExt; -import cn.axzo.msg.center.service.enums.YesOrNo; -import cn.axzo.pluto.api.TeamServiceApi; -import cn.axzo.pluto.teams.common.enums.UserTerminalPerspective; -import cn.azxo.framework.common.model.CommonResponse; -import com.alibaba.fastjson.JSONObject; -import com.google.common.collect.Lists; -import jodd.util.concurrent.ThreadFactoryBuilder; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.RequiredArgsConstructor; -import lombok.ToString; -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.stereotype.Component; -import org.springframework.util.CollectionUtils; - -import java.util.ArrayList; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; - -import static java.util.stream.Collectors.joining; -import static java.util.stream.Collectors.toList; - -/** - * @author yanglin - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class OuIdMigrateService { - - private static final String MIGRATE_OU_CURSOR = "msg_center_migrate_ou_cursor"; - - private final RedisTemplate redisTemplate; - private final PendingMessageRecordDao pendingMessageRecordDao; - private final OrganizationalTeamOuRelationApi organizationalTeamOuRelationApi; - private final TeamServiceApi teamServiceApi; - private final CooperateShipQueryApi cooperateShipQueryApi; - private final ApolloWorkerTaskOrderApi apolloWorkerTaskOrderApi; - private final OrganizationalNodeUserApi organizationalNodeUserApi; - - @Value("${message.pending.ouMigrateBatchSize:100}") - private final Long migrateBatchSize; - - @Getter - private volatile boolean isRunning; - - - - private final ThreadFactory moduleStatisticTaskThreadFactory = ThreadFactoryBuilder.create() - .setDaemon(true) - .setNameFormat("MODULE_STATISTIC_TASK_%d") - .get(); - private final ExecutorService taskThreadPool = new ThreadPoolExecutor(20, 100, 10, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(2000), moduleStatisticTaskThreadFactory); - - public void migrate(List records) { - try { - String ids = records.stream() - .map(BaseEntityExt::getId) - .map(String::valueOf) - .collect(joining(",")); - log.info("start - migrate ou id: {}", ids); - // 迁移ou_id=0的 - // 逻辑: 通过身份查询人默认的班组,获取到班组后,再获取班组对应的团队,取团队的id - migrateZeroOuId(records); - log.info("end - migrate ou id: {}", ids); - } catch (Exception e) { - log.warn("ouId fail", e); - } - } - - private void migrateZeroOuId(List records) throws ExecutionException, InterruptedException { - // 业务发送待办时未传ouId - List zeroOuIdRecords = records.stream() - .filter(i -> i.getOuId().equals(0L)) - .collect(toList()); - if (zeroOuIdRecords.isEmpty()) { - log.info("updateOuIdJob migrate is empty"); - return; - } - - List> partitions = Lists - .partition(zeroOuIdRecords, 20); - List futures = new ArrayList<>(); - for (int i = 0; i < partitions.size(); i++) { - List dataList = partitions.get(i); - futures.add(CompletableFuture.runAsync(() -> - updateZero(dataList), taskThreadPool)); - } - CompletableFuture.allOf(futures.stream().toArray(CompletableFuture[]::new)).get(); - } - - private void updateZero(List records) { - List update = records.stream() - .map(record -> { - try { - log.info("zeroOuId, record:{}", record.getId()); - Long identityId = record.getExecutorId(); - - DefaultTeamInfo defaultTeam = getDefaultTeamId(identityId); - log.info("zeroOuId, record:{}, defaultTeam:{}", record.getId(), JSONObject.toJSONString(defaultTeam)); - if (defaultTeam == null) { - PendingMessageRecord pendingMessageRecord = new PendingMessageRecord(); - pendingMessageRecord.setId(record.getId()); - pendingMessageRecord.setIsDelete(YesNoEnum.YES.getCode().longValue()); - return pendingMessageRecord; - } - - Map teamId2OuInfo = - getOuInfoByTeamId(Lists.newArrayList(defaultTeam.teamId)); - OrganizationalTeamOuRelationResp teamInfo = teamId2OuInfo.get(defaultTeam.teamId); - log.info("zeroOuId, record:{}, teamInfo:{}", record.getId(), JSONObject.toJSONString(teamInfo)); - if (teamInfo == null) { - PendingMessageRecord pendingMessageRecord = new PendingMessageRecord(); - pendingMessageRecord.setId(record.getId()); - pendingMessageRecord.setIsDelete(YesNoEnum.YES.getCode().longValue()); - return pendingMessageRecord; - } else { - return tryUpdate(record, teamInfo.getOuId(), String.format( - "找到了默认团队, 并通过身份id找到默认团队, 再找到对应的单位. teamId=%d, ouId=%d, 视角=%s, 视角type=%s", - defaultTeam.teamId, teamInfo.getOuId(), defaultTeam.perspectiveDesc, defaultTeam.perspective)); - } - } catch (Exception e) { - log.warn("ouId fail, recordId:{}; e:{}", record.getId(), e); - } - return null; - }) - .filter(Objects::nonNull) - .collect(toList()); - List removeIds = update.stream() - .filter(record -> Objects.equals(record.getIsDelete(), YesNoEnum.YES.getCode().longValue())) - .map(PendingMessageRecord::getId) - .collect(toList()); - if (!CollectionUtils.isEmpty(removeIds)) { - pendingMessageRecordDao.removeByIds(removeIds); - } - - List updates = update.stream() - .filter(record -> !Objects.equals(record.getIsDelete(), YesNoEnum.YES.getCode().longValue())) - .collect(toList()); - if (!CollectionUtils.isEmpty(updates)) { - pendingMessageRecordDao.updateBatchById(updates); - } - } - - @Nullable - private DefaultTeamInfo getDefaultTeamId(Long identityId) { - Integer successCode = 200; - CommonResponse resp1 = teamServiceApi.getDefaultTeam( - identityId, UserTerminalPerspective.LEADER_PERSPECTIVE.code); - if (successCode.equals(resp1.getCode())) { - return new DefaultTeamInfo(resp1.getData(), UserTerminalPerspective.LEADER_PERSPECTIVE, "工人端-班组长视角"); - } - CommonResponse resp2 = teamServiceApi.getDefaultTeam( - identityId, UserTerminalPerspective.CMP_LEADER_PERSPECTIVE.code); - if (successCode.equals(resp2.getCode())) { - return new DefaultTeamInfo(resp2.getData(), UserTerminalPerspective.CMP_LEADER_PERSPECTIVE, "管理端-班b组长视角"); - } - CommonResponse resp3 = teamServiceApi.getDefaultTeam( - identityId, UserTerminalPerspective.WORKER_PERSPECTIVE.code); - if (successCode.equals(resp3.getCode())) { - return new DefaultTeamInfo(resp3.getData(), UserTerminalPerspective.WORKER_PERSPECTIVE, "工人端-工人视角"); - } - return null; - } - - @NotNull - private Map getOuInfoByTeamId(List maybeTeamIds) { - ApiResult> maybeTeamIdsResp = - organizationalTeamOuRelationApi.getByTeamOuIds(Lists.newArrayList(maybeTeamIds)); - return BizAssertions - .assertResponse(maybeTeamIdsResp, "can't get plat team info").stream() - .collect(Collectors.toMap(OrganizationalTeamOuRelationResp::getTeamOuId, Function.identity())); - } - - private PendingMessageRecord tryUpdate(PendingMessageRecord record, Long newOuId, String ouIdMigrateDesc) { - PendingRecordExt ext = record.getRecordExt(); - if (ext == null) - ext = new PendingRecordExt(); - if (record.getIsOuIdMigrated() == YesOrNo.YES) - return null; - // 找不到的newOuId不进行更新 - if (newOuId == null || newOuId == 0 ) { - return null; - } - - // 最原始的ouId不要被覆盖了 - if (ext.getMigrateOu().getOriginalOuId() != null) - ext.getMigrateOu().setOriginalOuId(record.getOuId()); - ext.getMigrateOu().setOuIdMigrateTime(new Date()); - ext.getMigrateOu().setOuIdMigrateDesc(ouIdMigrateDesc); - - PendingMessageRecord update = new PendingMessageRecord(); - update.setId(record.getId()); - update.setOuId(newOuId); - update.setRecordExt(ext); - update.setUpdateAt(new Date()); - update.setIsOuIdMigrated(YesOrNo.YES); - log.warn("migrating pending record's ouId field. pending message id={}, originalOuId={}, newOuId={}", - record.getId(), record.getOuId(), newOuId); - return update; - } - - @Data - @ToString - @NoArgsConstructor - @AllArgsConstructor - public static class Param { - private List migrateRecordIds; - private boolean scanMigrate; - } - - @RequiredArgsConstructor - private static class DefaultTeamInfo { - private final Long teamId; - private final UserTerminalPerspective perspective; - private final String perspectiveDesc; - } -} \ No newline at end of file diff --git a/start/src/test/java/cn/axzo/msg/center/message/migrate/OuIdMigrateServiceTest.java b/start/src/test/java/cn/axzo/msg/center/message/migrate/OuIdMigrateServiceTest.java deleted file mode 100644 index 88085e45..00000000 --- a/start/src/test/java/cn/axzo/msg/center/message/migrate/OuIdMigrateServiceTest.java +++ /dev/null @@ -1,24 +0,0 @@ -package cn.axzo.msg.center.message.migrate; - -import cn.axzo.msg.center.MsgCenterApplication; -import lombok.RequiredArgsConstructor; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; - -import static org.junit.jupiter.api.Assertions.*; - -/** - * @author yanglin - */ -@SpringBootTest(classes = MsgCenterApplication.class) -@RequiredArgsConstructor(onConstructor_ = @Autowired) -class OuIdMigrateServiceTest { - - private final OuIdMigrateService ouIdMigrateService; - - @Test - void foo() { - } - -} \ No newline at end of file