Merge branch 'dev' into test

# Conflicts:
#	common-lib/src/main/java/cn/axzo/foundation/result/ApiResult.java
#	web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/AppCenterImpl.java
#	web-support-lib/src/main/java/cn/axzo/foundation/web/support/config/ApiResultWrapper.java
#	web-support-lib/src/main/java/cn/axzo/foundation/web/support/context/AxContextInterceptor.java
#	web-support-lib/src/main/java/cn/axzo/foundation/web/support/rpc/RpcClientImpl.java
This commit is contained in:
zengxiaobo 2024-07-05 09:19:13 +08:00
commit 75f6c849b5
7 changed files with 183 additions and 81 deletions

View File

@ -1,10 +1,12 @@
package cn.axzo.foundation.result;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.base.Strings;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;
@Data
@Builder
@ -68,4 +70,23 @@ public class ApiResult<T> {
public boolean isSuccess() {
return SUCCESS_CODE.equals(getCode());
}
/**
* 根据appId 获取标准的code
* 如果code > 100000 则认为可能已经带了appId
* 否则拼接当前的appId 到appCode中
*/
public static Integer getStandardCode(String appId, Integer code) {
if (code == null || Strings.isNullOrEmpty(appId) || SUCCESS_CODE.equals(code)) {
return code;
}
if (code >= 1000000) {
return code;
}
try {
return Integer.parseInt(StringUtils.right(StringUtils.getDigits(appId), 4) + StringUtils.leftPad(code + "", 3, "0"));
} catch (Exception ex) {
return code;
}
}
}

View File

@ -0,0 +1,91 @@
package cn.axzo.foundation.web.support.apps;
import cn.axzo.foundation.page.PageResp;
import cn.axzo.foundation.util.PageUtils;
import cn.axzo.foundation.web.support.TimerRefreshCache;
import cn.axzo.foundation.web.support.rpc.RequestProxy;
import cn.axzo.foundation.web.support.rpc.RpcClient;
import cn.axzo.foundation.web.support.rpc.RpcClientImpl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Slf4j
public class AppCenterImpl implements AppCenter {
private static final long INITIAL_DELAY_MILLIS = 0;
private static final long REFRESH_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(60);
private TimerRefreshCache<Map<String, App>> appCache;
private RpcClient rpcClient;
private String debugHost;
private String listAppUrl;
private Map<String, String> debugAppRoutes;
/**
* executor 为刷新的executor
* debugHost: 本地调试时 连接的环境 eg: http://pre-api.axzo.cn
* debugAppRoutes: 本地调试时部分appName 与apiSix配置的路由不一致,
* eg data-collection 对应的测试地址http://test-api.axzo.cn/dataCollection 而非http://test-api.axzo.cn/data-collection
* 因此该类映射需要单独处理
*/
@Builder
public AppCenterImpl(ScheduledThreadPoolExecutor executor,
String debugHost,
Map<String, String> debugAppRoutes) {
Objects.requireNonNull(executor);
this.rpcClient = RpcClientImpl.builder().requestProxy(RequestProxy.SIMPLE_PROXY).build();
this.appCache = new TimerRefreshCache<>("appCenterCache", INITIAL_DELAY_MILLIS,
REFRESH_INTERVAL_MILLIS, executor, this::loadApps);
this.debugHost = debugHost;
this.listAppUrl = Optional.ofNullable(Strings.emptyToNull(debugHost)).map(e -> e + "/apisix-plat").orElse("http://apisix-plat:8080")
+ "/api/v1/upstream/list";
this.debugAppRoutes = Optional.ofNullable(debugAppRoutes).orElse(ImmutableMap.of());
}
private Map<String, App> loadApps() {
List<App> apps = PageUtils.drainAll(page -> {
JSONObject result = rpcClient.request()
.url(listAppUrl)
.content(new JSONObject()
.fluentPut("pageNum", page)
.fluentPut("pageSize", 50))
.clz(JSONObject.class)
.post();
//结构不一样, 转换为自己的pageResp
return PageResp.<App>builder()
.total(result.getLong("totalCount"))
.current(result.getLong("page"))
.size(result.getLong("pageSize"))
.data(Optional.ofNullable(result.getJSONArray("list")).orElse(new JSONArray())
.toJavaList(App.class))
.build();
});
if (!Strings.isNullOrEmpty(debugHost)) {
apps.forEach(e -> e.setHost(debugHost + "/" + debugAppRoutes.getOrDefault(e.getAppName(), e.getAppName())));
}
return apps.stream()
.collect(Collectors.toMap(App::getAppName, e -> e, (o, n) -> n));
}
@Override
public List<App> listAll() {
return new ArrayList<>(appCache.get().values());
}
@Override
public App getByName(String appName) {
return appCache.get().get(appName);
}
}

