尼恩说在前面
Sentinel熔断降级,是如何实现的? Sentinel底层滑动时间窗限流算法怎么实现的?
特别说明, 本文属于 穿透 SpringCloud 工业级 底座工程(一共包括 15大学习圣经 )的 其中之一,并且,此系列15大学习圣经底座正在录制视频, 帮助大家一举成为技术高手。
本文,就是 Sentinel 学习圣经。稍后会录制视频。录完之后,Sentinel 学习圣经 正式版本会有更新, 最新版本找尼恩获取。
本文目录
- 尼恩说在前面
- 阿里面试真题:Sentinel熔断降级,是如何实现的?
- 第一个维度,Sentinel主要功能:
- 第二个维度, Sentinel 的基本组件:
- 第三个维度, Sentinel 的流量治理几个核心步骤:
- 第四个维度, Sentinel 的源码层面的两个核心架构:
- 说在最后: “offer自由” 很容易的
- 美团面试:Sentinel底层滑动时间窗限流算法怎么实现的?
- 滑动窗口的核心数据结构
-ArrayMetric 源码
-LeapArray 源码
-MetricBucket 源码
-WindowWrap 源码
- 滑动窗口 统计 源码实现
-如何 定位 Bucket?
- MetricBucket 的LongAdder
- 参考文章
- 《sentinel 学习圣经 V4》版本说明:
- 老牌hystrix 服务保护
- 新贵 Sentinel 服务保护
- 开始《sentinel 学习圣经》:一组核心基本概念
- 1. 响应时间(RT)
- 2. 吞吐量(Throughput)
- 3. 并发用户数
- 什么是服务雪崩效应?
- 1、什么是Sentinel:
- Sentinel 具有以下特征:
- Sentinel主要特性:
-Sentinel与Hystrix的区别
-Hystrix 迁移Sentinel 方案
- sentinel组件介绍
-Sentinel两个部分:
-sentinel 核心概念
- Sentinel 的使用
- Sentinel中的管理控制台
-1)获取 Sentinel 控制台
-2)sentinel服务启动
-启动 sentinel
-控制台端口:
-控制台登录
-启动日志
- Sentinel 控制台使用
-默认用户名和密码都是 sentinel
-查看机器列表以及健康情况
- SpringCloud客户端能接入控制台
- 2、使用 Sentinel 来进行熔断与限流
- 2.1 Java普通应用限流
-1. 引入 Sentinel 依赖
-2. 定义资源
-3. 定义规则
-4. 检查效果
-5.接入控制台
- 2.2 定义资源
-资源注解@SentinelResource
-@SentinelResource 注解
-fallback 函数签名和位置要求:
-defaultFallback 函数签名要求:
- 2.3 定义规则
- 3、sentinel 熔断降级
- 3.1 什么是熔断降级
- 3.2 熔断降级规则
- 3.3 几种降级策略
- 3.4 熔断降级代码实现
- 3.5 控制台降级规则
- 3.6 与Hystrix的熔断对比:
- 4、Sentinel 流控(限流)
- 基本的参数
- 流控的几种`strategy`:
- 4.1 直接失败模式
-使用API进行资源定义
-代码限流规则
-网页限流规则配置
-测试
- 4.2 关联模式
-使用注解进行资源定义
-代码配置关联限流规则
-网页限流规则配置
-测试
- 4.3 Warm up(预热)模式
-使用注解定义资源
-代码预热规则
-网页预热规则配置
- 4.4 排队等待模式
-示例
-使用注解定义资源
-代码限流规则
-网页限流规则配置
-通过jmeter进行测试
- 4.5 热点规则 (ParamFlowRule)
-自定义资源
-限流规则代码:
-网页限流规则配置
- 5、Sentinel 系统保护
-系统保护的目的
-系统保护规则的应用
-网页限流规则配置
- 6、黑白名单规则
- 访问控制规则 (AuthorityRule)
- 7、如何定义资源
- 方式一:主流框架的默认适配
- 方式二:抛出异常的方式定义资源
- 方式三:返回布尔值方式定义资源
- 方式四:注解方式定义资源
- 方式五:异步调用支持
- 8、Sentinel中基本概念
- 8.1 ProcessorSlotChain
- 8.2 Node
- 8.3 Entry
- 8.3.1.自定义资源
- 8.3.2.基于注解标记资源
- 8.4 Context
- 8.4.1 什么是Context
- 8.4.2 Context的初始化
-8.4.2.1.自动装配
-8.4.2.2.AbstractSentinelInterceptor
-8.4.2.3.ContextUtil
- 9、Sentinel源码分析
- 9.1.入口
- 9.2.DefaultProcessorSlotChain
- 9.3.NodeSelectorSlot
- 9.4.ClusterBuilderSlot
- 9.5.StatisticSlot
- 9.6.AuthoritySlot
- 9.7.SystemSlot
- 9.8.ParamFlowSlot
- 9.8.1.令牌桶
- 9.9.1.核心流程
- 9.9.2.滑动时间窗口
-9.9.2.1.时间窗口请求量统计
-9.9.2.2.滑动窗口QPS计算
- 9.9.3.令牌桶
- 9.10.DegradeSlot
- 9.10.1.CircuitBreaker
- 9.10.2.触发断路器
- 8、核心组件 源码分析
- 源码环境搭建
- Resource
- Context
-Context的创建与销毁
- Entry
- DefaultNode
- StatisticNode
- 9、插槽Slot 源码分析
- Sentinel中的责任链模式
- 责任链SlotChain 如何创建?
- 责任链模式的重要性
- NodeSelectorSlot 原理分析+ 源码分析
-核心的概念1: Resource
-核心的概念2: Context
-核心的概念3: Entry
-核心的概念4: Node
-调用链树 的例子
- 调用链树
-构造树干
-创建context
-创建Entry
-退出Entry
-构造叶子节点
-保存子节点
- ClusterBuilderSlot
- StatistcSlot
- SystemSlot
- AuthoritySlot
- FlowSlot
- DegradeSlot
- DefaultProcessorSlotChain
- slot总结
- 10、sentinel滑动窗口 sliding window 源码分析
- 10.1 基本原理
- 10.2 sentinel使用滑动窗口都统计啥
- 10.3 滑动窗口源码实现
-10.3.1 MetricBucket 源码分析
-10.3.2 WindowWrap 源码分析
-10.3.3 LeapArray 源码分析
- Sentinel的限流与Gateway的限流有什么差别?
- 补充
- 固定窗口算法
- 滑动窗口计数器算法
- 令牌桶算法
- 漏桶算法
- 漏桶算法
- Sentinel与Hystix的线程隔离有什么差别
- 11、使用Nacos存储规则及双向同步
-1、Sentinel 动态规则扩展
-2、规则管理及推送
-3、DataSource 扩展
- DataSource(接口)
-1、导入依赖
-2、配置
-3、Nacos中创建限流规则的配置
- 12、Nacos与Sentinel互相同步限流规则
-1、流控推送规则
-2、改造sentinel-dashboard
- 说在最后:有问题找老架构取经
阿里面试真题:Sentinel熔断降级,是如何实现的?
问题1:Sentinel高可用熔断降级,是如何实现的? 问题2:Sentinel底层滑动时间窗限流算法怎么实现的?
第一个维度,Sentinel主要功能:
Sentinel使用滑动窗口统计请求的成功和失败情况。这些统计信息包括成功的请求数、失败的请求数等。 当某个资源(例如一个API接口)的错误率超过阈值或其他指标达到预设的条件,Sentinel将触发熔断机制。 一旦熔断触发,Sentinel将暂时阻止对该资源的请求,防止继续失败的请求对系统造成更大的影响。
Sentinel还提供了降级机制,可以在资源负载过重或其他异常情况下,限制资源的访问速率,以保护系统免受过多的请求冲击。 降级策略可以根据需要定制,可以是慢调用降级、异常比例降级等。
第二个维度, Sentinel 的基本组件:
资源是我们想要保护的对象,比如一个远程服务、一个数据库连接等。 规则是定义如何保护资源的,比如我们可以通过设置阈值、时间窗口等方式来决定何时进行限流、熔断等操作。 上下文是一个临时的存储空间,用于存储资源的状态信息,比如当前的 QPS 等。 插槽属于责任链模式中的处理器/过滤器, 完成资源规则的计算和验证。
第三个维度, Sentinel 的流量治理几个核心步骤:
资源注册:当一个资源被创建时,需要将其注册到 Sentinel。在注册过程中,会为资源创建一个对应的上下文,并将资源的规则存储到插槽中。 流量控制:当有请求访问资源时,Sentinel 会根据资源的规则进行流量控制。如果当前 QPS 超过了规则设定的阈值,Sentinel 就会拒绝请求,以防止系统过载。 熔断降级:当资源出现异常时,Sentinel 会根据规则进行熔断或降级处理。熔断是指暂时切断对资源的访问,以防止异常扩散。降级则是提供一种备用策略,当主策略无法正常工作时,可以切换到备用策略。 规则更新:在某些情况下,我们可能需要动态调整资源的规则。Sentinel 提供了 API 接口,可以方便地更新资源的规则。
第四个维度, Sentinel 的源码层面的两个核心架构:
责任链模式架构 滑动窗口数据统计架构
尼恩说明: 两大架构的源码,简单说说就可以了,具体可以参见《Sentinel 学习圣经》 最新版本。
说在最后: “offer自由” 很容易的
美团面试:Sentinel底层滑动时间窗限流算法怎么实现的?
资源的调用关系,例如资源的调用链路,资源和资源之间的关系; 运行指标,例如 QPS、线程池、系统负载等; 控制的效果,例如直接限流、冷启动、排队等。
时间窗口划分:将整个时间范围划分为多个固定大小的时间窗口(例如1秒一个窗口)。这些时间窗口会随着时间的流逝依次滑动。 计数器:为每个时间窗口维护一个计数器,用于记录在该时间窗口内的请求数。 请求计数:当有请求到来时,将其计入当前时间窗口的计数器中。 滑动时间窗口:定期滑动时间窗口,将过期的时间窗口删除,并创建新的时间窗口。这样可以保持时间窗口的滚动。 限流判断:当有请求到来时,Sentinel会检查当前时间窗口内的请求数是否超过了预设的限制阈值。如果超过了限制阈值,请求将被拒绝或执行降级策略。 计数重置:定期重置过期时间窗口的计数器,以确保计数器不会无限增长。
滑动窗口的核心数据结构
ArrayMetric:滑动窗口核心实现类。 LeapArray:滑动窗口顶层数据结构,包含一个一个的窗口数据。 WindowWrap:每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。 MetricBucket:指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。 MetricEvent:指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。
ArrayMetric 源码
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
int intervalInMs
:表示一个采集的时间间隔,即滑动窗口的总时间,例如 1 分钟。int sampleCount
:在一个采集间隔中抽样的个数,默认为 2,即一个采集间隔中会包含两个相等的区间,一个区间就是一个窗口。boolean enableOccupy
:是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量。
LeapArray 源码
array
,AtomicReferenceArray<WindowWrap<T>>
,保证创建窗口的原子性(CAS)。public abstract class LeapArray<T> {
//每一个窗口的时间间隔,单位为毫秒
protected int windowLengthInMs;
//抽样个数,就一个统计时间间隔中包含的滑动窗口个数
protected int sampleCount;
//一个统计的时间间隔
protected int intervalInMs;
//滑动窗口的数组,滑动窗口类型为 WindowWrap<MetricBucket>
protected final AtomicReferenceArray<WindowWrap<T>> array;
private final ReentrantLock updateLock = new ReentrantLock();
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
MetricBucket 源码
public class MetricBucket {
/**
* 存储各事件的计数,比如异常总数、请求总数等
*/
private final LongAdder[] counters;
/**
* 这段事件内的最小耗时
*/
private volatile long minRt;
}
public enum MetricEvent {
PASS,
BLOCK,
EXCEPTION,
SUCCESS,
RT,
OCCUPIED_PASS
}
public long get(MetricEvent event) {
return counters[event.ordinal()].sum();
}
public void add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
}
WindowWrap 源码
public class WindowWrap<T> {
/**
* 单个窗口的时间长度(毫秒)
*/
private final long windowLengthInMs;
/**
* 窗口的开始时间戳(毫秒)
*/
private long windowStart;
/**
* 统计数据
*/
private T value;
}
WindowWrap 用于包装 Bucket,随着 Bucket 一起创建。 WindowWrap 数组 实现滑动窗口,Bucket 只负责统计各项指标数据,WindowWrap 用于记录 Bucket 的时间窗口信息。 定位 Bucket 实际上是定位 WindowWrap,拿到 WindowWrap 就能拿到 Bucket。
滑动窗口 统计 源码实现
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
//计算时间窗口开始时间戳
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
//判断时间戳是否在当前时间窗口内
public boolean isTimeInWindow(long timeMillis) {
return windowStart <= timeMillis && timeMillis < windowStart + windowLengthInMs;
}
如何 定位 Bucket?
/**
* 根据时间戳获取 bucket
* @param timeMillis 时间戳(毫秒)
* @return 如果时间有效,则在提供的时间戳处显示当前存储桶项;如果时间无效,则为空
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 获取时间戳映射到的数组索引
int idx = calculateTimeIdx(timeMillis);
// 计算 bucket 时间窗口的开始时间
long windowStart = calculateWindowStart(timeMillis);
// 从数组中死循环查找当前的时间窗口,因为可能多个线程都在获取当前时间窗口
while (true) {
WindowWrap<T> old = array.get(idx);
// 一般是项目启动时,时间未到达一个周期,数组还没有存储满,没有到复用阶段,所以数组元素可能为空
if (old == null) {
// 创建新的 bucket,并创建一个 bucket 包装器
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// cas 写入,确保线程安全,期望数组下标的元素是空的,否则就不写入,而是复用
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
}
// 如果 WindowWrap 的 windowStart 正好是当前时间戳计算出的时间窗口的开始时间,则就是我们想要的 bucket
else if (windowStart == old.windowStart()) {
return old;
}
// 复用旧的 bucket
else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// 重置 bucket,并指定 bucket 的新时间窗口的开始时间
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
}
// 计算出来的当前 bucket 时间窗口的开始时间比数组当前存储的 bucket 的时间窗口开始时间还小,
// 直接返回一个空的 bucket 就行
else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
如果旧的 bucket 不存在,那么我们在 windowStart 处创建一个新的 bucket,然后尝试通过 CAS 操作更新环形数组。只有一个线程可以成功更新,保证原子性。 如果当前 windowStart 等于旧桶的开始时间戳,表示时间在桶内,所以直接返回桶。 如果旧桶的开始时间戳落后于所提供的时间,这意味着桶已弃用,我们可以复用该桶,并将桶重置为当前的 windowStart。注意重置和清理操作很难是原子的,所以我们需要一个更新锁来保证桶更新的正确性。只有当 bucket 已弃用才会上锁,所以在大多数情况下它不会导致性能损失。 不应该通过这里,因为提供的时间已经落后了,一般是时钟回拨导致的。
MetricBucket 的LongAdder
java.util.concurrent.atomic
包中提供了一个新的原子类:LongAdder
。分散热点
,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回。参考文章
《sentinel 学习圣经 V4》版本说明:
老牌hystrix 服务保护
新贵 Sentinel 服务保护
开始《sentinel 学习圣经》:一组核心基本概念
1. 响应时间(RT)
2. 吞吐量(Throughput)
3. 并发用户数
QPS每秒查询率(Query Per Second)
什么是服务雪崩效应?
硬件故障:如服务器宕机、机房断电、光纤被挖断等。 流量激增:如异常流量、重试加大流量等。 缓存穿透:一般发生在应用重启,所有缓存失效时,以及短时间内大量缓存失效时,因大量的缓存不命中,使请求直击后端服务,造成服务提供者超负荷运行,引起服务不可用。 程序bug:如程序逻辑导致死循环或者内存泄漏等。
超时机制:在上游服务调用下游服务的时候,设置一个最大响应时间,如果超过这个时间,下游未作出反应,就断开请求,释放掉线程。 限流机制:限流就是限制系统的输入和输出流量已达到保护系统的目的。为了保证系统的稳固运行,一旦达到的需要限制的阈值,就需要限制流量并采取少量措施以完成限制流量的目的。 熔断机制:在互联网系统中,当下游服务因访问压力过大而响应变慢或失败,上游服务为了保护系统整体的可用性,可以暂时切断对下游服务的调用。这种牺牲局部,保全整体的措施就叫做熔断。 降级机制:降级是从系统功能优先级的角度考虑如何应对系统故障。 服务降级指的是当服务器压力剧增的情况下,根据当前业务情况及流量对一些服务和页面有策略的降级,以此释放服务器资源以保证核心任务的正常运行。降级其实就是为服务提供一个兜底方案,一旦服务无法正常调用,就使用兜底方案。
1、什么是Sentinel:
分布式系统的流量防卫兵 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度保护服务的稳定性。
丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应用等。 完备的实时监控:Sentinel 同时提供实时的监控功能。您可以在控制台中看到接入应用的单台机器秒级数据,甚至 500 台以下规模的集群的汇总运行情况。 广泛的开源生态:Sentinel 提供开箱即用的与其它开源框架/库的整合模块,例如与 Spring Cloud、Dubbo、gRPC 的整合。您只需要引入相应的依赖并进行简单的配置即可快速地接入 Sentinel。 完善的 SPI 扩展点:Sentinel 提供简单易用、完善的 SPI 扩展接口。您可以通过实现扩展接口来快速地定制逻辑。例如定制规则管理、适配动态数据源等。
Sentinel 具有以下特征:
Sentinel主要特性:
Sentinel与Hystrix的区别
Sentinel | Hystrix | |
Hystrix 迁移Sentinel 方案
sentinel组件介绍
Sentinel两个部分:
控制台(Dashboard):Sentinel 提供的一个轻量级的开源控制台,它为用户提供了机器自发现、簇点链路自发现、监控、规则配置等功能。控制台主要负责管理推送规则、监控、集群限流分配管理、机器发现等。 核心库(Java 客户端):不依赖任何框架/库,能够运行于 Java 7 及以上的版本的运行时环境,同时对 Dubbo / Spring Cloud 等框架也有较好的支持。
sentinel 核心概念
参数 | 解释 |
Sentinel 的使用
Sentinel中的管理控制台
1)获取 Sentinel 控制台
https://github.com/alibaba/Sentinel/releases/download/1.6.3/sentinel-dashboard-1.7.1.jar
下载 控制台 工程 使用以下命令将代码打包成一个 fat jar: mvn clean package
2)sentinel服务启动
启动 sentinel
java -server -Xms64m -Xmx256m -Dserver.port=8849 -Dcsp.sentinel.dashboard.server=localhost:8849 -Dproject.name=sentinel-dashboard -jar /work/sentinel-dashboard-1.8.6.jar
java -server -Xms64m -Xmx256m -Dserver.port=8849 -Dcsp.sentinel.dashboard.server=localhost:8849 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.6.jar
nohup java -server -Xms64m -Xmx256m -Dserver.port=8849 -Dcsp.sentinel.dashboard.server=localhost:8849 -Dproject.name=sentinel-dashboard -jar /work/sentinel-dashboard-1.8.6.jar 2>&1 &
或者
/usr/bin/su - root -c "nohup java -server -Xms64m -Xmx256m -Dserver.port=8849 -Dcsp.sentinel.dashboard.server=localhost:8849 -Dproject.name=sentinel-dashboard -jar /work/sentinel-dashboard-1.8.6.jar 2>&1 &"
控制台端口:
-Dserver.port=8849 用于指定 Sentinel 控制台端口为 8849
控制台登录
sentinel
。注:若您的应用为 Spring Boot 或 Spring Cloud 应用,您可以通过 Spring 配置文件来指定配置,详情请参考 Spring Cloud Alibaba Sentinel 文档。
启动日志
[root@192 ~]# java -Dserver.port=8888 -Dcsp.sentinel.dashboard.server=localhost:8888 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.8.6.jar
INFO: log base dir is: /root/logs/csp/
INFO: log name use pid is: false
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.0.5.RELEASE)
2020-02-08 13:07:29.316 INFO 114031 --- [ main] c.a.c.s.dashboard.DashboardApplication : Starting DashboardApplication on 192.168.180.137 with PID 114031 (/root/sentinel-dashboard-1.6.3.jar started by root in /root)
2020-02-08 13:07:29.319 INFO 114031 --- [ main] c.a.c.s.dashboard.DashboardApplication : No active profile set, falling back to default profiles: default
2020-02-08 13:07:29.456 INFO 114031 --- [ main] ConfigServletWebServerApplicationContext : Refreshing org.springframework.boot.web.servlet.context.AnnotationConfigServletWebServerApplicationContext@59690aa4: startup date [Sat Feb 08 13:07:29 CST 2020]; root of context hierarchy
2020-02-08 13:07:33.783 INFO 114031 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8888 (http)
Sentinel 控制台使用
默认用户名和密码都是 sentinel
查看机器列表以及健康情况
SpringCloud客户端能接入控制台
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
spring:
cloud:
sentinel:
transport:
dashboard: 192.168.180.137:8849 #sentinel控制台的请求地址
2、使用 Sentinel 来进行熔断与限流
定义资源 资源:可以是任何东西,一个服务,服务里的方法,甚至是一段代码。 定义规则 规则:Sentinel 支持以下几种规则:流量控制规则、熔断降级规则、系统保护规则、来源访问控制规则 和 热点参数规则。 检验规则是否生效
2.1 Java普通应用限流
1. 引入 Sentinel 依赖
pom.xml
文件中加入以下代码即可:<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.6</version>
</dependency>
2. 定义资源
SphU.entry("HelloWorld")
和 entry.exit()
包围起来即可。在下面的例子中,我们将 System.out.println("hello world");
作为资源(被保护的逻辑),用 API 包装起来。参考代码如下:public static void main(String[] args) {
// 配置规则.
initFlowRules();
while (true) {
// 1.5.0 版本开始可以直接利用 try-with-resources 特性
try (Entry entry = SphU.entry("HelloWorld")) {
// 被保护的逻辑
System.out.println("hello world");
} catch (BlockException ex) {
// 处理被流控的逻辑
System.out.println("blocked!");
}
}
}
@SentinelResource("HelloWorld")
public void helloWorld() {
// 资源中的逻辑
System.out.println("hello world");
}
helloWorld()
方法就成了我们的一个资源。注意注解支持模块需要配合 Spring AOP 或者 AspectJ 一起使用。3. 定义规则
HelloWorld
每秒最多只能通过 20 个请求。private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
4. 检查效果
~/logs/csp/${appName}-metrics.log.xxx
里看到下面的输出:|--timestamp-|------date time----|--resource-|p |block|s |e|rt
1529998904000|2018-06-26 15:41:44|hello world|20|0 |20|0|0
1529998905000|2018-06-26 15:41:45|hello world|20|5579 |20|0|728
1529998906000|2018-06-26 15:41:46|hello world|20|15698|20|0|0
1529998907000|2018-06-26 15:41:47|hello world|20|19262|20|0|0
1529998908000|2018-06-26 15:41:48|hello world|20|19502|20|0|0
1529998909000|2018-06-26 15:41:49|hello world|20|18386|20|0|0
p
代表通过的请求, block
代表被阻止的请求, s
代表成功执行完成的请求个数, e
代表用户自定义的异常, rt
代表平均响应时长。5.接入控制台
pom.xml
引入 JAR 包:<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-transport-simple-http</artifactId>
</dependency>
-Dcsp.sentinel.dashboard.server=consoleIp:port
指定控制台地址和端口。-Dcsp.sentinel.dashboard.server=cdh1:8849 -Dproject.name=api-gateway
-Dcsp.sentinel.dashboard.server=192.169.56.121:8849 -Dproject.name=api-gateway
telnet 192.168.56.121 8849
2.2 定义资源
Entry entry = null;
try {
// 定义一个sentinel保护的资源,名称为test-sentinel-api
entry = SphU.entry(resourceName);
// 模拟执行被保护的业务逻辑耗时
Thread.sleep(100);
return a;
} catch (BlockException e) {
// 如果被保护的资源被限流或者降级了,就会抛出BlockException
log.warn("资源被限流或降级了", e);
return "资源被限流或降级了";
} catch (InterruptedException e) {
return "发生InterruptedException";
} finally {
if (entry != null) {
entry.exit();
}
ContextUtil.exit();
}
}
public static void main(String[] args) {
// 配置规则.
initFlowRules();
while (true) {
// 1.5.0 版本开始可以直接利用 try-with-resources 特性
try (Entry entry = SphU.entry("HelloWorld")) {
// 被保护的逻辑
System.out.println("hello world");
} catch (BlockException ex) {
// 处理被流控的逻辑
System.out.println("blocked!");
}
}
}
资源注解@SentinelResource
@SentinelResource("HelloWorld")
public void helloWorld() {
// 资源中的逻辑
System.out.println("hello world");
}
@SentinelResource 注解
注意:注解方式埋点不支持 private 方法。
@SentinelResource
用于定义资源,并提供可选的异常处理和 fallback 配置项。@SentinelResource
注解包含以下属性:value:资源名称,必需项(不能为空) entryType:entry 类型,可选项(默认为 EntryType.OUT) blockHandler / blockHandlerClass:
fallback /fallbackClass fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑。fallback 函数可以针对所有类型的异常(除了exceptionsToIgnore里面排除掉的异常类型)进行处理。 defaultFallback (since 1.6.0):默认的 fallback 函数名称,可选项,通常用于通用的 fallback 逻辑(即可以用于很多服务或方法)。默认 fallback 函数可以针对所有类型的异常(除了exceptionsToIgnore里面排除掉的异常类型)进行处理。若同时配置了 fallback 和 defaultFallback,则只有 fallback 会生效。
fallback 函数签名和位置要求:
返回值类型必须与原函数返回值类型一致; 方法参数列表需要和原函数一致,或者可以额外多一个 Throwable
类型的参数用于接收对应的异常。fallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 fallbackClass为对应的类的 Class 对象,注意对应的函数必需为 static 函数,否则无法解析。
defaultFallback 函数签名要求:
返回值类型必须与原函数返回值类型一致; 方法参数列表需要为空,或者可以额外多一个 Throwable
类型的参数用于接收对应的异常。defaultFallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 fallbackClass 为对应的类的 Class 对象,注意对应的函数必需为 static 函数,否则无法解析。 exceptionsToIgnore(since 1.6.0):用于指定哪些异常被排除掉,不会计入异常统计中,也不会进入 fallback 逻辑中,而是会原样抛出。
2.3 定义规则
private void initSystemRule() {
List<SystemRule> rules = new ArrayList<>();
SystemRule rule = new SystemRule();
rule.setHighestSystemLoad(10);
rules.add(rule);
SystemRuleManager.loadRules(rules);
}
FlowRuleManager.loadRules(List<FlowRule> rules); // 修改流控规则
DegradeRuleManager.loadRules(List<DegradeRule> rules); // 修改降级规则
SystemRuleManager.loadRules(List<SystemRule> rules); // 修改系统规则
AuthorityRuleManager.loadRules(List<AuthorityRule> rules); // 修改授权规则
3、sentinel 熔断降级
3.1 什么是熔断降级
Closed:关闭状态(断路器关闭),所有请求都正常访问。 Open:打开状态(断路器打开),所有请求都会被降级。熔断器会对请求情况计数,当一定时间内失败请求百分比达到阈值,则触发熔断,断路器会完全打开。 Half Open:半开状态,不是永久的,断路器打开后会进入休眠时间。随后断路器会自动进入半开状态。此时会释放部分请求通过,若这些请求都是健康的,则会关闭断路器,否则继续保持打开,再次进行休眠计时。
3.2 熔断降级规则
Field | 说明 | 默认值 |
3.3 几种降级策略
平均响应时间 (DEGRADE_GRADE_RT): 当资源的平均响应时间超过阈值( DegradeRule
中的count
,以 ms 为单位)之后,资源进入准降级状态。接下来如果持续进入 5 个请求,它们的 RT 都持续超过这个阈值,那么在接下的时间窗口(DegradeRule
中的timeWindow
,以 s 为单位)之内,对这个方法的调用都会自动地返回(抛出DegradeException
)。在下一个时间窗口到来时, 会接着再放入5个请求, 再重复上面的判断.// 初始化降级规则
DegradeRule degradeRule = new DegradeRule();
degradeRule.setResource("queryGoodsInfo");
//响应时间
degradeRule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
//超过100ms
degradeRule.setCount(100);
//熔断打开后,经过多少时间进入半打开
degradeRule.setTimeWindow(10);注意 Sentinel 默认统计的 RT 上限是 4900 ms,超出此阈值的都会算作 4900 ms,若需要变更此上限可以通过启动配置项 -Dcsp.sentinel.statistic.max.rt=xxx 来配置。 异常比例 (DEGRADE_GRADE_EXCEPTION_RATIO): 当资源的每秒请求量 >= N(可配置),并且每秒异常总数占通过量的比值超过阈值(DegradeRule 中的 count)之后,资源进入降级状态,即在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。 当资源的每秒异常总数占通过量的比值超过阈值(DegradeRule 中的 count)之后,资源进入降级状态,即在接下的时间窗口(DegradeRule 中的 timeWindow,以 s 为单位)之内,对这个方法的调用都会自动地返回。 异常比率的阈值范围是 [0.0, 1.0],代表 0% - 100%。 异常数 (DEGRADE_GRADE_EXCEPTION_COUNT): 当资源近 1 分钟的异常数目超过阈值之后会进行熔断。 当资源近 1 分钟的异常数目超过阈值之后会进行熔断。注意由于统计时间窗口是分钟级别的,若 timeWindow 小于 60s,则结束熔断状态后仍可能再进入熔断状态。 注意由于统计时间窗口是分钟级别的,若 timeWindow 小于 60s,则结束熔断状态后仍可能再进入熔断状态。
3.4 熔断降级代码实现
@PostConstruct
public void initSentinelRule()
{
//熔断规则: 5s内调用接口出现异常次数超过5的时候, 进行熔断
List<DegradeRule> degradeRules = new ArrayList<>();
DegradeRule rule = new DegradeRule();
rule.setGrade(RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT);//熔断规则
rule.setResource("queryGoodsInfo");
rule.setCount(5);
rule.setTimeWindow(5);
degradeRules.add(rule);
DegradeRuleManager.loadRules(degradeRules);
}
具体源码,请参见疯狂创客圈crazy-springcloud 源码工程
// 初始化降级规则
DegradeRule degradeRule = new DegradeRule();
degradeRule.setResource("queryGoodsInfo");
//响应时间
degradeRule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
//平均响应时间超过100ms
degradeRule.setCount(100);
//熔断打开后,经过多少时间进入半打开
degradeRule.setTimeWindow(10);
DegradeRuleManager.loadRules(Collections.singletonList(degradeRule));
3.5 控制台降级规则
Field | 说明 | 默认值 |
3.6 与Hystrix的熔断对比:
4、Sentinel 流控(限流)
private static void initFlowRules(){
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource("HelloWorld");
rule.setGrade(RuleConstant.FLOW_GRADE_QPS);
// Set limit QPS to 20.
rule.setCount(20);
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
resource
:资源名,即限流规则的作用对象count
: 限流阈值grade
: 限流阈值类型(QPS 或并发线程数)limitApp
: 流控针对的调用来源,若为default
则不区分调用来源strategy
: 调用关系限流策略controlBehavior
: 流量控制效果(直接拒绝、Warm Up、匀速排队)
基本的参数
QPS:每秒请求数,当前调用该api的QPS到达阈值的时候进行限流 线程数:当调用该api的线程数到达阈值的时候,进行限流
流控的几种strategy:
直接:当api大达到限流条件时,直接限流 关联:当关联的资源到达阈值,就限流自己 链路:只记录指定路上的流量,指定资源从入口资源进来的流量,如果达到阈值,就进行限流,api级别的限流
4.1 直接失败模式
使用API进行资源定义
/**
* 限流实现方式一: 抛出异常的方式定义资源
*
* @param orderId
* @return
*/
@ApiOperation(value = "纯代码限流")
@GetMapping("/getOrder")
@ResponseBody
public String getOrder(@RequestParam(value = "orderId", required = false)String orderId)
{
Entry entry = null;
// 资源名
String resourceName = "getOrder";
try
{
// entry可以理解成入口登记
entry = SphU.entry(resourceName);
// 被保护的逻辑, 这里为订单查询接口
return "正常的业务逻辑 OrderInfo :" + orderId;
} catch (BlockException blockException)
{
// 接口被限流的时候, 会进入到这里
log.warn("---getOrder1接口被限流了---, exception: ", blockException);
return "接口限流, 返回空";
} finally
{
// SphU.entry(xxx) 需要与 entry.exit() 成对出现,否则会导致调用链记录异常
if (entry != null)
{
entry.exit();
}
}
}
代码限流规则
//限流规则 QPS mode,
List<FlowRule> rules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource("getOrder");
// QPS控制在2以内
rule1.setCount(2);
// QPS限流
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
FlowRuleManager.loadRules(rules);
网页限流规则配置
Field | 说明 | 默认值 |
default ,代表不区分调用来源 | ||
refResource ),还是根据链路入口 | ||
测试
4.2 关联模式
NodeSelectorSlot
建立不同资源间的调用的关系,并且通过 ClusterBuilderSlot
记录每个资源的实时统计信息。read_db
和 write_db
这两个资源分别代表数据库读写,我们可以给 read_db
设置限流规则来达到写优先的目的。具体的方法:设置 `strategy` 为 `RuleConstant.STRATEGY_RELATE`
设置 `refResource` 为 `write_db`。
这样当写库操作过于频繁时,读数据的请求会被限流。
使用注解进行资源定义
@SentinelResource(value = "test1", blockHandler = "exceptionHandler")
@GetMapping("/test1")
public String test1()
{
log.info(Thread.currentThread().getName() + "\t" + "...test1");
return "-------hello baby,i am test1";
}
// Block 异常处理函数,参数最后多一个 BlockException,其余与原函数一致.
public String exceptionHandler(BlockException ex)
{
// Do some log here.
ex.printStackTrace();
log.info(Thread.currentThread().getName() + "\t" + "...exceptionHandler");
return String.format("error: test1 is not OK");
}
@SentinelResource(value = "test1_ref")
@GetMapping("/test1_ref")
public String test1_ref()
{
log.info(Thread.currentThread().getName() + "\t" + "...test1_related");
return "-------hello baby,i am test1_ref";
}
代码配置关联限流规则
// 关联模式流控 QPS控制在1以内
String refResource = "test1_ref";
FlowRule rRule = new FlowRule("test1")
.setCount(1) // QPS控制在1以内
.setStrategy(RuleConstant.STRATEGY_RELATE)
.setRefResource(refResource);
rules.add(rRule);
FlowRuleManager.loadRules(rules);
网页限流规则配置
测试
4.3 Warm up(预热)模式
db连接池 , 最少的连接数 线程池, 很少线程数 其他的池
使用注解定义资源
@SentinelResource(value = "testWarmUP", blockHandler = "exceptionHandlerOfWarmUp")
@GetMapping("/testWarmUP")
public String testWarmUP()
{
log.info(Thread.currentThread().getName() + "\t" + "...test1");
return "-------hello baby,i am testWarmUP";
}
代码预热规则
FlowRule warmUPRule = new FlowRule();
warmUPRule.setResource("testWarmUP");
warmUPRule.setCount(10);
warmUPRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
warmUPRule.setLimitApp("default");
warmUPRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_WARM_UP);
warmUPRule.setWarmUpPeriodSec(5);
网页预热规则配置
4.4 排队等待模式
示例
使用注解定义资源
@SentinelResource(value = "testLineUp",
blockHandler = "exceptionHandlerOftestLineUp")
@GetMapping("/testLineUp")
public String testLineUp()
{
log.info(Thread.currentThread().getName() + "\t" + "...test1");
return "-------hello baby,i am testLineUp";
}
代码限流规则
FlowRule lineUpRule = new FlowRule();
lineUpRule.setResource("testLineUp");
lineUpRule.setCount(10);
lineUpRule.setGrade(RuleConstant.FLOW_GRADE_QPS);
lineUpRule.setLimitApp("default");
lineUpRule.setMaxQueueingTimeMs(20 * 1000);
// CONTROL_BEHAVIOR_DEFAULT means requests more than threshold will be rejected immediately.
// CONTROL_BEHAVIOR_DEFAULT将超过阈值的流量立即拒绝掉.
lineUpRule.setControlBehavior(RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER);
rules.add(lineUpRule);
网页限流规则配置
通过jmeter进行测试
4.5 热点规则 (ParamFlowRule)
商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制 热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效。 使用该规则需要引入依赖:
ParamFlowRule
)类似于流量控制规则(FlowRule
):属性 | 说明 | 默认值 |
SphU.entry(xxx, args) 中的参数索引位置 | ||
count 阈值的限制。仅支持基本类型和字符串类型 | ||
false | ||
自定义资源
@GetMapping("/byHotKey")
@SentinelResource(value = "byHotKey",
blockHandler = "userAccessError")
public String test4(@RequestParam(value = "userId", required = false) String userId,
@RequestParam(value = "goodId", required = false) int goodId)
{
log.info(Thread.currentThread().getName() + "\t" + "...byHotKey");
return "-----------by HotKey: UserId";
}
限流规则代码:
ParamFlowRule rule = new ParamFlowRule(resourceName)
.setParamIdx(0)
.setCount(5);
// 针对 int 类型的参数 PARAM_B,单独设置限流 QPS 阈值为 10,而不是全局的阈值 5.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(PARAM_B))
.setClassType(int.class.getName())
.setCount(10);
rule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(rule));
ParamFlowRule pRule = new ParamFlowRule("byHotKey")
.setParamIdx(1)
.setCount(1);
// 针对 参数值1000,单独设置限流 QPS 阈值为 5,而不是全局的阈值 1.
ParamFlowItem item = new ParamFlowItem().setObject(String.valueOf(1000))
.setClassType(int.class.getName())
.setCount(5);
pRule.setParamFlowItemList(Collections.singletonList(item));
ParamFlowRuleManager.loadRules(Collections.singletonList(pRule));
网页限流规则配置
5、Sentinel 系统保护
系统保护的目的
保证系统不被拖垮 在系统稳定的前提下,保持系统的吞吐量
load 是一个“结果”,如果根据 load 的情况来调节流量的通过率,那么就始终有延迟性。也就意味着通过率的任何调整,都会过一段时间才能看到效果。当前通过率是使 load 恶化的一个动作,那么也至少要过 1 秒之后才能观测到;同理,如果当前通过率调整是让 load 好转的一个动作,也需要 1 秒之后才能继续调整,这样就浪费了系统的处理能力。所以我们看到的曲线,总是会有抖动。 恢复慢。想象一下这样的一个场景(真实),出现了这样一个问题,下游应用不可靠,导致应用 RT 很高,从而 load 到了一个很高的点。过了一段时间之后下游应用恢复了,应用 RT 也相应减少。这个时候,其实应该大幅度增大流量的通过率;但是由于这个时候 load 仍然很高,通过率的恢复仍然不高。
系统保护规则的应用
Load 自适应(仅对 Linux/Unix-like 机器生效):系统的 load1 作为启发指标,进行自适应系统保护。当系统 load1 超过设定的启发值,且系统当前的并发线程数超过估算的系统容量时才会触发系统保护(BBR 阶段)。系统容量由系统的 maxQps * minRt
估算得出。设定参考值一般是CPU cores * 2.5
。CPU usage(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0),比较灵敏。 平均 RT:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。 并发线程数:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。 入口 QPS:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。
EntryType.IN
),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。highestSystemLoad 最大的 load1,参考值 -1 (不生效) avgRt 所有入口流量的平均响应时间 -1 (不生效) maxThread 入口流量的最大并发数 -1 (不生效) qps 所有入口资源的 QPS -1 (不生效)
List<SystemRule> srules = new ArrayList<>();
SystemRule srule = new SystemRule();
srule.setAvgRt(3000);
srules.add(srule);
SystemRuleManager.loadRules(srules);
网页限流规则配置
6、黑白名单规则
调用方信息通过 ContextUtil.enter(resourceName, origin) 方法中的 origin 参数传入。也可以在spring容器中注册RequestOriginParser
访问控制规则 (AuthorityRule)
resource:资源名,即限流规则的作用对象 limitApp:对应的黑名单/白名单,不同 origin 用 , 分隔,如 appA,appB strategy:限制模式,AUTHORITY_WHITE 为白名单模式,AUTHORITY_BLACK 为黑名单模式,默认为白名单模式 比如我们希望控制对资源 test 的访问设置白名单,只有来源为 appA 和 appB 的请求才可通过,则可以配置如下白名单规则:
AuthorityRule rule = new AuthorityRule();
rule.setResource("test");
rule.setStrategy(RuleConstant.AUTHORITY_WHITE);
rule.setLimitApp("appA,appB");
AuthorityRuleManager.loadRules(Collections.singletonList(rule));
@Component
public class CustomerRequestOriginParser implements RequestOriginParser {
//通过Request获取来源标识,交给授权规则进行匹配
@Override
public String parseOrigin(HttpServletRequest request) {
String origin = request.getHeader("origin"); //可以自定义
if (origin == null || "".equals(origin)) {
throw new RuntimeException("origin 为空");
}
return origin;
}
}
7、如何定义资源
定义资源 定义规则 检验规则是否生效
方式一:主流框架的默认适配
方式二:抛出异常的方式定义资源
SphU
包含了 try-catch 风格的 API。用这种方式,当资源发生了限流之后会抛出 BlockException
。这个时候可以捕捉异常,进行限流之后的逻辑处理。示例代码如下:// 1.5.0 版本开始可以利用 try-with-resources 特性(使用有限制)
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
try (Entry entry = SphU.entry("resourceName")) {
// 被保护的业务逻辑
// do something here...
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 在此处进行相应的处理操作
}
exit(count, args)
),否则可能会有统计错误。这个时候不能使用 try-with-resources 的方式。Tracer.trace(ex)
来统计异常信息时,由于 try-with-resources 语法中 catch 调用顺序的问题,会导致无法正确统计异常数,因此统计异常信息时也不能在 try-with-resources 的 catch 块中调用 Tracer.trace(ex)
。Entry entry = null;
// 务必保证 finally 会被执行
try {
// 资源名可使用任意有业务语义的字符串,注意数目不能太多(超过 1K),超出几千请作为参数传入而不要直接作为资源名
// EntryType 代表流量类型(inbound/outbound),其中系统规则只对 IN 类型的埋点生效
entry = SphU.entry("自定义资源名");
// 被保护的业务逻辑
// do something...
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 进行相应的处理操作
} catch (Exception ex) {
// 若需要配置降级规则,需要通过这种方式记录业务异常
Tracer.traceEntry(ex, entry);
} finally {
// 务必保证 exit,务必保证每个 entry 与 exit 配对
if (entry != null) {
entry.exit();
}
}
Entry entry = null;
try {
// 若需要配置例外项,则传入的参数只支持基本类型。
// EntryType 代表流量类型,其中系统规则只对 IN 类型的埋点生效
// count 大多数情况都填 1,代表统计为一次调用。
entry = SphU.entry(resourceName, EntryType.IN, 1, paramA, paramB);
// Your logic here.
} catch (BlockException ex) {
// Handle request rejection.
} finally {
// 注意:exit 的时候也一定要带上对应的参数,否则可能会有统计错误。
if (entry != null) {
entry.exit(1, paramA, paramB);
}
}
SphU.entry()
的参数描述:参数名 | 类型 | 解释 | 默认值 |
EntryType | EntryType.IN )还是出口流量(EntryType.OUT ),注意系统规则只对 IN 生效 | EntryType.OUT | |
int | |||
Object[] |
SphU.entry(xxx)
需要与 entry.exit()
方法成对出现,匹配调用,否则会导致调用链记录异常,抛出 ErrorEntryFreeException
异常。常见的错误:自定义埋点只调用 SphU.entry()
,没有调用entry.exit()
顺序错误,比如: entry1 -> entry2 -> exit1 -> exit2
,应该为entry1 -> entry2 -> exit2 -> exit1
方式三:返回布尔值方式定义资源
SphO
提供 if-else 风格的 API。用这种方式,当资源发生了限流之后会返回 false
,这个时候可以根据返回值,进行限流之后的逻辑处理。示例代码如下:// 资源名可使用任意有业务语义的字符串
if (SphO.entry("自定义资源名")) {
// 务必保证finally会被执行
try {
/**
* 被保护的业务逻辑
*/
} finally {
SphO.exit();
}
} else {
// 资源访问阻止,被限流或被降级
// 进行相应的处理操作
}
SphO.entry(xxx)
需要与 SphO.exit()方法成对出现,匹配调用,位置正确,否则会导致调用链记录异常,抛出
ErrorEntryFreeException` 异常。方式四:注解方式定义资源
@SentinelResource
注解定义资源并配置 blockHandler
和 fallback
函数来进行限流之后的处理。示例:// 原本的业务方法.
@SentinelResource(blockHandler = "blockHandlerForGetUser")
public User getUserById(String id) {
throw new RuntimeException("getUserById command failed");
}
// blockHandler 函数,原方法调用被限流/降级/系统保护的时候调用
public User blockHandlerForGetUser(String id, BlockException ex) {
return new User("admin");
}
blockHandler
函数会在原方法被限流/降级/系统保护的时候调用,而 fallback
函数会针对所有类型的异常。请注意 blockHandler
和 fallback
函数的形式要求,更多指引可以参见 Sentinel 注解支持文档。方式五:异步调用支持
SphU.asyncEntry(xxx)
方法定义资源,并通常需要在异步的回调函数中调用 exit
方法。以下是一个简单的示例:try {
AsyncEntry entry = SphU.asyncEntry(resourceName);
// 异步调用.
doAsync(userId, result -> {
try {
// 在此处处理异步调用的结果.
} finally {
// 在回调结束后 exit.
entry.exit();
}
});
} catch (BlockException ex) {
// Request blocked.
// Handle the exception (e.g. retry or fallback).
}
SphU.asyncEntry(xxx)
不会影响当前(调用线程)的 Context,因此以下两个 entry 在调用链上是平级关系(处于同一层),而不是嵌套关系:// 调用链类似于:
// -parent
// ---asyncResource
// ---syncResource
asyncEntry = SphU.asyncEntry(asyncResource);
entry = SphU.entry(normalResource);
entry
还是 asyncEntry
),只需要借助 Sentinel 提供的上下文切换功能,在对应的地方通过 ContextUtil.runOnContext(context, f)
进行 Context 变换,将对应资源调用处的 Context 切换为生成的异步 Context,即可维持正确的调用链路关系。示例如下:public void handleResult(String result) {
Entry entry = null;
try {
entry = SphU.entry("handleResultForAsync");
// Handle your result here.
} catch (BlockException ex) {
// Blocked for the result handler.
} finally {
if (entry != null) {
entry.exit();
}
}
}
public void someAsync() {
try {
AsyncEntry entry = SphU.asyncEntry(resourceName);
// Asynchronous invocation.
doAsync(userId, result -> {
// 在异步回调中进行上下文变换,通过 AsyncEntry 的 getAsyncContext 方法获取异步 Context
ContextUtil.runOnContext(entry.getAsyncContext(), () -> {
try {
// 此处嵌套正常的资源调用.
handleResult(result);
} finally {
entry.exit();
}
});
});
} catch (BlockException ex) {
// Request blocked.
// Handle the exception (e.g. retry or fallback).
}
}
-parent
---asyncInvocation
-----handleResultForAsync
8 Sentinel中基本概念
统计数据:统计某个资源的访问数据(QPS、RT等信息) 规则判断:判断限流规则、隔离规则、降级规则、熔断规则是否满足
8.1 ProcessorSlotChain
统计数据构建部分(statistic) NodeSelectorSlot:负责构建簇点链路中的节点(DefaultNode),将这些节点形成链路树 ClusterBuilderSlot:负责构建某个资源的ClusterNode,ClusterNode可以保存资源的运行信息(响应时间、QPS、block 数目、线程数、异常数等)以及来源信息(origin名称) StatisticSlot:负责统计实时调用数据,包括运行信息、来源信息等 规则判断部分(rule checking) AuthoritySlot:负责授权规则(来源控制) SystemSlot:负责系统保护规则 ParamFlowSlot:负责热点参数限流规则 FlowSlot:负责限流规则 DegradeSlot:负责降级规则
8.2 Node
DefaultNode:代表链路树中的每一个资源,一个资源出现在不同链路中时,会创建不同的DefaultNode节点。而树的入口节点叫EntranceNode,是一种特殊的DefaultNode ClusterNode:代表资源,一个资源不管出现在多少链路中,只会有一个ClusterNode。记录的是当前资源被访问的所有统计数据之和。
节点 | 作用 |
业务1:controller中的资源/order/query访问了service中的资源/goods 业务2:controller中的资源/order/save访问了service中的资源/goods
8.3 Entry
// 资源名可使用任意有业务语义的字符串,比如方法名、接口名或其它可唯一标识的字符串。
try (Entry entry = SphU.entry("resourceName")) {
// 被保护的业务逻辑
// do something here...
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 在此处进行相应的处理操作
}
1.3.1.自定义资源
OrderService
的queryOrderById()
方法标记为一个资源。<!--sentinel-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
spring:
cloud:
sentinel:
transport:
dashboard: localhost:8089 # 这里我的sentinel用了8089的端口
public Order queryOrderById(Long orderId) {
// 创建Entry,标记资源,资源名为resource1
try (Entry entry = SphU.entry("resource1")) {
// 1.查询订单,这里是假数据
Order order = Order.build(101L, 4999L, "小米 MIX4", 1, 1L, null);
// 2.查询用户,基于Feign的远程调用
User user = userClient.findById(order.getUserId());
// 3.设置
order.setUser(user);
// 4.返回
return order;
}catch (BlockException e){
log.error("被限流或降级", e);
return null;
}
}
1.3.2.基于注解标记资源
SentinelAutoConfiguration
这个类:SentinelResourceAspect
:
/**
* Aspect for methods with {@link SentinelResource} annotation.
*
* @author Eric Zhao
*/
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
// 切点是添加了 @SentinelResource注解的类
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
// 环绕增强
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
// 获取受保护的方法
Method originMethod = resolveMethod(pjp);
// 获取 @SentinelResource注解
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
// Should not go through here.
throw new IllegalStateException("Wrong state for SentinelResource annotation");
}
// 获取注解上的资源名称
String resourceName = getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
Entry entry = null;
try {
// 创建资源 Entry
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 执行受保护的方法
Object result = pjp.proceed();
return result;
} catch (BlockException ex) {
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
// The ignore list will be checked first.
if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
throw ex;
}
if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
traceException(ex);
return handleFallback(pjp, annotation, ex);
}
// No fallback function can handle the exception, so throw it out.
throw ex;
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
Entry
)的创建。8.4 Context
8.4.1 什么是Context
Context 代表调用链路上下文,贯穿一次调用链路中的所有资源( Entry),基于ThreadLocal。 Context 维持着入口节点(entranceNode)、本次调用链路的 curNode(当前资源节点)、调用来源(origin)等信息。 后续的Slot都可以通过Context拿到DefaultNode或者ClusterNode,从而获取统计数据,完成规则判断 Context初始化的过程中,会创建EntranceNode,contextName就是EntranceNode的名称
// 创建context,包含两个参数:context名称、 来源名称
ContextUtil.enter("contextName", "originName");
8.4.2 Context的初始化
8.4.2.1.自动装配
HandlerInterceptor拦截器会拦截一切进入controller的方法,执行preHandle前置拦截方法,而Context的初始化就是在这里完成的。
8.4.2.2.AbstractSentinelInterceptor
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {
try {
// 获取资源名称,一般是controller方法的@RequestMapping路径,例如/order/{orderId}
String resourceName = getResourceName(request);
if (StringUtil.isEmpty(resourceName)) {
return true;
}
// 从request中获取请求来源,将来做 授权规则 判断时会用
String origin = parseOrigin(request);
// 获取 contextName,默认是sentinel_spring_web_context
String contextName = getContextName(request);
// 创建 Context
ContextUtil.enter(contextName, origin);
// 创建资源,名称就是当前请求的controller方法的映射路径
Entry entry = SphU.entry(resourceName, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
request.setAttribute(baseWebMvcConfig.getRequestAttributeName(), entry);
return true;
} catch (BlockException e) {
try {
handleBlockException(request, response, e);
} finally {
ContextUtil.exit();
}
return false;
}
}
8.4.2.3.ContextUtil
public static Context enter(String name, String origin) {
if (Constants.CONTEXT_DEFAULT_NAME.equals(name)) {
throw new ContextNameDefineException(
"The " + Constants.CONTEXT_DEFAULT_NAME + " can't be permit to defined!");
}
return trueEnter(name, origin);
}
protected static Context trueEnter(String name, String origin) {
// 尝试获取context
Context context = contextHolder.get();
// 判空
if (context == null) {
// 如果为空,开始初始化
Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
// 尝试获取入口节点
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
// 入口节点为空,初始化入口节点 EntranceNode
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// 添加入口节点到 ROOT
Constants.ROOT.addChild(node);
// 将入口节点放入缓存
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
} finally {
LOCK.unlock();
}
}
// 创建Context,参数为:入口节点 和 contextName
context = new Context(node, name);
// 设置请求来源 origin
context.setOrigin(origin);
// 放入ThreadLocal
contextHolder.set(context);
}
// 返回
return context;
}
9 Sentinel源码分析
9.1.入口
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args)
throws BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException {
// 将 资源名称等基本信息 封装为一个 StringResourceWrapper对象
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
// 继续
return entryWithPriority(resource, count, prioritized, args);
}
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 获取 Context
Context context = ContextUtil.getContext();
if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
、 // 获取 Slot执行链,同一个资源,会创建一个执行链,放入缓存
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 创建 Entry,并将 resource、chain、context 记录在 Entry中
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 执行 slotChain
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
9.2.DefaultProcessorSlotChain
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args)
throws Throwable {
// first,就是责任链中的第一个 slot
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}
9.3.NodeSelectorSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 尝试获取 当前资源的 DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
// 如果为空,为当前资源创建一个新的 DefaultNode
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
// 放入缓存中,注意这里的 key是contextName,
// 这样不同链路进入相同资源,就会创建多个 DefaultNode
cacheMap.put(context.getName(), node);
map = cacheMap;
// 当前节点加入上一节点的 child中,这样就构成了调用链路树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
// context中的curNode(当前节点)设置为新的 node
context.setCurNode(node);
// 执行下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
为当前资源创建 DefaultNode 将DefaultNode放入缓存中,key是contextName,这样不同链路入口的请求,将会创建多个DefaultNode,相同链路则只有一个DefaultNode 将当前资源的DefaultNode设置为上一个资源的childNode 将当前资源的DefaultNode设置为Context中的curNode(当前节点)
9.4.ClusterBuilderSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args)
throws Throwable {
// 判空,注意ClusterNode是共享的成员变量,也就是说一个资源只有一个ClusterNode,与链路无关
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// 创建 cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
// 放入缓存,可以是nodeId,也就是resource名称
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
// 将资源的 DefaultNode与 ClusterNode关联
node.setClusterNode(clusterNode);
// 记录请求来源 origin 将 origin放入 entry
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
// 继续下一个slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
9.5.StatisticSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) throws Throwable {
try {
// 放行到下一个 slot,做限流、降级等判断
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 请求通过了, 线程计数器 +1 ,用作线程隔离
node.increaseThreadNum();
// 请求计数器 +1 用作限流
node.addPassRequest(count);
if (context.getCurEntry().getOriginNode() != null) {
// 如果有 origin,来源计数器也都要 +1
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// 如果是入口资源,还要给全局计数器 +1.
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// 请求通过后的回调.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (Throwable e) {
// 各种异常处理就省略了。。。
context.getCurEntry().setError(e);
throw e;
}
}
@Override
public void addPassRequest(int count) {
// DefaultNode的计数器,代表当前链路的 计数器
super.addPassRequest(count);
// ClusterNode计数器,代表当前资源的 总计数器
this.clusterNode.addPassRequest(count);
}
AuthoritySlot:负责授权规则(来源控制) SystemSlot:负责系统保护规则 ParamFlowSlot:负责热点参数限流规则 FlowSlot:负责限流规则 DegradeSlot:负责降级规则
9.6.AuthoritySlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args)
throws Throwable {
// 校验黑白名单
checkBlackWhiteAuthority(resourceWrapper, context);
// 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
// 获取授权规则
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
// 遍历规则并判断
for (AuthorityRule rule : rules) {
if (!AuthorityRuleChecker.passCheck(rule, context)) {
// 规则不通过,直接抛出异常
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
static boolean passCheck(AuthorityRule rule, Context context) {
// 得到请求来源 origin
String requester = context.getOrigin();
// 来源为空,或者规则为空,都直接放行
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
// rule.getLimitApp()得到的就是 白名单 或 黑名单 的字符串,这里先用 indexOf方法判断
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
if (contain) {
// 如果包含 origin,还要进一步做精确判断,把名单列表以","分割,逐个判断
boolean exactlyMatch = false;
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
// 如果是黑名单,并且包含origin,则返回false
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
// 如果是白名单,并且不包含origin,则返回false
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
// 其它情况返回true
return true;
}
9.7.SystemSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count,boolean prioritized, Object... args) throws Throwable {
// 系统规则校验
SystemRuleManager.checkSystem(resourceWrapper);
// 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
if (!checkSystemStatus.get()) {
return;
}
// 只针对入口资源做校验,其它直接返回
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// 全局 QPS校验
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// 全局 线程数 校验
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// 全局平均 RT校验
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// 全局 系统负载 校验
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// 全局 CPU使用率 校验
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
9.8.ParamFlowSlot
这里的单机阈值,就是最大令牌数量:maxCount 这里的统计窗口时长,就是统计时长:duration
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node,
int count, boolean prioritized, Object... args) throws Throwable {
// 如果没有设置热点规则,直接放行
if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {
fireEntry(context, resourceWrapper, node, count, prioritized, args);
return;
}
// 热点规则判断
checkFlow(resourceWrapper, count, args);
// 进入下一个 slot
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
9.8.1.令牌桶
tokenCounters:用来记录剩余令牌数量 timeCounters:用来记录上一个请求的时间
三种流控模式:直接模式、关联模式、链路模式 三种流控效果:快速失败、warm up、排队等待
对进入资源的所有请求(ClusterNode)做限流统计:直接模式、关联模式 对进入资源的部分链路(DefaultNode)做限流统计:链路模式
滑动时间窗口算法:快速失败、warm up 漏桶算法:排队等待效果
9.9.1.核心流程
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 限流规则检测
checkFlow(resourceWrapper, context, node, count, prioritized);
// 放行
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// checker是 FlowRuleChecker 类的一个对象
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider,
ResourceWrapper resource,Context context, DefaultNode node,
int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// 获取当前资源的所有限流规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 遍历,逐个规则做校验
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public class FlowRule extends AbstractRule {
/**
* 阈值类型 (0: 线程, 1: QPS).
*/
private int grade = RuleConstant.FLOW_GRADE_QPS;
/**
* 阈值.
*/
private double count;
/**
* 三种限流模式.
*
* {@link RuleConstant#STRATEGY_DIRECT} 直连模式;
* {@link RuleConstant#STRATEGY_RELATE} 关联模式;
* {@link RuleConstant#STRATEGY_CHAIN} 链路模式.
*/
private int strategy = RuleConstant.STRATEGY_DIRECT;
/**
* 关联模式关联的资源名称.
*/
private String refResource;
/**
* 3种流控效果.
* 0. 快速失败, 1. warm up, 2. 排队等待, 3. warm up + 排队等待
*/
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
// 预热时长
private int warmUpPeriodSec = 10;
/**
* 队列最大等待时间.
*/
private int maxQueueingTimeMs = 500;
// 。。。 略
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
// 获取限流资源名称
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
// 校验规则
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node,
int acquireCount, boolean prioritized) {
// 基于限流模式判断要统计的节点,
// 如果是直连模式,关联模式,对ClusterNode统计,如果是链路模式,则对DefaultNode统计
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 判断规则
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
DefaultController:快速失败,默认的方式,基于滑动时间窗口算法 WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的 RateLimiterController:排队等待模式,基于漏桶算法
9.9.2.滑动时间窗口
一是时间区间窗口的QPS计数功能,这个是在StatisticSlot中调用的 二是对滑动窗口内的时间区间窗口QPS累加,这个是在FlowRule中调用的
9.9.2.1.时间窗口请求量统计
// intervalInMs:是滑动窗口的时间间隔,默认为 1 秒
// sampleCount: 时间窗口的分隔数量,默认为 2,就是把 1秒分为 2个小时间窗
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
@Override
public void addPass(int count) {
// 获取当前时间所在的时间窗
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 计数器 +1
wrap.value().addPass(count);
}
public abstract class LeapArray<T> {
// 小窗口的时间长度,默认是500ms ,值 = intervalInMs / sampleCount
protected int windowLengthInMs;
// 滑动窗口内的 小窗口 数量,默认为 2
protected int sampleCount;
// 滑动窗口的时间间隔,默认为 1000ms
protected int intervalInMs;
// 滑动窗口的时间间隔,单位为秒,默认为 1
private double intervalInSecond;
}
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 计算当前时间对应的数组角标
int idx = calculateTimeIdx(timeMillis);
// 计算当前时间所在窗口的开始时间.
long windowStart = calculateWindowStart(timeMillis);
/*
* 先根据角标获取数组中保存的 oldWindow 对象,可能是旧数据,需要判断.
*
* (1) oldWindow 不存在, 说明是第一次,创建新 window并存入,然后返回即可
* (2) oldWindow的 starTime = 本次请求的 windowStar, 说明正是要找的窗口,直接返回.
* (3) oldWindow的 starTime < 本次请求的 windowStar, 说明是旧数据,需要被覆盖,创建
* 新窗口,覆盖旧窗口
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 创建新 window
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
// 基于CAS写入数组,避免线程安全问题
if (array.compareAndSet(idx, null, window)) {
// 写入成功,返回新的 window
return window;
} else {
// 写入失败,说明有并发更新,等待其它人更新完成即可
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
// 获取并发锁,覆盖旧窗口并返回
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// 获取锁失败,等待其它线程处理就可以了
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 这种情况不应该存在,写这里只是以防万一。
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
9.9.2.2.滑动窗口QPS计算
DefaultController:快速失败,默认的方式,基于滑动时间窗口算法 WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的 RateLimiterController:排队等待模式,基于漏桶算法
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 计算目前为止滑动窗口内已经存在的请求量
int curCount = avgUsedTokens(node);
// 判断:已使用请求量 + 需要的请求量(1) 是否大于 窗口的请求阈值
if (curCount + acquireCount > count) {
// 大于,说明超出阈值,返回false
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
// 小于等于,说明在阈值范围内,返回true
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
// 这里又进入了 StatisticNode类
@Override
public double passQps() {
// 请求量 ÷ 滑动窗口时间间隔 ,得到的就是QPS
return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
}
// rollingCounterInSecond 本质是ArrayMetric,之前说过
@Override
public long pass() {
// 获取当前窗口
data.currentWindow();
long pass = 0;
// 获取 当前时间的 滑动窗口范围内 的所有小窗口
List<MetricBucket> list = data.values();
// 遍历
for (MetricBucket window : list) {
// 累加求和
pass += window.pass();
}
// 返回
return pass;
}
// 此处进入LeapArray类中:
public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
// 创建空集合,大小等于 LeapArray长度
int size = array.length();
List<T> result = new ArrayList<T>(size);
// 遍历 LeapArray
for (int i = 0; i < size; i++) {
// 获取每一个小窗口
WindowWrap<T> windowWrap = array.get(i);
// 判断这个小窗口是否在 滑动窗口时间范围内(1秒内)
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
// 不在范围内,则跳过
continue;
}
// 在范围内,则添加到集合中
result.add(windowWrap.value());
}
// 返回集合
return result;
}
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
// 当前时间 - 窗口开始时间 是否大于 滑动窗口的最大间隔(1秒)
// 也就是说,我们要统计的时 距离当前时间1秒内的 小窗口的 count之和
return time - windowWrap.windowStart() > intervalInMs;
}
9.9.3.令牌桶
DefaultController:快速失败,默认的方式,基于滑动时间窗口算法 WarmUpController:预热模式,基于滑动时间窗口算法,只不过阈值是动态的 RateLimiterController:排队等待模式,基于令牌桶算法
9.10.2.触发断路器
> ....尼恩提示:此小结删掉 3000字, 由于整个PDF 文档 6W多,超过了 5W字限制,不得不进行删除。 删除部分内容,请参见 本文的PDF 版本 《Sentinel 学习圣经》
8、核心组件 源码分析
> ....尼恩提示:此小结删掉 3000字, 由于整个PDF 文档 6W多,超过了 5W字限制,不得不进行删除。 删除部分内容,请参见 本文的PDF 版本 《Sentinel 学习圣经》
9、插槽Slot 源码分析
10、sentinel滑动窗口 sliding window 源码分析
说在最后:有问题找老架构取经
空窗1年/空窗2年,如何通过一份绝世好简历, 起死回生 ?
空窗8月:中厂大龄34岁,被裁8月收一大厂offer, 年薪65W,转架构后逆天改命!
空窗2年:42岁被裁2年,天快塌了,急救1个月,拿到开发经理offer,起死回生
空窗半年:35岁被裁6个月, 职业绝望,转架构急救上岸,DDD和3高项目太重要了
空窗1.5年:失业15个月,学习40天拿offer, 绝境翻盘,如何实现?
100W 年薪 大逆袭, 如何实现 ?
100W案例,100W年薪的底层逻辑是什么? 如何实现年薪百万? 如何远离 中年危机?
如何 评价一份绝世好简历, 实现逆天改命,包含AI、大数据、golang、Java 等
实现职业转型,极速上岸
关注职业救助站公众号,获取每天职业干货
助您实现职业转型、职业升级、极速上岸
---------------------------------
实现架构转型,再无中年危机
关注技术自由圈公众号,获取每天技术千货
一起成为牛逼的未来超级架构师
几十篇架构笔记、5000页面试宝典、20个技术圣经
请加尼恩个人微信 免费拿走
暗号,请在 公众号后台 发送消息:领电子书
如有收获,请点击底部的"在看"和"赞",谢谢