🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# 7. redis 实现消息队列 #### 1. 介绍 redis有一个数据类型叫list(列表),它的每个子元素都是 string 类型的双向链表。我们可以通过 push,pop 操作从链表的头部或者尾部添加删除元素。这使得 list 既可以用作栈,也可以用作队列。 假如,我们有一个队列系统,把一个个任务放到队列中,另一个进程就把队列中的任务取出来执行。 放到队列我们使用`LPUSH`,也就是往双向链表的尾部填充一个元素,这一端也叫生产者,是产生内容的一端。 另一个进程使用`RPOP`往头部取出元素来执行,这一端也叫消费者。 如果仅仅是这种方式来实现队列,它就是需要进程不断地循环队列,判断队列是不是有新元素,有的话就取出来执行,没有的话,就继续循环,但是这个总有一个时间间隔,你总得规定每隔一段时间去循环,虽然这个时间很小,但总有延迟,这种方式叫作轮循。有没有一种方式就是让不断执行一个redis命令,而redis中的列队有值就会通过命令通知程序呢?有的,那就是阻塞操作的`RPOP`,它叫作`BRPOP`。 官方文档有一篇文章[An introduction to Redis data types and abstractions](http://redis.io/topics/data-types-intro#lists)是介绍了redis的各种数据结构,其中谈到了list。list部分谈到"Blocking operations on lists",这种就是阻塞版本的list,通常就是用它来实现消息队列的。 #### 2. 实现 我们来演示一下它是如何实现的。 ``` $ redis-cli 127.0.0.1:6379> BRPOP list1 0 ``` 先执行BRPOP,假如队列list1没有值,它会返回nil,并且阻塞在那,在等另一个程序或进程往list1中填值。 我们开启另一个`redis`端终。 ``` $ redis-cli 127.0.0.1:6379> LPUSH list1 a (integer) 1 ``` 我们再来看之前的结果。 ``` 127.0.0.1:6379> BRPOP list1 0 1) "list1" 2) "a" (16.99s) ``` 这样就能把列表的值给取到了。 #### 3. ost 下面我们通过这个叫[ost](https://github.com/soveran/ost)的ruby gem来实现消息队列,并来分析它的源码,来了解redis是如何结合编程语言来实现消息队列的。 先把下面一行添加到Gemfile文件中。 ``` gem 'ost' ``` 接着在`config/initializers`添加一个文件叫ost.rb,内容如下。 ``` require "ost" Ost.redis = Redic.new("redis://127.0.0.1:6379") ``` ost是使用一个轻量级的ruby客户端[redic](https://github.com/amakawa/redic)来连接redis的。 消息队列的模型生成两个部分,分别是生产者和消费者,生产者部分就是把访问的文章放到队列中,那就把文章的唯一标识id放到队列就好了。 ``` class ArticlesController < ApplicationController def show @article = Article.find(params[:id]) Ost[:article] << @article.id end end ``` `Ost[:article] << @article.id`这一部分就相当于上文提到的`LPUSH`。 现在可以打开`redis-cli`,运行监控命令来查看redis中的状态。 ``` $ redis-cli 127.0.0.1:6379> monitor OK ``` 我们在页面上随便刷新一篇文章,然后可以在`monitor`中看到类似下面的信息。 ``` 1446890795.971666 [0 127.0.0.1:53622] "LPUSH" "ost:article" "21" ``` 现在生产者好了,要来处理消费者部分。 一般来说我们是要开启另一个进程,但现在我们的重点不在这,我们就用`rails console`来摸似就好了。 在`console`中运行下面的命令。 ``` Ost[:article].each do |article_id| @article = Article.find(article_id) @article.visit_count += 1 @article.save!(validate: false) end ``` 现在到页面上刷新,再观察visit\_count的变化,会发现文章的visit\_count会加1的。 而且在`monitor`中出不断地出现下面的字样。 ``` 1446891057.725337 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2" 1446891059.807253 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2" 1446891061.827532 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2" 1446891063.881999 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2" 1446891065.897304 [0 127.0.0.1:54316] "BRPOPLPUSH" "ost:article" "ost:article:MacintoshdeMacBook-Air.local:4188" "2" ``` `console`中的执令就是调用redis的阻塞式的`RPOP`。 现在整个流程已经完成了,ost是怎么实现的呢,这就需要来分析它的源码。 #### 4. ost源码分析 ost这个gem只有一个[源文件](https://github.com/soveran/ost/blob/master/lib/ost.rb),总共有77行代码。 其中最主要的有下面的部分。 ``` def push(value) redis.call("LPUSH", @key, value) end def each(&block) loop do item = redis.call("BRPOPLPUSH", @key, @backup, TIMEOUT) if item block.call(item) redis.call("LPOP", @backup) end break if @stopping end end alias << push alias pop each ``` `Ost[:article] << @article.id`就对应上面的`push`方法,而`Ost[:article].each`部分就对应`each`方法。 从`each`方法的代码中可以看到调用了loop循环`BRPOPLPUSH`,`BRPOPLPUSH`是另一个阻塞版本的`RPOP`,它可以接超时的时间。 完结。