# 环境
首先,确定好你的环境已经拥有这些内容:
- php
- redis
- mysql
- phpredis 扩展
而接下来,我们通过运行 Mysql 和 Redis 各自的生产者、消费者,来看消息队列是否已经在工作。
# 实例化 Queue
打开生产者和消费者,我们可以看到有这么一段代码:
```
Queue::init('Mysql', [
'dsn' => 'mysql:host=mysql;dbname=test',
'username' => 'root',
'password' => 'root',
'table' => 'queues',
'ttr' => 60,
]); // 队列初始化
```
或:
```
Queue::init('Redis', [
'ip' => 'redis',
'port' => 6379,
'tubes' => 'tubes'
]); // 队列初始化
```
如果前面实现环节,我们有达成共识,那么,相信这里你一定已经知道,这段代码是用来传入连接参数的。
你需要根据你运行的环境,去单独设置它。
# 测试步骤
- 打开 4 个 client ( cmd , terminal )窗口
- 在其中两个命令行中,分别运行如下命令:
```
php Consumer.php // 运行 Mysql 消费者
php RedisConsumer.php // 运行 Redis 消费者
```
运行成功后,这两个窗口将处于运行状态,两个消费者都会始终处于 reserve 循环中,直到接收到数据,才会输出。
- 再另外两个窗口,运行如下命令:
```
php Producer.php
php RedisProducer.php
```
# 小结
到此为止,我们的模拟消息队列就成功了。
我们可以看到,当我们运行生产者的时候,各自对应的消费者窗口中,就会弹出相关的信息。
这就是一个简单的消息队列案例。
**但是,这里需要注意,这只是一个模拟案例,它帮助我们理解消息队列,但并不能应用于实际项目中。**
我们可以推理一下,消息队列,本该可以使用在高并发的场景,但我们现在实现的消息队列,它还有许多漏洞,根本无法应用在实战场景中。
比如:
- mysql 驱动的 reserve 方法,它先接收,接收成功后再修改 job 的状态,这在高并发的情况下,将导致我们的 job 被重复接收,因为在「接收」和「修改状态」两个环节中间,可能会有另一个「请求」进来,此时的 job 还处于 ready 状态
- redis 驱动的 reserve 方法,使用了 redis 的 rPop 操作,这个方法,会在取出数据的同时,将数据从 list 中删除,那么,如果我们消费者处理失败,这个数据就有可能丢失
- redis 驱动的 put 方法,使用了 lPush 操作,此操作执行成功后,将会返回 list 的长度,我们的 redis 驱动会使用这个长度来作为该 job 的下标( id )。然而, list 的长度会不断变化,并发场景下,你 put 成功后,可能还在处理其他流程,此进程并未结束,此时若有消费者接收走一个消息,则 list 的长度就发生了变化,你在 put 时的 id 就无法与 list 中的数据对应起来