feat: 获取app & 优化apiResult.code

This commit is contained in:
zengxiaobo 2024-07-04 15:48:19 +08:00
parent e520e2d550
commit 1f8c2eb5d0
9 changed files with 298 additions and 55 deletions

View File

@ -1,10 +1,12 @@
package cn.axzo.foundation.result;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
@Data
@Builder
@ -68,4 +70,23 @@ public class ApiResult<T> {
public boolean isSuccess() {
return SUCCESS_CODE.equals(getCode());
}
/**
* 根据appId 获取标准的code
* 如果code > 100000 则认为可能已经带了appId
* 否则拼接当前的appId 到appCode中
*/
public Integer getStandardCode(String appId) {
if (code == null || Strings.isNullOrEmpty(appId) || isSuccess()) {
return code;
}
if (code >= 1000000) {
return code;
}
try {
return Integer.parseInt(StringUtils.right(StringUtils.getDigits(appId), 4) + StringUtils.leftPad(code + "", 3, "0"));
} catch (Exception ex) {
return code;
}
}
}

View File

@ -0,0 +1,96 @@
package cn.axzo.foundation.web.support;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* 间隔一定时间刷新的缓存
* <pre>
* 1初次调用时直接获取需要缓存的内容并缓存到cache中
* 2每间隔intervalMillis尝试刷新缓存
* 如果刷新缓存成功更新cache缓存的值
* 如果刷新缓存失败则不更新
* </pre>
*
* @param <T>
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class TimerRefreshCache<T> {
private ScheduledThreadPoolExecutor executor;
private String name;
private Long initialDelayMillis;
private Long intervalMillis;
private Supplier<T> refresher;
private Cache<String, T> cache;
/** 缓存是否以初始化如果为false会进行主动加载进行初始化 */
private AtomicBoolean initialized = new AtomicBoolean(false);
public TimerRefreshCache(String name, Long initialDelayMillis, Long intervalMillis,
ScheduledThreadPoolExecutor executor, Supplier<T> refresher) {
Preconditions.checkArgument(executor != null);
Preconditions.checkArgument(refresher != null);
Preconditions.checkArgument(intervalMillis != null);
this.name = name;
this.initialDelayMillis = initialDelayMillis;
this.intervalMillis = intervalMillis;
this.refresher = refresher;
// 该方法获取的往往是完整的缓存内容所以maxSize暂定为2即可
long maximumSize = 2L;
cache = CacheBuilder.newBuilder()
.maximumSize(maximumSize)
.build();
this.executor = executor;
startTimer();
}
private void startTimer() {
executor.scheduleAtFixedRate(this::doRefresh, initialDelayMillis, intervalMillis, TimeUnit.MILLISECONDS);
}
private boolean doRefresh() {
try {
T value = refresher.get();
// 当返回为null表示Value没有变化这时不用刷新Cache
if (value == null) {
return true;
}
cache.put(name, value);
if (log.isDebugEnabled()) {
log.debug("{} refreshed, new value={}", name, value);
}
} catch (Throwable e) {
log.error("{} refresh failed", name, e);
return false;
}
return true;
}
public T get() {
// 如果没有加载过且没有缓存手动加载一次
if (!initialized.getAndSet(true) && cache.size() == 0) {
//如果第一次刷新缓存失败. 则重置initialized. 下次请求进入时再次尝试更新cache
if (!doRefresh()) {
initialized.set(false);
}
}
return cache.getIfPresent(name);
}
// XXX: for unittest
public void put(T configs) {
log.info("{},set->config={}", TimerRefreshCache.this, configs);
cache.put(name, configs);
}
}

View File

