如果你曾经使用过API来获取服务,那么你可能经受过与速率限制相抗衡。速率限制使得某种资源每次访问的次数受限。资源可以是任何东西:API连接,磁盘I/O,网络包,错误。
你有没有想过为什么会需要制定速率限制?为什么不允许无限制地访问系统?最明显的答案是,通过对系统进行速率限制,可以防止整个系统遭受攻击。如果恶意用户可以在他们的资源允许的情况下极快地访问你的系统,那么他们可以做各种事情。
例如,他们可以用日志消息或有效的请求填满服务器的磁盘。如果你的日志配置错误,他们甚至可能会执行恶意的操作,然后提交足够的请求,将任何活动记录从日志中移出并放入/dev/null中导致日志系统完全崩溃。他们可能试图暴力访问资源,或者执行分布式拒绝服务攻击。如果你没有对系统进行请求限制,你的系统就成了一个走在街上不穿衣服的大美女。
更糟糕的是,非恶意请求有时也会造成上述结果。恶意使用并不是唯一的原因。 在分布式系统中,如果合法用户正在以足够高的速度执行操作,或者他们正在运行的代码有问题,则合法用户可能会降低系统的性能。这甚至可能导致我们之前讨论的死亡螺旋。 从产品的角度来看,这太糟糕了!通常情况下,你希望向用户提供某种类型的性能保证,以确保他们可以在一致的基础上达到不错的性能。如果一个用户可以影响该协议,那么你的日子就不好过了。用户对系统的访问通常被沙盒化,既不会影响其他用户的活动也不会受其他用户影响。如果你打破了这种思维模式,你的系统会表现出糟糕的设计使用感,甚至会导致用户生气或离开。
>我曾经在一个分布式系统上工作,通过启动新的进程来并行扩展(这允许它水平扩展到多台机器)。每个进程都会打开数据库连接,读取一些数据并进行一些计算。一段时间以来,我们在以这种方式扩展系统以满足客户需求方面取得了巨大成功。 但是,一段时间后,我们发现大量的数据库中读取数据超时。
>我们的数据库管理员仔细查看了日志,试图找出问题所在。最后他们发现,由于系统中没有设定任何速率限制,所以进程彼此重叠。由于不同的进程试图从磁盘的不同部分读取数据,因此磁盘竞争将达到100%并一直保持在这个水平。这反过来导致了一系列的循环——超时——重试——循环。工作永远不会完成。
>我们设计了一个系统来限制数据库上可能的连接数量,并且速率限制被放在连接可以读取的秒级单位,问题消失了。虽然客户不得不等待更长的时间,但毕竟他们的工作完成了,我们能够在接下来的时间里进行适当的容量规划,以系统化的方式扩展容量。
速率限制使得你可以通过某个界限来推断系统的性能和稳定性。如果你需要扩展这些边界,可以在大量测试后以可控的方式进行。
在密集用户操作的情况下,速率限制可以使你的系统与用户之间保持良好的关系。你可以允许他们在严格限制的速率下尝试系统的特性。Google的云产品证明了这一点,这种限制在某种意义上是可以保护用户的。
速率限制通常由资源的构建者角度来考虑,但对于用户来说,能够减少速率限制的影响会让其感到非常欣慰。
那么我们怎样用Go来实现呢?
大多数速率限制是通过使用称为令牌桶的算法完成的。这很容易理解,而且相对容易实施。 我们来看看它背后的理论。
假设要使用资源,你必须拥有资源的访问令牌。没有令牌,请求会被拒绝。想象这些令牌存储在等待被检索以供使用的桶中。该桶的深度为d,表示它一次可以容纳d个访问令牌。
现在,每次你需要访问资源时,你都会进入存储桶并删除令牌。如果你的存储桶包含五个令牌,那么您可以访问五次资源。在第六次访问时,没有访问令牌可用,那么必须将请求加入队列,直到令牌变为可用,或拒绝请求。
这里有一个时间表来帮助观察这个概念。time表示以秒为单位的时间增量,bucket表示桶中请求令牌的数量,并且请求列中的tok表示成功的请求。(在这个和未来的时间表中,我们假设这些请求是即时的。)
:-: ![](https://box.kancloud.cn/6345e8d9af8548890551b7633a774811_677x295.jpg)
可以看到,在第一秒之前,我们能够完成五个请求,然后我们被阻塞,因为没有更多的令牌可供使用。
到目前为止,这些都很容易理解。那么如何补充令牌呢?在令牌桶算法中,我们将r定义为将令牌添加回桶的速率。 它可以是一纳秒或一分钟。它就是我们通常认为的速率限制:因为我们必须等待新的令牌可用,我们将操作限制为令牌的刷新率。
以下是深度为1的令牌桶示例,速率为1令牌/秒:
:-: ![](https://box.kancloud.cn/6947416d0a4c667389ba741b182ae838_678x294.jpg)
可以看到我们立即可以提出请求,但是只能隔一秒钟将请求一次。速率限制执行的非常好!
所以我们现在有两个设置可以摆弄:有多少令牌可用于立即使用,即桶的深度d,以及它们补充的速率r。另外我们还可以考虑下当存储桶已满时可以进行多少次请求。
以下是深度为5的令牌桶示例,速率为0.5令牌/秒:
:-: ![](https://box.kancloud.cn/76ba3052e0be5e33bb6a2ea89363a9ea_680x430.jpg)
在这里,我们能够立即提出五个请求,在这之后,每两秒限制一次请求。请求的爆发是在桶最初装满的时候。
请注意,用户可能不会消耗一个长数据流中的整个令牌桶。桶的深度只控制桶的容量。这里有一个用户爆发两次请求的例子,然后四秒钟后爆发了五次:
:-: ![](https://box.kancloud.cn/1ffecd7885323d1caad92b79869b1137_683x291.jpg)
:-: ![](https://box.kancloud.cn/2c0311f69d82a680c36853d3a56f021d_679x169.jpg)
虽然用户有可用的令牌,但这种爆发性仅允许根据调用者的能力限制访问系统。对于只能间歇性访问系统但希望尽快往返的用户来说,这种爆发性的存在是很好的。你只需要确保系统能够同时处理所有用户的爆发操作,或者确保操作上不可能有足够的用户同时爆发操作以影响你的系统。无论哪种方式,速率限制都可以让你规避风险。
让我们使用这个算法来看看一个Go程序在执行令牌桶算法的时如何表现。
假设我们可以访问API,并且已经提供了客户端来使用它。该API有两个操作:一个用于读取文件,另一个用于将域名解析为IP地址。为了简单起见,我将忽略任何参数并返回实际访问服务所需的值。 所以这是我们的客户端:
```
func Open() *APIConnection {
return &APIConnection{}
}
type APIConnection struct{}
func (a *APIConnection) ReadFile(ctx context.Context) error {
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
// Pretend we do work here
return nil
}
```
由于理论上这个请求正在通过网络传递,所以我们将context.Context作为第一个参数,以防我们需要取消请求或将值传递给服务器。 通过前面的章节讨论,想必大家已经熟悉了这样的用法。
现在我们将创建一个简单的程序来访问这个API。程序需要读取10个文件并解析10个地址,但这些文件和地址彼此没有关系,因此程序可以并发调用。稍后这将有助于添加利率限制。
```
func main() {
defer log.Printf("Done.")
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
apiConnection := Open()
var wg sync.WaitGroup
wg.Add(20)
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ReadFile(context.Background())
if err != nil {
log.Printf("cannot ReadFile: %v", err)
}
}()
}
log.Printf("ReadFile")
for i := 0; i < 10; i++ {
go func() {
defer wg.Done()
err := apiConnection.ResolveAddress(context.Background())
if err != nil {
log.Printf("cannot ResolveAddress: %v", err)
}
}()
}
log.Printf("ResolveAddress")
wg.Wait()
}
```
这会输出:
```
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ResolveAddress
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 ReadFile
20:13:13 Done.
```
我们可以看到所有的API请求几乎同时进行。由于没有设定速率限制,所以用户可以随意随意访问系统。现在我需要提醒你,当前个可能导致无限循环的错误。
好的,让我们来介绍一个限速器。我打算在APIConnection中这样做,但通常会在服务器上运行速率限制器,这样用户就无法绕过它。生产系统还可能包括一个客户端速率限制器,以防止用户因不必要的调用而被拒绝,但这是一种优化,并不是必需的。就我们的目的而言,限速器使事情变得简单。
我们将看看golang.org/x/time/rate 中的令牌桶速率限制器实现的示例。我选择了这个包,因为这跟我所能得到的标准库较为相近。当然还有其他的软件包可以做更多的花里胡哨的工作。 golang.org/x/time/rate 非常简单,而且它适用于我们的目的。
我们将与这个包交互的前两种方法是Limit类型和NewLimiter函数,在这里定义:
```
// Limit 定义了事件的最大频率。
// Limit 被表示为每秒事件的数量。
// 值为0的Limit不允许任何事件。
type Limit float64
// ewLimiter返回一个新的Limiter实例,
// 件发生率为r,并允许至多b个令牌爆发。
func NewLimiter(r Limit, b int) *Limiter
```
在NewLimiter中,我们看到了两个熟悉的参数:r ,以及 b.r。b是我们之前讨论过的桶的深度。
rate 包还定义了一个有用的函数,Every,帮助将time.Duration转换为Limit:
```
// Every将事件之间的最小时间间隔转换为Limit。
func Every(interval time.Duration) Limit
```
Every函数是有意义的,但我想讨论每次rate限制了操作次数,而不是请求之间的时间间隔。 我们可以将其表述如下:
```
rate.Limit(events/timePeriod.Seconds())
```
但是我不想每次都输入这个值,Every函数有一些特殊的逻辑会返回rate.Inf——表示没有限制——如果提供的时间间隔为零。 正因为如此,我们将用这个词来表达我们的帮助函数:
```
func Per(eventCount int, duration time.Duration) rate.Limit {
return rate.Every(duration/time.Duration(eventCount))
}
```
在我们建立rate.Limiter后,我们希望使用它来阻塞我们的请求,直到获得访问令牌。 我们可以用Wait方法来做到这一点,它只是用参数1来调用WaitN:
```
// Wait 是 WaitN(ctx, 1)的简写。
func (lim *Limiter) Wait(ctx context.Context)
// WaitN 会发生阻塞直到 lim 允许的 n 个事件执行。
// 它返回一个 error 如果 n 超过了 Limiter的桶大小,
// Context会被取消, 或等待的时间超过了 Context 的 Deadline。
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
```
我们已经集齐了限制API请求的所有要素。让我们修改APIConnection类型并尝试一下:
```
func Open() *APIConnection {
return &APIConnection{
rateLimiter: rate.NewLimiter(rate.Limit(1), 1) //1
}
}
type APIConnection struct {
rateLimiter *rate.Limiter
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil { //2
return err
}
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil { //2
return err
}
// Pretend we do work here
return nil
}
```
1. 我们将所有API连接的速率限制设置为每秒一个事件。
2. 我们等待速率限制器有足够的权限来完成我们的请求。
这会输出:
```
22:08:30 ResolveAddress
22:08:31 ReadFile
22:08:32 ReadFile
22:08:33 ReadFile
22:08:34 ResolveAddress
22:08:35 ResolveAddress
22:08:36 ResolveAddress
22:08:37 ResolveAddress
22:08:38 ResolveAddress
22:08:39 ReadFile
22:08:40 ResolveAddress
22:08:41 ResolveAddress
22:08:42 ResolveAddress
22:08:43 ResolveAddress
22:08:44 ReadFile
22:08:45 ReadFile
22:08:46 ReadFile
22:08:47 ReadFile
22:08:48 ReadFile
22:08:49 ReadFile
22:08:49 Done.
```
在生产中,我们可能会想要一些更复杂的东西。我们可能希望建立多层限制:细粒度控制以限制每秒请求数,粗粒度控制限制每分钟,每小时或每天的请求数。
在某些情况下,可以通过速率限制器来实现; 然而,在所有情况下都这么干是不可能的,并且通过尝试将单位时间的限制语义转换为单一层,你会因速率限制器而丢失大量信息。出于这些原因,我发现将限制器分开并将它们组合成一个限速器来管理交互更容易。 为此,我创建了一个简单的聚合速率限制器multiLimiter。 这是定义:
```
type RateLimiter interface { //1
Wait(context.Context) error
Limit() rate.Limit
}
func MultiLimiter(limiters ...RateLimiter) *multiLimiter {
byLimit := func(i, j int) bool {
return limiters[i].Limit() < limiters[j].Limit()
}
sort.Slice(limiters, byLimit) //2
return &multiLimiter{limiters: limiters}
}
type multiLimiter struct {
limiters []RateLimiter
}
func (l *multiLimiter) Wait(ctx context.Context) error {
for _, l := range l.limiters {
if err := l.Wait(ctx); err != nil {
return err
}
}
return nil
}
func (l *multiLimiter) Limit() rate.Limit {
return l.limiters[0].Limit() //3
}
```
1. 这里我们定义一个RateLimiter接口,以便MultiLimiter可以递归地定义其他MultiLimiter实例。
2. 这里我们实现一个优化,并按照每个RateLimiter的Limit()行排序。
3. 因为我们在multiLimiter实例化时对子RateLimiter实例进行排序,所以我们可以简单地返回限制性最高的limit,这将是切片中的第一个元素。
Wait方法遍历所有子限制器,并在每一个子限制器上调用Wait。这些调用可能会也可能不会阻塞,但我们需要通知每个子限制器,以便减少令牌桶内的令牌数量。通过等待每个限制器,我们保证最长的等待时间。这是因为如果我们执行时间较小的等待,那么最长的等待时间将被重新计算为剩余时间。在较早的等待被阻塞时,后者会等待令牌桶的填充。
经过思考,让我们重新定义APIConnection,对每秒和每分钟都进行限制:
```
func Open() *APIConnection {
secondLimit := rate.NewLimiter(Per(2, time.Second), 1) //1
minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) //2
return &APIConnection{
rateLimiter: MultiLimiter(secondLimit, minuteLimit) //3
}
}
type APIConnection struct {
rateLimiter RateLimiter
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
if err := a.rateLimiter.Wait(ctx); err != nil {
return err
}
// Pretend we do work here
return nil
}
```
1. 我们定义了每秒的极限。
2. 我们定义每分钟的突发极限为10,为用户提供初始池。每秒的限制将确保我们不会因请求而使系统过载。
3. 我们结合这两个限制,并将其设置为APIConnection的主限制器。
这会输出:
```
22:46:10 ResolveAddress
22:46:10 ReadFile
22:46:11 ReadFile
22:46:11 ReadFile
22:46:12 ReadFile
22:46:12 ReadFile
22:46:13 ReadFile
22:46:13 ReadFile
22:46:14 ReadFile
22:46:14 ReadFile
22:46:16 ResolveAddress
22:46:22 ResolveAddress
22:46:28 ReadFile
22:46:34 ResolveAddress
22:46:40 ResolveAddress
22:46:46 ResolveAddress
22:46:52 ResolveAddress
22:46:58 ResolveAddress
22:47:04 ResolveAddress
22:47:10 ResolveAddress
22:47:10 Done.
```
正如您所看到的,我们每秒发出两个请求,直到请求#11,此时我们开始每隔六秒发出一次请求。 这是因为我们耗尽了我们可用的每分钟请求令牌池,并受此限制。
为什么请求#11在两秒内发生而不是像其他请求那样发生,这可能有点违反直觉。 请记住,尽管我们将API请求限制为每分钟10个,但一分钟是一个动态的时间窗口。 当我们达到第十一个要求时,我们的每分钟限制器已经累积了另一个令牌。
通过定义这样的限制,我们可以清楚地表达了粗粒度限制,同时仍然以更详细的细节水平限制请求数量。
这种技术也使我们能够开始思考除时间之外的其他维度。 当你对系统进行限制时,你可能会限制不止一件事。 也可能对API请求的数量有一些限制,也会对其他资源(如磁盘访问,网络访问等)有限制。让我们稍微充实一下示例并设置磁盘和网络限制
```
func Open() *APIConnection {
return &APIConnection{
apiLimit: MultiLimiter( //1
rate.NewLimiter(Per(2, time.Second), 2),
rate.NewLimiter(Per(10, time.Minute), 10)),
diskLimit: MultiLimiter(rate.NewLimiter(rate.Limit(1), 1)), //2
networkLimit: MultiLimiter(rate.NewLimiter(Per(3, time.Second), 3)) //3
}
}
type APIConnection struct {
networkLimit,
diskLimit,
apiLimit RateLimiter
}
func (a *APIConnection) ReadFile(ctx context.Context) error {
err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx) //4
if err != nil {
return err
}
// Pretend we do work here
return nil
}
func (a *APIConnection) ResolveAddress(ctx context.Context) error {
err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx) //5
if err != nil {
return err
}
// Pretend we do work here
return nil
}
```
1. 我们为API调用设置了一个限制器。 每秒请求数和每分钟请求数都有限制
2. 我们为磁盘读取设置一个限制器。将其限制为每秒一次读取。
3. 对于网络,我们将设置每秒三个请求的限制。
4. 当我们读取文件时,将结合来自API限制器和磁盘限制器的限制。
5. 当我们需要网络访问时,将结合来自API限制器和网络限制器的限制。
这会输出:
```
01:40:15 ResolveAddress
01:40:15 ReadFile
01:40:16 ReadFile
01:40:17 ResolveAddress
01:40:17 ResolveAddress
01:40:17 ReadFile
01:40:18 ResolveAddress
01:40:18 ResolveAddress
01:40:19 ResolveAddress
01:40:19 ResolveAddress
01:40:21 ResolveAddress
01:40:27 ResolveAddress
01:40:33 ResolveAddress
01:40:39 ReadFile
01:40:45 ReadFile
01:40:51 ReadFile
01:40:57 ReadFile
01:41:03 ReadFile
01:41:09 ReadFile
01:41:15 ReadFile
01:41:15 Done.
```
让我们关注一下这样的事实,即我们可以将限制器组合成对每个调用都有意义的组,并且让APIClient执行正确的操作。如果我们想随便观察一下它是如何工作的,会注意到涉及网络访问的API调用似乎以更加规律的方式发生,并在前三分之二的调用中完成。这可能与goroutine调度有关,也是我们的限速器正在执行各自的工作。
我还应该提到的是,rate.Limiter类型有一些其他的技巧来优化不同的用例。这里只讨论了它等待令牌桶接收另一个令牌的能力,但如果你有兴趣使用它,可以查阅文档。
在本节中,我们考察了使用速率限制的理由,构建了令牌桶算法的Go实现,以及如何将令牌桶限制器组合为更大,更复杂的速率限制器。这应该能让你很好地了解速率限制,并帮助你开始使用它们。
* * * * *
学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
- 前序
- 谁适合读这本书
- 章节导读
- 在线资源
- 第一章 并发编程介绍
- 摩尔定律,可伸缩网络和我们所处的困境
- 为什么并发编程如此困难
- 数据竞争
- 原子性
- 内存访问同步
- 死锁,活锁和锁的饥饿问题
- 死锁
- 活锁
- 饥饿
- 并发安全性
- 优雅的面对复杂性
- 第二章 代码建模:序列化交互处理
- 并发与并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并发哲学
- 第三章 Go的并发构建模块
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select语句
- GOMAXPROCS
- 结论
- 第四章 Go的并发编程范式
- 访问范围约束
- fo-select循环
- 防止Goroutine泄漏
- or-channel
- 错误处理
- 管道
- 构建管道的最佳实践
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 队列
- context包
- 小结
- 第五章 可伸缩并发设计
- 错误传递
- 超时和取消
- 心跳
- 请求并发复制处理
- 速率限制
- Goroutines异常行为修复
- 本章小结
- 第六章 Goroutines和Go运行时
- 任务调度