0
  • 聊天消息
  • 系统消息
  • 评论与回复
登录后你可以
  • 下载海量资料
  • 学习在线课程
  • 观看技术视频
  • 写文章/发帖/加入社区
会员中心
创作中心

完善资料让更多小伙伴认识你,还能领取20积分哦,立即完善>

3天内不再提示

springboot统一异常处理

jf_ro2CN3Fa 来源:CSDN 2023-07-25 16:11 次阅读

一、背景

限流对于一个微服务架构系统来说具有非常重要的意义,否则其中的某个微服务将成为整个系统隐藏的雪崩因素,为什么这么说?

举例来讲,某个SAAS平台有100多个微服务应用,但是作为底层的某个或某几个应用来说,将会被所有上层应用频繁调用,业务高峰期时,如果底层应用不做限流处理,该应用必将面临着巨大的压力,尤其是那些个别被高频调用的接口来说,最直接的表现就是导致后续新进来的请求阻塞、排队、响应超时...最后直到该服务所在JVM资源被耗尽。

二、限流概述

在大多数的微服务架构在设计之初,比如在技术选型阶段,架构师会从一个全局的视角去规划技术栈的组合,比如结合当前产品的现状考虑是使用dubbo?还是springcloud?作为微服务治理的底层框架。甚至为了满足快速的上线、迭代和交付,直接以springboot为基座进行开发,后续再引入新的技术栈等...

所以在谈论某个业务场景具体的技术解决方案时不可一概而论,而是需要结合产品和业务的现状综合评估,以限流来说,在下面的不同的技术架构下具体在选择的时候可能也不一样。

2.1 dubbo 服务治理模式

选择dubbo框架作为基础服务治理对于那种偏向内部平台的应用还是不错的,dubbo底层走netty,这一点相比http协议来说,在一定场景下还是具有优势的,如果选择dubbo,在选择限流方案上可以做如下的参考。

2.1.1 dubbo框架级限流

dubbo官方提供了完善的服务治理,能够满足大多数开发场景中的需求,针对限流这个场景,具体来说包括如下手段,具体的配置,可以参考官方手册;

客户端限流

信号量限流 (通过统计的方式)

连接数限流 (socket->tcp)

服务端限流

线程池限流 (隔离手段)

信号量限流 (非隔离手段)

接收数限流 (socket->tcp)

2.1.2 线程池设置

多线程并发操作一定离不开线程池,Dubbo自身提供了支持了四种线程池类型支持。生产者标签中可配置线程池关键参数,线程池类型、阻塞队列大小、核心线程数量等,通过配置生产端的线程池数量可以在一定程度上起到限流的效果。

2.1.3 集成第三方组件

如果是springboot框架的项目,可以考虑直接引入地方的组件或SDK,比如hystrix,guava,sentinel原生SDK等,如果技术实力足够强甚至可以考虑自己造轮子。

2.2 springcloud 服务治理模式

如果你的服务治理框架选用的是springcloud或springcloud-alibaba,其框架自身的生态中已经包含了相应的限流组件,可以实现开箱即用,下面列举几种常用的基于springcloud框架的限流组件。

2.2.1 hystrix

Hystrix是Netflix开源的一款容错框架,在springcloud早期推出市场的时候,作为springcloud生态中用于限流、熔断、降级的一款组件。

Hystrix提供了限流功能,在springcloud架构的系统中,可以在网关启用Hystrix,进行限流处理,每个微服务也可以各自启用Hystrix进行限流。

Hystrix默认使用线程隔离模式,可以通过线程数+队列大小进行限流,具体参数配置可以参考官网相关资料

2.2.2 sentinel

Sentinel 号称分布式系统的流量防卫兵,属于springcloud-alibaba生态中的重要组件,面向分布式服务架构的流量控制组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。

2.3 网关层限流

随着微服务规模的增加,整个系统中很多微服务都需要实现限流这种需求时,就可以考虑在网关这一层进行限流了,通常来说,网关层的限流面向的是通用的业务,比如那些恶意的请求,爬虫,攻击等,简单来说,网关层面的限流提供了一层对系统整体的保护措施。

三、常用限流策略

3.1 限流常用的算法

不管是哪种限流组件,其底层的限流实现算法大同小异,这里列举几种常用的限流算法以供了解。

3.1.1 令牌桶算法

令牌桶算法是目前应用最为广泛的限流算法,顾名思义,它有以下两个关键角色:

令牌 :获取到令牌的Request才会被处理,其他Requests要么排队要么被直接丢弃;

桶 :用来装令牌的地方,所有Request都从这个桶里面获取令牌

cb3a5c3a-2a03-11ee-a368-dac502259ad0.png

令牌桶主要涉及到2个过程,即令牌的生成,令牌的获取

3.1.2 漏桶算法

漏桶算法的前半段和令牌桶类似,但是操作的对象不同,结合下图进行理解。

令牌桶是将令牌放入桶里,而漏桶是将访问请求的数据包放到桶里。同样的是,如果桶满了,那么后面新来的数据包将被丢弃。

cb3ecdba-2a03-11ee-a368-dac502259ad0.png

