feat: 获取app step 3

This commit is contained in:
zengxiaobo 2024-07-04 16:33:50 +08:00
parent 8f39152834
commit 8431e71fde
6 changed files with 53 additions and 87 deletions

View File

@ -76,8 +76,8 @@ public class ApiResult<T> {
* 如果code > 100000 则认为可能已经带了appId
* 否则拼接当前的appId 到appCode中
*/
public Integer getStandardCode(String appId) {
if (code == null || Strings.isNullOrEmpty(appId) || isSuccess()) {
public static Integer getStandardCode(String appId, Integer code) {
if (code == null || Strings.isNullOrEmpty(appId) || SUCCESS_CODE.equals(code)) {
return code;
}
if (code >= 1000000) {

View File

@ -1,10 +1,12 @@
package cn.axzo.foundation.web.support.apps;
import cn.axzo.foundation.page.PageResp;
import cn.axzo.foundation.util.PageUtils;
import cn.axzo.foundation.web.support.TimerRefreshCache;
import cn.axzo.foundation.web.support.rpc.RequestProxy;
import cn.axzo.foundation.web.support.rpc.RpcClient;
import cn.axzo.foundation.web.support.rpc.RpcClientImpl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@ -42,20 +44,30 @@ public class AppCenterImpl implements AppCenter {
this.rpcClient = RpcClientImpl.builder().requestProxy(RequestProxy.SIMPLE_PROXY).build();
this.appCache = new TimerRefreshCache<>("appCenterCache", INITIAL_DELAY_MILLIS,
REFRESH_INTERVAL_MILLIS, executor, this::loadAllAppHosts);
REFRESH_INTERVAL_MILLIS, executor, this::loadApps);
this.debugHost = debugHost;
this.debugAppRoutes = Optional.ofNullable(debugAppRoutes).orElse(ImmutableMap.of());
}
private Map<String, App> loadAllAppHosts() {
private Map<String, App> loadApps() {
String host = Optional.ofNullable(debugHost).map(e -> e + "/apisix-plat").orElse("http://apisix-plat:8080");
List<App> apps = PageUtils.drainAll(page -> rpcClient.request()
List<App> apps = PageUtils.drainAll(page -> {
JSONObject result = rpcClient.request()
.url(host + "/api/v1/upstream/list")
.content(new JSONObject()
.fluentPut("pageNum", page)
.fluentPut("pageSize", 50))
.clz(App.class)
.postAndGetPage());
.clz(JSONObject.class)
.post();
//结构不一样, 转换为自己的pageResp
return PageResp.<App>builder()
.total(result.getLong("totalCount"))
.current(result.getLong("page"))
.size(result.getLong("pageSize"))
.data(Optional.ofNullable(result.getJSONArray("list")).orElse(new JSONArray())
.toJavaList(App.class))
.build();
});
if (!Strings.isNullOrEmpty(debugHost)) {
apps.forEach(e -> e.setHost(debugHost + "/" + debugAppRoutes.getOrDefault(e.getAppName(), e.getAppName())));

View File

@ -21,6 +21,6 @@ class ApiResultWrapper<T> extends ApiResult<T> {
this.httpCode = Optional.ofNullable(result.getHttpCode()).orElse(ResultCode.DEFAULT_HTTP_ERROR_CODE);
this.code = result.getStandardCode(appRuntime.getAppId());
this.code = getStandardCode(appRuntime.getAppId(), result.getCode());
}
}

View File

@ -84,7 +84,7 @@ public class DefaultWebMvcConfig extends DelegatingWebMvcConfiguration implement
private Boolean browserCompatible;
@Value("${web.debug.host:}")
private String contextSupplierHost;
private String debugHost;
/**
* 自定义对返回的errorMsg进行处理
@ -195,7 +195,7 @@ public class DefaultWebMvcConfig extends DelegatingWebMvcConfiguration implement
registry.addInterceptor(e);
});
}
registry.addInterceptor(new AxContextInterceptor(appRuntime, contextSupplierHost));
registry.addInterceptor(new AxContextInterceptor(appRuntime, debugHost));
}
/**

View File

@ -25,10 +25,6 @@ public class RpcClientImpl implements RpcClient {
@Getter
protected HttpClient httpClient;
protected RequestProxy requestProxy;
/** 外部服务的appId */
private String appId;
private static final Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo");
// XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的在http/2协议下会透传失败
private static final TreeSet<String> CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream()
@ -52,15 +48,13 @@ public class RpcClientImpl implements RpcClient {
@Builder
public RpcClientImpl(RequestProxy requestProxy,
HttpClient.Config config,
Supplier<Map<String, String>> requestHeaderSupplier,
String appId) {
Supplier<Map<String, String>> requestHeaderSupplier) {
this.requestProxy = Optional.ofNullable(requestProxy).orElse(RequestProxy.SIMPLE_PROXY);
this.httpClient = OkHttpClientImpl.builder()
.config(config)
.build();
this.customHeaderSupplier = requestHeaderSupplier;
this.appId = appId;
}
@Override
@ -70,8 +64,7 @@ public class RpcClientImpl implements RpcClient {
Optional<String> resp = requestBySupplier(requestParams, () -> this.getHttpClient().execute(httpMethod, url, requestParams));
ApiResult<T> result = converter.apply(resp.orElse(StringUtils.EMPTY));
if (!result.isSuccess()) {
Integer standardCode = result.getStandardCode(appId);
throw new BusinessException(standardCode + "", result.getMsg());
throw new BusinessException(result.getCode() + "", result.getMsg());
}
return result.getData();
}

View File

@ -1,10 +1,9 @@
package cn.axzo.foundation.web.support.rpc;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.web.support.rpc.exception.RpcNetworkException;
import cn.axzo.foundation.web.support.apps.App;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@ -12,13 +11,9 @@ import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* 一个RpcClient的wrapper. 可以方便的在RpcClient和其他Client切换.
@ -26,88 +21,54 @@ import java.util.stream.Collectors;
*/
@Slf4j
public class RpcClientWrapper implements RpcClient {
private final Supplier<List<String>> hostsResolver;
private Supplier<String> hostResolver;
private final Supplier<App> appResolver;
private final RpcClient normalRpcClient;
@Getter
private volatile RpcClient activeRpcClient;
private AtomicInteger roundRobinIndex = new AtomicInteger(0);
private RpcClient normalRpcClient;
final Cache<String, String> excludeHostCache = CacheBuilder.
newBuilder()
.expireAfterWrite(1, TimeUnit.MINUTES).build();
private RpcClient activeRpcClient;
@lombok.Builder
public RpcClientWrapper(RpcClient normalRpcClient,
Supplier<String> hostResolver,
Supplier<List<String>> hostsResolver,
Supplier<App> appResolver,
ClientType defaultClientType) {
Preconditions.checkArgument(normalRpcClient != null, "normalRpcClient不能为空");
if (normalRpcClient != null) {
Preconditions.checkArgument(hostResolver != null || hostsResolver != null, "如果是normalRpcClient, hostResolver必须有值");
Preconditions.checkArgument(appResolver != null, "如果是normalRpcClient, appResolver必须有值");
Preconditions.checkArgument(normalRpcClient instanceof RpcClientImpl, "normalRpcClient必须是RpcClientImpl.class实现");
}
this.normalRpcClient = normalRpcClient;
this.hostResolver = hostResolver;
this.hostsResolver = hostsResolver;
this.appResolver = appResolver;
activate(Optional.ofNullable(defaultClientType).orElse(ClientType.NORMAL));
}
@Override
public <T> T execute(HttpClient.HttpMethod httpMethod, String path, RequestParams requestParams, Function<String, ApiResult<T>> converter) {
String host = resolveHost();
try {
return activeRpcClient.execute(httpMethod, resolvePath(host, path), requestParams, converter);
} catch (RpcNetworkException e) {
excludeHostCache.put(host, "1");
log.warn("get network exception, add host {} to invalid pool and cooldown 1 minutes", host);
return activeRpcClient.execute(httpMethod, resolvePath(resolveHost(), path), requestParams, converter);
} catch (BusinessException e) {
int code = Integer.parseInt(e.getErrorCode());
Integer standardCode = ApiResult.getStandardCode(appResolver.get().getAppId(), code);
if (standardCode != code) {
throw new BusinessException(standardCode + "", e.getErrorMsg());
}
throw e;
}
}
@Override
public <R> R execute(HttpClient.HttpMethod httpMethod, String path, RequestParams requestParams, BiFunction<byte[], Map<String, List<String>>, R> responder) {
String host = resolveHost();
try {
return activeRpcClient.execute(httpMethod, resolvePath(host, path), requestParams, responder);
} catch (RpcNetworkException e) {
excludeHostCache.put(host, "1");
log.warn("get network exception, add host {} to invalid pool and cooldown 1 minutes", host);
return activeRpcClient.execute(httpMethod, resolvePath(resolveHost(), path), requestParams, responder);
} catch (BusinessException e) {
int code = Integer.parseInt(e.getErrorCode());
Integer standardCode = ApiResult.getStandardCode(appResolver.get().getAppId(), code);
if (standardCode != code) {
throw new BusinessException(standardCode + "", e.getErrorMsg());
}
throw e;
}
}
protected String resolveHost() {
if (hostResolver == null && hostsResolver == null) {
return StringUtils.EMPTY;
}
// 如果hostResolver不为空, 且path没有包含protocol与host. 则尝试将host与path拼接
if (hostResolver != null) {
return hostResolver.get();
}
List<String> hosts = hostsResolver.get();
// 如果只有 1 host没必要做选择
if (hosts.size() == 1) {
return hosts.get(0);
}
ConcurrentMap<String, String> excludeHosts = excludeHostCache.asMap();
List<String> availables = excludeHosts.size() == 0 ? hosts : hosts.stream().filter(i -> !excludeHosts.containsKey(i)).collect(Collectors.toList());
if (availables.size() == 0) {
return hosts.get(0);
} else if (availables.size() == 1) {
return availables.get(0);
}
// 使用 round robin 算法选择可用节点
int index = roundRobinIndex.getAndAccumulate(1, (x, y) -> {
if ((x + y) >= availables.size()) {
return 0;
}
return x + y;
});
return availables.get(index);
return appResolver.get().getHost();
}
protected String resolvePath(String host, String path) {
@ -126,7 +87,7 @@ public class RpcClientWrapper implements RpcClient {
public RpcClient activate(ClientType type) {
if (type == ClientType.NORMAL) {
Preconditions.checkState(normalRpcClient != null);
Preconditions.checkState(hostResolver != null || hostsResolver != null);
Preconditions.checkState(appResolver != null);
activeRpcClient = normalRpcClient;
} else {
throw new UnsupportedOperationException("unsupported clientType");