From 02d9503133746892b9382c3095f4e12cc7e51a4e Mon Sep 17 00:00:00 2001 From: zengxiaobo Date: Fri, 5 Jul 2024 09:20:54 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=8E=B7=E5=8F=96app=20step=206?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../web/support/TimerRefreshCache.java | 96 +++++++++++++++++++ .../axzo/foundation/web/support/apps/App.java | 40 ++++++++ .../web/support/apps/AppCenter.java | 9 ++ .../foundation/web/support/rpc/RpcClient.java | 44 +-------- 4 files changed, 149 insertions(+), 40 deletions(-) create mode 100644 web-support-lib/src/main/java/cn/axzo/foundation/web/support/TimerRefreshCache.java create mode 100644 web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/App.java create mode 100644 web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/AppCenter.java diff --git a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/TimerRefreshCache.java b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/TimerRefreshCache.java new file mode 100644 index 0000000..f286b35 --- /dev/null +++ b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/TimerRefreshCache.java @@ -0,0 +1,96 @@ +package cn.axzo.foundation.web.support; + +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * 间隔一定时间刷新的缓存。 + *
+ * 1、初次调用时,直接获取需要缓存的内容,并缓存到cache中
+ * 2、每间隔intervalMillis尝试刷新缓存,
+ *      如果刷新缓存成功,更新cache缓存的值
+ *      如果刷新缓存失败,则不更新
+ * 
+ * + * @param + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +@Slf4j +public class TimerRefreshCache { + private ScheduledThreadPoolExecutor executor; + private String name; + private Long initialDelayMillis; + private Long intervalMillis; + private Supplier refresher; + private Cache cache; + /** 缓存是否以初始化,如果为false,会进行主动加载进行初始化 */ + private AtomicBoolean initialized = new AtomicBoolean(false); + + public TimerRefreshCache(String name, Long initialDelayMillis, Long intervalMillis, + ScheduledThreadPoolExecutor executor, Supplier refresher) { + Preconditions.checkArgument(executor != null); + Preconditions.checkArgument(refresher != null); + Preconditions.checkArgument(intervalMillis != null); + + this.name = name; + this.initialDelayMillis = initialDelayMillis; + this.intervalMillis = intervalMillis; + this.refresher = refresher; + // 该方法获取的,往往是完整的缓存内容。所以maxSize暂定为2即可 + long maximumSize = 2L; + cache = CacheBuilder.newBuilder() + .maximumSize(maximumSize) + .build(); + this.executor = executor; + startTimer(); + } + + private void startTimer() { + executor.scheduleAtFixedRate(this::doRefresh, initialDelayMillis, intervalMillis, TimeUnit.MILLISECONDS); + } + + private boolean doRefresh() { + try { + T value = refresher.get(); + // 当返回为null,表示Value没有变化,这时不用刷新Cache + if (value == null) { + return true; + } + + cache.put(name, value); + if (log.isDebugEnabled()) { + log.debug("{} refreshed, new value={}", name, value); + } + } catch (Throwable e) { + log.error("{} refresh failed", name, e); + return false; + } + return true; + } + + public T get() { + // 如果没有加载过且没有缓存,手动加载一次 + if (!initialized.getAndSet(true) && cache.size() == 0) { + //如果第一次刷新缓存失败. 则重置initialized. 下次请求进入时再次尝试更新cache + if (!doRefresh()) { + initialized.set(false); + } + } + return cache.getIfPresent(name); + } + + // XXX: for unittest + public void put(T configs) { + log.info("{},set->config={}", TimerRefreshCache.this, configs); + cache.put(name, configs); + } +} diff --git a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/App.java b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/App.java new file mode 100644 index 0000000..bb3edf1 --- /dev/null +++ b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/App.java @@ -0,0 +1,40 @@ +package cn.axzo.foundation.web.support.apps; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.RegExUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.Optional; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class App { + Long id; + Long upstreamId; + String upstreamName; + String appName; + Integer port; + + String host; + + public String getHost() { + return Optional.ofNullable(host).orElse(String.format("http://%s:%s", appName, port)); + } + + /** + * 根据upstreamId的后4位生产appId, 如果后四位位0则替换为9 + * eg 20000000000000124 -> 9124, 20000000000003078 -> 3078 + */ + public String getAppId() { + String right = StringUtils.right(String.valueOf(upstreamId), 4); + if (!right.startsWith("0")) { + return right; + } + return RegExUtils.replaceFirst(right, "0", "9"); + } +} diff --git a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/AppCenter.java b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/AppCenter.java new file mode 100644 index 0000000..d98b5d8 --- /dev/null +++ b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/apps/AppCenter.java @@ -0,0 +1,9 @@ +package cn.axzo.foundation.web.support.apps; + +import java.util.List; + +public interface AppCenter { + List listAll(); + + App getByName(String appName); +} diff --git a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/rpc/RpcClient.java b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/rpc/RpcClient.java index 45f3119..f6e182b 100644 --- a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/rpc/RpcClient.java +++ b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/rpc/RpcClient.java @@ -2,22 +2,18 @@ package cn.axzo.foundation.web.support.rpc; import cn.axzo.foundation.page.PageResp; import cn.axzo.foundation.result.ApiResult; -import cn.axzo.foundation.web.support.context.AxContext; -import cn.axzo.foundation.web.support.interceptors.CallerAppInterceptor; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.TypeReference; import com.google.common.base.Preconditions; import com.google.common.base.Strings; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import lombok.extern.slf4j.Slf4j; -import java.util.*; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.function.BiFunction; import java.util.function.Function; -import java.util.function.Supplier; -import java.util.stream.Collectors; /** * Rpc调用客户端接口 @@ -101,40 +97,9 @@ public interface RpcClient { return delete(url, typeReference, requestParams); } - default ApiResult convert(String body, Class clz) { - return JSONObject.parseObject(body, new TypeReference>(clz) { - }); - } - - Set AXZO_HEADERS = ImmutableSet.of("workspaceId", "ouId", "Authorization", "terminal", "userinfo"); - - // XXX: http/2会把所有Header都转成小写, 历史定义的Header都是大写的,在http/2协议下会透传失败。 - TreeSet CASE_INSENSITIVE_AXZO_HEADERS = AXZO_HEADERS.stream() - .collect(Collectors.toCollection(() -> new TreeSet<>(String.CASE_INSENSITIVE_ORDER))); - - // 将axzo-开头的header复制到请求的下一跳 - String AZXO_HEADER_PREFIX = "axzo-"; - - List>> DEFAULT_HEADER_SUPPLIERS = ImmutableList.of( - () -> AxContext.getRequest() - .map(request -> Collections.list(request.getHeaderNames()).stream() - // 通过http2协议的请求,默认会把大写转成小写,如"_SESSION_OBJECT" -> "_session_object",导致session无法透传,下一跳需要通过session验签会失败 - .filter(p -> CASE_INSENSITIVE_AXZO_HEADERS.contains(p) || p.startsWith(AZXO_HEADER_PREFIX)) - .collect(Collectors.toMap(e -> e, e -> request.getHeader(e), (oldValue, newValue) -> newValue))) - .orElse(Collections.emptyMap()), - //设置callerApp - () -> AxContext.getRequest().map(e -> { - Object caller = e.getAttribute(CallerAppInterceptor.NEXT_HTTP_REQUEST_ATTRIBUTE); - return ImmutableMap.of(CallerAppInterceptor.HTTP_REQUEST_HEADER, JSONObject.toJSONString(caller)); - }).orElse(ImmutableMap.of()) - ); - - /** * 使用builder模式来发起请求, 先通过request()获得builder对象. 再构建具体的请求方式 * eg: String resp = rpcClient.request().url("/my").content(Map.of("key", "value")).clz(String.class).post(); - * - * @return */ default RpcRequestBuilder request() { return new RpcRequestBuilder(this); @@ -142,7 +107,6 @@ public interface RpcClient { @Slf4j class RpcRequestBuilder { - private static final long MAX_PER_PAGE_COUNT = 1000; private String url; private Object content; private Class clz;