From 0dfd96c83c4325efdf5152d8028a375f34971330 Mon Sep 17 00:00:00 2001 From: zengxiaobo Date: Fri, 12 Jul 2024 16:03:17 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=B7=AF=E7=94=B1=E7=BA=A7=E5=88=AB?= =?UTF-8?q?=E7=9A=84=E6=9C=8D=E5=8A=A1=E9=99=8D=E7=BA=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 原因 路由级别的服务降级 修改 修改SimpleProxy, 支持降级 --- .../gateway/support/BizGatewayImpl.java | 13 +-- .../gateway/support/entity/GlobalContext.java | 18 +++-- .../support/fallback/FallbackConfig.java | 50 ++++++++++++ .../plugin/impl/RequestFilterHook.java | 79 ++++--------------- .../support/proxy/GateResponseSupplier.java | 31 ++++++++ .../support/proxy/impl/SimpleProxy.java | 44 +++++++++-- .../web/support/alert/EmailAlertConsumer.java | 6 ++ 7 files changed, 154 insertions(+), 87 deletions(-) create mode 100644 gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/fallback/FallbackConfig.java create mode 100644 gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/GateResponseSupplier.java diff --git a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/BizGatewayImpl.java b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/BizGatewayImpl.java index fdbf9d8..b05cebf 100755 --- a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/BizGatewayImpl.java +++ b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/BizGatewayImpl.java @@ -214,15 +214,6 @@ public class BizGatewayImpl implements BizGateway { ((StopWatchHook) hook).initialize(); } - private boolean isGrayStage(RequestContext requestContext) { - // TODO: correct gray stage - if (!CollectionUtils.isEmpty(requestContext.getHeaders()) - && StringUtils.equals(requestContext.getHeaders().get("gray_stage"), "true")) { - return true; - } - return false; - } - @Override public Proxy findProxy(RequestContext context) { initializeStopWatchHook(); @@ -377,9 +368,7 @@ public class BizGatewayImpl implements BizGateway { if (getRouteRules() == null) { return ImmutableMap.of(); } - return getRouteRules().stream().map(e -> { - return buildProxy(e); - }).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())); + return getRouteRules().stream().map(this::buildProxy).collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } diff --git a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/entity/GlobalContext.java b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/entity/GlobalContext.java index f93aa24..94cddc7 100755 --- a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/entity/GlobalContext.java +++ b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/entity/GlobalContext.java @@ -2,6 +2,7 @@ package cn.axzo.foundation.gateway.support.entity; import cn.axzo.foundation.enums.AppEnvEnum; import cn.axzo.foundation.gateway.support.plugin.ProxyHookChain; +import cn.axzo.foundation.gateway.support.proxy.GateResponseSupplier; import cn.axzo.foundation.gateway.support.utils.RpcClientProvider; import cn.axzo.foundation.web.support.AppRuntime; import com.google.common.base.Strings; @@ -34,20 +35,23 @@ public class GlobalContext { @Getter private AppEnvEnum gateEnv; @Getter - final private RpcClientProvider rpcClientProvider; + private final RpcClientProvider rpcClientProvider; @Getter - final private BiConsumer alertConsumer; + private final BiConsumer alertConsumer; @Getter - final private Function> serviceSupplier; + private final Function> serviceSupplier; + /* 服务降级时默认返回 */ + @Getter + private final Function fallbackSupplier; @Getter /** 全局的代理Hook列表 */ - final private ProxyHookChain proxyHookChain; + private final ProxyHookChain proxyHookChain; @Getter - final private Long blockingMillis; + private final Long blockingMillis; @Getter - final private Map localProxies; + private final Map localProxies; @Getter - /** 网关代理配置最后更新时间 */ + /* 网关代理配置最后更新时间 */ private Long version; @Getter /** 需要debug的URI列表 */ diff --git a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/fallback/FallbackConfig.java b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/fallback/FallbackConfig.java new file mode 100644 index 0000000..34eb1b2 --- /dev/null +++ b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/fallback/FallbackConfig.java @@ -0,0 +1,50 @@ +package cn.axzo.foundation.gateway.support.fallback; + +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; +import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.ImmutableSet; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class FallbackConfig { + String bean; + JSONObject config; + /** 策略, 支持FLOW_GRADE_QPS慢查询比例, DEGRADE_GRADE_EXCEPTION_RATIO异常比例, DEGRADE_GRADE_EXCEPTION_COUNT异常数 */ + Integer grade = 2; + /** 默认10s内请求大于5个, 同时错误超过一半则降级30s */ + Double count = 0.5D; + Integer timeWindow = 30; + Integer minRequestAmount = 5; + /** 慢比例有效 */ + Double slowRatioThreshold = 1.0; + Integer statIntervalMs = 10000; + + public static FallbackConfig fromConfig(JSONObject config) { + if (config == null || !config.containsKey("fallback")) { + return null; + } + return config.getJSONObject("fallback").toJavaObject(FallbackConfig.class); + } + + public void registerRule(String resourceName) { + if (DegradeRuleManager.hasConfig(resourceName)) { + return; + } + DegradeRule degradeRule = new DegradeRule(resourceName); + degradeRule.setGrade(grade); + degradeRule.setCount(count); + degradeRule.setTimeWindow(timeWindow); + degradeRule.setMinRequestAmount(minRequestAmount); + degradeRule.setSlowRatioThreshold(slowRatioThreshold); + degradeRule.setStatIntervalMs(statIntervalMs); + + DegradeRuleManager.setRulesForResource(resourceName, ImmutableSet.of(degradeRule)); + } +} \ No newline at end of file diff --git a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/plugin/impl/RequestFilterHook.java b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/plugin/impl/RequestFilterHook.java index 719175b..16b86e1 100644 --- a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/plugin/impl/RequestFilterHook.java +++ b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/plugin/impl/RequestFilterHook.java @@ -1,9 +1,9 @@ package cn.axzo.foundation.gateway.support.plugin.impl; -import cn.axzo.foundation.exception.BusinessException; import cn.axzo.foundation.gateway.support.entity.GateResponse; import cn.axzo.foundation.gateway.support.entity.ProxyContext; import cn.axzo.foundation.gateway.support.entity.RequestContext; +import cn.axzo.foundation.gateway.support.fallback.FallbackConfig; import cn.axzo.foundation.gateway.support.plugin.ProxyHook; import cn.axzo.foundation.gateway.support.plugin.impl.filters.*; import cn.axzo.foundation.util.FastjsonUtils; @@ -14,8 +14,6 @@ import com.alibaba.csp.sentinel.EntryType; import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.Tracer; import com.alibaba.csp.sentinel.slots.block.BlockException; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule; -import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.base.Preconditions; @@ -23,7 +21,6 @@ import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import lombok.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -123,25 +120,24 @@ public class RequestFilterHook implements ProxyHook { for (FilterBean bean : beans) { RequestFilter requestFilter = filterBeanResolver.apply(bean.getName()); Preconditions.checkState(requestFilter != null, bean.getName() + " 没在系统注册"); - FallBackConfig fallBackConfig = bean.getFallBackConfig(); + FallbackConfig fallbackConfig = FallbackConfig.fromConfig(bean.getConfig()); Entry entry = null; try { - if (fallBackConfig != null) { + if (fallbackConfig != null) { String resourceName = bean.getResourceName(reqContext.getRequestURI()); - fallBackConfig.registerRule(resourceName); + fallbackConfig.registerRule(resourceName); entry = SphU.entry(resourceName, EntryType.IN); } requestBody = requestFilter.filterIn(reqContext, requestBody, bean.getConfig()); } catch (BlockException ex) { + String filterBean = StringUtils.firstNonBlank(fallbackConfig.getBean(), "emptyFilter"); + log.warn("request fall back: {}, filter: {}, fallbackBean : {}", + reqContext.getRequestURI(), bean.getName(), filterBean, ex); //降级处理 - requestBody = filterBeanResolver.apply(fallBackConfig.getFallBackFilter()) - .filterIn(reqContext, requestBody, fallBackConfig.getConfig()); + requestBody = filterBeanResolver.apply(filterBean).filterIn(reqContext, requestBody, fallbackConfig.getConfig()); } catch (Exception ex) { - //非业务异常记录trace - if (!BusinessException.class.isAssignableFrom(ex.getClass())) { - Tracer.traceEntry(ex, entry); - } + Tracer.traceEntry(ex, entry); throw ex; } finally { if (entry != null) { @@ -176,25 +172,24 @@ public class RequestFilterHook implements ProxyHook { for (FilterBean bean : beans) { RequestFilter requestFilter = filterBeanResolver.apply(bean.getName()); Preconditions.checkState(requestFilter != null, bean.getName() + " 没在系统注册"); - FallBackConfig fallBackConfig = bean.getFallBackConfig(); + FallbackConfig fallbackConfig = FallbackConfig.fromConfig(bean.getConfig()); Entry entry = null; try { - if (fallBackConfig != null) { + if (fallbackConfig != null) { String resourceName = bean.getResourceName(reqContext.getRequestURI()); - fallBackConfig.registerRule(resourceName); + fallbackConfig.registerRule(resourceName); entry = SphU.entry(resourceName, EntryType.OUT); } responseBody = requestFilter.filterOut(reqContext, responseBody, bean.getConfig()); } catch (BlockException ex) { + String filterBean = StringUtils.firstNonBlank(fallbackConfig.getBean(), "emptyFilter"); + log.warn("response fall back: {}, filter: {}, fallbackBean : {}", + reqContext.getRequestURI(), bean.getName(), filterBean, ex); //降级处理 - responseBody = filterBeanResolver.apply(fallBackConfig.getFallBackFilter()) - .filterIn(reqContext, responseBody, fallBackConfig.getConfig()); + responseBody = filterBeanResolver.apply(filterBean).filterOut(reqContext, responseBody, fallbackConfig.getConfig()); } catch (Exception ex) { - //非业务异常记录trace - if (!BusinessException.class.isAssignableFrom(ex.getClass())) { - Tracer.traceEntry(ex, entry); - } + Tracer.traceEntry(ex, entry); throw ex; } finally { if (entry != null) { @@ -263,48 +258,8 @@ public class RequestFilterHook implements ProxyHook { private String name; private JSONObject config; - public FallBackConfig getFallBackConfig() { - return Optional.ofNullable(config) - .flatMap(e -> Optional.ofNullable(e.getJSONObject("fallBack")) - .map(fallBack -> fallBack.toJavaObject(FallBackConfig.class))) - .orElse(null); - } - public String getResourceName(String requestUrl) { return "RequestFilterHook:" + name + "@" + requestUrl; } } - - @Data - @Builder - @NoArgsConstructor - @AllArgsConstructor - private static class FallBackConfig { - String fallBackFilter = "emptyFilter"; - JSONObject config; - /** 策略, 支持FLOW_GRADE_QPS慢查询比例, DEGRADE_GRADE_EXCEPTION_RATIO异常比例, DEGRADE_GRADE_EXCEPTION_COUNT异常数 */ - Integer grade = 2; - /** 默认10s内请求大于5个, 同时错误超过一半则降级30s */ - Double count = 0.5D; - Integer timeWindow = 30; - Integer minRequestAmount = 5; - /** 慢比例有效 */ - Double slowRatioThreshold = 1.0; - Integer statIntervalMs = 10000; - - public void registerRule(String resourceName) { - if (DegradeRuleManager.hasConfig(resourceName)) { - return; - } - DegradeRule degradeRule = new DegradeRule(resourceName); - degradeRule.setGrade(grade); - degradeRule.setCount(count); - degradeRule.setTimeWindow(timeWindow); - degradeRule.setMinRequestAmount(minRequestAmount); - degradeRule.setSlowRatioThreshold(slowRatioThreshold); - degradeRule.setStatIntervalMs(statIntervalMs); - - DegradeRuleManager.setRulesForResource(resourceName, ImmutableSet.of(degradeRule)); - } - } } diff --git a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/GateResponseSupplier.java b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/GateResponseSupplier.java new file mode 100644 index 0000000..afdffe2 --- /dev/null +++ b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/GateResponseSupplier.java @@ -0,0 +1,31 @@ +package cn.axzo.foundation.gateway.support.proxy; + +import cn.axzo.foundation.gateway.support.entity.GateResponse; +import cn.axzo.foundation.gateway.support.entity.RequestContext; +import cn.axzo.foundation.result.ApiResult; +import cn.axzo.foundation.web.support.rpc.RequestParams; +import com.alibaba.fastjson.JSON; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.springframework.http.HttpStatus; + +import java.nio.charset.StandardCharsets; + +/** + * 返回GateResponse + * 目前在服务降级时会调用 + */ +public interface GateResponseSupplier { + + GateResponse getResponse(RequestContext requestContext, String resolvedUrl, RequestParams requestParams); + + GateResponseSupplier DEFAULT = (requestContext, resolvedUrl, requestParams) -> { + byte[] bytes = JSON.toJSONString(ApiResult.success()).getBytes(StandardCharsets.UTF_8); + return GateResponse.builder() + .status(HttpStatus.OK) + .content(bytes) + .headers(ImmutableMap.of("content-type", ImmutableList.of("application/json;charset=UTF-8") + , "content-length", ImmutableList.of(bytes.length + ""))) + .build(); + }; +} diff --git a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/impl/SimpleProxy.java b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/impl/SimpleProxy.java index 2c07cb1..75e3fd2 100755 --- a/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/impl/SimpleProxy.java +++ b/gateway-support-lib/src/main/java/cn/axzo/foundation/gateway/support/proxy/impl/SimpleProxy.java @@ -3,8 +3,16 @@ package cn.axzo.foundation.gateway.support.proxy.impl; import cn.axzo.foundation.gateway.support.entity.GateResponse; import cn.axzo.foundation.gateway.support.entity.ProxyContext; import cn.axzo.foundation.gateway.support.entity.RequestContext; +import cn.axzo.foundation.gateway.support.exception.ApiNotFoundException; +import cn.axzo.foundation.gateway.support.fallback.FallbackConfig; import cn.axzo.foundation.gateway.support.plugin.ProxyHookChain; +import cn.axzo.foundation.gateway.support.proxy.GateResponseSupplier; import cn.axzo.foundation.web.support.rpc.RequestParams; +import cn.axzo.foundation.web.support.rpc.RpcClient; +import com.alibaba.csp.sentinel.Entry; +import com.alibaba.csp.sentinel.SphU; +import com.alibaba.csp.sentinel.Tracer; +import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONException; @@ -14,6 +22,7 @@ import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.net.UrlEscapers; +import lombok.extern.slf4j.Slf4j; import okhttp3.MediaType; import org.apache.commons.lang3.StringUtils; import org.springframework.util.CollectionUtils; @@ -23,12 +32,10 @@ import javax.servlet.http.HttpServletRequest; import java.util.*; import java.util.stream.Collectors; -import cn.axzo.foundation.gateway.support.exception.ApiNotFoundException; -import cn.axzo.foundation.web.support.rpc.RpcClient; - /** * 简单请求Proxy, 根据配置中的服务编码对应的域名, 在具体实现代理中进行URL及参数转换. */ +@Slf4j public abstract class SimpleProxy extends AbstractProxy { abstract String resolveServiceCode(RequestContext context); @@ -74,8 +81,33 @@ public abstract class SimpleProxy extends AbstractProxy { RpcClient rpcClient = getServiceResolver().getRpcClient(requestContext, hookedCode); RequestParams resolvedParams = getHookChain().preRequest(requestContext, getContext(), rpcClient, resolvedUrl, requestParams); - GateResponse response = requestContext.getRequestMethod().request( - rpcClient, resolvedUrl, resolvedParams); + FallbackConfig fallbackConfig = FallbackConfig.fromConfig(getContext().getProxyParam()); + + GateResponse response; + Entry entry = null; + try { + if (fallbackConfig != null) { + String resourceName = "SimpleProxy:" + requestContext.getServiceCode() + "@" + requestContext.getRequestURI(); + fallbackConfig.registerRule(resourceName); + entry = SphU.entry(resourceName); + } + response = requestContext.getRequestMethod().request( + rpcClient, resolvedUrl, resolvedParams); + } catch (BlockException ex) { + log.warn("proxy fall back: {}, fallbackBean : {}", requestContext.getRequestURI(), fallbackConfig.getBean(), ex); + //降级处理 + response = Optional.ofNullable(getContext().getGlobalContext().getFallbackSupplier()) + .flatMap(e -> Optional.ofNullable(e.apply(fallbackConfig.getBean()))) + .orElse(GateResponseSupplier.DEFAULT).getResponse(requestContext, resolvedUrl, requestParams); + } catch (Exception ex) { + Tracer.traceEntry(ex, entry); + throw ex; + } finally { + if (entry != null) { + entry.exit(); + } + } + GateResponse res = getHookChain().postResponse(requestContext, getContext(), response); filterResponse(requestContext, res); @@ -102,7 +134,7 @@ public abstract class SimpleProxy extends AbstractProxy { Optional subtypeOptional = Optional.ofNullable(context.getOriginalRequest()) .map(HttpServletRequest::getContentType) .map(MediaType::parse) - .map(e->e.subtype().toLowerCase()); + .map(e -> e.subtype().toLowerCase()); if (subtypeOptional.isPresent() && JSON_YML_SUBTYPES.contains(subtypeOptional.get())) { String requestBody = StringUtils.firstNonBlank(context.getRequestBody(), "{}"); Object body; diff --git a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/alert/EmailAlertConsumer.java b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/alert/EmailAlertConsumer.java index 09a9c87..9d986b1 100644 --- a/web-support-lib/src/main/java/cn/axzo/foundation/web/support/alert/EmailAlertConsumer.java +++ b/web-support-lib/src/main/java/cn/axzo/foundation/web/support/alert/EmailAlertConsumer.java @@ -1,6 +1,8 @@ package cn.axzo.foundation.web.support.alert; +import cn.axzo.foundation.enums.AppEnvEnum; import cn.axzo.foundation.web.support.AppRuntime; +import com.alibaba.fastjson.JSONObject; import lombok.Builder; import lombok.extern.slf4j.Slf4j; @@ -57,6 +59,10 @@ public class EmailAlertConsumer implements Consumer> alerts) { + if (appRuntime.getEnv() == AppEnvEnum.local) { + log.error("local alerts: {}", JSONObject.toJSONString(alerts)); + return; + } Message message = new MimeMessage(session); try { message.setFrom(new InternetAddress(from));