3.1.3 滑动时间窗口

根据下图,简单描述下滑动时间窗口这种过程:

黑色大框为时间窗口,可以设定窗口时间单位为5秒,它会随着时间推移向后滑动。我们将窗口内的时间划分为五个小格子,每个格子代表1秒钟,同时这个格子还包含一个计数器,用来计算在当前时间内访问的请求数量。那么这个时间窗口内的总访问量就是所有格子计数器累加后的数值;

比如说,我们在每一秒内有5个用户访问,第5秒内有10个用户访问,那么在0到5秒这个时间窗口内访问量就是15。如果我们的接口设置了时间窗口内访问上限是20,那么当时间到第六秒的时候,这个时间窗口内的计数总和就变成了10,因为1秒的格子已经退出了时间窗口,因此在第六秒内可以接收的访问量就是20-10=10个;

cb4ab60c-2a03-11ee-a368-dac502259ad0.png

滑动窗口其实也是一种计算器算法,它有一个显著特点,当时间窗口的跨度越长时,限流效果就越平滑。打个比方,如果当前时间窗口只有两秒,而访问请求全部集中在第一秒的时候,当时间向后滑动一秒后,当前窗口的计数量将发生较大的变化,拉长时间窗口可以降低这种情况的发生概率

四、通用限流实现方案

抛开网关层的限流先不说,在微服务应用中,考虑到技术栈的组合,团队人员的开发水平,以及易维护性等因素,一个比较通用的做法是,利用AOP技术+自定义注解实现对特定的方法或接口进行限流,下面基于这个思路来分别介绍下几种常用的限流方案的实现。

4.1 基于guava限流实现

guava为谷歌开源的一个比较实用的组件,利用这个组件可以帮助开发人员完成常规的限流操作,接下来看具体的实现步骤。

4.1.1 引入guava依赖

版本可以选择更高的或其他版本


com.google.guava
guava
23.0

4.1.2 自定义限流注解

自定义一个限流用的注解,后面在需要限流的方法或接口上面只需添加该注解即可;

importjava.lang.annotation.ElementType;
importjava.lang.annotation.Retention;
importjava.lang.annotation.RetentionPolicy;
importjava.lang.annotation.Target;

@Target(value=ElementType.METHOD)
@Retention(value=RetentionPolicy.RUNTIME)
public@interfaceRateConfigAnno{

StringlimitType();

doublelimitCount()default5d;
}

4.1.3 限流AOP类

通过AOP前置通知的方式拦截添加了上述自定义限流注解的方法,解析注解中的属性值,并以该属性值作为guava提供的限流参数,该类为整个实现的核心所在。

importcom.alibaba.fastjson2.JSONObject;
importcom.google.common.util.concurrent.RateLimiter;
importorg.aspectj.lang.JoinPoint;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Before;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.stereotype.Component;
importorg.springframework.web.context.request.RequestContextHolder;
importorg.springframework.web.context.request.ServletRequestAttributes;

importjavax.servlet.ServletOutputStream;
importjavax.servlet.http.HttpServletResponse;
importjava.io.IOException;
importjava.lang.reflect.Method;
importjava.util.Objects;

@Aspect
@Component
publicclassGuavaLimitAop{

privatestaticLoggerlogger=LoggerFactory.getLogger(GuavaLimitAop.class);

@Before("execution(@RateConfigAnno**(..))")
publicvoidlimit(JoinPointjoinPoint){
//1、获取当前的调用方法
MethodcurrentMethod=getCurrentMethod(joinPoint);
if(Objects.isNull(currentMethod)){
return;
}
//2、从方法注解定义上获取限流的类型
StringlimitType=currentMethod.getAnnotation(RateConfigAnno.class).limitType();
doublelimitCount=currentMethod.getAnnotation(RateConfigAnno.class).limitCount();
//使用guava的令牌桶算法获取一个令牌,获取不到先等待
RateLimiterrateLimiter=RateLimitHelper.getRateLimiter(limitType,limitCount);
booleanb=rateLimiter.tryAcquire();
if(b){
System.out.println("获取到令牌");
}else{
HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse();
JSONObjectjsonObject=newJSONObject();
jsonObject.put("success",false);
jsonObject.put("msg","限流中");
try{
output(resp,jsonObject.toJSONString());
}catch(Exceptione){
logger.error("error,e:{}",e);
}
}
}

privateMethodgetCurrentMethod(JoinPointjoinPoint){
Method[]methods=joinPoint.getTarget().getClass().getMethods();
Methodtarget=null;
for(Methodmethod:methods){
if(method.getName().equals(joinPoint.getSignature().getName())){
target=method;
break;
}
}
returntarget;
}

publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{
response.setContentType("application/json;charset=UTF-8");
ServletOutputStreamoutputStream=null;
try{
outputStream=response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
}catch(IOExceptione){
e.printStackTrace();
}finally{
outputStream.flush();
outputStream.close();
}
}
}

其中限流的核心API即为RateLimiter这个对象,涉及到的RateLimitHelper类如下

importcom.google.common.util.concurrent.RateLimiter;

