订阅发布主要用于广播系统,例如聊天系统中的群聊,消息推送,同步操作等等;这里以同步操作为例子作为讲解
![订阅发布](https://box.kancloud.cn/c223c3affae5211b70410259cf1a1572_814x471.png)
图片来源于https://www.cnblogs.com/JasonLeemz/p/5116814.html
##### 大概流程如下:
- pc端游览器与websocket建立长连接
- websocket订阅redis (subscribe channel),一旦channel有数据,websocket主动推送数据给pc端游览器
- 手机设备修改数据,并执行发布 (publish channel message)
- websocket获取到channel的message,主动推送给pc游览器
##### 注意点:
- redis的订阅发布使用php内置的socket,默认超时时间为60秒,需要设置ini_set('default_socket_timeout', -1);
- php的redis扩展为阻塞IO,当websocket订阅redis,整个进程会阻塞,导致服务端的websocket不能处理其他事件,从而无法记录长连接fd,解决方法用异步redis
##### 下面是一个简单的基于swoole异步redis实现的websocket服务端
```php
class WebsocketServer
{
public $fdMaps = [];
/** @var \Swoole\WebSocket\Server */
public $ws;
public function __construct()
{
$this->ws = new Swoole\WebSocket\Server('0.0.0.0', 9503);
$this->ws->on('open', [$this, 'open']);
$this->ws->on('close', [$this, 'close']);
$this->ws->on('workerStart', [$this, 'workerStart']);
$this->ws->on('message', [$this, 'message']);
$this->ws->set([
'worker_num' => 1,
'daemonize' => 0,
]);
$this->ws->start();
}
public function message($server, $frame)
{
$server->push($frame->fd, "hello");
}
public function workerStart(\Swoole\WebSocket\Server $server, $workerID)
{
$client = new \Swoole\Redis();
$client->on('message', function ($client, $result) use ($server) {
if ($result[0] == 'message') {
foreach ($server->connections as $fd) {
$server->push($fd, $result[2]);
}
}
});
$client->connect('127.0.0.1', 6379, function ($client, $result) {
$client->subscribe('channel');
});
}
public function open($server, $request)
{
echo "fd $request->fd connect \n";
$this->fdMaps[$request->fd] = $request->fd;
}
public function close($server, $fd)
{
if (isset($this->fdMaps[$fd])) {
unset($this->fdMaps[$fd]);
echo "fd $fd close \n";
} else {
echo "error\n";
}
}
}
new WebsocketServer();
```
html代码:
```html
<html>
<body>
<h1>Redis publish/subscribe</h1>
<div id="show-list"></div>
</body>
</html>
<script type="text/javascript" src="/chat/js/jquery.min.js"></script>
<script>
$(function () {
var ws = new WebSocket('ws://127.0.0.1:9503');
ws.onopen = function () {
console.log('connect success');
};
// 接受消息
ws.onmessage = function (evt) {
console.log(evt.data);
var html = '<h3>' + evt.data + '</h3>';
$('#show-list').append(html);
};
ws.onclose = function () {
console.log('connect close');
};
});
</script>
```
调试:
```bash
127.0.0.1:6379> publish channel hello
(integer) 1
# 响应的pc游览器会显示hello
```
- php
- 编译安装
- 基本概念
- 垃圾回收机制
- 生命周期
- zval底层实现
- c扩展开发
- gdb调试工具
- 自定义扩展简单demo
- 钩子函数
- 读取php.ini配置
- 数组
- 函数
- 类
- yaf扩展底层源码
- swoole扩展底层源码
- memoryGlobal内存池
- swoole协程使用记录
- 单点登录sso原理
- compser使用
- session实现机制
- c & linux
- gcc
- 指针
- 结构体,联合和位字段
- 宏定义井号说明
- printf家族函数和可变参数
- 共享函数
- 静态库和动态库
- makefile自动化构建
- 信号一
- 信号二
- inotify监控文件事件
- socket编程
- 简介
- UNIX DOMAIN
- Internet DOMAIN
- TCP/IP
- 文件IO多路复用
- 内存管理
- 进程组,会话和控制终端
- daemon守护进程
- 多进程
- 多线程
- 常用进制转换
- go
- 入门知识
- 字节和整数装换
- python
- redis
- 应用场景
- 消息队列
- 热点数据
- 扫码登录
- 订阅发布
- 次数限制
- 抢购超卖
- 持久化机制
- mysql
- 工作流程
- MyISAM和InnoDB区别
- 用户和权限管理
- 执行计划
- sql优化
- 事务和锁
- 慢查询日志
- case...when...then...end用法
- sql
- 参考
- linux
- 内核参数优化
- 防火墙设置
- docker
- docker入门知识
- 算法
- 多维数组合
- DFA算法
- 红包金额分配