@ -0,0 +1,40 @@
package cn.axzo.foundation.web.support.apps;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.RegExUtils;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class App {
Long id;
Long upstreamId;
String upstreamName;
String appName;
Integer port;
String host;
public String getHost() {
return Optional.ofNullable(host).orElse(String.format("http://%s:%s", appName, port));
}
/**
* 根据upstreamId的后4位生产appId, 如果后四位位0则替换为9
* eg 20000000000000124 -> 9124, 20000000000003078 -> 3078
*/
public String getAppId() {
String right = StringUtils.right(String.valueOf(upstreamId), 4);
if (!right.startsWith("0")) {
return right;
}
return RegExUtils.replaceFirst(right, "0", "9");
}
}

View File

@ -0,0 +1,9 @@
package cn.axzo.foundation.web.support.apps;
import java.util.List;
public interface AppCenter {
List<App> listAll();
App getByName(String appName);
}

View File

@ -0,0 +1,77 @@
package cn.axzo.foundation.web.support.apps;
import cn.axzo.foundation.util.PageUtils;
import cn.axzo.foundation.web.support.TimerRefreshCache;
import cn.axzo.foundation.web.support.rpc.RequestProxy;
import cn.axzo.foundation.web.support.rpc.RpcClient;
import cn.axzo.foundation.web.support.rpc.RpcClientImpl;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
public class AppCenterImpl implements AppCenter {
private static final long INITIAL_DELAY_MILLIS = 0;
private static final long REFRESH_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(60);
private TimerRefreshCache<Map<String, App>> appCache;
private RpcClient rpcClient;
private String debugHost;
private Map<String, String> debugAppRoutes;
/**
* executor 为刷新的executor
* debugHost: 本地调试时 连接的环境 eg: http://pre-api.axzo.cn
* debugAppRoutes: 本地调试时部分appName 与apiSix配置的路由不一致,
* eg data-collection 对应的测试地址http://test-api.axzo.cn/dataCollection 而非http://test-api.axzo.cn/data-collection
* 因此该类映射需要单独处理
*/
@Builder
public AppCenterImpl(ScheduledThreadPoolExecutor executor,
String debugHost,
Map<String, String> debugAppRoutes) {
Objects.requireNonNull(executor);
this.rpcClient = RpcClientImpl.builder().requestProxy(RequestProxy.SIMPLE_PROXY).build();
this.appCache = new TimerRefreshCache<>("appCenterCache", INITIAL_DELAY_MILLIS,
REFRESH_INTERVAL_MILLIS, executor, this::loadAllAppHosts);
this.debugHost = debugHost;
this.debugAppRoutes = Optional.ofNullable(debugAppRoutes).orElse(ImmutableMap.of());
}
private Map<String, App> loadAllAppHosts() {
String host = Optional.ofNullable(debugHost).map(e -> e + "/apisix-plat").orElse("http://apisix-plat:8080");
List<App> apps = PageUtils.drainAll(page -> rpcClient.request()
.url(host + "/api/v1/upstream/list")
.content(new JSONObject()
.fluentPut("pageNum", page)
.fluentPut("pageSize", 50))
.clz(App.class)
.postAndGetPage());
if (!Strings.isNullOrEmpty(debugHost)) {
apps.forEach(e -> e.setHost(debugHost + "/" + debugAppRoutes.getOrDefault(e.getAppName(), e.getAppName())));
}
return apps.stream()
.collect(Collectors.toMap(App::getAppName, e -> e, (o, n) -> n));
}
@Override
public List<App> listAll() {
return new ArrayList<>(appCache.get().values());
}
@Override
public App getByName(String appName) {
return appCache.get().get(appName);
}
}

View File

@ -4,7 +4,6 @@ import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.web.support.AppRuntime;
import lombok.Builder;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
@ -22,10 +21,6 @@ class ApiResultWrapper<T> extends ApiResult<T> {
this.httpCode = Optional.ofNullable(result.getHttpCode()).orElse(ResultCode.DEFAULT_HTTP_ERROR_CODE);
//没有appId时沿用之前的拼装逻辑"${appName}_${httpCode}${ErrorCode}"
//存在appId时拼装逻辑调整为"${appId}${ErrorCode}"
if (!StringUtils.isEmpty(appRuntime.getAppId()) && StringUtils.length(result.getCode() + "") < 6) {
this.code = Integer.parseInt(StringUtils.getDigits(appRuntime.getAppId() + StringUtils.leftPad(result.getCode() + "", 3, "0")));
}
this.code = result.getStandardCode(appRuntime.getAppId());
}
}