importjava.util.HashMap;
importjava.util.Map;

publicclassRateLimitHelper{

privateRateLimitHelper(){}

privatestaticMaprateMap=newHashMap<>();

publicstaticRateLimitergetRateLimiter(StringlimitType,doublelimitCount){
RateLimiterrateLimiter=rateMap.get(limitType);
if(rateLimiter==null){
rateLimiter=RateLimiter.create(limitCount);
rateMap.put(limitType,rateLimiter);
}
returnrateLimiter;
}

}

4.1.4 测试接口

下面添加一个测试接口,测试一下上面的代码是否生效

@RestController
publicclassOrderController{

//localhost:8081/save
@GetMapping("/save")
@RateConfigAnno(limitType="saveOrder",limitCount=1)
publicStringsave(){
return"success";
}

}

在接口中为了模拟出效果,我们将参数设置的非常小,即QPS为1,可以预想当每秒请求超过1时将会出现被限流的提示,启动工程并验证接口,每秒1次的请求,可以正常得到结果,效果如下:

cb518608-2a03-11ee-a368-dac502259ad0.png

快速刷接口,将会看到下面的效果

cb5911d4-2a03-11ee-a368-dac502259ad0.png

4.2 基于sentinel限流实现

在不少同学的意识中,sentinel通常是需要结合springcloud-alibaba框架一起实用的,而且与框架集成之后,可以配合控制台一起使用达到更好的效果,实际上,sentinel官方也提供了相对原生的SDK可供使用,接下来就以这种方式进行整合。

4.2.1 引入sentinel核心依赖包


com.alibaba.csp
sentinel-core
1.8.0

4.2.2 自定义限流注解

可以根据需要,添加更多的属性

importjava.lang.annotation.ElementType;
importjava.lang.annotation.Retention;
importjava.lang.annotation.RetentionPolicy;
importjava.lang.annotation.Target;

@Target(value=ElementType.METHOD)
@Retention(value=RetentionPolicy.RUNTIME)
public@interfaceSentinelLimitAnnotation{

StringresourceName();

intlimitCount()default5;

}

4.2.3 自定义AOP类实现限流

该类的实现思路与上述使用guava类似,不同的是,这里使用的是sentinel原生的限流相关的API,对此不够属性的可以查阅官方的文档进行学习,这里就不展开来说了。

importcom.alibaba.csp.sentinel.Entry;
importcom.alibaba.csp.sentinel.SphU;
importcom.alibaba.csp.sentinel.Tracer;
importcom.alibaba.csp.sentinel.slots.block.BlockException;
importcom.alibaba.csp.sentinel.slots.block.RuleConstant;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
importorg.apache.commons.lang3.StringUtils;
importorg.aspectj.lang.JoinPoint;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.springframework.stereotype.Component;

importjava.lang.reflect.Method;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Objects;

@Aspect
@Component
publicclassSentinelMethodLimitAop{

privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){
Listrules=newArrayList<>();
FlowRulerule=newFlowRule();
//设置受保护的资源
rule.setResource(resourceName);
//设置流控规则QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//设置受保护的资源阈值
rule.setCount(limitCount);
rules.add(rule);
//加载配置好的规则
FlowRuleManager.loadRules(rules);
}

@Pointcut(value="@annotation(com.congge.sentinel.SentinelLimitAnnotation)")
publicvoidrateLimit(){

}

@Around("rateLimit()")
publicObjectaround(ProceedingJoinPointjoinPoint){
//1、获取当前的调用方法
MethodcurrentMethod=getCurrentMethod(joinPoint);
if(Objects.isNull(currentMethod)){
returnnull;
}
//2、从方法注解定义上获取限流的类型
StringresourceName=currentMethod.getAnnotation(SentinelLimitAnnotation.class).resourceName();
if(StringUtils.isEmpty(resourceName)){
thrownewRuntimeException("资源名称为空");
}
intlimitCount=currentMethod.getAnnotation(SentinelLimitAnnotation.class).limitCount();
initFlowRule(resourceName,limitCount);

Entryentry=null;
Objectresult=null;
try{
entry=SphU.entry(resourceName);
try{
result=joinPoint.proceed();
}catch(Throwablethrowable){
throwable.printStackTrace();
}
}catch(BlockExceptionex){
//资源访问阻止,被限流或被降级
//在此处进行相应的处理操作
System.out.println("blocked");
return"被限流了";
}catch(Exceptione){
Tracer.traceEntry(e,entry);
}finally{
if(entry!=null){
entry.exit();
}
}
returnresult;
}

privateMethodgetCurrentMethod(JoinPointjoinPoint){
Method[]methods=joinPoint.getTarget().getClass().getMethods();
Methodtarget=null;
for(Methodmethod:methods){
if(method.getName().equals(joinPoint.getSignature().getName())){
target=method;
break;
}
}
returntarget;
}
}

4.2.4 自定义测试接口

为了模拟效果,这里将QPS的数量设置为1

//localhost:8081/limit
@GetMapping("/limit")
@SentinelLimitAnnotation(limitCount=1,resourceName="sentinelLimit")
publicStringsentinelLimit(){
return"sentinelLimit";
}

