# 服务容错-Sentinel [TOC] ## 1.Sentinel概述 随着微服务的流行,服务和服务之间的稳定性变得越来越重要。Sentinel 是面向分布式服务架构的**轻量级**流量控制产品,主要以流量为切入点,从流量控制、熔断降级、系统负载保护等多个维度来帮助您保护服务的稳定性。 ​     ## 2.整合Sentinel * 第一步: 加依赖 ```         <!--alibaba sentinel--> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId> </dependency>           <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> ``` * 第二步:写配置 ```yaml spring:   cloud:     nacos:       discovery:         server-addr: localhost:8848         namespace: 807451c6-1a54-4c3f-b652-ac1adacdd6f7     sentinel:       transport:         # 指定sentinel 控制台的地址         dashboard: localhost:8080     # 全端点开放 management:   endpoints:     web:       exposure:         include: "*" ``` * 第三步:核查是否成功 如果整合成功,就可以看到 actuator/sentine 端点 然后 我们启动项目进入端点,可以看到返回对应数据,说明整合成功 ![](https://img.kancloud.cn/cd/46/cd46899e0e5d06c94ed990f4be83705b_1091x710.png) ## 3.Sentinel-dashboard (控制台)--- 重点 ### 启动sentinel-dashboard ![](https://img.kancloud.cn/19/3c/193cf6fecdf75123346d9ecacb444a40_1610x876.png) ### 访问sentinel-dashboard ### [http://127.0.0.1:8989/](http://127.0.0.1:8989/) ### sentinel/sentinel ### 登录页面 ![](https://img.kancloud.cn/93/e5/93e59cc065413ce08a30a26c17ec005c_1920x606.png) ### 登录成功 ![](https://img.kancloud.cn/96/17/96174ff06e1225ace15f6b0245179426_1901x662.png) 注意点:一开始进去Sentinel 控制台 默认是懒加载的模式 就算应用注册上去,也是看不到该应用的;可以调用该应用的 /actuator/sentinel 端点,触发懒加模式; ![](https://img.kancloud.cn/73/e8/73e882478005ad079bd35f733a8fd593_1895x746.png) 以上整合 sentinel 完成,接下来开始sentinel的监控配置; ### 执行流程 Sentinel 的执行流程分为三个阶段: * Sentinel Core 与 Sentinel Dashboard 建立连接; > 第一步,建立连接。 Sentine Core 在初始化的时候,通过 application.yml 参数中指定的 Dashboard 的 IP地址,会主动向 dashboard 发起连接的请求。 ``` # Sentinel Dashboard通信地址 spring: cloud sentinel: transport: # 指定sentinel 控制台的地址 dashboard: 127.0.0.1:8080 eager: true ``` 该请求是以心跳包的方式定时向 Dashboard 发送,包含 Sentinel Core 的 AppName、IP、端口信息。这里有个重要细节:Sentinel Core为了能够持续接收到来自 Dashboard的数据,会在微服务实例设备上监听 8719 端口,在心跳包上报时也是上报这个 8719 端口。在 Sentinel Dashboard 接收到心跳包后,来自 Sentinel Core的AppName、IP、端口信息会被封装为 MachineInfo 对象放入 ConcurrentHashMap 保存在 JVM的内存中供后续使用。 ![](https://img.kancloud.cn/c5/f1/c5f1014c724d0251496bdc2260493683_886x399.png) * Sentinel Dashboard 向 Sentinel Core 下发新的保护规则; > 如果在 Dashboard 页面中设置了新的保护规则,会先从当前的 MachineInfo 中提取符合要求的微服务实例信息,之后通过 Dashboard内置的 transport 模块将新规则打包推送到微服务实例的 Sentinel Core,Sentinel Core收 到新规则在微服务应用中对本地规则进行更新,这些新规则会保存在微服务实例的 JVM 内存中。 ![](https://img.kancloud.cn/e5/a9/e5a9f692a8acb4ce762c7ba1f25556ec_867x371.png) * Sentinel Core 应用新的保护规则,实施限流、熔断等动作。 > Sentinel Core 为服务限流、熔断提供了核心拦截器 SentinelWebInterceptor,这个拦截器默认对所有请求 /** 进行拦截,然后开始请求的链式处理流程,在对于每一个处理请求的节点被称为 Slot(槽),通过多个槽的连接形成处理链,在请求的流转过程中,如果有任何一个 Slot 验证未通过,都会产生 BlockException,请求处理链便会中断,并返回“Blocked by sentinel" 异常信息。 ![](https://img.kancloud.cn/85/4b/854b733a2c2641a38445b31512df86d8_942x909.png) ## 4.专业名称 Sentinel 的一些概念 * 资源:资源是 Sentinel 的关键概念。资源,可以是一个方法、一段代码、由应用提供的接口,或者由应用调用其它应用的接口。 * 规则:围绕资源的实时状态设定的规则,包括流量控制规则、熔断降级规则以及系统保护规则、自定义规则。 * 降级:在流量剧增的情况下,为保证系统能够正常运行,根据资源的实时状态、访问流量以及系统负载有策略的拒绝掉一部分流量。 Sentinel 可以简单的分为 Sentinel 核心库和 Dashboard。核心库不依赖 Dashboard,但是结合 Dashboard 可以取得最好的效果。 1. **定义资源** 2. **定义规则** 3. **检验规则是否生效** 先把可能需要保护的资源定义好,之后再配置规则。也可以理解为,只要有了资源,我们就可以在任何时候灵活地定义各种流量控制规则。在编码的时候,只需要考虑这个代码是否需要保护,如果需要保护,就将之定义为一个资源。 对于主流的框架,我们提供适配,只需要按照适配中的说明配置,Sentinel 就会默认定义提供的服务,方法等为资源 。 ### 4.1.定义资源 我们启动用户中心模块,访问 /users-anon/login 端点 ![](https://img.kancloud.cn/62/ce/62ce4d2d4a811006592a573cf56fa406_1872x849.png) 在簇点链路中找到  **/users-anon/login** 定义的资源,然后点击流控 ![](https://img.kancloud.cn/d2/2c/d22c91267c9c119e55708adf45bedd15_1860x718.png) #### Sentinel 资源指标数据统计相关的类 Sentinel 中指标数据统计以资源为维度。资源使用 ResourceWrapper 对象表示,我们把 ResourceWrapper 对象称为资源 ID。如果一个资源描述的是一个接口,那么资源名称通常就是接口的 url,例如“GET:**/users-anon/login**。 ##### **ResourceWrapper** ~~~ public abstract class ResourceWrapper { protected final String name; protected final EntryType entryType; protected final int resourceType; public ResourceWrapper(String name, EntryType entryType, int resourceType) { this.name = name; this.entryType = entryType; this.resourceType = resourceType; } } ~~~ ResourceWrapper 有三个字段: * name 为资源名称,例如:“GET:**/users-anon/login**”。 * entryType 为流量类型,即流入流量还是流出流量,通俗点说就是发起请求还是接收请求。 * resourceType 表示资源的类型,例如 Dubbo RPC、Web MVC 或者 API Gateway 网关。 EntryType 是一个枚举类型: ~~~ public enum EntryType { IN("IN"), OUT("OUT"); } ~~~ 可以把 IN 和 OUT 简单理解为接收处理请求与发送请求。当接收到别的服务或者前端发来的请求,那么 entryType 为 IN;当向其他服务发起请求时,那么 entryType 就为 OUT。例如,在消费端向服务提供者发送请求,当请求失败率达到多少时触发熔断降级,那么服务消费端为实现熔断降级就需要统计资源的 OUT 类型流量。 Sentinel 目前支持的资源类型有以下几种: ~~~ public final class ResourceTypeConstants { public static final int COMMON = 0; public static final int COMMON_WEB = 1; public static final int COMMON_RPC = 2; public static final int COMMON_API_GATEWAY = 3; public static final int COMMON_DB_SQL = 4; } ~~~ * COMMON:默认 * COMMON_WEB:Web 应用的接口 * COMMON_RPC:Dubbo 框架的 RPC 接口 * COMMON_API_GATEWAY:用于 API Gateway 网关 * COMMON_DB_SQL:数据库 SQL 操作 ### 4.2.定义规则 #### 1.流量控制规则(FlowRule) ![](https://img.kancloud.cn/d1/58/d158e9fcb29ed8815912cb21f4172c80_1268x863.png) 资源名:资源名称,必需项 针对来源:默认 default,该项定义资源需要自定义某些规则,后面会说到; 阈值类型:2种类型,QPS 和 线程数 单机阈值:定义数量,超过则返回失败 是否集群:后续会说明; 流控模式:3种类型; * 直接拒绝:超过阈值直接抛异常  FlowException * 关联: 是指关联的资源到达阈值,当前资源无法访问,既做到了限流的效果 * 链路: 只会记录指定链路上的资源 流控效果:3种类型; * 1.快速失败:直接失败,抛异常 FlowException 源码位置 canPass方法 ``` com.alibaba.csp.sentinel.slots.block.flow.controller.DefaultController @Override public boolean canPass(Node node, int acquireCount, boolean prioritized) {           //获取系统当前qps数量         int curCount = avgUsedTokens(node);           //如果当前qps + 请求qps > 定义阈值  并且 prioritized 这个定义为 false 所以直接抛出异常         if (curCount + acquireCount > count) {             if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {                 long currentTime;                 long waitInMs;                 currentTime = TimeUtil.currentTimeMillis();                 waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);                 if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {                     node.addWaitingRequest(currentTime + waitInMs, acquireCount);                     node.addOccupiedPass(acquireCount);                     sleep(waitInMs);                     // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.                     throw new PriorityWaitException(waitInMs);                 }             }             return false;         }         return true;     } ```    * 2.Warm up(慢启动模式):Warm Up(`RuleConstant.CONTROL_BEHAVIOR_WARM_UP`)方式,即预热/冷启动方式。当系统长期处于低水位的情况下,当流量突然增加时,直接把系统拉升到高水位可能瞬间把系统压垮。通过"冷启动",让通过的流量缓慢增加,在一定时间内逐渐增加到阈值上限,给冷系统一个预热的时间,避免冷系统被压垮。 源码位置 canPass方法 ``` com.alibaba.csp.sentinel.slots.block.flow.controller.WarmUpController     @Override     public boolean canPass(Node node, int acquireCount, boolean prioritized) {         long passQps = (long) node.passQps();         long previousQps = (long) node.previousPassQps();         syncToken(previousQps);         // 开始计算它的斜率         // 如果进入了警戒线,开始调整他的qps         long restToken = storedTokens.get();         if (restToken >= warningToken) {             long aboveToken = restToken - warningToken;             // 消耗的速度要比warning快,但是要比慢             // current interval = restToken*slope+1/count             double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));             if (passQps + acquireCount <= warningQps) {                 return true;             }         } else {             if (passQps + acquireCount <= count) {                 return true;             }         }         return false;     } ``` ​     * 3.排队等待:匀速排队(`RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER`)方式会严格控制请求通过的间隔时间,也即是让请求以均匀的速度通过,对应的是漏桶算法;**阈值类型必须为QPS,否则无效** 源码位置 canPass方法 ``` com.alibaba.csp.sentinel.slots.block.flow.controller.RateLimiterController     @Override     public boolean canPass(Node node, int acquireCount, boolean prioritized) {         // Pass when acquire count is less or equal than 0.         if (acquireCount <= 0) {             return true;         }         // Reject when count is less or equal than 0.         // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.         if (count <= 0) {             return false;         }         long currentTime = TimeUtil.currentTimeMillis();         // Calculate the interval between every two requests.         long costTime = Math.round(1.0 * (acquireCount) / count * 1000);         // Expected pass time of this request.           // latestPassedTime 根据最后一次通过的时间,排队等待,其实定义好了每段时间经过,如果超过定义的超时时间,就丢弃请求         long expectedTime = costTime + latestPassedTime.get();         if (expectedTime <= currentTime) {             // Contention may exist here, but it's okay.               //记录最后一次时间 (这里可能存在争议,但没关系。)             latestPassedTime.set(currentTime);             return true;         } else {             // Calculate the time to wait.             long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();               //如果等待的时间 大于 我最大的等待时间 直接抛弃             if (waitTime > maxQueueingTimeMs) {                 return false;             } else {                 long oldTime = latestPassedTime.addAndGet(costTime);                 try {                     waitTime = oldTime - TimeUtil.currentTimeMillis();                     if (waitTime > maxQueueingTimeMs) {                         latestPassedTime.addAndGet(-costTime);                         return false;                     }                     // in race condition waitTime may <= 0                     if (waitTime > 0) {                         Thread.sleep(waitTime);                     }                     return true;                 } catch (InterruptedException e) {                 }             }         }         return false;     } ``` ##### 流量控制测试 1.打开 Sentinel 控制台,在簇点链路中找到  **/users-anon/login** 定义的资源,然后点击流控 ![](https://img.kancloud.cn/7e/b2/7eb23d7585b786fb57fb75dc55f807c2_1833x768.png) 我们尝试QPS 单机阈值 1,然后调用 http://localhost:7000/users-anon/login?username=admin 资源 发现返回 Blocked by Sentinel (flow limiting) 说明我们流控 快速失败成功了 ![](https://img.kancloud.cn/0b/b5/0bb5cd90262a11c5de6513c10d03eeb0_1346x198.png) #### 2.熔断降级规则(DegradeRule) ##### 1.概述 除了流量控制以外,对调用链路中不稳定的资源进行熔断降级也是保障高可用的重要措施之一。由于调用关系的复杂性,如果调用链路中的某个资源不稳定,最终会导致请求发生堆积。Sentinel **熔断降级**会在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速失败,避免影响到其它的资源而导致级联错误。当资源被降级后,在接下来的降级时间窗口之内,对该资源的调用都自动熔断(默认行为是抛出 `DegradeException`)。 ![](https://img.kancloud.cn/7d/6d/7d6dc00cf6c1a6621483a8ab7cee9c3b_1840x515.png) ##### 2.降级策略 - 平均响应时间 (`DEGRADE_GRADE_RT`):当 1s 内持续进入 5 个请求,对应时刻的平均响应时间(秒级)均超过阈值(`count`,以 ms 为单位),那么在接下的时间窗口(`DegradeRule` 中的 `timeWindow`,以 s 为单位)之内,对这个方法的调用都会自动地熔断(抛出 `DegradeException`)。注意 Sentinel 默认统计的 RT 上限是 4900 ms,**超出此阈值的都会算作 4900 ms**,若需要变更此上限可以通过启动配置项 `-Dcsp.sentinel.statistic.max.rt=xxx` 来配置。 - 异常比例 (`DEGRADE_GRADE_EXCEPTION_RATIO`):当资源的每秒请求量 >= 5,并且每秒异常总数占通过量的比值超过阈值(`DegradeRule` 中的 `count`)之后,资源进入降级状态,即在接下的时间窗口(`DegradeRule` 中的 `timeWindow`,以 s 为单位)之内,对这个方法的调用都会自动地返回。异常比率的阈值范围是 `[0.0, 1.0]`,代表 0% - 100%。 - 异常数 (`DEGRADE_GRADE_EXCEPTION_COUNT`):当资源近 1 分钟的异常数目超过阈值之后会进行熔断。注意由于统计时间窗口是分钟级别的,若 `timeWindow` 小于 60s,则结束熔断状态后仍可能再进入熔断状态。 核心代码: ```java com.alibaba.csp.sentinel.slots.block.degrade.DegradeRule ``` ```java public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {         if (cut.get()) {             return false;         }         ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());         if (clusterNode == null) {             return true;         }           // RT方式         if (grade == RuleConstant.DEGRADE_GRADE_RT) {               //rt的平均数             double rt = clusterNode.avgRt();             if (rt < this.count) {                 passCount.set(0);                 return true;             }             // Sentinel will degrade the service only if count exceeds.               // 当前通过数 小于 RT最大是 返回可以通过             if (passCount.incrementAndGet() < RT_MAX_EXCEED_N) {                 return true;             }         } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { // 异常比例               //异常QPS             double exception = clusterNode.exceptionQps();             //成功QPS             double success = clusterNode.successQps();               //总数             double total = clusterNode.totalQps();             // if total qps less than RT_MAX_EXCEED_N, pass.               //总数小于最大的RT 直接通过             if (total < RT_MAX_EXCEED_N) {                 return true;             }               //可以访问的数量             double realSuccess = success - exception;             if (realSuccess <= 0 && exception < RT_MAX_EXCEED_N) {                 return true;             }               //异常数量 除以 成功数量 小于定义的数量 返回成功             if (exception / success < count) {                 return true;             }         } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {               // 总异常数量             double exception = clusterNode.totalException();               //总异常数量 小于 数量             if (exception < count) {                 return true;             }         }         if (cut.compareAndSet(false, true)) {             ResetTask resetTask = new ResetTask(this);             pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);         }         return false;     } ``` ##### 熔断降级测试 我们尝试RT   1ms ,时间窗口 5 s,然后调用 http://localhost:7000/users-anon/login?username=admin 资源 发现返回 Blocked by Sentinel (flow limiting) ![](https://img.kancloud.cn/44/ca/44ca50b83f64a5639f4d9d9da6199f45_1826x612.png) ![](https://img.kancloud.cn/e6/b3/e6b33ec72595e05a1b7d790e60a0ad1e_1140x267.png) **注意点:测试单个的时候应该把之前相关的规则删掉,因为Sentinel支持多个规则共存,到时候测试不出哪个是对应的规则;** **测试的debug图**,请同学们仔细看清楚上面的源码 ![](https://img.kancloud.cn/4a/5a/4a5ac546170faa6508a66e618bd527e2_1485x894.png) #### 3.热点规则 (ParamFlowRule) ##### 1.概述 何为热点?热点即经常访问的数据。很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制。比如: - 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制 - 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制 热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流。热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效 ![](https://img.kancloud.cn/e5/d6/e5d6b3aa7f1441d603d877a7052a639f_1207x471.png) ##### 热点规格测试 ![](https://img.kancloud.cn/bb/a1/bba1b6cea12ecde22f5dfbc347cb25f6_862x727.png) 这次我们针对 username 参数进行限流,如果我们只有一个参数,这样限流的意思也等于限流整个url,所以我们点击高级选项 ```java     //定义一个 Sentinel 资源 否则热点无效,资源名称随便取,不能重复即可            @SentinelResource("findByUsername")     @GetMapping(value = "/users-anon/login", params = "username")     @ApiOperation(value = "根据用户名查询用户")     @LogAnnotation(module="user-center",recordRequestParam=false)     public LoginAppUser findByUsername(String username) throws ControllerException {         try {             return sysUserService.findByUsername(username);         } catch (ServiceException e) {             throw new ControllerException(e);         }     } ``` ![](https://img.kancloud.cn/44/e6/44e6aebcde5a9c5c63112e3c6de34148_1815x476.png) ![](https://img.kancloud.cn/2b/15/2b1563fc6ccdeca1f75e9ebde1f0e852_1840x640.png) ![](https://img.kancloud.cn/91/3f/913f3747f65e805b22befb9b8e3b1f71_1701x831.png) 点击进行保存 这个时候,因为我们方法只有一个参数,不管怎么设置也是对整个方法进行限流,这种方式完全可以依靠上面的几个规则解决,所以,接下来我们点击编辑,高级选项,我们针对**admin** 限流10次 ,其他用户我们限流1次的规则 进行测试 ![](https://img.kancloud.cn/4d/69/4d690ff98072bbaa0ce64d888a29c8c0_1218x794.png) 这个就是访问一次之后,在进行访问,直接返回 500 错误,刷新2次,就出现错误 然后 我们访问admin,多刷新几次,也没有出现500错误,可能本人测试机比较慢,部分请求慢,这个同学们可以自己测试 **注意点:也就是热点参数,我们已经可以细粒度为某个参数进行限流;或者某个参数值进行限流;** **参数必须为基本类型,不可以为复杂类型** ```java com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowChecker ``` ```java public static boolean passCheck(ResourceWrapper resourceWrapper, /*@Valid*/ ParamFlowRule rule, /*@Valid*/ int count,                              Object... args) {         if (args == null) {             return true;         }         int paramIdx = rule.getParamIdx();         if (args.length <= paramIdx) {             return true;         }         // Get parameter value. If value is null, then pass.         Object value = args[paramIdx];         if (value == null) {             return true;         }         if (rule.isClusterMode() && rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {             return passClusterCheck(resourceWrapper, rule, count, value);         }         return passLocalCheck(resourceWrapper, rule, count, value);     } ``` ```java com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowSlot ``` ```java void checkFlow(ResourceWrapper resourceWrapper, int count, Object... args) throws BlockException {         if (args == null) {             return;         }         if (!ParamFlowRuleManager.hasRules(resourceWrapper.getName())) {             return;         }         List rules = ParamFlowRuleManager.getRulesOfResource(resourceWrapper.getName());         for (ParamFlowRule rule : rules) {             applyRealParamIdx(rule, args.length);             // Initialize the parameter metrics.             ParameterMetricStorage.initParamMetricsFor(resourceWrapper, rule);             if (!ParamFlowChecker.passCheck(resourceWrapper, rule, count, args)) {                 String triggeredParam = "";                 if (args.length > rule.getParamIdx()) {                     Object value = args[rule.getParamIdx()];                     triggeredParam = String.valueOf(value);                 }                 throw new ParamFlowException(resourceWrapper.getName(), triggeredParam, rule);             }         }     } ``` 抛出该异常,我们可以通过全局异常捕抓,然后自定义对应异常就行 ``` com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException: owen ``` ![](https://img.kancloud.cn/80/7d/807d166144bf66b671d6bff9008ae173_1189x432.png) #### 4.系统保护规则 (SystemRule) Sentinel 系统自适应限流从整体维度对应用入口流量进行控制,结合应用的 Load、总体平均 RT、入口 QPS 和线程数等几个维度的监控指标,让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。 ![](https://img.kancloud.cn/fb/56/fb56e4c24f0602583b861d5b1f737c60_1833x508.png) 系统保护规则是从应用级别的入口流量进行控制,从单台机器的总体 Load、RT、入口 QPS 和线程数四个维度监控应用数据,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。 系统保护规则是应用整体维度的,而不是资源维度的,并且**仅对入口流量生效**。入口流量指的是进入应用的流量(`EntryType.IN`),比如 Web 服务或 Dubbo 服务端接收的请求,都属于入口流量。 系统规则支持以下的阈值类型: \- **Load**(仅对 Linux/Unix-like 机器生效):当系统 load1 超过阈值,且系统当前的并发线程数超过系统容量时才会触发系统保护。系统容量由系统的 `maxQps * minRt` 计算得出。设定参考值一般是 `CPU cores * 2.5`。 \- **CPU usage**(1.5.0+ 版本):当系统 CPU 使用率超过阈值即触发系统保护(取值范围 0.0-1.0)。 \- **RT**:当单台机器上所有入口流量的平均 RT 达到阈值即触发系统保护,单位是毫秒。 \- **线程数**:当单台机器上所有入口流量的并发线程数达到阈值即触发系统保护。 \- **入口 QPS**:当单台机器上所有入口流量的 QPS 达到阈值即触发系统保护。 核心代码 ```java com.alibaba.csp.sentinel.slots.system.SystemRuleManager ``` ```java     private static boolean checkBbr(int currentThread) {         if (currentThread > 1 &&             currentThread > Constants.ENTRY_NODE.maxSuccessQps() * Constants.ENTRY_NODE.minRt() / 1000) {             return false;         }         return true;     } ``` ```java  public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {         // Ensure the checking switch is on.         if (!checkSystemStatus.get()) {             return;         }         // for inbound traffic only         if (resourceWrapper.getEntryType() != EntryType.IN) {             return;         }         // total qps         double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();         if (currentQps > qps) {             throw new SystemBlockException(resourceWrapper.getName(), "qps");         }         // total thread         int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();         if (currentThread > maxThread) {             throw new SystemBlockException(resourceWrapper.getName(), "thread");         }         double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();         if (rt > maxRt) {             throw new SystemBlockException(resourceWrapper.getName(), "rt");         }         // load. BBR algorithm.         if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {             if (!checkBbr(currentThread)) {                 throw new SystemBlockException(resourceWrapper.getName(), "load");             }         }         // cpu usage         if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {             if (!checkBbr(currentThread)) {                 throw new SystemBlockException(resourceWrapper.getName(), "cpu");             }         }     } ``` 因为系统设置,一般比较少用,这里测试也不方便,没有进行测试,贴出2段核心源码,如果有兴趣的同学,可以继续研究; #### 5.访问控制规则 (AuthorityRule) 很多时候,我们需要根据调用方来限制资源是否通过,这时候可以使用 Sentinel 的黑白名单控制的功能。黑白名单根据资源的请求来源(`origin`)限制资源是否通过,若配置白名单则只有请求来源位于白名单内时才可通过;若配置黑名单则请求来源位于黑名单时不通过,其余的请求通过。 ![](https://img.kancloud.cn/e0/f2/e0f2d359225caa0e2f0a68a29f3990e6_1837x636.png) ## 5.代码配置规则 ​    Sentinel支持页面的方式新增规则,也支持代码的方式新增规则; 下面定义流控规则 ```java private void initFlowQpsRule() {     List rules = new ArrayList<>();     FlowRule rule = new FlowRule(resourceName);     // set limit qps to 20     rule.setCount(20);     rule.setGrade(RuleConstant.FLOW_GRADE_QPS);     rule.setLimitApp("default");     rules.add(rule);     FlowRuleManager.loadRules(rules); } ``` 代码中 “resourceName” 指定随机一个资源名就行了 。 Sentinel 实现限流降级、熔断降级、黑白名单限流降级、系统自适应限流降级以及热点参数限流降级都是由 ProcessorSlot、Checker、Rule、RuleManager 组合完成。ProcessorSlot 作为调用链路的切入点,负责调用 Checker 检查当前请求是否可以放行;Checker 则根据资源名称从 RuleManager 中拿到为该资源配置的 Rule(规则),取 ClusterNode 统计的实时指标数据与规则对比,如果达到规则的阈值则抛出 Block 异常,抛出 Block 异常意味着请求被拒绝,也就实现了限流或熔断。 可以总结为以下三个步骤: 1. 在 ProcessorSlot#entry 方法中调用 Checker#check 方法,并将 DefaultNode 传递给 Checker。 2. Checker 从 DefaultNode 拿到 ClusterNode,并根据资源名称从 RuleManager 获取为该资源配置的规则。 3. Checker 从 ClusterNode 中获取当前时间窗口的某项指标数据(QPS、avgRt 等)与规则的阈值对比,如果达到规则的阈值则抛出 Block 异常(也有可能将 check 交给 Rule 去实现)。 ## 6.控制台相关配置项 ```properties #spring settings spring.http.encoding.force=true spring.http.encoding.charset=UTF-8 spring.http.encoding.enabled=true #logging settings logging.level.org.springframework.web=INFO logging.file=${user.home}/logs/csp/sentinel-dashboard.log logging.pattern.file= %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n #logging.pattern.console= %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n #auth settings auth.filter.exclude-urls=/,/auth/login,/auth/logout,/registry/machine,/version auth.filter.exclude-url-suffixes=htm,html,js,css,map,ico,ttf,woff,png # If auth.enabled=false, Sentinel console disable login auth.username=sentinel auth.password=sentinel # Inject the dashboard version. It's required to enable # filtering in pom.xml for this resource file. sentinel.dashboard.version=${project.version} ``` 前面我们也下载了一个 dashboard 查看 **application.properties**,可以指定修改sentinel账号与密码; ## 7.注解支持 Sentinel 提供了 `@SentinelResource` 注解用于定义资源,并提供了 AspectJ 的扩展用于自动定义资源、处理 `BlockException` 等。 ### @SentinelResource 注解 `@SentinelResource` 用于定义资源,并提供可选的异常处理和 fallback 配置项。 `@SentinelResource` 注解包含以下属性: \- `value`:资源名称,必需项(不能为空) \- `entryType`:entry 类型,可选项(默认为 `EntryType.OUT`) \- `blockHandler` / `blockHandlerClass`: `blockHandler `对应处理 `BlockException` 的函数名称,可选项。blockHandler 函数访问范围需要是 `public`,返回类型需要与原方法相匹配,参数类型需要和原方法相匹配并且最后加一个额外的参数,类型为 `BlockException`。blockHandler 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 `blockHandlerClass` 为对应的类的 `Class` 对象,注意对应的函数必需为 static 函数,否则无法解析。 \- `fallback`:fallback 函数名称,可选项,用于在抛出异常的时候提供 fallback 处理逻辑。fallback 函数可以针对所有类型的异常(除了`exceptionsToIgnore`里面排除掉的异常类型)进行处理。fallback 函数签名和位置要求:   - 返回值类型必须与原函数返回值类型一致;   - 方法参数列表需要和原函数一致,或者可以额外多一个 `Throwable` 类型的参数用于接收对应的异常。   - fallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 `fallbackClass` 为对应的类的 `Class` 对象,注意对应的函数必需为 static 函数,否则无法解析。 \- `defaultFallback`(since 1.6.0):默认的 fallback 函数名称,可选项,通常用于通用的 fallback 逻辑(即可以用于很多服务或方法)。默认 fallback 函数可以针对所有类型的异常(除了`exceptionsToIgnore`里面排除掉的异常类型)进行处理。若同时配置了 fallback 和 defaultFallback,则只有 fallback 会生效。defaultFallback 函数签名要求:   - 返回值类型必须与原函数返回值类型一致;   - 方法参数列表需要为空,或者可以额外多一个 `Throwable` 类型的参数用于接收对应的异常。   - defaultFallback 函数默认需要和原方法在同一个类中。若希望使用其他类的函数,则可以指定 `fallbackClass` 为对应的类的 `Class` 对象,注意对应的函数必需为 static 函数,否则无法解析。 \- `exceptionsToIgnore`(since 1.6.0):用于指定哪些异常被排除掉,不会计入异常统计中,也不会进入 fallback 逻辑中,而是会原样抛出。 \> 注:1.6.0 之前的版本 fallback 函数只针对降级异常(`DegradeException`)进行处理,**不能针对业务异常进行处理**。 特别地,若 blockHandler 和 fallback 都进行了配置,则被限流降级而抛出 `BlockException` 时只会进入 `blockHandler` 处理逻辑。若未配置 `blockHandler`、`fallback` 和 `defaultFallback`,则被限流降级时会将 `BlockException` **直接抛出**。 核心代码: ```java com.alibaba.csp.sentinel.annotation.aspectj.SentinelResourceAspect ``` ```java     @Around("sentinelResourceAnnotationPointcut()")     public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {         Method originMethod = resolveMethod(pjp);         SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);         if (annotation == null) {             // Should not go through here.             throw new IllegalStateException("Wrong state for SentinelResource annotation");         }         String resourceName = getResourceName(annotation.value(), originMethod);         EntryType entryType = annotation.entryType();         int resourceType = annotation.resourceType();         Entry entry = null;         try {             entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());             Object result = pjp.proceed();             return result;         } catch (BlockException ex) {             return handleBlockException(pjp, annotation, ex);         } catch (Throwable ex) {             Class[] exceptionsToIgnore = annotation.exceptionsToIgnore();             // The ignore list will be checked first.             if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {                 throw ex;             }             if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {                 traceException(ex, annotation);                 return handleFallback(pjp, annotation, ex);             }             // No fallback function can handle the exception, so throw it out.             throw ex;         } finally {             if (entry != null) {                 entry.exit(1, pjp.getArgs());             }         }     } ``` 上面源码核心逻辑,我们可以用简单的代码解释下 ```java Entry entry = null; // 务必保证finally会被执行 try {   // 资源名可使用任意有业务语义的字符串   entry = SphU.entry("自定义资源名");   // 被保护的业务逻辑   // do something... } catch (BlockException e1) {   // 资源访问阻止,被限流或被降级   // 进行相应的处理操作 } finally {   if (entry != null) {     entry.exit();   } } ``` ## 8.Sentinel 源码分析 ### 基于滑动窗口的实时指标数据统计 > Sentinel 是基于滑动窗口实现的实时指标数据统计,要深入理解 Sentinel 的限流实现原理,首先我们得要了解其指标数据统计的实现,例如如何统计 QPS。 * Bucket Sentinel 使用 Bucket 统计一个窗口时间内的各项指标数据,这些指标数据包括请求总数、成功总数、异常总数、总耗时、最小耗时、最大耗时等,而一个 Bucket 可以是记录一秒内的数据,也可以是 10 毫秒内的数据,这个时间长度称为窗口时间。 ![](https://img.kancloud.cn/87/1c/871cbf152fe94536361b4b40a5e2b570_958x355.png) 如上面代码所示,Bucket 记录一段时间内的各项指标数据用的是一个 LongAdder 数组,LongAdder 保证了数据修改的原子性,并且性能比 AtomicInteger 表现更好。数组的每个元素分别记录一个时间窗口内的请求总数、异常数、总耗时,如下图所示。 ![](https://img.kancloud.cn/d7/fc/d7fc6eabbe49f4ae31a17779e35dc864_1027x424.png) Sentinel 用枚举类型 MetricEvent 的 ordinal 属性作为下标,ordinal 的值从 0 开始,按枚举元素的顺序递增,正好可以用作数组的下标。 ![](https://img.kancloud.cn/a4/e9/a4e9a8dab87b4fd3ddc7addfd6866b20_1371x501.png) 当需要获取 Bucket 记录总的成功请求数或者异常总数、总的请求处理耗时,可根据事件类型(MetricEvent)从 Bucket 的 LongAdder 数组中获取对应的 LongAdder,并调用 sum 方法获取总数,如下代码所示。 ![](https://img.kancloud.cn/dc/7d/dc7df398793c4dbd131a1dfebd44e8d7_1264x692.png) 当需要 Bucket 记录一个成功请求或者一个异常请求、处理请求的耗时,可根据事件类型(MetricEvent)从 LongAdder 数组中获取对应的 LongAdder,并调用其 add 方法,如下代码所示。 ![](https://img.kancloud.cn/ce/57/ce5726e985652bc271682f47c31b2e08_1302x492.png) * 滑动窗口 Sentinel 定义一个 Bucket 数组,根据时间戳来定位到数组的下标。 ![](https://img.kancloud.cn/2e/43/2e438ac0348a37299de112b10b2fb559_1144x341.png) 假设我们需要统计每 1 秒处理的请求数等数据,且只需要保存最近一分钟的数据。那么 Bucket 数组的大小就可以设置为 60,每个 Bucket 的 windowLengthInMs(窗口时间)大小就是 1000 毫秒(1 秒),如下图所示构建时间轮数据结构。 ![](https://img.kancloud.cn/3a/1f/3a1fb1601cda345d262595c6eb2300a1_720x255.png) * WindowWrap Bucket 自身并不保存时间窗口信息, Sentinel 给 Bucket 加了一个包装类 WindowWrap,用于记录 Bucket 的时间窗口信息。 当接收到一个请求时,根据接收到请求的时间戳计算出一个数组索引,从滑动窗口(WindowWrap 数组)中获取一个 WindowWrap,从而获取 WindowWrap 包装的 Bucket,调用 Bucket 的 add 方法记录相应的事件。 根据当前时间戳定位 Bucket 的算法实现如下。 ![](https://img.kancloud.cn/6b/4a/6b4a7ededfae2be5f312fbaf1a7474c9_1625x672.png) 上面代码实现的是,通过当前时间戳计算出当前时间窗口的 Bucket(New Buket)在数组中的索引(cidx),以及 Bucket 时间窗口的开始时间,通过索引从数组中取得 Bucket(Old Bucket)。 * 当索引(cidx)处不存在 Bucket 时,创建一个新的 Bucket,并且确保线程安全写入到数组 cidx 处,将此 Bucket 返回; * 当 Old Bucket 不为空时,且 Old Bucket 时间窗口的开始时间与当前计算得到的 New Buket 的时间窗口开始时间相等时,该 Bucket 就是当前要找的 Bucket,直接返回; * 当计算出 New Bucket 时间窗口的开始时间大于当前数组 cidx 位置存储的 Old Bucket 时间窗口的开始时间时,可以复用这个 Old Bucket,确保线程安全重置 Bucket,并返回; * 当计算出 New Bucket 时间窗口的开始时间小于当前数组 cidx 位置存储的 Old Bucket 时间窗口的开始时间时,直接返回一个空的 Bucket,因为时间不会倒退。 * WindowWrap 用于包装 Bucket,随着 Bucket 一起创建。 * WindowWrap 数组实现滑动窗口,Bucket 只负责统计各项指标数据,WindowWrap 用于记录 Bucket 的时间窗口信息。 * 定位 Bucket 实际上是定位 WindowWrap,拿到 WindowWrap 就能拿到 Bucket。 * Node Node 用于持有实时统计的指标数据,Node 接口定义了一个 Node 类所需要提供的各项指标数据统计的相关功能,为外部屏蔽滑动窗口的存在。提供记录请求被拒绝、请求被放行、请求处理异常、请求处理成功的方法,以及获取当前时间窗口统计的请求总数、平均耗时等方法。Node 几个实现类DefaultNode、ClusterNode、EntranceNode、StatisticNode 的关系如下图所示。。 ![](https://img.kancloud.cn/9a/5c/9a5c8792e636cafcaa807d972403842a_998x460.png) * StatisticNode Statistic 即统计的意思,StatisticNode 是 Node 接口的实现类,是实现实时指标数据统计 Node。 ~~~ public class StatisticNode implements Node { // 秒级滑动窗口,2 个时间窗口大小为 500 毫秒的 Bucket private transient volatile Metric rollingCounterInSecond = new ArrayMetric(2,1000); // 分钟级滑动窗口,60 个 Bucket 数组,每个 Bucket 统计的时间窗口大小为 1 秒 private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false); // 统计并发使用的线程数 private LongAdder curThreadNum = new LongAdder(); } ~~~ 如代码所示,一个 StatisticNode 包含一个秒级和一个分钟级的滑动窗口,以及并行线程数计数器。秒级滑动窗口用于统计实时的 QPS,分钟级的滑动窗口用于保存最近一分钟内的历史指标数据,并行线程计数器用于统计当前并行占用的线程数。 StatisticNode 的分钟级和秒级滑动窗口统计的指标数据分别有不同的用处。例如,StatisticNode 记录请求成功和请求执行耗时的方法中调用了两个滑动窗口的对应指标项的记录方法,代码如下: ~~~ @Override public void addRtAndSuccess(long rt, int successCount) { // 秒级滑动窗口 rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); // 分钟级滑动窗口 rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); } ~~~ 获取前一秒被 Sentinel 拒绝的请求总数从分钟级滑动窗口获取,代码如下: ~~~ @Override public double previousBlockQps() { return this.rollingCounterInMinute.previousWindowBlock(); } ~~~ 而获取当前一秒内已经被 Sentinel 拒绝的请求总数则从秒级滑动窗口获取,代码如下: ~~~ @Override public double blockQps() { return rollingCounterInSecond.block() / rollingCounterInSecond.getWindowIntervalInSec(); } ~~~ 获取最小耗时也是从秒级的滑动窗口取的,代码如下: ~~~ @Override public double minRt() { // 秒级滑动窗口 return rollingCounterInSecond.minRt(); } ~~~ 由于方法比较多,这里就不详细介绍每个方法的实现了。 StatisticNode 还负责统计并行占用的线程数,用于实现信号量隔离,按资源所能并发占用的最大线程数实现限流。当接收到一个请求就将 curThreadNum 自增 1,当处理完请求时就将 curThreadNum 自减一,如果同时处理 10 个请求,那么 curThreadNum 的值就为 10。 假设我们配置 tomcat 处理请求的线程池大小为 200,通过控制并发线程数实现信号量隔离的好处就是不让一个接口同时使用完这 200 个线程,避免因为一个接口响应慢将 200 个线程都阻塞导致应用无法处理其他请求的问题,这也是实现信号量隔离的目的。 ### Sentinel 中的 ProcessorSlot ProcessorSlot 直译就是处理器插槽,是 Sentinel 实现限流降级、熔断降级、系统自适应降级等功能的切入点。Sentinel 提供的 ProcessorSlot 可以分为两类,一类是辅助完成资源指标数据统计的切入点,一类是实现降级功能的切入点。 辅助资源指标数据统计的 ProcessorSlot: * NodeSelectorSlot:为当前资源创建 DefaultNode,并且将 DefaultNode 赋值给 Context.curEntry.curNode(见倒数第二张图);如果当前调用链路上只出现过一次 SphU#entry 的情况,将该 DefaultNode 添加到的 Context.entranceNode 的子节点(如倒数第一张图所示,名为 sentinel\_spring\_web\_context 的 EntranceNode),否则添加到 Context.curEntry.parent 的子节点(childList)。有点抽象,我们在分析 NodeSelectorSlot 源码时再详细介绍。 * ClusterBuilderSlot:如果当前资源未创建 ClusterNode,则为资源创建 ClusterNode;将 ClusterNode 赋值给当前资源的 DefaultNode.clusterNode;如果调用来源(origin)不为空,则为调用来源创建 StatisticNode,用于实现按调用来源统计资源的指标数据,ClusterNode 持有每个调用来源的 StatisticNode。 * StatisticSlot:这是 Sentinel 最为重要的类之一,用于实现指标数据统计。先是调用后续的 ProcessorSlot#entry 判断是否放行请求,再根据判断结果进行相应的指标数据统计操作。 实现降级功能的 ProcessorSlot: * AuthoritySlot:实现黑白名单降级 * SystemSlot:实现系统自适应降级 * FlowSlot:实现限流降级 * DegradeSlot:实现熔断降级 Sentinel 的整体工具流程就是使用责任链模式将所有的 ProcessorSlot 按照一定的顺序串成一个单向链表。辅助完成资源指标数据统计的 ProcessorSlot 必须在实现降级功能的 ProcessorSlot 的前面,原因很简单,降级功能需要依据资源的指标数据做判断,当然,如果某个 ProcessorSlot 不依赖指标数据实现降级功能,那这个 ProcessorSlot 的位置就没有约束。 除了按分类排序外,同一个分类下的每个 ProcessorSlot 可能也需要有严格的排序。比如辅助完成资源指标数据统计的 ProcessorSlot 的排序顺序为: > NodeSelectorSlot->ClusterBuilderSlot->StatisticSlot 如果顺序乱了就会抛出异常,而实现降级功能的 ProcessorSlot 就没有严格的顺序要求,AuthoritySlot、SystemSlot、FlowSlot、DegradeSlot 这几个的顺序可以按需调整。 实现将 ProcessorSlot 串成一个单向链表的是 ProcessorSlotChain,这个 ProcessorSlotChain 是由 SlotChainBuilder 构造的,默认 SlotChainBuilder 构造的 ProcessorSlotChain 注册的 ProcessorSlot 以及顺序如下代码所示。 ~~~ public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new SystemSlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } } ~~~ ![](https://img.kancloud.cn/ab/98/ab985ac29dc9052007a418dac2220054_1062x368.png) #### Slot 作用 我们需要了解一下,默认 Slot 有7 个,前 3 个 Slot为前置处理,用于收集、统计、分析必要的数据;后 4 个为规则校验 Slot,从Dashboard 推送的新规则保存在“规则池”中,然后对应 Slot 进行读取并校验当前请求是否允许放行,允许放行则送入下一个 Slot 直到最终被 RestController 进行业务处理,不允许放行则直接抛出 BlockException 返回响应。 以下是每一个 Slot 的具体职责: NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级; ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT(运行时间), QPS, thread count(线程总数)等,这些信息将用作为多维度限流,降级的依据; StatistcSlot 则用于记录,统计不同维度的runtime 信息; SystemSlot 则通过系统的状态,例如CPU、内存的情况,来控制总的入口流量; AuthoritySlot 则根据黑白名单,来做黑白名单控制; FlowSlot 则用于根据预设的限流规则,以及前面 slot 统计的状态,来进行限流; DegradeSlot 则通过统计信息,以及预设的规则,来做熔断降级。 到这里我们理解了 Sentinel 通信与降级背后的执行过程,下面咱们学习如何有效配置 Sentinel 的限流策略。 ## 控制台是如何获取到实时数据的 Sentinel 能够被大家所认可,除了他自身的轻量级,高性能,可扩展之外,跟控制台的好用和易用也有着莫大的关系,因为通过控制台极大的方便了我们日常的运维工作。 我们可以在控制台上操作各种限流、降级、系统保护的规则,也可以查看每个资源的实时数据,还能管理集群环境下的服务端与客户端机器。 但是控制台只是一个独立的 spring boot 应用,他本身是没有任何数据的,他的数据都是从其他的 sentinel 实例中获取的,那他是如何获取到这些数据的呢?带着这个疑问我们从源码中寻找答案。 最简单的方法莫过于启动一个控制台的实例,然后从页面上查看每个接口请求的url,然后再到 dashboard 的代码中去深挖下去。 ### dashboard源码分析 dashboard 是通过一个叫 SentinelApiClient 的类去指定的 ip 和 port 处获取数据的。这个 ip 和 port 是前端页面直接提交给后端的,而前端页面又是通过 /app/{app}/machines.json 接口获取机器列表的 ![](https://img.kancloud.cn/88/23/8823bffc9e797af5524101874250c291_1118x863.png) ### 连接 dashboard 机器列表中展示的就是所有连接到 dashboard 上的 sentinel 的实例,包括普通限流的 sentinel-core 和集群模式下的 token-server 和 token-client。一个 sentinel-core 的实例要接入 dashboard 的几个步骤: 1. 引入 dashboard 的依赖 2. 配置 dashboard 的 ip 和 port 3. 初始化 sentinel-core,连接 dashboard sentinel-core 在初始化的时候,通过 JVM 参数中指定的 dashboard 的 ip 和 port,会主动向 dashboard 发起连接的请求,该请求是通过 HeartbeatSender 接口以心跳的方式发送的,并将自己的 ip 和 port 告知 dashboard。这里 sentinel-core 上报给 dashboard 的端口是 sentinel 对外暴露的自己的 CommandCenter 的端口。 HeartbeatSender 有两个实现类,一个是通过 http,另一个是通过 netty,我们看 http 的实现类: > SimpleHttpHeartbeatSender.java ~~~java private final HeartbeatMessage heartBeat = new HeartbeatMessage(); private final SimpleHttpClient httpClient = new SimpleHttpClient(); @Override public boolean sendHeartbeat() throws Exception { if (TransportConfig.getRuntimePort() <= 0) { RecordLog.info("[SimpleHttpHeartbeatSender] Runtime port not initialized, won't send heartbeat"); return false; } InetSocketAddress addr = getAvailableAddress(); if (addr == null) { return false; } SimpleHttpRequest request = new SimpleHttpRequest(addr, HEARTBEAT_PATH); request.setParams(heartBeat.generateCurrentMessage()); try { SimpleHttpResponse response = httpClient.post(request); if (response.getStatusCode() == OK_STATUS) { return true; } } catch (Exception e) { RecordLog.warn("[SimpleHttpHeartbeatSender] Failed to send heartbeat to " + addr + " : ", e); } return false; } ~~~ 通过一个 HttpClient 向 dashboard 发送了自己的信息,包括 ip port 和版本号等信息。 其中 consoleHost 和 consolePort 的值就是从 JVM 参数 csp.sentinel.dashboard.server 中获取的。 dashboard 在接收到 sentinel-core 的连接之后,就会与 sentinel-core 建立连接,并将 sentinel-core 上报的 ip 和 port 的信息包装成一个 MachineInfo 对象,然后通过 SimpleMachineDiscovery 将该对象保存在一个 map 中,如下图所示: ![](https://img.kancloud.cn/4b/cb/4bcb52bd960216466d362bb1ce35c40a_1183x637.png) ### 定时发送心跳 sentinel-core 连接上 dashboard 之后,并不是就结束了,事实上 sentinel-core 是通过一个 ScheduledExecutorService 的定时任务,每隔 10 秒钟向 dashboard 发送一次心跳信息。发送心跳的目的主要是告诉 dashboard 我这台 sentinel 的实例还活着,你可以继续向我请求数据。 这也就是为什么 dashboard 中每个 app 对应的机器列表要用 Set 来保存的原因,如果用 List 来保存的话就可能存在同一台机器保存了多次的情况。 心跳可以维持双方之间的连接是正常的,但是也有可能因为各种原因,某一方或者双方都离线了,那他们之间的连接就丢失了。 1.sentinel-core 宕机 如果是 sentinel-core 宕机了,那么这时 dashboard 中保存在内存里面的机器列表还是存在的。目前 dashboard 只是在接收到 sentinel-core 发送过来的心跳包的时候更新一次机器列表,当 sentinel-core 宕机了,不再发送心跳数据的时候,dashboard 是没有将 “失联” 的 sentinel-core 实例给去除的。而是页面上每次查询的时候,会去用当前时间减去机器上次心跳包的时间,如果时间差大于 5 分钟了,才会将该机器标记为 “失联”。 所以我们在页面上的机器列表中,需要至少等到 5 分钟之后,才会将具体失联的 sentinel-core 的机器标记为 “失联”。如下图所示: ![](https://img.kancloud.cn/63/b7/63b763e5d2d22a80b08ddcdaccefe98c_1512x482.png) 2.dashboard 宕机 如果 dashboard 宕机了,sentinel-core 的定时任务实际上是会一直请求下去的,只要 dashboard 恢复后就会自动重新连接上 dashboard,双方之间的连接又会恢复正常了,如果 dashboard 一直不恢复,那么 sentinel-core 就会一直报错,在 sentinel-record.log 中我们会看到如下的报错信息: ![](https://img.kancloud.cn/b8/b2/b8b20e224c6262bb1578d7ac1ce0d281_1167x433.png) 不过实际生产中,不可能出现 dashboard 宕机了一直没人去恢复的情况的,如果真出现这种情况的话,那就要吃故障了。 ### 请求数据 当 dashboard 有了具体的 sentinel-core 实例的 ip 和 port 之后,就可以去请求所需要的数据了。 让我们再回到最开始的地方,我在页面上查询某一台机器的限流的规则时,是将该机器的 ip 和 port 以及 appName 都传给了服务端,服务端通过这些信息去具体的远程实例中请求所需的数据,拿到数据后再封装成 dashboard 所需的格式返回给前端页面进行展示。 具体请求限流规则列表的代码在 SentinelApiClient 中,如下所示: > SentinelApiClient.java ~~~java public List<FlowRuleEntity> fetchFlowRuleOfMachine(String app, String ip, int port) { String url = "http://" + ip + ":" + port + "/" + GET_RULES_PATH + "?type=" + FLOW_RULE_TYPE; String body = httpGetContent(url); logger.info("FlowRule Body:{}", body); List<FlowRule> rules = RuleUtils.parseFlowRule(body); if (rules != null) { return rules.stream().map(rule -> FlowRuleEntity.fromFlowRule(app, ip, port, rule)) .collect(Collectors.toList()); } else { return null; } } ~~~ 可以看到也是通过一个 httpClient 请求的数据,然后再对结果进行转换,具体请求的过程是在 httpGetContent 方法中进行的,我们看下该方法,如下所示: ~~~java private String httpGetContent(String url) { final HttpGet httpGet = new HttpGet(url); final CountDownLatch latch = new CountDownLatch(1); final AtomicReference<String> reference = new AtomicReference<>(); httpClient.execute(httpGet, new FutureCallback<HttpResponse>() { @Override public void completed(final HttpResponse response) { try { reference.set(getBody(response)); } catch (Exception e) { logger.info("httpGetContent " + url + " error:", e); } finally { latch.countDown(); } } @Override public void failed(final Exception ex) { latch.countDown(); logger.info("httpGetContent " + url + " failed:", ex); } @Override public void cancelled() { latch.countDown(); } }); try { latch.await(5, TimeUnit.SECONDS); } catch (Exception e) { logger.info("wait http client error:", e); } return reference.get(); } ~~~ 从代码中可以看到,是通过一个异步的 httpClient 再结合 CountDownLatch 等待 5 秒的超时时间去获取结果的。 获取数据的请求从 dashboard 中发出去了,那 sentinel-core 中是怎么进行相应处理的呢?看过我其他文章的同学肯定还记得, sentinel-core 在启动的时候,执行了一个 InitExecutor.init 的方法,该方法会触发所有 InitFunc 实现类的 init 方法,其中就包括两个最重要的实现类: * HeartbeatSenderInitFunc * CommandCenterInitFunc HeartbeatSenderInitFunc 会启动一个 HeartbeatSender 来定时的向 dashboard 发送自己的心跳包,而 CommandCenterInitFunc 则会启动一个 CommandCenter 对外提供 sentinel-core 的数据服务,而这些数据服务是通过一个一个的 CommandHandler 来提供的,如下图所示: ![](https://img.kancloud.cn/dd/e6/dde65d0129aeb2e1c1504a9c8b5b7313_1021x766.png) ### 总结 现在我们已经知道了 dashboard 是如何获取到实时数据的了,具体的流程如下所示: 1.首先 sentinel-core 向 dashboard 发送心跳包 2.dashboard 将 sentinel-core 的机器信息保存在内存中 3.dashboard 根据 sentinel-core 的机器信息通过 httpClient 获取实时的数据 4.sentinel-core 接收到请求之后,会找到具体的 CommandHandler 来处理 5.sentinel-core 将处理好的结果返回给 dashboard ### 思考 1.数据安全性 >sentinel-dashboard 和 sentinel-core 之间的通讯是基于 http 的,没有进行加密或鉴权,可能会存在数据安全性的问题,不过这些数据并非是很机密的数据,对安全性要求并不是很高,另外增加了鉴权或加密之后,对于性能和实效性有一定的影响。 2.SentinelApiClient >目前所有的数据请求都是通过 SentinelApiClient 类去完成的,该类中充斥着大量的方法,都是发送 http 请求的。代码的可读性和可维护性不高,所以需要对该类进行重构,目前我能够想到的有两种方法: 1)通过将 sentinel-core 注册为 rpc 服务,dashboard 就像调用本地方法一样去调用 sentinel-core 中的方法,不过这样的话需要引入服务注册和发现的依赖了。 2)通过 netty 实现私有的协议,sentinel-core 通过 netty 启动一个 CommandCenter 来对外提供服务。dashboard 通过发送 Packet 来进行数据请求,sentinel-core 来处理 Packet。不过这种方法跟目前的做法没有太大的区别,唯一比较好的可能就是不需要为每种请求都写一个方法,只需要定义好具体的 Packet 就好了。