View File

@ -8,7 +8,7 @@ import cn.axzo.foundation.web.support.rpc.RequestParams;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import lombok.RequiredArgsConstructor;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
@ -24,11 +24,17 @@ import java.util.Optional;
* 3. 非prd环境支持通过token到puge换
*/
@Slf4j
@RequiredArgsConstructor
public class AxContextInterceptor implements HandlerInterceptor {
private final AppRuntime appRuntime;
private final String supplierHost;
private final String supplierUrl;
@Builder
public AxContextInterceptor(AppRuntime appRuntime, String debugHost) {
this.appRuntime = appRuntime;
this.supplierUrl = Optional.ofNullable(Strings.emptyToNull(debugHost))
.orElse("http://pudge:10099") + "/webApi/oauth/apisix/authentication";
}
private final static HttpClient HTTP_CLIENT = OkHttpClientImpl.builder().build();
@ -83,18 +89,18 @@ public class AxContextInterceptor implements HandlerInterceptor {
return JSONObject.parseObject(StringUtils.removeStart(authorization, "Raw "), AxContext.class);
}
if (authorization.startsWith("Bearer")) {
if (Strings.isNullOrEmpty(supplierHost)) {
if (Strings.isNullOrEmpty(supplierUrl)) {
return null;
}
String result;
try {
result = HTTP_CLIENT.get(supplierHost, RequestParams.FormParams.builder()
result = HTTP_CLIENT.get(supplierUrl, RequestParams.FormParams.builder()
.headers(ImmutableMap.of("Authorization", authorization,
"terminal", request.getHeader("terminal")))
.logEnable(true)
.build());
} catch (Exception ex) {
log.error("获取登陆信息错误, url = {}, authorization = {}", supplierHost, authorization, ex);
log.error("获取登陆信息错误, url = {}, authorization = {}", supplierUrl, authorization, ex);
return null;
}
//这里是一个非标准返回

View File

@ -2,22 +2,18 @@ package cn.axzo.foundation.web.support.rpc;
import cn.axzo.foundation.page.PageResp;
import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.web.support.context.AxContext;
import cn.axzo.foundation.web.support.interceptors.CallerAppInterceptor;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Rpc调用客户端接口
@ -101,40 +97,9 @@ public interface RpcClient {
return delete(url, typeReference, requestParams);
}
default <T> ApiResult<T> convert(String body, Class<T> clz) {
return JSONObject.parseObject(body, new TypeReference<ApiResult<T>>(clz) {
});
}
Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo");
// XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的在http/2协议下会透传失败
TreeSet<String> CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream()
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
// 将axzo-开头的header复制到请求的下一跳
String AZXO_HEADER_PREFIX = "axzo-";
List<Supplier<Map<String, String>>> DEFAULT_HEADER_SUPPLIERS = ImmutableList.of(
() -> AxContext.getRequest()
.map(request -> Collections.list(request.getHeaderNames()).stream()
// 通过http2协议的请求默认会把大写转成小写"_SESSION_OBJECT" -> "_session_object"导致session无法透传下一跳需要通过session验签会失败
.filter(p -> CASE_INSENSITIVE_AXZO_HEADERS.contains(p) || p.startsWith(AZXO_HEADER_PREFIX))
.collect(Collectors.toMap(e -> e, e -> request.getHeader(e), (oldValue, newValue) -> newValue)))
.orElse(Collections.emptyMap()),
//设置callerApp
() -> AxContext.getRequest().map(e -> {
Object caller = e.getAttribute(CallerAppInterceptor.NEXT_HTTP_REQUEST_ATTRIBUTE);
return ImmutableMap.of(CallerAppInterceptor.HTTP_REQUEST_HEADER, JSONObject.toJSONString(caller));
}).orElse(ImmutableMap.of())
);
/**
* 使用builder模式来发起请求, 先通过request()获得builder对象. 再构建具体的请求方式
* eg: String resp = rpcClient.request().url("/my").content(Map.of("key", "value")).clz(String.class).post();
*
* @return
*/
default RpcRequestBuilder request() {
return new RpcRequestBuilder(this);
@ -142,7 +107,6 @@ public interface RpcClient {
@Slf4j
class RpcRequestBuilder {
private static final long MAX_PER_PAGE_COUNT = 1000;
private String url;
private Object content;
private Class clz;

View File

@ -2,6 +2,12 @@ package cn.axzo.foundation.web.support.rpc;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.web.support.context.AxContext;
import cn.axzo.foundation.web.support.interceptors.CallerAppInterceptor;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -11,6 +17,7 @@ import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@Slf4j
public class RpcClientImpl implements RpcClient {
@ -19,14 +26,41 @@ public class RpcClientImpl implements RpcClient {
protected HttpClient httpClient;
protected RequestProxy requestProxy;
/** 外部服务的appId */
private String appId;
private static final Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo");
// XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的在http/2协议下会透传失败
private static final TreeSet<String> CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream()
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
// 将axzo-开头的header复制到请求的下一跳
private static final String AZXO_HEADER_PREFIX = "axzo-";
private static final List<Supplier<Map<String, String>>> DEFAULT_HEADER_SUPPLIERS = ImmutableList.of(
() -> AxContext.getRequest()
.map(request -> Collections.list(request.getHeaderNames()).stream()
// 通过http2协议的请求默认会把大写转成小写"_SESSION_OBJECT" -> "_session_object"导致session无法透传下一跳需要通过session验签会失败
.filter(p -> CASE_INSENSITIVE_AXZO_HEADERS.contains(p) || p.startsWith(AZXO_HEADER_PREFIX))
.collect(Collectors.toMap(e -> e, e -> request.getHeader(e), (oldValue, newValue) -> newValue)))
.orElse(Collections.emptyMap()),
//设置callerApp
() -> AxContext.getRequest().map(e -> {
Object caller = e.getAttribute(CallerAppInterceptor.NEXT_HTTP_REQUEST_ATTRIBUTE);
return ImmutableMap.of(CallerAppInterceptor.HTTP_REQUEST_HEADER, JSONObject.toJSONString(caller));
}).orElse(ImmutableMap.of())
);
@Builder
public RpcClientImpl(RequestProxy requestProxy, HttpClient.Config config, Supplier<Map<String, String>> requestHeaderSupplier) {
public RpcClientImpl(RequestProxy requestProxy,
HttpClient.Config config,
Supplier<Map<String, String>> requestHeaderSupplier,
String appId) {
this.requestProxy = Optional.ofNullable(requestProxy).orElse(RequestProxy.SIMPLE_PROXY);
this.httpClient = OkHttpClientImpl.builder()
.config(config)
.build();
customHeaderSupplier = requestHeaderSupplier;
this.customHeaderSupplier = requestHeaderSupplier;
this.appId = appId;
}
@Override
@ -36,7 +70,8 @@ public class RpcClientImpl implements RpcClient {
Optional<String> resp = requestBySupplier(requestParams, () -> this.getHttpClient().execute(httpMethod, url, requestParams));
ApiResult<T> result = converter.apply(resp.orElse(StringUtils.EMPTY));
if (!result.isSuccess()) {
throw new BusinessException(result.getCode() + "", result.getMsg());
Integer standardCode = result.getStandardCode(appId);
throw new BusinessException(standardCode + "", result.getMsg());
}
return result.getData();
}