View File

@ -4,7 +4,6 @@ import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.result.ResultCode;
import cn.axzo.foundation.web.support.AppRuntime;
import lombok.Builder;
import org.apache.commons.lang3.StringUtils;
import java.util.Optional;
@ -22,10 +21,6 @@ class ApiResultWrapper<T> extends ApiResult<T> {
this.httpCode = Optional.ofNullable(result.getHttpCode()).orElse(ResultCode.DEFAULT_HTTP_ERROR_CODE);
//没有appId时沿用之前的拼装逻辑"${appName}_${httpCode}${ErrorCode}"
//存在appId时拼装逻辑调整为"${appId}${ErrorCode}"
if (!StringUtils.isEmpty(appRuntime.getAppId()) && StringUtils.length(result.getCode() + "") < 6) {
this.code = Integer.parseInt(StringUtils.getDigits(appRuntime.getAppId() + StringUtils.leftPad(result.getCode() + "", 3, "0")));
}
this.code = getStandardCode(appRuntime.getAppId(), result.getCode());
}
}

View File

@ -83,8 +83,8 @@ public class DefaultWebMvcConfig extends DelegatingWebMvcConfiguration implement
@Value("${web.serialize.browser-compatible.enabled:true}")
private Boolean browserCompatible;
@Value("${web.context.supplier.url:}")
private String contextSupplierUrl;
@Value("${web.debug.host:}")
private String debugHost;
/**
* 自定义对返回的errorMsg进行处理
@ -195,7 +195,7 @@ public class DefaultWebMvcConfig extends DelegatingWebMvcConfiguration implement
registry.addInterceptor(e);
});
}
registry.addInterceptor(new AxContextInterceptor(appRuntime, contextSupplierUrl));
registry.addInterceptor(new AxContextInterceptor(appRuntime, debugHost));
}
/**

View File

@ -8,7 +8,7 @@ import cn.axzo.foundation.web.support.rpc.RequestParams;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import lombok.RequiredArgsConstructor;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
@ -24,11 +24,17 @@ import java.util.Optional;
* 3. 非prd环境支持通过token到puge换
*/
@Slf4j
@RequiredArgsConstructor
public class AxContextInterceptor implements HandlerInterceptor {
private final AppRuntime appRuntime;
private final String supplierHost;
private final String supplierUrl;
@Builder
public AxContextInterceptor(AppRuntime appRuntime, String debugHost) {
this.appRuntime = appRuntime;
this.supplierUrl = Optional.ofNullable(Strings.emptyToNull(debugHost)).map(e -> e + "/pudge")
.orElse("http://pudge:10099") + "/webApi/oauth/apisix/authentication";
}
private final static HttpClient HTTP_CLIENT = OkHttpClientImpl.builder().build();
@ -83,18 +89,18 @@ public class AxContextInterceptor implements HandlerInterceptor {
return JSONObject.parseObject(StringUtils.removeStart(authorization, "Raw "), AxContext.class);
}
if (authorization.startsWith("Bearer")) {
if (Strings.isNullOrEmpty(supplierHost)) {
if (Strings.isNullOrEmpty(supplierUrl)) {
return null;
}
String result;
try {
result = HTTP_CLIENT.get(supplierHost, RequestParams.FormParams.builder()
result = HTTP_CLIENT.get(supplierUrl, RequestParams.FormParams.builder()
.headers(ImmutableMap.of("Authorization", authorization,
"terminal", request.getHeader("terminal")))
.logEnable(true)
.build());
} catch (Exception ex) {
log.error("获取登陆信息错误, url = {}, authorization = {}", supplierHost, authorization, ex);
log.error("获取登陆信息错误, url = {}, authorization = {}", supplierUrl, authorization, ex);
return null;
}
//这里是一个非标准返回

View File

@ -2,6 +2,12 @@ package cn.axzo.foundation.web.support.rpc;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.web.support.context.AxContext;
import cn.axzo.foundation.web.support.interceptors.CallerAppInterceptor;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@ -11,6 +17,7 @@ import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@Slf4j
public class RpcClientImpl implements RpcClient {
@ -18,15 +25,36 @@ public class RpcClientImpl implements RpcClient {
@Getter
protected HttpClient httpClient;
protected RequestProxy requestProxy;
private static final Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo");
// XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的在http/2协议下会透传失败
private static final TreeSet<String> CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream()
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
// 将axzo-开头的header复制到请求的下一跳
private static final String AZXO_HEADER_PREFIX = "axzo-";
private static final List<Supplier<Map<String, String>>> DEFAULT_HEADER_SUPPLIERS = ImmutableList.of(
() -> AxContext.getRequest()
.map(request -> Collections.list(request.getHeaderNames()).stream()
// 通过http2协议的请求默认会把大写转成小写"_SESSION_OBJECT" -> "_session_object"导致session无法透传下一跳需要通过session验签会失败
.filter(p -> CASE_INSENSITIVE_AXZO_HEADERS.contains(p) || p.startsWith(AZXO_HEADER_PREFIX))
.collect(Collectors.toMap(e -> e, e -> request.getHeader(e), (oldValue, newValue) -> newValue)))
.orElse(Collections.emptyMap()),
//设置callerApp
() -> AxContext.getRequest().map(e -> {
Object caller = e.getAttribute(CallerAppInterceptor.NEXT_HTTP_REQUEST_ATTRIBUTE);
return ImmutableMap.of(CallerAppInterceptor.HTTP_REQUEST_HEADER, JSONObject.toJSONString(caller));
}).orElse(ImmutableMap.of())
);
@Builder
public RpcClientImpl(RequestProxy requestProxy, HttpClient.Config config, Supplier<Map<String, String>> requestHeaderSupplier) {
public RpcClientImpl(RequestProxy requestProxy,
HttpClient.Config config,
Supplier<Map<String, String>> requestHeaderSupplier) {
this.requestProxy = Optional.ofNullable(requestProxy).orElse(RequestProxy.SIMPLE_PROXY);
this.httpClient = OkHttpClientImpl.builder()
.config(config)
.build();
customHeaderSupplier = requestHeaderSupplier;
this.customHeaderSupplier = requestHeaderSupplier;
}
@Override

View File

@ -1,10 +1,9 @@
package cn.axzo.foundation.web.support.rpc;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.web.support.rpc.exception.RpcNetworkException;
import cn.axzo.foundation.web.support.apps.App;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -12,13 +11,9 @@ import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* 一个RpcClient的wrapper. 可以方便的在RpcClient和其他Client切换.
@ -26,88 +21,54 @@ import java.util.stream.Collectors;
*/
@Slf4j
public class RpcClientWrapper implements RpcClient {
private final Supplier<List<String>> hostsResolver;
private Supplier<String> hostResolver;
private final Supplier<App> appResolver;
private final RpcClient normalRpcClient;
@Getter
private volatile RpcClient activeRpcClient;
private AtomicInteger roundRobinIndex = new AtomicInteger(0);
private RpcClient normalRpcClient;
final Cache<String, String> excludeHostCache = CacheBuilder.
newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES).build();
private RpcClient activeRpcClient;
@lombok.Builder
public RpcClientWrapper(RpcClient normalRpcClient,
Supplier<String> hostResolver,
Supplier<List<String>> hostsResolver,
Supplier<App> appResolver,
ClientType defaultClientType) {
Preconditions.checkArgument(normalRpcClient != null, "normalRpcClient不能为空");
if (normalRpcClient != null) {
Preconditions.checkArgument(hostResolver != null || hostsResolver != null, "如果是normalRpcClient, hostResolver必须有值");
Preconditions.checkArgument(normalRpcClient instanceof RpcClientImpl, "normalRpcClient必须是RpcClientImpl.class实现");
}
Preconditions.checkArgument(appResolver != null, "如果是normalRpcClient, appResolver必须有值");
Preconditions.checkArgument(normalRpcClient instanceof RpcClientImpl, "normalRpcClient必须是RpcClientImpl.class实现");
this.normalRpcClient = normalRpcClient;
this.hostResolver = hostResolver;
this.hostsResolver = hostsResolver;
this.appResolver = appResolver;
activate(Optional.ofNullable(defaultClientType).orElse(ClientType.NORMAL));
}
@Override
public <T> T execute(HttpClient.HttpMethod httpMethod, String path, RequestParams requestParams, Function<String, ApiResult<T>> converter) {
String host = resolveHost();
try {
return activeRpcClient.execute(httpMethod, resolvePath(host, path), requestParams, converter);
} catch (RpcNetworkException e) {
excludeHostCache.put(host, "1");
log.warn("get network exception, add host {} to invalid pool and cooldown 1 minutes", host);
return activeRpcClient.execute(httpMethod, resolvePath(resolveHost(), path), requestParams, converter);
} catch (BusinessException e) {
int code = Integer.parseInt(e.getErrorCode());
Integer standardCode = ApiResult.getStandardCode(appResolver.get().getAppId(), code);
if (standardCode != code) {
throw new BusinessException(standardCode + "", e.getErrorMsg());
}
throw e;
}
}
@Override
public <R> R execute(HttpClient.HttpMethod httpMethod, String path, RequestParams requestParams, BiFunction<byte[], Map<String, List<String>>, R> responder) {
String host = resolveHost();
try {
return activeRpcClient.execute(httpMethod, resolvePath(host, path), requestParams, responder);
} catch (RpcNetworkException e) {
excludeHostCache.put(host, "1");
log.warn("get network exception, add host {} to invalid pool and cooldown 1 minutes", host);
return activeRpcClient.execute(httpMethod, resolvePath(resolveHost(), path), requestParams, responder);
} catch (BusinessException e) {
int code = Integer.parseInt(e.getErrorCode());
Integer standardCode = ApiResult.getStandardCode(appResolver.get().getAppId(), code);
if (standardCode != code) {
throw new BusinessException(standardCode + "", e.getErrorMsg());
}
throw e;
}
}
protected String resolveHost() {
if (hostResolver == null && hostsResolver == null) {
return StringUtils.EMPTY;
}
// 如果hostResolver不为空, 且path没有包含protocol与host. 则尝试将host与path拼接
if (hostResolver != null) {
return hostResolver.get();
}
List<String> hosts = hostsResolver.get();
// 如果只有 1 host没必要做选择
if (hosts.size() == 1) {
return hosts.get(0);
}
ConcurrentMap<String, String> excludeHosts = excludeHostCache.asMap();
List<String> availables = excludeHosts.size() == 0 ? hosts : hosts.stream().filter(i -> !excludeHosts.containsKey(i)).collect(Collectors.toList());
if (availables.size() == 0) {
return hosts.get(0);
} else if (availables.size() == 1) {
return availables.get(0);
}
// 使用 round robin 算法选择可用节点
int index = roundRobinIndex.getAndAccumulate(1, (x, y) -> {
if ((x + y) >= availables.size()) {
return 0;
}
return x + y;
});
return availables.get(index);
return appResolver.get().getHost();
}
protected String resolvePath(String host, String path) {
@ -126,7 +87,7 @@ public class RpcClientWrapper implements RpcClient {
public RpcClient activate(ClientType type) {
if (type == ClientType.NORMAL) {
Preconditions.checkState(normalRpcClient != null);
Preconditions.checkState(hostResolver != null || hostsResolver != null);
Preconditions.checkState(appResolver != null);
activeRpcClient = normalRpcClient;
} else {
throw new UnsupportedOperationException("unsupported clientType");