feat: 路由级别的服务降级

原因
    路由级别的服务降级

修改
    修改SimpleProxy, 支持降级
This commit is contained in:
zengxiaobo 2024-07-12 16:03:17 +08:00
parent 06a8b49c54
commit 0dfd96c83c
7 changed files with 154 additions and 87 deletions

View File

@ -214,15 +214,6 @@ public class BizGatewayImpl implements BizGateway {
((StopWatchHook) hook).initialize(); ((StopWatchHook) hook).initialize();
} }
private boolean isGrayStage(RequestContext requestContext) {
// TODO: correct gray stage
if (!CollectionUtils.isEmpty(requestContext.getHeaders())
&& StringUtils.equals(requestContext.getHeaders().get("gray_stage"), "true")) {
return true;
}
return false;
}
@Override @Override
public Proxy findProxy(RequestContext context) { public Proxy findProxy(RequestContext context) {
initializeStopWatchHook(); initializeStopWatchHook();
@ -377,9 +368,7 @@ public class BizGatewayImpl implements BizGateway {
if (getRouteRules() == null) { if (getRouteRules() == null) {
return ImmutableMap.of(); return ImmutableMap.of();
} }
return getRouteRules().stream().map(e -> { return getRouteRules().stream().map(this::buildProxy).collect(Collectors.toMap(Pair::getKey, Pair::getValue));
return buildProxy(e);
}).collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
} }

View File

@ -2,6 +2,7 @@ package cn.axzo.foundation.gateway.support.entity;
import cn.axzo.foundation.enums.AppEnvEnum; import cn.axzo.foundation.enums.AppEnvEnum;
import cn.axzo.foundation.gateway.support.plugin.ProxyHookChain; import cn.axzo.foundation.gateway.support.plugin.ProxyHookChain;
import cn.axzo.foundation.gateway.support.proxy.GateResponseSupplier;
import cn.axzo.foundation.gateway.support.utils.RpcClientProvider; import cn.axzo.foundation.gateway.support.utils.RpcClientProvider;
import cn.axzo.foundation.web.support.AppRuntime; import cn.axzo.foundation.web.support.AppRuntime;
import com.google.common.base.Strings; import com.google.common.base.Strings;
@ -34,20 +35,23 @@ public class GlobalContext {
@Getter @Getter
private AppEnvEnum gateEnv; private AppEnvEnum gateEnv;
@Getter @Getter
final private RpcClientProvider rpcClientProvider; private final RpcClientProvider rpcClientProvider;
@Getter @Getter
final private BiConsumer<String, Object[]> alertConsumer; private final BiConsumer<String, Object[]> alertConsumer;
@Getter @Getter
final private Function<AppEnvEnum, List<Service>> serviceSupplier; private final Function<AppEnvEnum, List<Service>> serviceSupplier;
/* 服务降级时默认返回 */
@Getter
private final Function<String, GateResponseSupplier> fallbackSupplier;
@Getter @Getter
/** 全局的代理Hook列表 */ /** 全局的代理Hook列表 */
final private ProxyHookChain proxyHookChain; private final ProxyHookChain proxyHookChain;
@Getter @Getter
final private Long blockingMillis; private final Long blockingMillis;
@Getter @Getter
final private Map<String, GateSettingResp.Proxy> localProxies; private final Map<String, GateSettingResp.Proxy> localProxies;
@Getter @Getter
/** 网关代理配置最后更新时间 */ /* 网关代理配置最后更新时间 */
private Long version; private Long version;
@Getter @Getter
/** 需要debug的URI列表 */ /** 需要debug的URI列表 */

View File

@ -0,0 +1,50 @@
package cn.axzo.foundation.gateway.support.fallback;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableSet;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class FallbackConfig {
String bean;
JSONObject config;
/** 策略, 支持FLOW_GRADE_QPS慢查询比例, DEGRADE_GRADE_EXCEPTION_RATIO异常比例, DEGRADE_GRADE_EXCEPTION_COUNT异常数 */
Integer grade = 2;
/** 默认10s内请求大于5个, 同时错误超过一半则降级30s */
Double count = 0.5D;
Integer timeWindow = 30;
Integer minRequestAmount = 5;
/** 慢比例有效 */
Double slowRatioThreshold = 1.0;
Integer statIntervalMs = 10000;
public static FallbackConfig fromConfig(JSONObject config) {
if (config == null || !config.containsKey("fallback")) {
return null;
}
return config.getJSONObject("fallback").toJavaObject(FallbackConfig.class);
}
public void registerRule(String resourceName) {
if (DegradeRuleManager.hasConfig(resourceName)) {
return;
}
DegradeRule degradeRule = new DegradeRule(resourceName);
degradeRule.setGrade(grade);
degradeRule.setCount(count);
degradeRule.setTimeWindow(timeWindow);
degradeRule.setMinRequestAmount(minRequestAmount);
degradeRule.setSlowRatioThreshold(slowRatioThreshold);
degradeRule.setStatIntervalMs(statIntervalMs);
DegradeRuleManager.setRulesForResource(resourceName, ImmutableSet.of(degradeRule));
}
}

View File

@ -1,9 +1,9 @@
package cn.axzo.foundation.gateway.support.plugin.impl; package cn.axzo.foundation.gateway.support.plugin.impl;
import cn.axzo.foundation.exception.BusinessException;
import cn.axzo.foundation.gateway.support.entity.GateResponse; import cn.axzo.foundation.gateway.support.entity.GateResponse;
import cn.axzo.foundation.gateway.support.entity.ProxyContext; import cn.axzo.foundation.gateway.support.entity.ProxyContext;
import cn.axzo.foundation.gateway.support.entity.RequestContext; import cn.axzo.foundation.gateway.support.entity.RequestContext;
import cn.axzo.foundation.gateway.support.fallback.FallbackConfig;
import cn.axzo.foundation.gateway.support.plugin.ProxyHook; import cn.axzo.foundation.gateway.support.plugin.ProxyHook;
import cn.axzo.foundation.gateway.support.plugin.impl.filters.*; import cn.axzo.foundation.gateway.support.plugin.impl.filters.*;
import cn.axzo.foundation.util.FastjsonUtils; import cn.axzo.foundation.util.FastjsonUtils;
@ -14,8 +14,6 @@ import com.alibaba.csp.sentinel.EntryType;
import com.alibaba.csp.sentinel.SphU; import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer; import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.slots.block.BlockException; import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -23,7 +21,6 @@ import com.google.common.base.Splitter;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import lombok.*; import lombok.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -123,25 +120,24 @@ public class RequestFilterHook implements ProxyHook {
for (FilterBean bean : beans) { for (FilterBean bean : beans) {
RequestFilter requestFilter = filterBeanResolver.apply(bean.getName()); RequestFilter requestFilter = filterBeanResolver.apply(bean.getName());
Preconditions.checkState(requestFilter != null, bean.getName() + " 没在系统注册"); Preconditions.checkState(requestFilter != null, bean.getName() + " 没在系统注册");
FallBackConfig fallBackConfig = bean.getFallBackConfig(); FallbackConfig fallbackConfig = FallbackConfig.fromConfig(bean.getConfig());
Entry entry = null; Entry entry = null;
try { try {
if (fallBackConfig != null) { if (fallbackConfig != null) {
String resourceName = bean.getResourceName(reqContext.getRequestURI()); String resourceName = bean.getResourceName(reqContext.getRequestURI());
fallBackConfig.registerRule(resourceName); fallbackConfig.registerRule(resourceName);
entry = SphU.entry(resourceName, EntryType.IN); entry = SphU.entry(resourceName, EntryType.IN);
} }
requestBody = requestFilter.filterIn(reqContext, requestBody, bean.getConfig()); requestBody = requestFilter.filterIn(reqContext, requestBody, bean.getConfig());
} catch (BlockException ex) { } catch (BlockException ex) {
String filterBean = StringUtils.firstNonBlank(fallbackConfig.getBean(), "emptyFilter");
log.warn("request fall back: {}, filter: {}, fallbackBean : {}",
reqContext.getRequestURI(), bean.getName(), filterBean, ex);
//降级处理 //降级处理
requestBody = filterBeanResolver.apply(fallBackConfig.getFallBackFilter()) requestBody = filterBeanResolver.apply(filterBean).filterIn(reqContext, requestBody, fallbackConfig.getConfig());
.filterIn(reqContext, requestBody, fallBackConfig.getConfig());
} catch (Exception ex) { } catch (Exception ex) {
//非业务异常记录trace
if (!BusinessException.class.isAssignableFrom(ex.getClass())) {
Tracer.traceEntry(ex, entry); Tracer.traceEntry(ex, entry);
}
throw ex; throw ex;
} finally { } finally {
if (entry != null) { if (entry != null) {
@ -176,25 +172,24 @@ public class RequestFilterHook implements ProxyHook {
for (FilterBean bean : beans) { for (FilterBean bean : beans) {
RequestFilter requestFilter = filterBeanResolver.apply(bean.getName()); RequestFilter requestFilter = filterBeanResolver.apply(bean.getName());
Preconditions.checkState(requestFilter != null, bean.getName() + " 没在系统注册"); Preconditions.checkState(requestFilter != null, bean.getName() + " 没在系统注册");
FallBackConfig fallBackConfig = bean.getFallBackConfig(); FallbackConfig fallbackConfig = FallbackConfig.fromConfig(bean.getConfig());
Entry entry = null; Entry entry = null;
try { try {
if (fallBackConfig != null) { if (fallbackConfig != null) {
String resourceName = bean.getResourceName(reqContext.getRequestURI()); String resourceName = bean.getResourceName(reqContext.getRequestURI());
fallBackConfig.registerRule(resourceName); fallbackConfig.registerRule(resourceName);
entry = SphU.entry(resourceName, EntryType.OUT); entry = SphU.entry(resourceName, EntryType.OUT);
} }
responseBody = requestFilter.filterOut(reqContext, responseBody, bean.getConfig()); responseBody = requestFilter.filterOut(reqContext, responseBody, bean.getConfig());
} catch (BlockException ex) { } catch (BlockException ex) {
String filterBean = StringUtils.firstNonBlank(fallbackConfig.getBean(), "emptyFilter");
log.warn("response fall back: {}, filter: {}, fallbackBean : {}",
reqContext.getRequestURI(), bean.getName(), filterBean, ex);
//降级处理 //降级处理
responseBody = filterBeanResolver.apply(fallBackConfig.getFallBackFilter()) responseBody = filterBeanResolver.apply(filterBean).filterOut(reqContext, responseBody, fallbackConfig.getConfig());
.filterIn(reqContext, responseBody, fallBackConfig.getConfig());
} catch (Exception ex) { } catch (Exception ex) {
//非业务异常记录trace
if (!BusinessException.class.isAssignableFrom(ex.getClass())) {
Tracer.traceEntry(ex, entry); Tracer.traceEntry(ex, entry);
}
throw ex; throw ex;
} finally { } finally {
if (entry != null) { if (entry != null) {
@ -263,48 +258,8 @@ public class RequestFilterHook implements ProxyHook {
private String name; private String name;
private JSONObject config; private JSONObject config;
public FallBackConfig getFallBackConfig() {
return Optional.ofNullable(config)
.flatMap(e -> Optional.ofNullable(e.getJSONObject("fallBack"))
.map(fallBack -> fallBack.toJavaObject(FallBackConfig.class)))
.orElse(null);
}
public String getResourceName(String requestUrl) { public String getResourceName(String requestUrl) {
return "RequestFilterHook:" + name + "@" + requestUrl; return "RequestFilterHook:" + name + "@" + requestUrl;
} }
} }
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
private static class FallBackConfig {
String fallBackFilter = "emptyFilter";
JSONObject config;
/** 策略, 支持FLOW_GRADE_QPS慢查询比例, DEGRADE_GRADE_EXCEPTION_RATIO异常比例, DEGRADE_GRADE_EXCEPTION_COUNT异常数 */
Integer grade = 2;
/** 默认10s内请求大于5个, 同时错误超过一半则降级30s */
Double count = 0.5D;
Integer timeWindow = 30;
Integer minRequestAmount = 5;
/** 慢比例有效 */
Double slowRatioThreshold = 1.0;
Integer statIntervalMs = 10000;
public void registerRule(String resourceName) {
if (DegradeRuleManager.hasConfig(resourceName)) {
return;
}
DegradeRule degradeRule = new DegradeRule(resourceName);
degradeRule.setGrade(grade);
degradeRule.setCount(count);
degradeRule.setTimeWindow(timeWindow);
degradeRule.setMinRequestAmount(minRequestAmount);
degradeRule.setSlowRatioThreshold(slowRatioThreshold);
degradeRule.setStatIntervalMs(statIntervalMs);
DegradeRuleManager.setRulesForResource(resourceName, ImmutableSet.of(degradeRule));
}
}
} }

View File

@ -0,0 +1,31 @@
package cn.axzo.foundation.gateway.support.proxy;
import cn.axzo.foundation.gateway.support.entity.GateResponse;
import cn.axzo.foundation.gateway.support.entity.RequestContext;
import cn.axzo.foundation.result.ApiResult;
import cn.axzo.foundation.web.support.rpc.RequestParams;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.springframework.http.HttpStatus;
import java.nio.charset.StandardCharsets;
/**
* 返回GateResponse
* 目前在服务降级时会调用
*/
public interface GateResponseSupplier {
GateResponse getResponse(RequestContext requestContext, String resolvedUrl, RequestParams requestParams);
GateResponseSupplier DEFAULT = (requestContext, resolvedUrl, requestParams) -> {
byte[] bytes = JSON.toJSONString(ApiResult.success()).getBytes(StandardCharsets.UTF_8);
return GateResponse.builder()
.status(HttpStatus.OK)
.content(bytes)
.headers(ImmutableMap.of("content-type", ImmutableList.of("application/json;charset=UTF-8")
, "content-length", ImmutableList.of(bytes.length + "")))
.build();
};
}

View File

@ -3,8 +3,16 @@ package cn.axzo.foundation.gateway.support.proxy.impl;
import cn.axzo.foundation.gateway.support.entity.GateResponse; import cn.axzo.foundation.gateway.support.entity.GateResponse;
import cn.axzo.foundation.gateway.support.entity.ProxyContext; import cn.axzo.foundation.gateway.support.entity.ProxyContext;
import cn.axzo.foundation.gateway.support.entity.RequestContext; import cn.axzo.foundation.gateway.support.entity.RequestContext;
import cn.axzo.foundation.gateway.support.exception.ApiNotFoundException;
import cn.axzo.foundation.gateway.support.fallback.FallbackConfig;
import cn.axzo.foundation.gateway.support.plugin.ProxyHookChain; import cn.axzo.foundation.gateway.support.plugin.ProxyHookChain;
import cn.axzo.foundation.gateway.support.proxy.GateResponseSupplier;
import cn.axzo.foundation.web.support.rpc.RequestParams; import cn.axzo.foundation.web.support.rpc.RequestParams;
import cn.axzo.foundation.web.support.rpc.RpcClient;
import com.alibaba.csp.sentinel.Entry;
import com.alibaba.csp.sentinel.SphU;
import com.alibaba.csp.sentinel.Tracer;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONException;
@ -14,6 +22,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.net.UrlEscapers; import com.google.common.net.UrlEscapers;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType; import okhttp3.MediaType;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
@ -23,12 +32,10 @@ import javax.servlet.http.HttpServletRequest;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import cn.axzo.foundation.gateway.support.exception.ApiNotFoundException;
import cn.axzo.foundation.web.support.rpc.RpcClient;
/** /**
* 简单请求Proxy, 根据配置中的服务编码对应的域名, 在具体实现代理中进行URL及参数转换. * 简单请求Proxy, 根据配置中的服务编码对应的域名, 在具体实现代理中进行URL及参数转换.
*/ */
@Slf4j
public abstract class SimpleProxy extends AbstractProxy { public abstract class SimpleProxy extends AbstractProxy {
abstract String resolveServiceCode(RequestContext context); abstract String resolveServiceCode(RequestContext context);
@ -74,8 +81,33 @@ public abstract class SimpleProxy extends AbstractProxy {
RpcClient rpcClient = getServiceResolver().getRpcClient(requestContext, hookedCode); RpcClient rpcClient = getServiceResolver().getRpcClient(requestContext, hookedCode);
RequestParams resolvedParams = getHookChain().preRequest(requestContext, getContext(), rpcClient, resolvedUrl, requestParams); RequestParams resolvedParams = getHookChain().preRequest(requestContext, getContext(), rpcClient, resolvedUrl, requestParams);
GateResponse response = requestContext.getRequestMethod().request( FallbackConfig fallbackConfig = FallbackConfig.fromConfig(getContext().getProxyParam());
GateResponse response;
Entry entry = null;
try {
if (fallbackConfig != null) {
String resourceName = "SimpleProxy:" + requestContext.getServiceCode() + "@" + requestContext.getRequestURI();
fallbackConfig.registerRule(resourceName);
entry = SphU.entry(resourceName);
}
response = requestContext.getRequestMethod().request(
rpcClient, resolvedUrl, resolvedParams); rpcClient, resolvedUrl, resolvedParams);
} catch (BlockException ex) {
log.warn("proxy fall back: {}, fallbackBean : {}", requestContext.getRequestURI(), fallbackConfig.getBean(), ex);
//降级处理
response = Optional.ofNullable(getContext().getGlobalContext().getFallbackSupplier())
.flatMap(e -> Optional.ofNullable(e.apply(fallbackConfig.getBean())))
.orElse(GateResponseSupplier.DEFAULT).getResponse(requestContext, resolvedUrl, requestParams);
} catch (Exception ex) {
Tracer.traceEntry(ex, entry);
throw ex;
} finally {
if (entry != null) {
entry.exit();
}
}
GateResponse res = getHookChain().postResponse(requestContext, getContext(), response); GateResponse res = getHookChain().postResponse(requestContext, getContext(), response);
filterResponse(requestContext, res); filterResponse(requestContext, res);
@ -102,7 +134,7 @@ public abstract class SimpleProxy extends AbstractProxy {
Optional<String> subtypeOptional = Optional.ofNullable(context.getOriginalRequest()) Optional<String> subtypeOptional = Optional.ofNullable(context.getOriginalRequest())
.map(HttpServletRequest::getContentType) .map(HttpServletRequest::getContentType)
.map(MediaType::parse) .map(MediaType::parse)
.map(e->e.subtype().toLowerCase()); .map(e -> e.subtype().toLowerCase());
if (subtypeOptional.isPresent() && JSON_YML_SUBTYPES.contains(subtypeOptional.get())) { if (subtypeOptional.isPresent() && JSON_YML_SUBTYPES.contains(subtypeOptional.get())) {
String requestBody = StringUtils.firstNonBlank(context.getRequestBody(), "{}"); String requestBody = StringUtils.firstNonBlank(context.getRequestBody(), "{}");
Object body; Object body;

View File

@ -1,6 +1,8 @@
package cn.axzo.foundation.web.support.alert; package cn.axzo.foundation.web.support.alert;
import cn.axzo.foundation.enums.AppEnvEnum;
import cn.axzo.foundation.web.support.AppRuntime; import cn.axzo.foundation.web.support.AppRuntime;
import com.alibaba.fastjson.JSONObject;
import lombok.Builder; import lombok.Builder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -57,6 +59,10 @@ public class EmailAlertConsumer implements Consumer<Map<AlertClient.AlertKey, Co
@Override @Override
public void accept(Map<AlertClient.AlertKey, Collection<AlertClient.AlertMessage>> alerts) { public void accept(Map<AlertClient.AlertKey, Collection<AlertClient.AlertMessage>> alerts) {
if (appRuntime.getEnv() == AppEnvEnum.local) {
log.error("local alerts: {}", JSONObject.toJSONString(alerts));
return;
}
Message message = new MimeMessage(session); Message message = new MimeMessage(session);
try { try {
message.setFrom(new InternetAddress(from)); message.setFrom(new InternetAddress(from));