如果你曾经使用过API来获取服务,那么你可能经受过与速率限制相抗衡。速率限制使得某种资源每次访问的次数受限。资源可以是任何东西:API连接,磁盘I/O,网络包,错误。 你有没有想过为什么会需要制定速率限制?为什么不允许无限制地访问系统?最明显的答案是,通过对系统进行速率限制,可以防止整个系统遭受攻击。如果恶意用户可以在他们的资源允许的情况下极快地访问你的系统,那么他们可以做各种事情。 例如,他们可以用日志消息或有效的请求填满服务器的磁盘。如果你的日志配置错误,他们甚至可能会执行恶意的操作,然后提交足够的请求,将任何活动记录从日志中移出并放入/dev/null中导致日志系统完全崩溃。他们可能试图暴力访问资源,或者执行分布式拒绝服务攻击。如果你没有对系统进行请求限制,你的系统就成了一个走在街上不穿衣服的大美女。 更糟糕的是,非恶意请求有时也会造成上述结果。恶意使用并不是唯一的原因。 在分布式系统中,如果合法用户正在以足够高的速度执行操作,或者他们正在运行的代码有问题,则合法用户可能会降低系统的性能。这甚至可能导致我们之前讨论的死亡螺旋。 从产品的角度来看,这太糟糕了!通常情况下,你希望向用户提供某种类型的性能保证,以确保他们可以在一致的基础上达到不错的性能。如果一个用户可以影响该协议,那么你的日子就不好过了。用户对系统的访问通常被沙盒化,既不会影响其他用户的活动也不会受其他用户影响。如果你打破了这种思维模式,你的系统会表现出糟糕的设计使用感,甚至会导致用户生气或离开。 >我曾经在一个分布式系统上工作,通过启动新的进程来并行扩展(这允许它水平扩展到多台机器)。每个进程都会打开数据库连接,读取一些数据并进行一些计算。一段时间以来,我们在以这种方式扩展系统以满足客户需求方面取得了巨大成功。 但是,一段时间后,我们发现大量的数据库中读取数据超时。 >我们的数据库管理员仔细查看了日志,试图找出问题所在。最后他们发现,由于系统中没有设定任何速率限制,所以进程彼此重叠。由于不同的进程试图从磁盘的不同部分读取数据,因此磁盘竞争将达到100%并一直保持在这个水平。这反过来导致了一系列的循环——超时——重试——循环。工作永远不会完成。 >我们设计了一个系统来限制数据库上可能的连接数量,并且速率限制被放在连接可以读取的秒级单位,问题消失了。虽然客户不得不等待更长的时间,但毕竟他们的工作完成了,我们能够在接下来的时间里进行适当的容量规划,以系统化的方式扩展容量。 速率限制使得你可以通过某个界限来推断系统的性能和稳定性。如果你需要扩展这些边界,可以在大量测试后以可控的方式进行。 在密集用户操作的情况下,速率限制可以使你的系统与用户之间保持良好的关系。你可以允许他们在严格限制的速率下尝试系统的特性。Google的云产品证明了这一点,这种限制在某种意义上是可以保护用户的。 速率限制通常由资源的构建者角度来考虑,但对于用户来说,能够减少速率限制的影响会让其感到非常欣慰。 那么我们怎样用Go来实现呢? 大多数速率限制是通过使用称为令牌桶的算法完成的。这很容易理解,而且相对容易实施。 我们来看看它背后的理论。 假设要使用资源,你必须拥有资源的访问令牌。没有令牌,请求会被拒绝。想象这些令牌存储在等待被检索以供使用的桶中。该桶的深度为d,表示它一次可以容纳d个访问令牌。 现在,每次你需要访问资源时,你都会进入存储桶并删除令牌。如果你的存储桶包含五个令牌,那么您可以访问五次资源。在第六次访问时,没有访问令牌可用,那么必须将请求加入队列,直到令牌变为可用,或拒绝请求。 这里有一个时间表来帮助观察这个概念。time表示以秒为单位的时间增量,bucket表示桶中请求令牌的数量,并且请求列中的tok表示成功的请求。(在这个和未来的时间表中,我们假设这些请求是即时的。) :-: ![](https://box.kancloud.cn/6345e8d9af8548890551b7633a774811_677x295.jpg) 可以看到,在第一秒之前,我们能够完成五个请求,然后我们被阻塞,因为没有更多的令牌可供使用。 到目前为止,这些都很容易理解。那么如何补充令牌呢?在令牌桶算法中,我们将r定义为将令牌添加回桶的速率。 它可以是一纳秒或一分钟。它就是我们通常认为的速率限制:因为我们必须等待新的令牌可用,我们将操作限制为令牌的刷新率。 以下是深度为1的令牌桶示例,速率为1令牌/秒: :-: ![](https://box.kancloud.cn/6947416d0a4c667389ba741b182ae838_678x294.jpg) 可以看到我们立即可以提出请求,但是只能隔一秒钟将请求一次。速率限制执行的非常好! 所以我们现在有两个设置可以摆弄:有多少令牌可用于立即使用,即桶的深度d,以及它们补充的速率r。另外我们还可以考虑下当存储桶已满时可以进行多少次请求。 以下是深度为5的令牌桶示例,速率为0.5令牌/秒: :-: ![](https://box.kancloud.cn/76ba3052e0be5e33bb6a2ea89363a9ea_680x430.jpg) 在这里,我们能够立即提出五个请求,在这之后,每两秒限制一次请求。请求的爆发是在桶最初装满的时候。 请注意,用户可能不会消耗一个长数据流中的整个令牌桶。桶的深度只控制桶的容量。这里有一个用户爆发两次请求的例子,然后四秒钟后爆发了五次: :-: ![](https://box.kancloud.cn/1ffecd7885323d1caad92b79869b1137_683x291.jpg) :-: ![](https://box.kancloud.cn/2c0311f69d82a680c36853d3a56f021d_679x169.jpg) 虽然用户有可用的令牌,但这种爆发性仅允许根据调用者的能力限制访问系统。对于只能间歇性访问系统但希望尽快往返的用户来说,这种爆发性的存在是很好的。你只需要确保系统能够同时处理所有用户的爆发操作,或者确保操作上不可能有足够的用户同时爆发操作以影响你的系统。无论哪种方式,速率限制都可以让你规避风险。 让我们使用这个算法来看看一个Go程序在执行令牌桶算法的时如何表现。 假设我们可以访问API,并且已经提供了客户端来使用它。该API有两个操作:一个用于读取文件,另一个用于将域名解析为IP地址。为了简单起见,我将忽略任何参数并返回实际访问服务所需的值。 所以这是我们的客户端: ``` func Open() *APIConnection { return &APIConnection{} } type APIConnection struct{} func (a *APIConnection) ReadFile(ctx context.Context) error { // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { // Pretend we do work here return nil } ``` 由于理论上这个请求正在通过网络传递,所以我们将context.Context作为第一个参数,以防我们需要取消请求或将值传递给服务器。 通过前面的章节讨论,想必大家已经熟悉了这样的用法。 现在我们将创建一个简单的程序来访问这个API。程序需要读取10个文件并解析10个地址,但这些文件和地址彼此没有关系,因此程序可以并发调用。稍后这将有助于添加利率限制。 ``` func main() { defer log.Printf("Done.") log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) apiConnection := Open() var wg sync.WaitGroup wg.Add(20) for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ReadFile(context.Background()) if err != nil { log.Printf("cannot ReadFile: %v", err) } }() } log.Printf("ReadFile") for i := 0; i < 10; i++ { go func() { defer wg.Done() err := apiConnection.ResolveAddress(context.Background()) if err != nil { log.Printf("cannot ResolveAddress: %v", err) } }() } log.Printf("ResolveAddress") wg.Wait() } ``` 这会输出: ``` 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ResolveAddress 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 ReadFile 20:13:13 Done. ``` 我们可以看到所有的API请求几乎同时进行。由于没有设定速率限制,所以用户可以随意随意访问系统。现在我需要提醒你,当前个可能导致无限循环的错误。 好的,让我们来介绍一个限速器。我打算在APIConnection中这样做,但通常会在服务器上运行速率限制器,这样用户就无法绕过它。生产系统还可能包括一个客户端速率限制器,以防止用户因不必要的调用而被拒绝,但这是一种优化,并不是必需的。就我们的目的而言,限速器使事情变得简单。 我们将看看golang.org/x/time/rate 中的令牌桶速率限制器实现的示例。我选择了这个包,因为这跟我所能得到的标准库较为相近。当然还有其他的软件包可以做更多的花里胡哨的工作。 golang.org/x/time/rate 非常简单,而且它适用于我们的目的。 我们将与这个包交互的前两种方法是Limit类型和NewLimiter函数,在这里定义: ``` // Limit 定义了事件的最大频率。 // Limit 被表示为每秒事件的数量。 // 值为0的Limit不允许任何事件。 type Limit float64 // ewLimiter返回一个新的Limiter实例, // 件发生率为r,并允许至多b个令牌爆发。 func NewLimiter(r Limit, b int) *Limiter ``` 在NewLimiter中,我们看到了两个熟悉的参数:r ,以及 b.r。b是我们之前讨论过的桶的深度。 rate 包还定义了一个有用的函数,Every,帮助将time.Duration转换为Limit: ``` // Every将事件之间的最小时间间隔转换为Limit。 func Every(interval time.Duration) Limit ``` Every函数是有意义的,但我想讨论每次rate限制了操作次数,而不是请求之间的时间间隔。 我们可以将其表述如下: ``` rate.Limit(events/timePeriod.Seconds()) ``` 但是我不想每次都输入这个值,Every函数有一些特殊的逻辑会返回rate.Inf——表示没有限制——如果提供的时间间隔为零。 正因为如此,我们将用这个词来表达我们的帮助函数: ``` func Per(eventCount int, duration time.Duration) rate.Limit { return rate.Every(duration/time.Duration(eventCount)) } ``` 在我们建立rate.Limiter后,我们希望使用它来阻塞我们的请求,直到获得访问令牌。 我们可以用Wait方法来做到这一点,它只是用参数1来调用WaitN: ``` // Wait 是 WaitN(ctx, 1)的简写。 func (lim *Limiter) Wait(ctx context.Context) // WaitN 会发生阻塞直到 lim 允许的 n 个事件执行。 // 它返回一个 error 如果 n 超过了 Limiter的桶大小, // Context会被取消, 或等待的时间超过了 Context 的 Deadline。 func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) ``` 我们已经集齐了限制API请求的所有要素。让我们修改APIConnection类型并尝试一下: ``` func Open() *APIConnection { return &APIConnection{ rateLimiter: rate.NewLimiter(rate.Limit(1), 1) //1 } } type APIConnection struct { rateLimiter *rate.Limiter } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { //2 return err } // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { //2 return err } // Pretend we do work here return nil } ``` 1. 我们将所有API连接的速率限制设置为每秒一个事件。 2. 我们等待速率限制器有足够的权限来完成我们的请求。 这会输出: ``` 22:08:30 ResolveAddress 22:08:31 ReadFile 22:08:32 ReadFile 22:08:33 ReadFile 22:08:34 ResolveAddress 22:08:35 ResolveAddress 22:08:36 ResolveAddress 22:08:37 ResolveAddress 22:08:38 ResolveAddress 22:08:39 ReadFile 22:08:40 ResolveAddress 22:08:41 ResolveAddress 22:08:42 ResolveAddress 22:08:43 ResolveAddress 22:08:44 ReadFile 22:08:45 ReadFile 22:08:46 ReadFile 22:08:47 ReadFile 22:08:48 ReadFile 22:08:49 ReadFile 22:08:49 Done. ``` 在生产中,我们可能会想要一些更复杂的东西。我们可能希望建立多层限制:细粒度控制以限制每秒请求数,粗粒度控制限制每分钟,每小时或每天的请求数。 在某些情况下,可以通过速率限制器来实现; 然而,在所有情况下都这么干是不可能的,并且通过尝试将单位时间的限制语义转换为单一层,你会因速率限制器而丢失大量信息。出于这些原因,我发现将限制器分开并将它们组合成一个限速器来管理交互更容易。 为此,我创建了一个简单的聚合速率限制器multiLimiter。 这是定义: ``` type RateLimiter interface { //1 Wait(context.Context) error Limit() rate.Limit } func MultiLimiter(limiters ...RateLimiter) *multiLimiter { byLimit := func(i, j int) bool { return limiters[i].Limit() < limiters[j].Limit() } sort.Slice(limiters, byLimit) //2 return &multiLimiter{limiters: limiters} } type multiLimiter struct { limiters []RateLimiter } func (l *multiLimiter) Wait(ctx context.Context) error { for _, l := range l.limiters { if err := l.Wait(ctx); err != nil { return err } } return nil } func (l *multiLimiter) Limit() rate.Limit { return l.limiters[0].Limit() //3 } ``` 1. 这里我们定义一个RateLimiter接口,以便MultiLimiter可以递归地定义其他MultiLimiter实例。 2. 这里我们实现一个优化,并按照每个RateLimiter的Limit()行排序。 3. 因为我们在multiLimiter实例化时对子RateLimiter实例进行排序,所以我们可以简单地返回限制性最高的limit,这将是切片中的第一个元素。 Wait方法遍历所有子限制器,并在每一个子限制器上调用Wait。这些调用可能会也可能不会阻塞,但我们需要通知每个子限制器,以便减少令牌桶内的令牌数量。通过等待每个限制器,我们保证最长的等待时间。这是因为如果我们执行时间较小的等待,那么最长的等待时间将被重新计算为剩余时间。在较早的等待被阻塞时,后者会等待令牌桶的填充。 经过思考,让我们重新定义APIConnection,对每秒和每分钟都进行限制: ``` func Open() *APIConnection { secondLimit := rate.NewLimiter(Per(2, time.Second), 1) //1 minuteLimit := rate.NewLimiter(Per(10, time.Minute), 10) //2 return &APIConnection{ rateLimiter: MultiLimiter(secondLimit, minuteLimit) //3 } } type APIConnection struct { rateLimiter RateLimiter } func (a *APIConnection) ReadFile(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { if err := a.rateLimiter.Wait(ctx); err != nil { return err } // Pretend we do work here return nil } ``` 1. 我们定义了每秒的极限。 2. 我们定义每分钟的突发极限为10,为用户提供初始池。每秒的限制将确保我们不会因请求而使系统过载。 3. 我们结合这两个限制,并将其设置为APIConnection的主限制器。 这会输出: ``` 22:46:10 ResolveAddress 22:46:10 ReadFile 22:46:11 ReadFile 22:46:11 ReadFile 22:46:12 ReadFile 22:46:12 ReadFile 22:46:13 ReadFile 22:46:13 ReadFile 22:46:14 ReadFile 22:46:14 ReadFile 22:46:16 ResolveAddress 22:46:22 ResolveAddress 22:46:28 ReadFile 22:46:34 ResolveAddress 22:46:40 ResolveAddress 22:46:46 ResolveAddress 22:46:52 ResolveAddress 22:46:58 ResolveAddress 22:47:04 ResolveAddress 22:47:10 ResolveAddress 22:47:10 Done. ``` 正如您所看到的,我们每秒发出两个请求,直到请求#11,此时我们开始每隔六秒发出一次请求。 这是因为我们耗尽了我们可用的每分钟请求令牌池,并受此限制。 为什么请求#11在两秒内发生而不是像其他请求那样发生,这可能有点违反直觉。 请记住,尽管我们将API请求限制为每分钟10个,但一分钟是一个动态的时间窗口。 当我们达到第十一个要求时,我们的每分钟限制器已经累积了另一个令牌。 通过定义这样的限制,我们可以清楚地表达了粗粒度限制,同时仍然以更详细的细节水平限制请求数量。 这种技术也使我们能够开始思考除时间之外的其他维度。 当你对系统进行限制时,你可能会限制不止一件事。 也可能对API请求的数量有一些限制,也会对其他资源(如磁盘访问,网络访问等)有限制。让我们稍微充实一下示例并设置磁盘和网络限制 ``` func Open() *APIConnection { return &APIConnection{ apiLimit: MultiLimiter( //1 rate.NewLimiter(Per(2, time.Second), 2), rate.NewLimiter(Per(10, time.Minute), 10)), diskLimit: MultiLimiter(rate.NewLimiter(rate.Limit(1), 1)), //2 networkLimit: MultiLimiter(rate.NewLimiter(Per(3, time.Second), 3)) //3 } } type APIConnection struct { networkLimit, diskLimit, apiLimit RateLimiter } func (a *APIConnection) ReadFile(ctx context.Context) error { err := MultiLimiter(a.apiLimit, a.diskLimit).Wait(ctx) //4 if err != nil { return err } // Pretend we do work here return nil } func (a *APIConnection) ResolveAddress(ctx context.Context) error { err := MultiLimiter(a.apiLimit, a.networkLimit).Wait(ctx) //5 if err != nil { return err } // Pretend we do work here return nil } ``` 1. 我们为API调用设置了一个限制器。 每秒请求数和每分钟请求数都有限制 2. 我们为磁盘读取设置一个限制器。将其限制为每秒一次读取。 3. 对于网络,我们将设置每秒三个请求的限制。 4. 当我们读取文件时,将结合来自API限制器和磁盘限制器的限制。 5. 当我们需要网络访问时,将结合来自API限制器和网络限制器的限制。 这会输出: ``` 01:40:15 ResolveAddress 01:40:15 ReadFile 01:40:16 ReadFile 01:40:17 ResolveAddress 01:40:17 ResolveAddress 01:40:17 ReadFile 01:40:18 ResolveAddress 01:40:18 ResolveAddress 01:40:19 ResolveAddress 01:40:19 ResolveAddress 01:40:21 ResolveAddress 01:40:27 ResolveAddress 01:40:33 ResolveAddress 01:40:39 ReadFile 01:40:45 ReadFile 01:40:51 ReadFile 01:40:57 ReadFile 01:41:03 ReadFile 01:41:09 ReadFile 01:41:15 ReadFile 01:41:15 Done. ``` 让我们关注一下这样的事实,即我们可以将限制器组合成对每个调用都有意义的组,并且让APIClient执行正确的操作。如果我们想随便观察一下它是如何工作的,会注意到涉及网络访问的API调用似乎以更加规律的方式发生,并在前三分之二的调用中完成。这可能与goroutine调度有关,也是我们的限速器正在执行各自的工作。 我还应该提到的是,rate.Limiter类型有一些其他的技巧来优化不同的用例。这里只讨论了它等待令牌桶接收另一个令牌的能力,但如果你有兴趣使用它,可以查阅文档。 在本节中,我们考察了使用速率限制的理由,构建了令牌桶算法的Go实现,以及如何将令牌桶限制器组合为更大,更复杂的速率限制器。这应该能让你很好地了解速率限制,并帮助你开始使用它们。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。