启动工程之后,浏览器调用接口测试一下,每秒一个请求,可以正常通过

cb6841cc-2a03-11ee-a368-dac502259ad0.png

快速刷接口,超过每秒1次时,效果如下

cb716dec-2a03-11ee-a368-dac502259ad0.png

这里只是为了演示出效果,建议在真实的项目中使用时,对返回结果做一个封装。

4.3 基于redis+lua限流实现

redis是线程安全的,天然具有线程安全的特性,支持原子性操作,限流服务不仅需要承接超高QPS,还要保证限流逻辑的执行层面具备线程安全的特性,利用Redis这些特性做限流,既能保证线程安全,也能保证性能。基于redis的限流实现完整流程如下图:

cb7998dc-2a03-11ee-a368-dac502259ad0.png

结合上面的流程图,这里梳理出一个整体的实现思路:

编写lua脚本,指定入参的限流规则,比如对特定的接口限流时,可以根据某个或几个参数进行判定,调用该接口的请求,在一定的时间窗口内监控请求次数;

既然是限流,最好能够通用,可将限流规则应用到任何接口上,那么最合适的方式就是通过自定义注解形式切入;

提供一个配置类,被spring的容器管理,redisTemplate中提供了DefaultRedisScript这个bean;

提供一个能动态解析接口参数的类,根据接口参数进行规则匹配后触发限流;

4.3.1 引入redis依赖


org.springframework.boot
spring-boot-starter-data-redis

4.3.2 自定义注解

@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public@interfaceRedisLimitAnnotation{

/**
*key
*/
Stringkey()default"";
/**
*Key的前缀
*/
Stringprefix()default"";
/**
*一定时间内最多访问次数
*/
intcount();
/**
*给定的时间范围单位(秒)
*/
intperiod();
/**
*限流的类型(用户自定义key或者请求ip)
*/
LimitTypelimitType()defaultLimitType.CUSTOMER;

}

4.3.3 自定义redis配置类

importorg.springframework.context.annotation.Bean;
importorg.springframework.core.io.ClassPathResource;
importorg.springframework.data.redis.connection.RedisConnectionFactory;
importorg.springframework.data.redis.core.RedisTemplate;
importorg.springframework.data.redis.core.script.DefaultRedisScript;
importorg.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
importorg.springframework.data.redis.serializer.StringRedisSerializer;
importorg.springframework.scripting.support.ResourceScriptSource;
importorg.springframework.stereotype.Component;

importjava.io.Serializable;

@Component
publicclassRedisConfiguration{

@Bean
publicDefaultRedisScriptredisluaScript(){
DefaultRedisScriptredisScript=newDefaultRedisScript<>();
redisScript.setScriptSource(newResourceScriptSource(newClassPathResource("limit.lua")));
redisScript.setResultType(Number.class);
returnredisScript;
}

@Bean("redisTemplate")
publicRedisTemplateredisTemplate(RedisConnectionFactoryredisConnectionFactory){
RedisTemplateredisTemplate=newRedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
Jackson2JsonRedisSerializerjackson2JsonRedisSerializer=newJackson2JsonRedisSerializer(Object.class);

//设置value的序列化方式为JSOn
redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
//设置key的序列化方式为String
redisTemplate.setKeySerializer(newStringRedisSerializer());

redisTemplate.setHashKeySerializer(newStringRedisSerializer());
redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();

returnredisTemplate;
}

}

4.3.4 自定义限流AOP类

importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.aspectj.lang.reflect.MethodSignature;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.data.redis.core.RedisTemplate;
importorg.springframework.data.redis.core.script.DefaultRedisScript;
importorg.springframework.web.context.request.RequestContextHolder;
importorg.springframework.web.context.request.ServletRequestAttributes;

importjavax.servlet.http.HttpServletRequest;
importjava.io.Serializable;
importjava.lang.reflect.Method;
importjava.util.Collections;
importjava.util.List;

