ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
# 自动加载 *boot.php* ``` <?php // +---------------------------------------------------------------------- // | boot.php // +---------------------------------------------------------------------- // | Description: 自动加载 // +---------------------------------------------------------------------- // | Time: 2018/12/19 下午5:51 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- define('ROOT',realpath('./')); spl_autoload_register(function ($class) { $file_path = str_replace('\\', '/', $class); $file = $file_path . '.php'; if (!file_exists($file)) { throw new Exception('找不到文件:' . $file); } include_once $file; return true; }); ``` # Mysql 队列生产者 *Producer.php* ``` <?php // +---------------------------------------------------------------------- // | Producer.php // +---------------------------------------------------------------------- // | Description: 生产者 // +---------------------------------------------------------------------- // | Time: 2018/12/19 下午3:05 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- include_once 'boot.php'; try { Queue::init('Mysql', [ 'dsn' => 'mysql:host=mysql;dbname=test', 'username' => 'root', 'password' => 'root', 'table' => 'queues', 'ttr' => 60, ]); // 队列初始化 // 生产者放入消息 $job = new Driver\Job([ 'job_data' => json_encode(['order_id' => time(), 'user_id' => 0001]), 'tube' => 'test' ]); $job = Queue::put($job); } catch (Exception $e) { var_dump($e->getMessage()); } ``` # Mysql 队列消费者 *Consumer.php* ``` <?php // +---------------------------------------------------------------------- // | Consumer.php // +---------------------------------------------------------------------- // | Description: 消费者 // +---------------------------------------------------------------------- // | Time: 2018/12/19 下午4:55 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- include_once 'boot.php'; try { Queue::init('Mysql', [ 'dsn' => 'mysql:host=mysql;dbname=test', 'username' => 'root', 'password' => 'root', 'table' => 'queues', 'ttr' => 60, ]); while (1) { // 死循环,使进程一直在cli中运行,不断从消息队列读取数据 $job = Queue::reserve('test'); if (!$job->isEmpty()) { echo $job->job_data . PHP_EOL; sleep(2); if (Queue::delete($job)) { echo "job was deleted" . PHP_EOL; } else { echo "delete failed" . PHP_EOL; } } } } catch (Exception $e) { var_dump($e->getMessage()); } ``` # Redis 队列生产者 *RedisProducer.php* ``` <?php // +---------------------------------------------------------------------- // | RedisProducer.php // +---------------------------------------------------------------------- // | Description: 生产者 // +---------------------------------------------------------------------- // | Time: 2018/12/19 下午3:05 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- include_once 'boot.php'; try { Queue::init('Redis', [ 'ip' => 'redis', 'port' => 6379, 'tubes' => 'tubes' ]); // 队列初始化 // 生产者放入消息 $job = new Driver\Job([ 'job_data' => json_encode(['order_id' => time(), 'user_id' => '0001']), 'tube' => 'default' ]); $job = Queue::put($job); echo $job->id . PHP_EOL; } catch (Exception $e) { var_dump($e->getMessage()); } ``` # Redis 队列消费者 *RedisConsumer.php* ``` <?php // +---------------------------------------------------------------------- // | RedisConsumer.php // +---------------------------------------------------------------------- // | Description: 消费者 // +---------------------------------------------------------------------- // | Time: 2018/12/19 下午4:55 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- include_once 'boot.php'; try { Queue::init('Redis', [ 'ip' => 'redis', 'port' => 6379, 'tubes' => 'tubes' ]); // 队列初始化 while (1) { // 死循环,使进程一直在cli中运行,不断从消息队列读取数据 $job = Queue::reserve('default'); if (!$job->isEmpty()) { echo $job->job_data . PHP_EOL; sleep(2); if (Queue::delete($job)) { echo "job was deleted" . PHP_EOL; } else { echo "delete failed" . PHP_EOL; } } } } catch (Exception $e) { var_dump($e->getMessage()); } ```