一、场景描述
很多做服务接口的人或多或少的遇到这样的场景,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统。
也就是面对大流量时,如何进行流量控制?
服务接口的流量控制策略:分流、降级、限流等。本文讨论下限流策略,虽然降低了服务接口的访问频率和并发量,却换取服务接口和业务应用系统的高可用。
实际场景中常用的限流策略:
Nginx前端限流
按照一定的规则如帐号、IP、系统调用逻辑等在Nginx层面做限流
业务应用系统限流
1、客户端限流
2、服务端限流
数据库限流
红线区,力保数据库
二、常用的限流算法
常用的限流算法由:楼桶算法和令牌桶算法。本文不具体的详细说明两种算法的原理,原理会在接下来的文章中做说明。
1、漏桶算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率.示意图如下:
![](http://upload-images.jianshu.io/upload_images/6954572-fb9e39784b70f9f2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(burst),另一个是水桶漏洞的大小(rate)。
因为漏桶的漏出速率是固定的参数,所以,即使网络中不存在资源冲突(没有发生拥塞),漏桶算法也不能使流突发(burst)到端口速率.因此,漏桶算法对于存在突发特性的流量来说缺乏效率.
2、令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
![](http://upload-images.jianshu.io/upload_images/6954572-e270efc4f6e59366.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
令牌桶的另外一个好处是可以方便的改变速度. 一旦需要提高速率,则按需提高放入桶中的令牌的速率. 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量.
三、基于Redis功能的实现
简陋的设计思路:假设一个用户(用IP判断)每分钟访问某一个服务接口的次数不能超过10次,那么我们可以在Redis中创建一个键,并此时我们就设置键的过期时间为60秒,每一个用户对此服务接口的访问就把键值加1,在60秒内当键值增加到10的时候,就禁止访问服务接口。在某种场景中添加访问时间间隔还是很有必要的。
1)使用Redis的incr命令,将计数器作为Lua脚本
```
local current
current = redis.call("incr",KEYS[1])
if tonumber(current) == 1 then
redis.call("expire",KEYS[1],1)
end
```
Lua脚本在Redis中运行,保证了incr和expire两个操作的原子性。
2)使用Reids的列表结构代替incr命令
```
FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
ERROR "too many requests per second"
ELSE
IF EXISTS(ip) == FALSE
MULTI
RPUSH(ip,ip)
EXPIRE(ip,1)
EXEC
ELSE
RPUSHX(ip,ip)
END
PERFORM_API_CALL()
END
```
Rate Limit使用Redis的列表作为容器,LLEN用于对访问次数的检查,一个事物中包含了RPUSH和EXPIRE两个命令,用于在第一次执行计数是创建列表并设置过期时间,
RPUSHX在后续的计数操作中进行增加操作。
四、基于令牌桶算法的实现
令牌桶算法可以很好的支撑突然额流量的变化即满令牌桶数的峰值。
import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.base.Preconditions;
import com.netease.datastream.util.framework.LifeCycle;
20 public class TokenBucket implements LifeCycle {
// 默认桶大小个数 即最大瞬间流量是64M
private static final int DEFAULT_BUCKET_SIZE = 1024 * 1024 * 64;
// 一个桶的单位是1字节
private int everyTokenSize = 1;
// 瞬间最大流量
private int maxFlowRate;
// 平均流量
private int avgFlowRate;
// 队列来缓存桶数量:最大的流量峰值就是 = everyTokenSize*DEFAULT_BUCKET_SIZE 64M = 1 * 1024 * 1024 * 64
private ArrayBlockingQueue<Byte> tokenQueue = new ArrayBlockingQueue<Byte>(DEFAULT_BUCKET_SIZE);
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isStart = false;
private ReentrantLock lock = new ReentrantLock(true);
private static final byte A_CHAR = 'a';
public TokenBucket() {
}
public TokenBucket(int maxFlowRate, int avgFlowRate) {
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public TokenBucket(int everyTokenSize, int maxFlowRate, int avgFlowRate) {
this.everyTokenSize = everyTokenSize;
this.maxFlowRate = maxFlowRate;
this.avgFlowRate = avgFlowRate;
}
public void addTokens(Integer tokenNum) {
// 若是桶已经满了,就不再家如新的令牌
for (int i = 0; i < tokenNum; i++) {
tokenQueue.offer(Byte.valueOf(A_CHAR));
}
}
public TokenBucket build() {
start();
return this;
}
/**
* 获取足够的令牌个数
*
* @return
*/
public boolean getTokens(byte[] dataSize) {
Preconditions.checkNotNull(dataSize);
Preconditions.checkArgument(isStart, "please invoke start method first !");
int needTokenNum = dataSize.length / everyTokenSize + 1;// 传输内容大小对应的桶个数
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean result = needTokenNum <= tokenQueue.size(); // 是否存在足够的桶数量
if (!result) {
return false;
}
int tokenCount = 0;
for (int i = 0; i < needTokenNum; i++) {
Byte poll = tokenQueue.poll();
if (poll != null) {
tokenCount++;
}
}
return tokenCount == needTokenNum;
} finally {
lock.unlock();
}
}
@Override
public void start() {
// 初始化桶队列大小
if (maxFlowRate != 0) {
tokenQueue = new ArrayBlockingQueue<Byte>(maxFlowRate);
}
// 初始化令牌生产者
TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
isStart = true;
}
@Override
public void stop() {
isStart = false;
scheduledExecutorService.shutdown();
}
@Override
public boolean isStarted() {
return isStart;
}
class TokenProducer implements Runnable {
private int avgFlowRate;
private TokenBucket tokenBucket;
public TokenProducer(int avgFlowRate, TokenBucket tokenBucket) {
this.avgFlowRate = avgFlowRate;
this.tokenBucket = tokenBucket;
}
@Override
public void run() {
tokenBucket.addTokens(avgFlowRate);
}
}
public static TokenBucket newBuilder() {
return new TokenBucket();
}
public TokenBucket everyTokenSize(int everyTokenSize) {
this.everyTokenSize = everyTokenSize;
return this;
}
public TokenBucket maxFlowRate(int maxFlowRate) {
this.maxFlowRate = maxFlowRate;
return this;
}
public TokenBucket avgFlowRate(int avgFlowRate) {
this.avgFlowRate = avgFlowRate;
return this;
}
private String stringCopy(String data, int copyNum) {
StringBuilder sbuilder = new StringBuilder(data.length() * copyNum);
for (int i = 0; i < copyNum; i++) {
sbuilder.append(data);
}
return sbuilder.toString();
}
public static void main(String[] args) throws IOException, InterruptedException {
tokenTest();
}
private static void arrayTest() {
ArrayBlockingQueue<Integer> tokenQueue = new ArrayBlockingQueue<Integer>(10);
tokenQueue.offer(1);
tokenQueue.offer(1);
tokenQueue.offer(1);
System.out.println(tokenQueue.size());
System.out.println(tokenQueue.remainingCapacity());
}
private static void tokenTest() throws InterruptedException, IOException {
TokenBucket tokenBucket = TokenBucket.newBuilder().avgFlowRate(512).maxFlowRate(1024).build();
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream("/tmp/ds_test")));
String data = "xxxx";// 四个字节
for (int i = 1; i <= 1000; i++) {
Random random = new Random();
int i1 = random.nextInt(100);
boolean tokens = tokenBucket.getTokens(tokenBucket.stringCopy(data, i1).getBytes());
TimeUnit.MILLISECONDS.sleep(100);
if (tokens) {
bufferedWriter.write("token pass --- index:" + i1);
System.out.println("token pass --- index:" + i1);
} else {
bufferedWriter.write("token rejuect --- index" + i1);
System.out.println("token rejuect --- index" + i1);
}
bufferedWriter.newLine();
bufferedWriter.flush();
}
bufferedWriter.close();
}
}
- 技能知识点
- 对死锁问题的理解
- 文件系统原理:如何用1分钟遍历一个100TB的文件?
- 数据库原理:为什么PrepareStatement性能更好更安全?
- Java Web程序的运行时环境到底是怎样的?
- 你真的知道自己要解决的问题是什么吗?
- 如何解决问题
- 经验分享
- GIT的HTTP方式免密pull、push
- 使用xhprof对php7程序进行性能分析
- 微信扫码登录和使用公众号方式进行扫码登录
- 关于curl跳转抓取
- Linux 下配置 Git 操作免登录 ssh 公钥
- Linux Memcached 安装
- php7安装3.4版本的phalcon扩展
- centos7下php7.0.x安装phalcon框架
- 将字符串按照指定长度分割
- 搜索html源码中标签包的纯文本
- 更换composer镜像源为阿里云
- mac 隐藏文件显示/隐藏
- 谷歌(google)世界各国网址大全
- 实战文档
- PHP7安装intl扩展和linux安装icu
- linux编译安装时常见错误解决办法
- linux删除文件后不释放磁盘空间解决方法
- PHP开启异步多线程执行脚本
- file_exists(): open_basedir restriction in effect. File完美解决方案
- PHP 7.1 安装 ssh2 扩展,用于PHP进行ssh连接
- php命令行加载的php.ini
- linux文件实时同步
- linux下php的psr.so扩展源码安装
- php将字符串中的\n变成真正的换行符?
- PHP7 下安装 memcache 和 memcached 扩展
- PHP 高级面试题 - 如果没有 mb 系列函数,如何切割多字节字符串
- PHP设置脚本最大执行时间的三种方法
- 升级Php 7.4带来的两个大坑
- 不同域名的iframe下,fckeditor在chrome下的SecurityError,解决办法~~
- Linux find+rm -rf 执行组合删除
- 从零搭建Prometheus监控报警系统
- Bug之group_concat默认长度限制
- PHP生成的XML显示无效的Char值27消息(PHP generated XML shows invalid Char value 27 message)
- XML 解析中,如何排除控制字符
- PHP各种时间获取
- nginx配置移动自适应跳转
- 已安装nginx动态添加模块
- auto_prepend_file与auto_append_file使用方法
- 利用nginx实现web页面插入统计代码
- Nginx中的rewrite指令(break,last,redirect,permanent)
- nginx 中 index try_files location 这三个配置项的作用
- linux安装git服务器
- PHP 中运用 elasticsearch
- PHP解析Mysql Binlog
- 好用的PHP学习网(持续更新中)
- 一篇写给准备升级PHP7的小伙伴的文章
- linux 安装php7 -系统centos7
- Linux 下多php 版本共存安装
- PHP编译安装时常见错误解决办法,php编译常见错误
- nginx upstream模块--负载均衡
- 如何解决Tomcat服务器打开不了HOST Manager的问题
- PHP的内存泄露问题与垃圾回收
- Redis数据结构 - string字符串
- PHP开发api接口安全验证
- 服务接口API限流 Rate Limit
- php内核分析---内存管理(一)
- PHP内存泄漏问题解析
- 【代码片-1】 MongoDB与PHP -- 高级查询
- 【代码片-1】 php7 mongoDB 简单封装
- php与mysql系统中出现大量数据库sleep的空连接问题分析
- 解决crond引发大量sendmail、postdrop进程问题
- PHP操作MongoDB GridFS 存储文件,如图片文件
- 浅谈php安全
- linux上keepalived+nginx实现高可用web负载均衡
- 整理php防注入和XSS攻击通用过滤