💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
# 14. listen/notify 之 queue\_classic #### 1. 介绍 [queue\_classic](https://github.com/QueueClassic/queue_classic)是一个ruby的gem,用来实现PostgreSQL的消息队列。它基于[PostgreSQL的listen/notify](http://www.rails365.net/articles/2015-10-12-postgresql-listen-notify-xiao-xi-dui-lie-lie-yi),有很多接口,使用起来比较简单。 如果你有使用过[sidekiq](https://github.com/mperham/sidekiq),[resque](https://github.com/resque/resque),[delayed\_job](https://github.com/collectiveidea/delayed_job),就会发现基本每个这种消息队列的gem的使用方法都是类似的,里面的概念也是差不多,也就是说,学会了[queue\_classic](https://github.com/QueueClassic/queue_classic)就等于会其中三种,只要掌握思想就好了。 #### 2. 安装 假如我们已经有一个rails项目了。 在Gemfile添加下面这行。 ``` gem "queue_classic", "~> 3.0.0" ``` 执行`bundle install` 正如我们上面所说的,任务是需要存储的。所以要创建相应的表来存。 ``` # 创建queue_classic_jobs表 rails generate queue_classic:install # username替换为你自己的数据库的用户名,password是数据库的密码,rails365_dev是数据库名 export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev" bundle exec rake db:migrate ``` #### 3. 测试 我们先在`rails console`里测试。 前面说过,消息队列是跑在一个进程里的。所以要启动那个进程。 ``` bundle exec rake qc:work ``` 你会发现delayed\_job,sidekiq也是差不多的启动方法。 还可以指定队列来启动。 ``` QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work ``` 启动好后,我们进入`rails console`中,执行下面这行语句。 ``` ➜ rails365 git:(master) ✗ rails c Loading development environment (Rails 4.2.3) 2.2.2 :001 > QC.enqueue("Kernel.puts", "hello world") nil ``` 你会在进程里看到类似这样的输出,就说明成功了。 ``` ➜ rails365 git:(master) ✗ bundle exec rake qc:work hello world ``` QC.enqueue就是后面的命令加上参数作为任务push到队列中。 上面只是个最简单的例子,还有可以在指定时间,指定队列来执行,具体更为详细的命令要看官方的readme文档。 如果需要调试或查看日志,可以开启调试功能,有两种不同的日志,分别是: ``` export QC_MEASURE="true" # or export DEBUG="true" ``` 在运行`rake qc:work`之前运行,具体的效果,尝试下就知道的。 #### 4. 在rails中使用 以[本站](https://github.com/yinsigan/rails365)为例,是一个放博客的网站,文章是存放在articles这张表,当时为了查看哪篇文章最受欢迎,就在articles存了一个字段叫visit\_count,但每次用户查看文章时,就会往这个字段加1。这个动作是用rails的[ActiveSupport::Notifications](http://api.rubyonrails.org/classes/ActiveSupport/Notifications.html)配合sidekiq的消息队列来做的,现在要改成用queue\_classic来做。 我们来看下相关的代码。 ``` # app/workers/update_article_visit_count_worker.rb class UpdateArticleVisitCountWorker include Sidekiq::Worker def perform(article_id) logger.info 'update article visit count begin' @article = Article.find(article_id) @article.visit_count += 1 @article.save!(validate: false) logger.info 'update article visit count end' end end ``` 在哪里调用呢,我们是结合ActiveSupport::Notifications来做的,这个先不管,你也可以在articles\_controller的show action直接调用。 ``` # config/initializers/notification.rb ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload| Rails.logger.info payload if payload[:controller] == "ArticlesController" && payload[:action] == "show" UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present? end end ``` 上文提过,queue\_classic主要是利用`QC.enqueue`这条命令把任务push到队列中的。只需要把这行`UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?`改成我们需要的就可以了。 把增加visit\_count的值的逻辑移动model中去,然后在`ActiveSupport::Notifications.subscribe`用`QC.enqueue`中调用就好了。 改造之后是这样的。 ``` # app/models/article.rb class Article < ActiveRecord::Base def self.update_article_visit_count(article_id) article = Article.find(article_id) article.visit_count += 1 article.save!(validate: false) end end ``` ``` # config/initializers/notification.rb ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload| Rails.logger.info payload if payload[:controller] == "ArticlesController" && payload[:action] == "show" QC.enqueue "Article.update_article_visit_count",payload[:params]["id"] if payload[:params]["id"].present? end end ``` 现在需要重启下`rails server`和`bundle exec rake qc:work`。 重启`rails server`运行好`export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"`这个命令。 为了让`rake qc:work`更能明显地看到日志信息,在运行`rake qc:work`前先执行`export QC_MEASURE="true"`。 现在可以去页面上测试的。 #### 5. 注意事项 第一点是关于环境变量,也就是`QC_DATABASE_URL`、`QC_MEASURE`、`DEBUG`这三个,当部署到线上环境时,就要把这三个变量写进shell的配置文件,如比ubuntu系统,就写进~/.bashrc\_profile就好了。 第二点是关于错误的任务,任务也是有可能会报错的,但是我们不知道哪个任务报错了,所以很不方便,其实官方提供了接口的,你自己可以捕获那个错误信息,捕获后就可以进行自己想要的处理了。其实错误的任务都会一直存在表queue\_classic\_jobs中,这样查看就好,至于那个接口,就是[worker.rb](https://github.com/QueueClassic/queue_classic/blob/master/lib/queue_classic/worker.rb)中的handle\_failure方法,官方readme文档也有示例,这里不再深究。 完结。