Merge remote-tracking branch 'origin/feature/REQ-3057' into feature/REQ-3057
This commit is contained in:
commit
be26aa75e9
@ -0,0 +1,143 @@
|
||||
package cn.axzo.im.job;
|
||||
|
||||
import cn.axzo.basics.common.constant.enums.TableIsDeleteEnum;
|
||||
import cn.axzo.basics.profiles.api.UserProfileServiceApi;
|
||||
import cn.axzo.basics.profiles.dto.basic.BasicDto;
|
||||
import cn.axzo.basics.profiles.dto.basic.PersonProfileDto;
|
||||
import cn.axzo.im.center.common.enums.AccountTypeEnum;
|
||||
import cn.axzo.im.channel.netease.client.NimClient;
|
||||
import cn.axzo.im.channel.netease.dto.GetAccountInfoRequest;
|
||||
import cn.axzo.im.channel.netease.dto.GetAccountInfoResponse;
|
||||
import cn.axzo.im.channel.netease.dto.UpdateAccountInfoRequest;
|
||||
import cn.axzo.im.channel.netease.dto.UpdateAccountInfoResponse;
|
||||
import cn.axzo.im.dao.repository.AccountRegisterDao;
|
||||
import cn.axzo.im.entity.AccountRegister;
|
||||
import cn.axzo.im.utils.BizAssertions;
|
||||
import cn.axzo.maokai.api.util.Ref;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.google.common.util.concurrent.RateLimiter;
|
||||
import com.xxl.job.core.biz.model.ReturnT;
|
||||
import com.xxl.job.core.handler.annotation.XxlJob;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class UpdateImAccountPersonInfoJob {
|
||||
|
||||
private final AccountRegisterDao accountRegisterDao;
|
||||
private final NimClient nimClient;
|
||||
private final UserProfileServiceApi userProfileServiceApi;
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
@XxlJob("updateImAccountPersonInfoJob")
|
||||
public ReturnT<String> execute(String jsonStr) throws Exception {
|
||||
try {
|
||||
log.info("job start");
|
||||
executeImpl();
|
||||
log.info("job end");
|
||||
return ReturnT.SUCCESS;
|
||||
} catch (Exception e) {
|
||||
log.warn("job failed", e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
private void executeImpl() {
|
||||
RateLimiter rateLimiter = RateLimiter.create(60);
|
||||
Supplier<List<AccountRegister>> cursor = accountsCursor();
|
||||
int count = 0;
|
||||
for (List<AccountRegister> accounts = cursor.get(); !accounts.isEmpty(); accounts = cursor.get()) {
|
||||
log.info("update account info, count: {}", count += accounts.size());
|
||||
GetAccountInfoRequest getAccountInfoRequest = new GetAccountInfoRequest();
|
||||
getAccountInfoRequest.setImAccountIds(accounts.stream()
|
||||
.map(AccountRegister::getImAccount)
|
||||
.distinct()
|
||||
.collect(toList()));
|
||||
GetAccountInfoResponse getAccountInfoResponse = nimClient.getAccountInfo(getAccountInfoRequest);
|
||||
if (!getAccountInfoResponse.isSuccess()) {
|
||||
log.warn("get account info failed, {}", getAccountInfoResponse);
|
||||
continue;
|
||||
}
|
||||
List<Long> personIds = accounts.stream()
|
||||
.map(AccountRegister::getAccountId)
|
||||
.filter(Objects::nonNull)
|
||||
.filter(NumberUtils::isCreatable)
|
||||
.map(Long::parseLong)
|
||||
.distinct()
|
||||
.collect(toList());
|
||||
Map<Long, PersonProfileDto> id2PersonProfile = BizAssertions.assertResponse(
|
||||
userProfileServiceApi.getPersonProfiles(personIds)).stream()
|
||||
.collect(Collectors.toMap(BasicDto::getId, i -> i));
|
||||
for (AccountRegister account : accounts) {
|
||||
GetAccountInfoResponse.AccountInfo accountInfo = getAccountInfoResponse
|
||||
.findImAccountInfo(account.getImAccount()).orElse(null);
|
||||
if (accountInfo == null) continue;
|
||||
|
||||
JSONObject ext = accountInfo.getOrCreateExtObject();
|
||||
ext.put("personId", account.getAccountId());
|
||||
|
||||
UpdateAccountInfoRequest updateAccountInfoRequest = new UpdateAccountInfoRequest();
|
||||
updateAccountInfoRequest.setImAccountId(accountInfo.getAccid());
|
||||
updateAccountInfoRequest.addExt(ext);
|
||||
|
||||
if (account.getAccountId() != null && NumberUtils.isCreatable(account.getAccountId())) {
|
||||
long personId = Long.parseLong(account.getAccountId());
|
||||
PersonProfileDto person = id2PersonProfile.get(personId);
|
||||
if (person != null) {
|
||||
if (StringUtils.isNotBlank(person.getAvatarUrl())) {
|
||||
updateAccountInfoRequest.setIcon(person.getAvatarUrl());
|
||||
}
|
||||
if (StringUtils.isNotBlank(person.getRealName())) {
|
||||
updateAccountInfoRequest.setName(person.getRealName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rateLimiter.acquire();
|
||||
UpdateAccountInfoResponse updateAccountInfoResponse = nimClient
|
||||
.updateAccountInfo(updateAccountInfoRequest);
|
||||
if (updateAccountInfoResponse.isSuccess()) {
|
||||
log.info("update account info success, {}", updateAccountInfoResponse);
|
||||
} else {
|
||||
log.warn("update account info failed, {}", updateAccountInfoResponse);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Supplier<List<AccountRegister>> accountsCursor() {
|
||||
Ref<Long> maxId = Ref.create(0L);
|
||||
return () -> {
|
||||
List<AccountRegister> accounts = accountRegisterDao
|
||||
.lambdaQuery()
|
||||
.eq(AccountRegister::getAccountType, AccountTypeEnum.USER.getCode())
|
||||
.eq(AccountRegister::getIsDelete, TableIsDeleteEnum.NORMAL.value)
|
||||
.gt(AccountRegister::getId, maxId.get())
|
||||
.orderByAsc(AccountRegister::getId)
|
||||
.last("LIMIT 200")
|
||||
.list();
|
||||
if (!accounts.isEmpty()) {
|
||||
maxId.set(accounts.get(accounts.size() - 1).getId());
|
||||
log.info("update cursor id to: {}", maxId.get());
|
||||
}
|
||||
return accounts;
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
@ -0,0 +1,25 @@
|
||||
package cn.axzo.im.job;
|
||||
|
||||
import cn.axzo.im.Application;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.context.SpringBootTest;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
/**
|
||||
* @author yanglin
|
||||
*/
|
||||
@SpringBootTest(classes = Application.class)
|
||||
@RequiredArgsConstructor(onConstructor_ = @Autowired)
|
||||
class UpdateImAccountPersonInfoJobTest {
|
||||
|
||||
private final UpdateImAccountPersonInfoJob updateImAccountPersonInfoJob;
|
||||
|
||||
@Test
|
||||
void exec() throws Exception {
|
||||
updateImAccountPersonInfoJob.execute(null);
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user