ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
~~~ 5.0 全新数据类型 数据流streams 看起来比pubsub可靠多的消息队列。pubsub不靠谱? 很不靠谱,网络一断或buffer一大就会主动清理数据。stream的设计参考了kafka的消费组模型 以更抽象的方式建模日志的数据结构。Redis的streams主要是一个append only的数据结构,至少在概念上它是一种在内存中表示的抽象数据类型,只不过它们实现了更强大的操作,以克服日志文件本身的限制 如果你了解MQ,那么可以把streams当做MQ。如果你还了解kafka,那么甚至可以把streams当做kafka 这个功能有点类似于redis以前的Pub/Sub,但是也有基本的不同: streams支持多个客户端(消费者)等待数据(Linux环境开多个窗口执行XREAD即可模拟),并且每个客户端得到的是完全相同的数据 Pub/Sub是发送忘记的方式,并且不存储任何数据;而streams模式下,所有消息被无限期追加在streams中,除非用于显示执行删除(XDEL) streams的Consumer Groups也是Pub/Sub无法实现的控制方式 streams数据结构本身非常简单,但是streams依然是Redis到目前为止最复杂的类型,其原因是实现的一些额外的功能:一系列的阻塞操作允许消费者等待生产者加入到streams的新数据。另外还有一个称为Consumer Groups的概念,这个概念最先由kafka提出,Redis有一个类似实现,和kafka的Consumer Groups的目的是一样的:允许一组客户端协调消费相同的信息流 stream适用于允许丢失数据的业务场景,因为redis本身是不支持数据的绝对可靠的,哪怕aof调成always Redis Stream——作为消息队列的典型应用场景 https://segmentfault.com/a/1190000016777728 xAck($stream, $group, $arr_messages_ids) - 确认一条或多条待处理的消息 $redis->xAck('mystream_key', 'group1', ['1530063064286-0', '1530063064286-1']); xAdd($str_key, $str_id(* 代表redis自动生成流序列id), $arr_message) - 向流添加消息 返回添加消息的id $obj_redis->xAdd('mystream_key', "*", ['field' => 'value']);//1530063064286-0 xClaim($str_key, $str_group, $str_consumer, $min_idle_time, $arr_messages_ids, [$arr_options]) - 获取待处理信息的所有权 返回消息ID数组以及相应的数据 $ids = ['1530113681011-0', '1530113681011-1', '1530113681011-2']; // 无options $obj_redis->xClaim( 'mystream', 'group1', 'myconsumer1', 0, $ids ); // 有 options $obj_redis->xClaim( 'mystream', 'group1', 'myconsumer2', 0, $ids, [ 'IDLE' => time() * 1000, 'RETRYCOUNT' => 5, 'FORCE', 'JUSTID' ] ); xDel(stream_key, $arr_messages_ids) - 从流中删除消息 $redis->xDel('mystream', ['1530115304877-0', '1530115305731-0']); Group - 创建,销毁或管理消费者群组 //创建 $redis->xGroup('CREATE', $str_key, $str_group, $str_msg_id, [$boo_mkstream]); //删除 $redis->xGroup('DESTROY', $str_key, $str_group); //管理 $obj_redis->xGroup('HELP'); $obj_redis->xGroup('SETID', $str_key, $str_group, $str_msg_id); $obj_redis->xGroup('DELCONSUMER', $str_key, $str_group, $str_consumer_name); //删除消费者 xInfo('CONSUMERS/GROUPS/STREAM/HELP', $str_stream, $str_group) - 获取有关流的信息 xLen(stream_key) - 获取流的总数据个数 xPending($str_stream, $str_group [, $str_start, $str_end, $i_count, $str_consumer]) - 检查流中的待处理消息 $redis->xPending('mystream', 'mygroup', '-', '+', 1, 'consumer-1'); xRange($str_stream, $str_start, $str_end [, $i_count]) - 查询流中的一系列消息 $redis->xRange('mystream', '-', '+', 2); xRead($arr_streams [, $i_count, $i_block) - 从流中读取消息 $redis->xRead(['stream1' => '1535222584555-0', 'stream2' => '1535222584555-0']); //仅接收新消息(($=last id)并无限期等待一个新消息 $redis->xRead(['stream1' => '$'], 1, 0); // xReadGroup($str_group, $str_consumer, $arr_streams [, $i_count, $i_block]) - 使用组和消费者一起读取流消息 类似于xRead,但它支持读取特定消费者组的消息 返回传递给此使用者组的消息(如果有) $redis->xReadGroup('mygroup', 'consumer2', ['s1' => 0, 's2' => 0], 1, 1000); xRevRange($str_stream, $str_end, $str_start [, $i_count]) - 查询从start到end的一条或多条消息 $redis->xRevRange('mystream', '+', '-'); xTrim($str_stream, $i_max_len [, $boo_approximate])- 流裁剪为指定数量的项目 $obj_redis->xTrim('mystream', 100, true); ~~~