@Aspect
@Configuration
publicclassLimitRestAspect{

privatestaticfinalLoggerlogger=LoggerFactory.getLogger(LimitRestAspect.class);

@Autowired
privateRedisTemplateredisTemplate;

@Autowired
privateDefaultRedisScriptredisluaScript;


@Pointcut(value="@annotation(com.congge.config.limit.RedisLimitAnnotation)")
publicvoidrateLimit(){

}

@Around("rateLimit()")
publicObjectinterceptor(ProceedingJoinPointjoinPoint)throwsThrowable{
MethodSignaturesignature=(MethodSignature)joinPoint.getSignature();
Methodmethod=signature.getMethod();
ClasstargetClass=method.getDeclaringClass();
RedisLimitAnnotationrateLimit=method.getAnnotation(RedisLimitAnnotation.class);
if(rateLimit!=null){
HttpServletRequestrequest=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getRequest();
StringipAddress=getIpAddr(request);
StringBufferstringBuffer=newStringBuffer();
stringBuffer.append(ipAddress).append("-")
.append(targetClass.getName()).append("-")
.append(method.getName()).append("-")
.append(rateLimit.key());
Listkeys=Collections.singletonList(stringBuffer.toString());
//调用lua脚本,获取返回结果,这里即为请求的次数
Numbernumber=redisTemplate.execute(
redisluaScript,
keys,
rateLimit.count(),
rateLimit.period()
);
if(number!=null&&number.intValue()!=0&&number.intValue()<= rateLimit.count()) {
                logger.info("限流时间段内访问了第:{} 次", number.toString());
                return joinPoint.proceed();
            }
        } else {
            return joinPoint.proceed();
        }
        throw new RuntimeException("访问频率过快,被限流了");
    }
 
    /**
     * 获取请求的IP方法
     * @param request
     * @return
     */
    private static String getIpAddr(HttpServletRequest request) {
        String ipAddress = null;
        try {
            ipAddress = request.getHeader("x-forwarded-for");
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getHeader("WL-Proxy-Client-IP");
            }
            if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) {
                ipAddress = request.getRemoteAddr();
            }
            // 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割
            if (ipAddress != null && ipAddress.length() >15){
if(ipAddress.indexOf(",")>0){
ipAddress=ipAddress.substring(0,ipAddress.indexOf(","));
}
}
}catch(Exceptione){
ipAddress="";
}
returnipAddress;
}

}

该类要做的事情和上面的两种限流措施类似,不过在这里核心的限流是通过读取lua脚步,通过参数传递给lua脚步实现的。

4.3.5 自定义lua脚本

在工程的resources目录下,添加如下的lua脚本

localkey="rate.limit:"..KEYS[1]

locallimit=tonumber(ARGV[1])

localcurrent=tonumber(redis.call('get',key)or"0")

ifcurrent+1>limitthen
return0
else
--没有超阈值,将当前访问数量+1,并设置2秒过期(可根据自己的业务情况调整)
redis.call("INCRBY",key,"1")
redis.call("expire",key,"2")
returncurrent+1
end

4.3.6 添加测试接口

@RestController
publicclassRedisController{

//localhost:8081/redis/limit
@GetMapping("/redis/limit")
@RedisLimitAnnotation(key="queryFromRedis",period=1,count=1)
publicStringqueryFromRedis(){
return"success";
}

}

为了模拟效果,这里将QPS设置为1 ,启动工程后(提前启动redis服务),调用一下接口,正常的效果如下:

cb81f504-2a03-11ee-a368-dac502259ad0.png

快速刷接口,超过每秒1次的请求时看到如下效果

cb8c793e-2a03-11ee-a368-dac502259ad0.png

五、自定义starter限流实现

上面通过案例介绍了几种常用的限流实现,不过细心的同学可以看到,这些限流的实现都是在具体的工程模块中嵌入的,事实上,在真实的微服务开发中,一个项目可能包含了众多的微服务模块,为了减少重复造轮子,避免每个微服务模块中单独实现,可以考虑将限流的逻辑实现封装成一个SDK,即作为一个springboot的starter的方式被其他微服务模块进行引用即可。这也是目前很多生产实践中比较通用的做法,接下来看看具体的实现吧。

5.1 前置准备

创建一个空的springboot工程,工程目录结构如下图,目录说明:

annotation:存放自定义的限流相关的注解;

aop:存放不同的限流实现,比如基于guava的aop,基于sentinel的aop实现等;

spring.factories:自定义待装配的aop实现类;

cb9342f0-2a03-11ee-a368-dac502259ad0.png

5.2 代码整合完成步骤

5.2.1 导入基础的依赖

这里包括如下几个必须的依赖,其他的依赖可以结合自身的情况合理选择;

spring-boot-starter;

guava;

spring-boot-autoconfigure;

sentinel-core;


org.springframework.boot
spring-boot-starter-parent
2.2.1.RELEASE




UTF-8
UTF-8
1.8





org.springframework.boot
spring-boot-starter-aop



log4j
log4j
1.2.17



org.springframework.boot
spring-boot-starter



org.springframework.boot
spring-boot-starter-web



org.projectlombok
lombok


 

com.google.guava
guava
23.0



org.springframework.boot
spring-boot-autoconfigure
2.2.1.RELEASE



org.springframework.boot
spring-boot-configuration-processor
2.2.1.RELEASE



com.alibaba.csp
sentinel-core
1.8.0



org.apache.commons
commons-lang3
3.4



com.alibaba.fastjson2
fastjson2
2.0.22







src/main/resources

