# 自动加载
*boot.php*
```
<?php
// +----------------------------------------------------------------------
// | boot.php
// +----------------------------------------------------------------------
// | Description: 自动加载
// +----------------------------------------------------------------------
// | Time: 2018/12/19 下午5:51
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
define('ROOT',realpath('./'));
spl_autoload_register(function ($class) {
$file_path = str_replace('\\', '/', $class);
$file = $file_path . '.php';
if (!file_exists($file)) {
throw new Exception('找不到文件:' . $file);
}
include_once $file;
return true;
});
```
# Mysql 队列生产者
*Producer.php*
```
<?php
// +----------------------------------------------------------------------
// | Producer.php
// +----------------------------------------------------------------------
// | Description: 生产者
// +----------------------------------------------------------------------
// | Time: 2018/12/19 下午3:05
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
include_once 'boot.php';
try {
Queue::init('Mysql', [
'dsn' => 'mysql:host=mysql;dbname=test',
'username' => 'root',
'password' => 'root',
'table' => 'queues',
'ttr' => 60,
]); // 队列初始化
// 生产者放入消息
$job = new Driver\Job([
'job_data' => json_encode(['order_id' => time(), 'user_id' => 0001]),
'tube' => 'test'
]);
$job = Queue::put($job);
} catch (Exception $e) {
var_dump($e->getMessage());
}
```
# Mysql 队列消费者
*Consumer.php*
```
<?php
// +----------------------------------------------------------------------
// | Consumer.php
// +----------------------------------------------------------------------
// | Description: 消费者
// +----------------------------------------------------------------------
// | Time: 2018/12/19 下午4:55
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
include_once 'boot.php';
try {
Queue::init('Mysql', [
'dsn' => 'mysql:host=mysql;dbname=test',
'username' => 'root',
'password' => 'root',
'table' => 'queues',
'ttr' => 60,
]);
while (1) {
// 死循环,使进程一直在cli中运行,不断从消息队列读取数据
$job = Queue::reserve('test');
if (!$job->isEmpty()) {
echo $job->job_data . PHP_EOL;
sleep(2);
if (Queue::delete($job)) {
echo "job was deleted" . PHP_EOL;
} else {
echo "delete failed" . PHP_EOL;
}
}
}
} catch (Exception $e) {
var_dump($e->getMessage());
}
```
# Redis 队列生产者
*RedisProducer.php*
```
<?php
// +----------------------------------------------------------------------
// | RedisProducer.php
// +----------------------------------------------------------------------
// | Description: 生产者
// +----------------------------------------------------------------------
// | Time: 2018/12/19 下午3:05
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
include_once 'boot.php';
try {
Queue::init('Redis', [
'ip' => 'redis',
'port' => 6379,
'tubes' => 'tubes'
]); // 队列初始化
// 生产者放入消息
$job = new Driver\Job([
'job_data' => json_encode(['order_id' => time(), 'user_id' => '0001']),
'tube' => 'default'
]);
$job = Queue::put($job);
echo $job->id . PHP_EOL;
} catch (Exception $e) {
var_dump($e->getMessage());
}
```
# Redis 队列消费者
*RedisConsumer.php*
```
<?php
// +----------------------------------------------------------------------
// | RedisConsumer.php
// +----------------------------------------------------------------------
// | Description: 消费者
// +----------------------------------------------------------------------
// | Time: 2018/12/19 下午4:55
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
include_once 'boot.php';
try {
Queue::init('Redis', [
'ip' => 'redis',
'port' => 6379,
'tubes' => 'tubes'
]); // 队列初始化
while (1) {
// 死循环,使进程一直在cli中运行,不断从消息队列读取数据
$job = Queue::reserve('default');
if (!$job->isEmpty()) {
echo $job->job_data . PHP_EOL;
sleep(2);
if (Queue::delete($job)) {
echo "job was deleted" . PHP_EOL;
} else {
echo "delete failed" . PHP_EOL;
}
}
}
} catch (Exception $e) {
var_dump($e->getMessage());
}
```