feat:feature-REQ/2129

1、im_account_register增加ouId,
  历史的网易云信账号清洗,
  设置成当前员工最新的ouId
This commit is contained in:
lilong 2024-03-13 18:19:09 +08:00
parent ec82c8155a
commit 6727594463
14 changed files with 355 additions and 24 deletions

View File

@ -26,4 +26,10 @@ public class AccountAbsentQuery {
@NotNull(message = "注册用户personId不能为空")
private String personId;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
}

View File

@ -36,4 +36,10 @@ public class AccountQuery {
*/
private String imAccount;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
}

View File

@ -5,7 +5,6 @@ import com.google.common.collect.Maps;
import lombok.Data;
import javax.validation.constraints.NotNull;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -66,6 +65,12 @@ public class MessageInfo {
@NotNull(message = "消息模板内容不能为空")
private String msgTemplateContent;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
/**
* 消息扩展信息
*/

View File

@ -51,4 +51,9 @@ public class UserAccountReq {
*/
private Map<String, String> attachments;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
}

View File

@ -43,5 +43,9 @@ public class UserAccountResp {
*/
private String appType;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
private Long organizationalUnitId;
}

View File

@ -98,6 +98,17 @@
<artifactId>axzo-common-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>cn.axzo.framework</groupId>
<artifactId>dao-support</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>cn.axzo.maokai</groupId>
<artifactId>maokai-api</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@ -3,6 +3,7 @@ package cn.axzo.im.dao.mapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.entity.RobotInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.springframework.stereotype.Repository;
/**
@ -12,6 +13,7 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
* @version V1.0
* @date 2023/10/10 10:06
*/
@Repository
public interface AccountRegisterMapper extends BaseMapper<AccountRegister> {
}

View File

@ -32,7 +32,7 @@ public class AccountRegister extends BaseEntity<AccountRegister> implements Ser
private String accountId;
/**
* 普通用户通过appType包装
* 普通用户通过appType和ouId包装
* 包装以后进行账户注册
*/
@TableField("account_wrapper")
@ -76,4 +76,10 @@ public class AccountRegister extends BaseEntity<AccountRegister> implements Ser
*/
@TableField("token")
private String token;
/**
* 企业id
*/
@TableField("organizational_unit_id")
private Long organizationalUnitId;
}

View File