**/**




5.2.2 自定义注解

目前该SDK支持三种限流方式,即后续其他微服务工程中可以通过添加这3种注解即可实现限流,分别是基于guava的令牌桶,基于sentinel的限流,基于java自带的Semaphore限流,三个自定义注解类如下:

令牌桶

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)

public@interfaceTokenBucketLimiter{
intvalue()default50;
}

Semaphore

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public@interfaceShLimiter{
intvalue()default50;
}

sentinel

@Target(value=ElementType.METHOD)
@Retention(value=RetentionPolicy.RUNTIME)
public@interfaceSentinelLimiter{

StringresourceName();

intlimitCount()default50;

}

5.2.3 限流实现AOP类

具体的限流在AOP中进行实现,思路和上一章节类似,即通过环绕通知的方式,先解析那些添加了限流注解的方法,然后解析里面的参数,进行限流的业务实现。

基于guava的aop实现

importcom.alibaba.fastjson2.JSONObject;
importcom.congge.annotation.TokenBucketLimiter;
importcom.google.common.util.concurrent.RateLimiter;
importlombok.extern.slf4j.Slf4j;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.springframework.cglib.core.ReflectUtils;
importorg.springframework.stereotype.Component;
importorg.springframework.web.context.request.RequestContextHolder;
importorg.springframework.web.context.request.ServletRequestAttributes;

importjavax.servlet.ServletOutputStream;
importjavax.servlet.http.HttpServletResponse;
importjava.io.IOException;
importjava.lang.reflect.Method;
importjava.util.Arrays;
importjava.util.Map;
importjava.util.concurrent.ConcurrentHashMap;

@Aspect
@Component
@Slf4j
publicclassGuavaLimiterAop{

privatefinalMaprateLimiters=newConcurrentHashMap();

@Pointcut("@annotation(com.congge.annotation.TokenBucketLimiter)")
publicvoidaspect(){
}

@Around(value="aspect()")
publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{
log.debug("准备限流");
Objecttarget=point.getTarget();
StringtargetName=target.getClass().getName();
StringmethodName=point.getSignature().getName();
Object[]arguments=point.getArgs();
ClasstargetClass=Class.forName(targetName);
Class[]argTypes=ReflectUtils.getClasses(arguments);
Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes);
//获取目标method上的限流注解@Limiter
TokenBucketLimiterlimiter=method.getAnnotation(TokenBucketLimiter.class);
RateLimiterrateLimiter=null;
Objectresult=null;
if(null!=limiter){
//以class+method+parameters为key,避免重载、重写带来的混乱
Stringkey=targetName+"."+methodName+Arrays.toString(argTypes);
rateLimiter=rateLimiters.get(key);
if(null==rateLimiter){
//获取限定的流量
//为了防止并发
rateLimiters.putIfAbsent(key,RateLimiter.create(limiter.value()));
rateLimiter=rateLimiters.get(key);
}
booleanb=rateLimiter.tryAcquire();
if(b){
log.debug("得到令牌,准备执行业务");
result=point.proceed();
}else{
HttpServletResponseresp=((ServletRequestAttributes)RequestContextHolder.getRequestAttributes()).getResponse();
JSONObjectjsonObject=newJSONObject();
jsonObject.put("success",false);
jsonObject.put("msg","限流中");
try{
output(resp,jsonObject.toJSONString());
}catch(Exceptione){
log.error("error,e:{}",e);
}
}
}else{
result=point.proceed();
}
log.debug("退出限流");
returnresult;
}

publicvoidoutput(HttpServletResponseresponse,Stringmsg)throwsIOException{
response.setContentType("application/json;charset=UTF-8");
ServletOutputStreamoutputStream=null;
try{
outputStream=response.getOutputStream();
outputStream.write(msg.getBytes("UTF-8"));
}catch(IOExceptione){
e.printStackTrace();
}finally{
outputStream.flush();
outputStream.close();
}
}
}

基于Semaphore的aop实现

importcom.congge.annotation.ShLimiter;
importlombok.extern.slf4j.Slf4j;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.cglib.core.ReflectUtils;
importorg.springframework.stereotype.Component;

importjava.lang.reflect.Method;
importjava.util.Arrays;
importjava.util.Map;
importjava.util.concurrent.ConcurrentHashMap;
importjava.util.concurrent.Semaphore;

@Aspect
@Component
@Slf4j
publicclassSemaphoreLimiterAop{

privatefinalMapsemaphores=newConcurrentHashMap();
privatefinalstaticLoggerLOG=LoggerFactory.getLogger(SemaphoreLimiterAop.class);

@Pointcut("@annotation(com.congge.annotation.ShLimiter)")
publicvoidaspect(){

}

@Around(value="aspect()")
publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{
log.debug("进入限流aop");
Objecttarget=point.getTarget();
StringtargetName=target.getClass().getName();
StringmethodName=point.getSignature().getName();
Object[]arguments=point.getArgs();
ClasstargetClass=Class.forName(targetName);
Class[]argTypes=ReflectUtils.getClasses(arguments);
Methodmethod=targetClass.getDeclaredMethod(methodName,argTypes);
//获取目标method上的限流注解@Limiter
ShLimiterlimiter=method.getAnnotation(ShLimiter.class);
Objectresult=null;
if(null!=limiter){
//以class+method+parameters为key,避免重载、重写带来的混乱
Stringkey=targetName+"."+methodName+Arrays.toString(argTypes);
//获取限定的流量
Semaphoresemaphore=semaphores.get(key);
if(null==semaphore){
semaphores.putIfAbsent(key,newSemaphore(limiter.value()));
semaphore=semaphores.get(key);
}
try{
semaphore.acquire();
result=point.proceed();
}finally{
if(null!=semaphore){
semaphore.release();
}
}
}else{
result=point.proceed();
}
log.debug("退出限流");
returnresult;
}

}

基于sentinel的aop实现

importcom.alibaba.csp.sentinel.Entry;
importcom.alibaba.csp.sentinel.SphU;
importcom.alibaba.csp.sentinel.Tracer;
importcom.alibaba.csp.sentinel.slots.block.BlockException;
importcom.alibaba.csp.sentinel.slots.block.RuleConstant;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRule;
importcom.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
importcom.congge.annotation.SentinelLimiter;
importorg.apache.commons.lang3.StringUtils;
importorg.aspectj.lang.JoinPoint;
importorg.aspectj.lang.ProceedingJoinPoint;
importorg.aspectj.lang.annotation.Around;
importorg.aspectj.lang.annotation.Aspect;
importorg.aspectj.lang.annotation.Pointcut;
importorg.springframework.stereotype.Component;

importjava.lang.reflect.Method;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Objects;

@Aspect
@Component
publicclassSentinelLimiterAop{

privatestaticvoidinitFlowRule(StringresourceName,intlimitCount){
Listrules=newArrayList<>();
FlowRulerule=newFlowRule();
//设置受保护的资源
rule.setResource(resourceName);
//设置流控规则QPS
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
//设置受保护的资源阈值
rule.setCount(limitCount);
rules.add(rule);
//加载配置好的规则
FlowRuleManager.loadRules(rules);
}

@Pointcut(value="@annotation(com.congge.annotation.SentinelLimiter)")
publicvoidrateLimit(){

}

@Around("rateLimit()")
publicObjectaround(ProceedingJoinPointjoinPoint){
//1、获取当前的调用方法
MethodcurrentMethod=getCurrentMethod(joinPoint);
if(Objects.isNull(currentMethod)){
returnnull;
}
//2、从方法注解定义上获取限流的类型
StringresourceName=currentMethod.getAnnotation(SentinelLimiter.class).resourceName();
if(StringUtils.isEmpty(resourceName)){
thrownewRuntimeException("资源名称为空");
}
intlimitCount=currentMethod.getAnnotation(SentinelLimiter.class).limitCount();
initFlowRule(resourceName,limitCount);

Entryentry=null;
Objectresult=null;
try{
entry=SphU.entry(resourceName);
try{
result=joinPoint.proceed();
}catch(Throwablethrowable){
throwable.printStackTrace();
}
}catch(BlockExceptionex){
//资源访问阻止,被限流或被降级
//在此处进行相应的处理操作
System.out.println("blocked");
return"被限流了";
}catch(Exceptione){
Tracer.traceEntry(e,entry);
}finally{
if(entry!=null){
entry.exit();
}
}
returnresult;
}

privateMethodgetCurrentMethod(JoinPointjoinPoint){
Method[]methods=joinPoint.getTarget().getClass().getMethods();
Methodtarget=null;
for(Methodmethod:methods){
if(method.getName().equals(joinPoint.getSignature().getName())){
target=method;
break;
}
}
returntarget;
}

}

5.2.4 配置自动装配AOP实现

在resources目录下创建上述的spring.factories文件,内容如下,通过这种方式配置后,其他应用模块引入了当前的SDK的jar之后,就可以实现开箱即用了;

org.springframework.boot.autoconfigure.EnableAutoConfiguration=
com.congge.aop.SemaphoreLimiterAop,
com.congge.aop.GuavaLimiterAop,
com.congge.aop.SemaphoreLimiterAop

5.2.5 将工程打成jar进行安装

这一步比较简单就跳过了

cb9f6260-2a03-11ee-a368-dac502259ad0.png

5.2.6 在其他的工程中引入上述SDK


cm.congge
biz-limit
1.0-SNAPSHOT

5.2.7 编写测试接口

在其他工程中,编写一个测试接口,并使用上面的注解,这里以guava的限流注解为例进行说明

importcom.congge.annotation.TokenBucketLimiter;
importorg.springframework.web.bind.annotation.GetMapping;
importorg.springframework.web.bind.annotation.RestController;

@RestController
publicclassSdkController{

//localhost:8081/query
@GetMapping("/query")
@TokenBucketLimiter(1)
publicStringqueryUser(){
return"queryUser";
}

}

5.2.8 功能测试

启动当前的工程后,正常调用接口,每秒一次的请求,可以正常得到结果

cba7f63c-2a03-11ee-a368-dac502259ad0.png

快速刷接口,QPS超过1之后,将会触发限流,看到如下效果

cbb0bb14-2a03-11ee-a368-dac502259ad0.png

通过上面这种方式,也可以得到预期的效果,其他两种限流注解有兴趣的同学也可以继续测试验证,篇幅原因就不再赘述了。

上述通过starter的方式实现了一种更优雅的限流集成方式,也是生产中比较推荐的一种方式,不过当前的案例还比较粗糙,需要使用的同学还需根据自己的情况完善里面的逻辑,进一步的封装以期得到更好的效果。





审核编辑:刘清

声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉
  • 接收机
    +关注

    关注

    8

    文章

    1158

    浏览量

    53155
  • 计数器
    +关注

    关注

    32

    文章

    2241

    浏览量

    93962
  • JVM
    JVM
    +关注

    关注

    0

    文章

    155

    浏览量

    12168
  • QPS
    QPS
    +关注

    关注

    0

    文章

    24

    浏览量

    8777
  • 负载保护器
    +关注

    关注

    0

    文章

    4

    浏览量

    5414
  • SpringBoot
    +关注

    关注

    0

    文章

    172

    浏览量

    145

原文标题:SpringBoot 通用限流方案(VIP珍藏版)

文章出处:【微信号:芋道源码,微信公众号:芋道源码】欢迎添加关注!文章转载请注明出处。

收藏 人收藏

    评论

    相关推荐

    SpringBoot知识总结

    SpringBoot干货学习总结
    发表于 08-01 10:40

    怎么学习SpringBoot

    SpringBoot学习之路(X5)- 整合JPA
    发表于 06-10 14:52

    怎样去使用springboot

    怎样去使用springboot呢?学习springboot需要懂得哪些?
    发表于 10-25 07:13

    SpringBoot应用启动运行run方法

    )、refreshContext(context);SpringBoot刷新IOC容器【创建IOC容器对象,并初始化容器,创建容器中的每个组件】;如果是web应用创建**AnnotationConfigEmbeddedWebApplicationContext**,否则
    发表于 12-20 06:16

    Springboot是如何获取自定义异常并进行返回的

    源码剖析Springboot是如何获取自定义异常并进行返回的。来吧!第步:肯定是在Springboot启动的过程中进行的异常
    发表于 03-22 14:15

    java异常处理的设计与重构

    在程序设计中,进行异常处理是非常关键和重要的部分。个程序的异常处理框架的好坏直接影响到整个项
    发表于 09-27 15:40 1次下载
    java<b class='flag-5'>异常</b><b class='flag-5'>处理</b>的设计与重构

    java异常处理设计和些建议

    程序设计在程序设计中,进行异常处理是非常关键和重要的部分。个程序的异常处理框架的好坏直接影响
    发表于 09-28 11:48 0次下载
    java<b class='flag-5'>异常</b><b class='flag-5'>处理</b>设计和<b class='flag-5'>一</b>些建议

    Spring Boot 系列(八)@ControllerAdvice 拦截异常统一处理

    Spring Boot 系列(八)@ControllerAdvice 拦截异常统一处理 在spring 3.2中,新增了@ControllerAdvice 注解,可以用于定义
    发表于 01-16 18:39 281次阅读

    关于SpringBoot如何优雅的全局异常处理

    SpringBoot全局异常准备说明:如果想直接获取工程那么可以直接跳到底部,通过链接下载工程代码。 开发准备 环境要求JDK:1.8SpringBoot:1.5.17.RELEASE 首先还是
    的头像 发表于 05-31 14:25 1558次阅读
    关于<b class='flag-5'>SpringBoot</b>如何优雅的全局<b class='flag-5'>异常</b><b class='flag-5'>处理</b>

    公司这套架构统一处理try...catch真香!

    有大量的冗余代码,而且还影响代码的可读性。这样就需要定义个全局统一异常处理器,以便业务层再也不必处理异常
    的头像 发表于 02-27 10:47 409次阅读

    什么是 SpringBoot

    本文从为什么要有 `SpringBoot`,以及 `SpringBoot` 到底方便在哪里开始入手,逐步分析了 `SpringBoot` 自动装配的原理,最后手写了个简单的 `sta
    的头像 发表于 04-07 11:28 1180次阅读
    什么是 <b class='flag-5'>SpringBoot</b>?

    SpringBoot统一功能处理

    最初用户登录效验: 在每个方法中获取 Session 和 Session 中的用户信息,如果存在用户,那么就认为登录成功了,否则就登录失败了
    的头像 发表于 04-19 14:51 527次阅读

    SpringBoot拦截器与统一功能处理实战

    Spring AOP是个基于面向切面编程的框架,用于将横切性关注点(如日志记录、事务管理)与业务逻辑分离,通过代理对象将这些关注点织入到目标对象的方法执行前后、抛出异常或返回结果时等特定位置执行,从而提高程序的可复用性、可维护性和灵活性。
    的头像 发表于 08-27 10:44 771次阅读
    <b class='flag-5'>SpringBoot</b>拦截器与<b class='flag-5'>统一</b>功能<b class='flag-5'>处理</b>实战

    异常处理和错误码管理

    团队达成共识,统一规范就可以。 下面介绍下我使用的处理异常的方式。 自定义异常 创建个业务
    的头像 发表于 09-25 14:51 573次阅读
    <b class='flag-5'>异常</b><b class='flag-5'>处理</b>和错误码管理

    站式统一返回值封装、异常处理异常错误码解决方案—最强的Sping Boot接口优雅响应处理

    1. 前言 统一返回值封装、统一异常处理异常错误码体系的意义在于提高代码的可维护性和可读性,使得代码更加健壮和稳定。
    的头像 发表于 06-20 15:42 334次阅读