# Beanstalkd任务队列
> 高性能离不开异步,异步离不开队列。
## Beanstalkd是什么
Beanstalkd 是一个轻量级消息中间件,它最大特点是将自己定位为基于管道 (tube) 和任务 (job) 的工作队列。
Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。它的内部采用libevent,服务器-客户端之间采用类似Memcached的轻量级通讯协议,因此性能很高(enque: 9000 jobs/second, worker: 5200 jobs/second)。
尽管是内存队列, Beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。Beanstalkd支持过有9.5 million用户的Facebook Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过Memcached的用户会觉得Beanstalkd似曾相识。
Beanstalkd支持的语言有很多,可以参考这里:https://github.com/kr/beanstalkd/wiki/client-libraries
## Beanstalkd设计的核心概念
### job
一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。
### tube
一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。
### producer
Job的生产者,通过put命令来将一个job放到一个tube中。
### consumer
Job的消费者,通过`reserve/release/bury/delete`命令来获取job或改变job的状态。
当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。
consumer获取了当前READY的job后,该job的状态就迁移到RESERVED,这样其他的consumer就不能再操作该job。
当consumer完成该job后,可以选择delete, release或者bury操作:
* delete操作,job从系统消亡,之后不能再获取;
* release操作,可以重新把该job状态迁移回READY(也可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;
* bury操作,可以把该job休眠,等到需要的时候,再将休眠的job kick回READY状态,也可以delete buride状态的job。
![](https://box.kancloud.cn/2ee937085397575f7a28d2be61ca7e93_600x254.png)
#### 任务优先级 (priority):
任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn).
#### 延时任务 (delay):
有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with <delay>)。这种机制可以实现分布式的 Java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。
#### 任务超时重发 (time-to-run):
Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run).
#### 任务预留 (buried):
如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。
### Beanstalkd 协议
Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。
这些命令可以简单的分成三组:
1. 生产类 - use <tube> / put <priority> <delay> <ttr> [bytes]:
2. 生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).
3. 消费类 - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>
消费者用 watch 选择多个管道 (tube), 然后用 reserve 命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE), 释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。
维护类 - peek job / peek delayed / peek ready / peek buried / kick <n>
用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。
被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。
> 协议文档: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt
正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。
## Beanstalkd有什么不足?
Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,可以用数据库为任务 (job) 提供持久化存储。 和 Memcached 类似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点可以通过单机部署多个实例克服。
![](https://box.kancloud.cn/8e4eb7ee2aac45fbb31490c131b5571b_544x327.png)
一个Beanstalkd尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。
Beanstalk速度非常快,协议简单,占用内存空间少,而且支持持久化。唯一的不足是挂了之后恢复慢,3G日志数据恢复了十多分钟。如何安装Beanstalkd?
## 安装
使用下面的命令进行安装,同时查看版本:
sudo apt-get install beanstalkd
beanstalkd -v
beanstalkd 1.9
Beanstalkd可以使用以下命令停止和启动:
root@ubuntu-vagrant:/usr/local/nginx/conf# service beanstalkd stop
* Stopping in-memory queueing server beanstalkd [ OK ]
root@ubuntu-vagrant:/usr/local/nginx/conf# service beanstalkd start
* Starting in-memory queueing server beanstalkd
root@ubuntu-vagrant:/usr/local/nginx/conf# service beanstalkd start
通过apt-get安装后的配置文件目录在/etc/default/beanstalkd,里面描述了Beanstalkd监听的地址和端口:
root@ubuntu-vagrant:/usr/local/nginx/conf# cat /etc/default/beanstalkd
## Defaults for the beanstalkd init script, /etc/init.d/beanstalkd on
## Debian systems.
BEANSTALKD_LISTEN_ADDR=127.0.0.1
BEANSTALKD_LISTEN_PORT=11300
# You can use BEANSTALKD_EXTRA to pass additional options. See beanstalkd(1)
# for a list of the available options. Uncomment the following line for
# persistent job storage.
# BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
## 使用composer组件来管理它
[GitHub - davidpersson/beanstalk: Minimalistic PHP client for beanstalkd.](https://github.com/davidpersson/beanstalk)我们可以使用这个组件来帮助操作beanstalkd。
### 安装composer依赖组件
此处我用composer没有拉下来,换了种方式直接git clone。
git clone https://github.com/davidpersson/beanstalk.git
### 代码实现
<?php
use Beanstalk\Client;
//
// A sample producer.
//
$beanstalk = new Client(); // For connection options see the
// class documentation.
$beanstalk->connect();
$beanstalk->useTube('flux'); // Begin to use tube `'flux'`.
$beanstalk->put(
23, // Give the job a priority of 23.
0, // Do not wait to put job into the ready queue.
60, // Give the job 1 minute to run.
'/path/to/cat-image.png' // The job's body.
);
$beanstalk->disconnect();
//
// A sample consumer.
//
$beanstalk = new Client();
$beanstalk->connect();
$beanstalk->watch('flux');
while (true) {
$job = $beanstalk->reserve(); // Block until job is available.
// Now $job is an array which contains its ID and body:
// ['id' => 123, 'body' => '/path/to/cat-image.png']
// Processing of the job...
$result = touch($job['body']);
if ($result) {
$beanstalk->delete($job['id']);
} else {
$beanstalk->bury($job['id']);
}
}
// When exiting i.e. on critical error conditions
// you may also want to disconnect the consumer.
// $beanstalk->disconnect();
## 控制台
为了可以可视化的在网页上查看消息队列的运行情况,我推荐使用控制台。
[ptrofimov/beanstalk_console - Packagist](https://packagist.org/packages/ptrofimov/beanstalk_console)我们使用这个项目作为消息队列的控制台,直观的查看队列任务的执行。
安装console项目:
composer create-project ptrofimov/beanstalk_console
然后给他配置虚拟站点,就可以进入控制台查看beanstalkd状态了。
![](https://box.kancloud.cn/e4128681a0bc929742def2074a661940_1996x802.png)
此处因为使用lnmp一键脚本搭建环境,踩了几个坑。有时间和大家分享下。
- 前言
- 读者须知
- 第一章 Linux
- HTTP
- 简介
- 状态码
- 特点
- URL
- Request
- Response
- 请求方式
- 工作原理
- 生命周期
- GET和POST区别
- 组成
- 端口
- 命令
- 常用命令
- chmod命令详解
- ubuntu apt-get命令
- 用户和用户组
- Nginx
- 四个基本功能
- 进程
- 进程管理[ps命令]
- 进程管理[top命令]
- 进程管理[kill命令]
- 进程管理[进程优先级]
- 进程管理[netstat命令]
- 定时任务
- crontab
- 实现每秒执行
- >/dev/null 2>&1说明
- 文件管理
- 工作管理
- 资源管理
- 第二章 NGINX
- 介绍
- 入门
- 特性
- 安装启动
- 基础必会
- 常用功能
- 反向代理
- 负载均衡
- 正向代理
- HTTP服务器
- 动静分离
- 技能点汇总
- 显示乱码
- 打开目录浏览功能
- 错误码原因和解决方案
- location用法
- 常用正则
- rewrite
- 全局变量
- if语句块
- https
- php后端处理(fast-cgi)
- flag标志位
- 过期功能
- gzip压缩
- 会话保持时间
- 配置nginx worker进程最大打开文件数
- sendfile
- 单个工作进程的最大连接数
- 选择事件驱动模型
- 隐藏ngxin版本号
- 网络连接的优化
- 缓存原理及机制
- 限流
- 日志配置
- 灰度发布
- 配置一键生成
- 第三章 MySQL
- 入门
- 简介
- 术语
- 特点
- 三范式
- 8.0 新特性
- 数据类型
- 数据类型详解
- 常用函数
- 命令速查
- MyISAM与InnoDB区别
- 服务器构成
- 事务
- 本质
- 特性
- 分类
- 隔离级别
- PHP中使用事务实例
- MVCC
- 问题和解决
- 调优原则
- 分布式事务
- 索引
- 简介
- 索引的分类
- 创建索引
- 删除索引
- 哈希索引
- btree索引和hash索引的区别
- 单列索引和多列索引
- 索引优化
- 查看SQL语句对索引的使用情况
- 锁
- 技能点
- 开发规范
- 导入导出数据库
- blob和text的区别
- char与varchar类型区别
- SQL查询语句优化
- 事务隔离和锁操作需要在语言级别来做吗
- 58到家数据库30条军规解读
- 数据迁移
- SKU数据库设计
- RBAC数据库设计
- 第四章 Redis
- 入门
- 简介
- 应用场景
- 安装启动
- 生命周期
- 事务
- 配置项
- 缓存
- 数据持久化
- 安全
- 数据类型
- string
- hash
- list
- set
- zset
- php代码实战
- 字符串缓存实战
- 队列实战
- 发布订阅实战
- 计数器实战
- 排行榜实战
- 字符串悲观锁实战
- 事务的乐观锁实战
- 高级应用
- 分片机制
- 主从复制
- 缓存问题
- 解决 Redis 并发竞争 Key 问题
- 淘汰策略
- 第五章 PHP
- composer
- 什么是composer
- composer常用概念解析
- 使用composer的正确姿势
- 消息队列
- 为何使用消息队列
- Beanstalkd
- PSR规范
- PSR-0
- PSR-1
- PSR-2
- PSR-3
- PSR-4
- OOP基础
- 面向对象概念
- 类和对象
- 类
- 操作对象成员
- this使用
- 构造方法和析构方法
- 封装
- __set(),__get(),__isset(),__unset()四个方法的应用
- 继承
- 重载新的方法(parent::)
- 访问类型(public,protected,private)
- final关键字的应用
- static和const关键字的使用(self::)
- static关键字
- __toString()方法
- 克隆对象__clone()方法
- __call()处理调用错误
- 抽象方法和抽象类(abstract)
- 接口(interface)
- 多态
- 把对象串行化serialize()方法,__sleep()方法,__wakeup()方法
- 自动加载类 __autoload()函数
- OOP进阶
- 语法糖
- 异常处理
- 后期静态绑定
- 后期静态绑定在框架的运用
- 代码优化思路
- Closure(闭包)
- 巧用PHP内置方法
- 数组操作的奇技淫巧
- 设计模式
- 单例模式(Singleton Pattern)
- 工厂模式(Factor Pattern)
- 建造者模式(Builder Pattern)
- 原型模式(Prototype Pattern)
- 适配器模式(Adapter Pattern)
- 装饰器模式(Decorator Pattern)
- 代理模式(Proxy Pattern)
- 外观模式(Facade Pattern)
- 桥接模式(Bridge Pattern)
- 组合模式(Composite Pattern)
- 享元模式 (Flyweight Pattern)
- 策略模式 ( Strategy Pattern )
- 模板模式 (Template Pattern)
- 观察者模式 (observer Pattern)
- 迭代模式(Iterator Pattern)
- 责任链模式(Chain of Responsibility Pattern)
- 命令模式 (Command Pattern)
- 备忘录模式(Memento Pattern)
- 状态模式 (State Pattern)
- 访问者模式(Visitor Pattern)
- 中介者模式(Mediator Pattern)
- 解释器模式(Interpreter Pattern)
- 数据映射模式(Data Mapper Pattern)
- 注册树模式(Registry Pattern)
- 空对象模式(Null Object Pattern)
- 搜索引擎
- Elasticsearch
- 安装
- 入门
- 实践
- 集群
- 查询
- API
- 接口调用
- cURL
- Guzzle
- RPC
- yar
- session
- 概念
- 客户端实现形式
- cookie与session的区别
- Cookies的安全性
- JWT
- 组成
- 入门
- 应用
- 知识点
- 常见
- $_SERVER
- php的引用
- 第六章 技术栈扩展
- 使用第三方静态资源服务
- 七牛对象存储实战
- 七牛对象存储之客户端上传
- aliyunOSS服务端文件上传
- aliyunOSS客户端文件上传
- 第三方支付
- 微信支付
- 支付宝支付
- SEO排名影响因素
- PHP架构师之路
- CTO职能
- web宏观分析
- 常见的企业软件系统
- 负载的优化思路
- 从容应对负载并发的前期准备
- 第七章 网络安全
- XSS
- CSRF
- DDoS
- SQL注入
- 停用js
- 文件上传
- 点击劫持
- APT
- 会话劫持
- 第八章 运维
- devops
- devops简介
- 常用工具
- 搭建运行环境
- Centos7 lnmp环境搭建
- ubuntu lnmp环境搭建
- Apache多站点配置
- docker
- 轻松使用和理解docker
- lnamp产品级环境搭建
- lnamp产品级环境搭建【第二版】
- 基于 Docker 容器的沙盒化评测系统
- vagrant
- vagrant入门
- vagrant之Vagrantfile
- vagrant之集成jenkins
- homestead
- gitlab
- gitlab简介
- webhook
- ssh堡垒机
- 第九章 测试
- 压力测试
- 单元测试
- 第十章 团队协作
- 软件开发模式
- 边做边改模型
- 瀑布模型
- 迭代模型
- 快速原型模型
- 增量模型
- 螺旋模型
- 敏捷软件开发
- 演化模型
- 喷泉模型
- 智能模型
- 混合模型
- 模型对比
- TDD
- git
- git_入门
- git_使用
- git_进阶
- git workflow
- git_高级
- git_小技巧
- okr工作法
- API接口文档管理系统
- 敏捷协作工具
- 第十一章 技术灯塔
- github项目
- 社区好货
- 纸质书
- 第十二章 代码之外
- 面试官的角度看面试
- 程序员的壮年思考