feat: 获取app step 6
This commit is contained in:
parent
75f6c849b5
commit
02d9503133
@ -0,0 +1,96 @@
|
||||
package cn.axzo.foundation.web.support;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.cache.Cache;
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* 间隔一定时间刷新的缓存。
|
||||
* <pre>
|
||||
* 1、初次调用时,直接获取需要缓存的内容,并缓存到cache中
|
||||
* 2、每间隔intervalMillis尝试刷新缓存,
|
||||
* 如果刷新缓存成功,更新cache缓存的值
|
||||
* 如果刷新缓存失败,则不更新
|
||||
* </pre>
|
||||
*
|
||||
* @param <T>
|
||||
*/
|
||||
@NoArgsConstructor(access = AccessLevel.PRIVATE)
|
||||
@Slf4j
|
||||
public class TimerRefreshCache<T> {
|
||||
private ScheduledThreadPoolExecutor executor;
|
||||
private String name;
|
||||
private Long initialDelayMillis;
|
||||
private Long intervalMillis;
|
||||
private Supplier<T> refresher;
|
||||
private Cache<String, T> cache;
|
||||
/** 缓存是否以初始化,如果为false,会进行主动加载进行初始化 */
|
||||
private AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
|
||||
public TimerRefreshCache(String name, Long initialDelayMillis, Long intervalMillis,
|
||||
ScheduledThreadPoolExecutor executor, Supplier<T> refresher) {
|
||||
Preconditions.checkArgument(executor != null);
|
||||
Preconditions.checkArgument(refresher != null);
|
||||
Preconditions.checkArgument(intervalMillis != null);
|
||||
|
||||
this.name = name;
|
||||
this.initialDelayMillis = initialDelayMillis;
|
||||
this.intervalMillis = intervalMillis;
|
||||
this.refresher = refresher;
|
||||
// 该方法获取的,往往是完整的缓存内容。所以maxSize暂定为2即可
|
||||
long maximumSize = 2L;
|
||||
cache = CacheBuilder.newBuilder()
|
||||
.maximumSize(maximumSize)
|
||||
.build();
|
||||
this.executor = executor;
|
||||
startTimer();
|
||||
}
|
||||
|
||||
private void startTimer() {
|
||||
executor.scheduleAtFixedRate(this::doRefresh, initialDelayMillis, intervalMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
private boolean doRefresh() {
|
||||
try {
|
||||
T value = refresher.get();
|
||||
// 当返回为null,表示Value没有变化,这时不用刷新Cache
|
||||
if (value == null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
cache.put(name, value);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("{} refreshed, new value={}", name, value);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
log.error("{} refresh failed", name, e);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public T get() {
|
||||
// 如果没有加载过且没有缓存,手动加载一次
|
||||
if (!initialized.getAndSet(true) && cache.size() == 0) {
|
||||
//如果第一次刷新缓存失败. 则重置initialized. 下次请求进入时再次尝试更新cache
|
||||
if (!doRefresh()) {
|
||||
initialized.set(false);
|
||||
}
|
||||
}
|
||||
return cache.getIfPresent(name);
|
||||
}
|
||||
|
||||
// XXX: for unittest
|
||||
public void put(T configs) {
|
||||
log.info("{},set->config={}", TimerRefreshCache.this, configs);
|
||||
cache.put(name, configs);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,40 @@
|
||||
package cn.axzo.foundation.web.support.apps;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import org.apache.commons.lang3.RegExUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class App {
|
||||
Long id;
|
||||
Long upstreamId;
|
||||
String upstreamName;
|
||||
String appName;
|
||||
Integer port;
|
||||
|
||||
String host;
|
||||
|
||||
public String getHost() {
|
||||
return Optional.ofNullable(host).orElse(String.format("http://%s:%s", appName, port));
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据upstreamId的后4位生产appId, 如果后四位位0则替换为9
|
||||
* eg 20000000000000124 -> 9124, 20000000000003078 -> 3078
|
||||
*/
|
||||
public String getAppId() {
|
||||
String right = StringUtils.right(String.valueOf(upstreamId), 4);
|
||||
if (!right.startsWith("0")) {
|
||||
return right;
|
||||
}
|
||||
return RegExUtils.replaceFirst(right, "0", "9");
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,9 @@
|
||||
package cn.axzo.foundation.web.support.apps;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface AppCenter {
|
||||
List<App> listAll();
|
||||
|
||||
App getByName(String appName);
|
||||
}
|
||||
@ -2,22 +2,18 @@ package cn.axzo.foundation.web.support.rpc;
|
||||
|
||||
import cn.axzo.foundation.page.PageResp;
|
||||
import cn.axzo.foundation.result.ApiResult;
|
||||
import cn.axzo.foundation.web.support.context.AxContext;
|
||||
import cn.axzo.foundation.web.support.interceptors.CallerAppInterceptor;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.alibaba.fastjson.TypeReference;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Rpc调用客户端接口
|
||||
@ -101,40 +97,9 @@ public interface RpcClient {
|
||||
return delete(url, typeReference, requestParams);
|
||||
}
|
||||
|
||||
default <T> ApiResult<T> convert(String body, Class<T> clz) {
|
||||
return JSONObject.parseObject(body, new TypeReference<ApiResult<T>>(clz) {
|
||||
});
|
||||
}
|
||||
|
||||
Set<String> AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo");
|
||||
|
||||
// XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的,在http/2协议下会透传失败。
|
||||
TreeSet<String> CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream()
|
||||
.collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER)));
|
||||
|
||||
// 将axzo-开头的header复制到请求的下一跳
|
||||
String AZXO_HEADER_PREFIX = "axzo-";
|
||||
|
||||
List<Supplier<Map<String, String>>> DEFAULT_HEADER_SUPPLIERS = ImmutableList.of(
|
||||
() -> AxContext.getRequest()
|
||||
.map(request -> Collections.list(request.getHeaderNames()).stream()
|
||||
// 通过http2协议的请求,默认会把大写转成小写,如"_SESSION_OBJECT" -> "_session_object",导致session无法透传,下一跳需要通过session验签会失败
|
||||
.filter(p -> CASE_INSENSITIVE_AXZO_HEADERS.contains(p) || p.startsWith(AZXO_HEADER_PREFIX))
|
||||
.collect(Collectors.toMap(e -> e, e -> request.getHeader(e), (oldValue, newValue) -> newValue)))
|
||||
.orElse(Collections.emptyMap()),
|
||||
//设置callerApp
|
||||
() -> AxContext.getRequest().map(e -> {
|
||||
Object caller = e.getAttribute(CallerAppInterceptor.NEXT_HTTP_REQUEST_ATTRIBUTE);
|
||||
return ImmutableMap.of(CallerAppInterceptor.HTTP_REQUEST_HEADER, JSONObject.toJSONString(caller));
|
||||
}).orElse(ImmutableMap.of())
|
||||
);
|
||||
|
||||
|
||||
/**
|
||||
* 使用builder模式来发起请求, 先通过request()获得builder对象. 再构建具体的请求方式
|
||||
* eg: String resp = rpcClient.request().url("/my").content(Map.of("key", "value")).clz(String.class).post();
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
default RpcRequestBuilder request() {
|
||||
return new RpcRequestBuilder(this);
|
||||
@ -142,7 +107,6 @@ public interface RpcClient {
|
||||
|
||||
@Slf4j
|
||||
class RpcRequestBuilder {
|
||||
private static final long MAX_PER_PAGE_COUNT = 1000;
|
||||
private String url;
|
||||
private Object content;
|
||||
private Class clz;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user