@ -0,0 +1,136 @@
package cn.axzo.im.job;
import cn.axzo.im.center.common.enums.AccountTypeEnum;
import cn.axzo.im.center.common.enums.AppTypeEnum;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.service.AccountRegisterService;
import cn.axzo.maokai.api.client.OrganizationalNodeUserApi;
import cn.axzo.maokai.api.vo.request.OrganizationalNodeUserBasicQueryVO;
import cn.axzo.maokai.api.vo.response.OrganizationalNodeUserBasicVO;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
* 把appType = CMPaccountType = 'user'的账号的ouId更新成用户最后加入的企业id
* 因为用户在管理版的IM云信消息要按照企业隔离历史的账号是跟企业没绑定要保持数据不丢失
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class UpdateImAccountOuIdJob extends IJobHandler {
@Autowired
private AccountRegisterService accountRegisterService;
@Autowired
private OrganizationalNodeUserApi organizationalNodeUserApi;
private static final Integer DEFAULT_PAGE_SIZE = 500;
@Override
@XxlJob("updateImAccountOuIdJob")
public ReturnT<String> execute(String s) throws Exception {
log.info("start updateImAccountOuIdJob,s:{}", s);
UpdateImAccountOuIdParam updateImAccountOuIdParam = Optional.ofNullable(s)
.map(e -> JSONObject.parseObject(e, UpdateImAccountOuIdParam.class))
.orElseGet(() -> UpdateImAccountOuIdParam.builder().build());
Integer pageNumber = 1;
while (true) {
AccountRegisterService.PageAccountRegisterParam req = AccountRegisterService.PageAccountRegisterParam.builder()
.ids(updateImAccountOuIdParam.getIds())
.appType(AppTypeEnum.CMP.getCode())
.accountType(AccountTypeEnum.USER.getCode())
.pageNumber(pageNumber++)
.pageSize(DEFAULT_PAGE_SIZE)
.build();
Page<AccountRegister> page = accountRegisterService.page(req);
if (CollectionUtils.isNotEmpty(page.getRecords())) {
Map<Long, OrganizationalNodeUserBasicVO> nodeUsers = listNodeUsers(page.getRecords());
updateAccountRegister(page.getRecords(), nodeUsers);
}
if (!page.hasNext()) {
break;
}
}
log.info("end updateImAccountOuIdJob");
return ReturnT.SUCCESS;
}
private void updateAccountRegister(List<AccountRegister> accountRegisters, Map<Long, OrganizationalNodeUserBasicVO> nodeUsers) {
List<AccountRegister> update = accountRegisters.stream()
.map(accountRegister -> {
OrganizationalNodeUserBasicVO nodeUser = nodeUsers.get(Long.valueOf(accountRegister.getAccountId()));
if (nodeUser == null) {
log.info("updateImAccountOuIdJob: accountRegisterId :{} not found node user",
accountRegister.getId());
return null;
}
AccountRegister result = new AccountRegister();
result.setId(accountRegister.getId());
result.setOrganizationalUnitId(nodeUser.getOrganizationalUnitId());
return result;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
if (CollectionUtils.isEmpty(accountRegisters)) {
log.info("updateImAccountOuIdJob: no data update");
return;
}
accountRegisterService.updateBatchById(update);
}
private Map<Long, OrganizationalNodeUserBasicVO> listNodeUsers(List<AccountRegister> accountRegisters) {
if (CollectionUtils.isEmpty(accountRegisters)) {
return Collections.EMPTY_MAP;
}
Set<Long> accountIds = accountRegisters.stream()
.map(AccountRegister::getAccountId)
.filter(Objects::nonNull)
.map(Long::valueOf)
.collect(Collectors.toSet());
if (CollectionUtils.isEmpty(accountIds)) {
return Collections.EMPTY_MAP;
}
OrganizationalNodeUserBasicQueryVO queryVO = new OrganizationalNodeUserBasicQueryVO();
queryVO.setPersonIds(accountIds);
List<OrganizationalNodeUserBasicVO> nodeUsers = organizationalNodeUserApi.queryUserBasic(queryVO).getData();
return nodeUsers.stream()
.collect(Collectors.toMap(OrganizationalNodeUserBasicVO::getPersonId, Function.identity(), (f, s) -> s));
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
static class UpdateImAccountOuIdParam {
private List<Long> ids;
}
}

View File

@ -0,0 +1,77 @@
package cn.axzo.im.service;
import cn.axzo.bfs.support.data.page.IPageParam;
import cn.axzo.bfs.support.data.wrapper.CriteriaField;
import cn.axzo.bfs.support.data.wrapper.Operator;
import cn.axzo.im.entity.AccountRegister;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.IService;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import java.util.List;
public interface AccountRegisterService extends IService<AccountRegister> {
Page<AccountRegister> page(PageAccountRegisterParam param);
List<AccountRegister> list(ListAccountRegisterParam param);
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class ListAccountRegisterParam {
@CriteriaField(field = "id", operator = Operator.IN)
private List<Long> ids;
@CriteriaField(field = "appType", operator = Operator.EQ)
private String appType;
/**
* 注册用户ID唯一
* 普通用户personId机器人robotId
*/
@CriteriaField(field = "accountId", operator = Operator.EQ)
private String accountId;
/**
* 注册用户ID唯一
*/
@CriteriaField(field = "imAccount", operator = Operator.EQ)
private String imAccount;
/**
* appType = AppTypeEnum.CMP时因为网易云信无法对同一个账号做企业隔离只能一个企业一个账号
* 所以需要根据organizationalUnitId获取账号
*/
@CriteriaField(field = "organizationalUnitId", operator = Operator.EQ)
private Long organizationalUnitId;
@CriteriaField(field = "accountType", operator = Operator.EQ)
private String accountType;
}
@SuperBuilder
@Data
@NoArgsConstructor
@AllArgsConstructor
class PageAccountRegisterParam extends ListAccountRegisterParam implements IPageParam {
@CriteriaField(ignore = true)
Integer pageNumber;
@CriteriaField(ignore = true)
Integer pageSize;
/**
* 排序使用示例createTime__DESC
*/
@CriteriaField(ignore = true)
List<String> sort;
}
}

View File

@ -25,24 +25,24 @@ import cn.axzo.im.entity.bo.AccountQueryParam;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.conditions.query.LambdaQueryChainWrapper;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.validation.Valid;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import javax.validation.Valid;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RestController;
/**
* IM账户服务
@ -56,6 +56,9 @@ import org.springframework.web.bind.annotation.RestController;
@RequiredArgsConstructor
public class AccountService {
@Autowired
private AccountRegisterService accountRegisterService;
@Resource
private IMChannelProvider imChannelProvider;
@ -116,7 +119,7 @@ public class AccountService {
if (appTypeEnum == null) {
throw new ServiceException("当前appType,服务器不支持该类型!!");
}
String userIdWrapper = buildUserIdWrapper(userAccountReq.getUserId(), userAccountReq.getAppType());
String userIdWrapper = buildUserIdWrapper(userAccountReq.getUserId(), userAccountReq.getAppType(), userAccountReq.getOrganizationalUnitId());
//后续AppKey可能会更换普通用户通过userIdappTypeappKey维度来保证数据库唯一性
UserAccountResp userAccountResp = createAccountRegister(userAccountReq.getUserId(), userIdWrapper, appType,
AccountTypeEnum.USER.getCode(), userAccountReq.getHeadImageUrl(), userAccountReq.getNickName());
@ -127,13 +130,17 @@ public class AccountService {
return userAccountResp;
}
private String buildUserIdWrapper(String userId, String appType) {
private String buildUserIdWrapper(String userId, String appType, Long organizationalUnitId) {
String env = environment.getProperty("spring.profiles.active");
StringBuilder buf = new StringBuilder();
if (StringUtils.isNotBlank(liveEnvPrefix)) {
buf.append(liveEnvPrefix).append("_");
}
buf.append(env).append(userId).append("_").append(appType);
// 新的用户在使用管理版时如果有企业则只能根据企业产生唯一账号
if (organizationalUnitId != null) {
buf.append("_").append(organizationalUnitId);
}
return buf.toString();
}
@ -147,7 +154,7 @@ public class AccountService {
if (appTypeEnum == null) {
throw new ServiceException("当前appType,服务器不支持该类型!!");
}
String userIdWrapper = buildUserIdWrapper(userAccountReq.getUserId(), appTypeEnum.getCode());
String userIdWrapper = buildUserIdWrapper(userAccountReq.getUserId(), appTypeEnum.getCode(), userAccountReq.getOrganizationalUnitId());
String appKey = imChannelProvider.getProviderAppKey();
AccountRegister customAccountRegister = queryCustomAccount(AccountTypeEnum.CUSTOM,
@ -267,11 +274,18 @@ public class AccountService {
return userAccountResp;
}
/**
* 建议用更通用的AccountRegisterService.page解耦
* @param accountQuery
* @return
*/
@Deprecated
public List<UserAccountResp> queryAccountInfo(@Valid AccountQuery accountQuery) {
//如果存在多个appKey一个账户会有多条数据分别对应不同的appKey
//机器人的账户 不进行wapper判断
// TODO 这块逻辑不通用新的数据userIdWrapper会增加ouId历史的数据只会补ouId不会把userIdWrapper补上ouId不应该放在通用的query里面
if (!AppTypeEnum.SYSTEM.getCode().equals(accountQuery.getAppType()) && StringUtils.isEmpty(accountQuery.getImAccount())) {
String userIdWrapper = buildUserIdWrapper(accountQuery.getAccountId(), accountQuery.getAppType());
String userIdWrapper = buildUserIdWrapper(accountQuery.getAccountId(), accountQuery.getAppType(), null);
accountQuery.setImAccount(userIdWrapper);
}
List<AccountRegister> accountRegisterList = accountRegisterDao.lambdaQuery().eq(AccountRegister::getIsDelete, 0)
@ -316,13 +330,28 @@ public class AccountService {
} else {
target = AppTypeEnum.values();
}
// TODO 待优化兼容移动端因为原接口会返回cmcmp两个端的账号但是cmp这个端查询的时候如果有传ouId则返回对应ouId的账号
for (AppTypeEnum appTypeEnum : target) {
AccountQuery accountQuery = new AccountQuery();
accountQuery.setAppType(appTypeEnum.getCode());
accountQuery.setAccountId(accountAbsentQuery.getPersonId());
List<UserAccountResp> userAccountRespList = queryAccountInfo(accountQuery);
if (CollectionUtils.isNotEmpty(userAccountRespList)) {
userAccountAll.addAll(userAccountRespList);
AccountRegisterService.ListAccountRegisterParam listAccountRegisterParam = AccountRegisterService.ListAccountRegisterParam.builder()
.appType(appTypeEnum.getCode())
.accountId(accountAbsentQuery.getPersonId())
.build();
if (appTypeEnum == AppTypeEnum.CMP) {
listAccountRegisterParam.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
}
List<AccountRegister> accountRegisters = accountRegisterService.list(listAccountRegisterParam);
if (CollectionUtils.isNotEmpty(accountRegisters)) {
userAccountAll.addAll(accountRegisters.stream()
.map(accountRegister -> {
UserAccountResp userAccountResp = new UserAccountResp();
userAccountResp.setImAccount(accountRegister.getImAccount());
userAccountResp.setUserId(accountRegister.getAccountId());
userAccountResp.setAppType(accountRegister.getAppType());
userAccountResp.setToken(accountRegister.getToken());
return userAccountResp;
})
.collect(Collectors.toList()));
} else {
if (appTypeEnum == AppTypeEnum.SYSTEM) {
//log.warn("PersonId=[" + accountAbsentQuery.getPersonId() + "],不允许创建AppType=[system]账户!");
@ -332,6 +361,7 @@ public class AccountService {
userAccountReq.setAppType(appTypeEnum.getCode());
userAccountReq.setUserId(accountAbsentQuery.getPersonId());
userAccountReq.setNickName(DEFAULT_NICK_NAME + accountAbsentQuery.getPersonId());
userAccountReq.setOrganizationalUnitId(accountAbsentQuery.getOrganizationalUnitId());
UserAccountResp accountResp = generateAccount(userAccountReq, iNotifyService);
if (StringUtils.isEmpty(accountResp.getToken())) {
continue;

View File

@ -117,6 +117,7 @@ public class MessageService {
List<MessageDispatchResp> messageDispatchRespList;
log.info("sendMessage发送消息,msgReceiverLimit:{},msgReceiverThreshold:{},msgSendPersonOfOneBatch:{}"
, msgReceiverLimit, msgReceiverThreshold, msgSendPersonOfOneBatch);
// 异步发送消息1同步发送接口性能不好2第三方接口有接口限流处理
int personCount = messageInfo.getToPersonIdList().size();
//小于阈值就走单个IM消息发送接口
if (personCount <= msgReceiverThreshold) {

View File

@ -0,0 +1,40 @@
package cn.axzo.im.service.impl;
import cn.axzo.bfs.support.data.converter.PageConverter;
import cn.axzo.bfs.support.data.mysql.QueryWrapperHelper;
import cn.axzo.im.dao.mapper.AccountRegisterMapper;
import cn.axzo.im.entity.AccountRegister;
import cn.axzo.im.service.AccountRegisterService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service
public class AccountRegisterServiceImpl extends ServiceImpl<AccountRegisterMapper, AccountRegister>
implements AccountRegisterService {
@Override
public Page<AccountRegister> page(PageAccountRegisterParam param) {
QueryWrapper<AccountRegister> wrapper = QueryWrapperHelper.fromBean(param, AccountRegister.class);
wrapper.eq("is_delete", 0);
return this.page(PageConverter.convertToMybatis(param, AccountRegister.class), wrapper);
}
@Override
public List<AccountRegister> list(ListAccountRegisterParam param) {
return PageConverter.drainAll(pageNumber -> {
PageAccountRegisterParam pageParam = PageAccountRegisterParam.builder().build();
BeanUtils.copyProperties(param, pageParam);
pageParam.setPageNumber(pageNumber);
pageParam.setPageSize(500);
return page(pageParam);
});
}
}

View File

@ -98,3 +98,5 @@ CREATE TABLE IF NOT EXISTS im_message_history
create index idx_im_from_account
on im_message_history (from_account);
ALTER TABLE im_account_register ADD COLUMN organizational_unit_id bigint null comment '企业id';