feat: 优化rpc构建
This commit is contained in:
parent
537ccadd0a
commit
944fcaf70a
@ -3,7 +3,6 @@ package cn.axzo.foundation.web.support.apps;
|
||||
import cn.axzo.foundation.page.PageResp;
|
||||
import cn.axzo.foundation.util.PageUtils;
|
||||
import cn.axzo.foundation.web.support.TimerRefreshCache;
|
||||
import cn.axzo.foundation.web.support.rpc.RequestProxy;
|
||||
import cn.axzo.foundation.web.support.rpc.RpcClient;
|
||||
import cn.axzo.foundation.web.support.rpc.RpcClientImpl;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
@ -39,11 +38,12 @@ public class AppCenterImpl implements AppCenter {
|
||||
@Builder
|
||||
public AppCenterImpl(ScheduledThreadPoolExecutor executor,
|
||||
String debugHost,
|
||||
Map<String, String> debugAppRoutes) {
|
||||
Map<String, String> debugAppRoutes,
|
||||
RpcClient rpcClient) {
|
||||
|
||||
Objects.requireNonNull(executor);
|
||||
|
||||
this.rpcClient = RpcClientImpl.builder().requestProxy(RequestProxy.SIMPLE_PROXY).build();
|
||||
this.rpcClient = Optional.ofNullable(rpcClient).orElseGet(() -> RpcClientImpl.builder().build());
|
||||
this.appCache = new TimerRefreshCache<>("appCenterCache", INITIAL_DELAY_MILLIS,
|
||||
REFRESH_INTERVAL_MILLIS, executor, this::loadApps);
|
||||
this.debugHost = debugHost;
|
||||
|
||||
@ -22,12 +22,11 @@ import java.util.function.Function;
|
||||
public class OkHttpClientImpl implements HttpClient {
|
||||
private OkHttpClient okHttpClient;
|
||||
private Call.Factory callFactory;
|
||||
|
||||
private Config config;
|
||||
private List<Interceptor> interceptorList;
|
||||
private Set<String> logResponseHeaderNames;
|
||||
private final List<Interceptor> interceptorList;
|
||||
private final Set<String> logResponseHeaderNames;
|
||||
|
||||
private final static long MAX_RESPONSE_LOG_LENGTH = 10_240L;
|
||||
private static final long MAX_RESPONSE_LOG_LENGTH = 10_240L;
|
||||
|
||||
@Builder
|
||||
public OkHttpClientImpl(Config config, List<Interceptor> interceptorList,
|
||||
@ -169,7 +168,7 @@ public class OkHttpClientImpl implements HttpClient {
|
||||
.connectionPool(new ConnectionPool(config.getMaxIdleConnections(), config.getKeepAliveMinutes(), TimeUnit.MINUTES))
|
||||
.dispatcher(dispatcher);
|
||||
|
||||
interceptorList.forEach(interceptor -> builder.addInterceptor(interceptor));
|
||||
interceptorList.forEach(builder::addInterceptor);
|
||||
|
||||
if (null != clientBuilder) {
|
||||
clientBuilder.accept(builder);
|
||||
|
||||
@ -25,7 +25,7 @@ public class RpcClientImpl implements RpcClient {
|
||||
@Getter
|
||||
protected HttpClient httpClient;
|
||||
protected RequestProxy requestProxy;
|
||||
private static final Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo");
|
||||
private static final Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo", "ctxLogId", "traceId");
|
||||
// XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的,在http/2协议下会透传失败。
|
||||
private static final TreeSet<String> CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream()
|
||||
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
|
||||
@ -36,7 +36,7 @@ public class RpcClientImpl implements RpcClient {
|
||||
.map(request -> Collections.list(request.getHeaderNames()).stream()
|
||||
// 通过http2协议的请求,默认会把大写转成小写,如"_SESSION_OBJECT" -> "_session_object",导致session无法透传,下一跳需要通过session验签会失败
|
||||
.filter(p -> CASE_INSENSITIVE_AXZO_HEADERS.contains(p) || p.startsWith(AZXO_HEADER_PREFIX))
|
||||
.collect(Collectors.toMap(e -> e, e -> request.getHeader(e), (oldValue, newValue) -> newValue)))
|
||||
.collect(Collectors.toMap(e -> e, request::getHeader, (oldValue, newValue) -> newValue)))
|
||||
.orElse(Collections.emptyMap()),
|
||||
//设置callerApp
|
||||
() -> AxContext.getRequest().map(e -> {
|
||||
@ -47,12 +47,12 @@ public class RpcClientImpl implements RpcClient {
|
||||
|
||||
@Builder
|
||||
public RpcClientImpl(RequestProxy requestProxy,
|
||||
HttpClient.Config config,
|
||||
Supplier<Map<String, String>> requestHeaderSupplier) {
|
||||
Supplier<Map<String, String>> requestHeaderSupplier,
|
||||
HttpClient httpClient) {
|
||||
this.requestProxy = Optional.ofNullable(requestProxy).orElse(RequestProxy.SIMPLE_PROXY);
|
||||
this.httpClient = OkHttpClientImpl.builder()
|
||||
.config(config)
|
||||
.build();
|
||||
this.httpClient = Optional.ofNullable(httpClient).orElseGet(() -> OkHttpClientImpl.builder()
|
||||
.config(HttpClient.Config.DEFAULT)
|
||||
.build());
|
||||
|
||||
this.customHeaderSupplier = requestHeaderSupplier;
|
||||
}
|
||||
@ -80,17 +80,12 @@ public class RpcClientImpl implements RpcClient {
|
||||
Objects.requireNonNull(requestParams);
|
||||
|
||||
//XXX 附加默认的header, 以及自定义的header
|
||||
DEFAULT_HEADER_SUPPLIERS.stream()
|
||||
.forEach(headerSupplier -> headerSupplier.get().entrySet()
|
||||
.forEach(headerEntry ->
|
||||
requestParams.addHeaderIfAbsent(headerEntry.getKey(), headerEntry.getValue())
|
||||
));
|
||||
DEFAULT_HEADER_SUPPLIERS
|
||||
.forEach(headerSupplier -> headerSupplier.get().forEach(requestParams::addHeaderIfAbsent));
|
||||
|
||||
if (customHeaderSupplier != null) {
|
||||
Map<String, String> map = customHeaderSupplier.get();
|
||||
map.entrySet().forEach(e -> {
|
||||
requestParams.addHeader(e.getKey(), e.getValue());
|
||||
});
|
||||
map.forEach(requestParams::addHeader);
|
||||
}
|
||||
|
||||
return requestProxy.request(token -> {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user