企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# rxjs中breakpressure(背压)概念 当涉及到数据流时,流可能发送太快及至于订阅者跟不上其速度。为此,我们需要一些机制来控制发送源,使订阅者不至于被大量数据淹没。这些机制可以根据需要采用有损或者无损操作形式来处理背压。例如,如果你错过了几次鼠标点击,这没有什么大碍,但是,如果你错过了几次银行交易,这是一个很明确的问题。 例如,想象一下使用`zip`操作符将2个无限大的Observable压缩在一起,其中一个Observable的发送速度是另外一个的2倍。`zip`操作符内部处理这种情形的机制是采用一个扩充缓冲区来存储发送较快的Observable,待较慢的Observable发出消息时再取出缓存的内容并将它们最终合并在一起。这将导致RXJS占用大量系统资源。 ## Hot and Cold Observables and Multicast Cold Observable会发出特定元素序列值(例如:时间序列,数组元素),一旦有观察者开始订阅时,就开始发送其数据,在发送过程中RxJS会保证该序列值的完整性。例如,将类似于Array、Map、Set或者生成器(generator)这样的可迭代数据转换成Observable,无论是第一次订阅还是后面反复订阅都会发出相同的值。Cold Observable发出的值来源可能包括数据库查询、文件检索或者Web请求结果等。 Hot Observable的特点是一旦被创建就会立即发射数据流。订阅者通常会从序列的中间(处于数据开始发射和结束之间)位置开始观察发出的Observable,并且在第一次发射数据后开始建立起subscription。这样一个Observable在在发射数据时拥有自己的独立空间,并由观察者维护其状态。Hot Observable的例子可能包括鼠标&键盘事件、系统事件或者股票价格波动。 当一个Cold Observable是多播的(当它被转换成一个`ConnectableObservable`对象,并且它的`connect`方法被调用),它实际上已经变**hot**了,即出于背压和流量控制的目的,它应被视为一个Hot Observable。 在处理背压问题上,Cold Observable是一个**拉取**(pull)模型的理想主体。Hot Observable通常不是用来处理拉取取问题,而是作为本文讨论的流程控制策略的最佳选择,比如使用`pausableBuffered`和`pausable`操作符,通过节流(throttling)、缓存(buffers)或者窗口(windows)方式。 > 译注:这里的"windows",是指RxJS中的一种数据控制方式,和`buffer`相似,不同之处在于它发送的是嵌套的Observable而不是数组形式。 ## 有损背压(Lossy Backpressure) 有很多方法可以控制可观察序列,从而让消费者不至于因有损操作而失去方向,这也意味着数据包可能在暂停(pause)和恢复(resume)操作切换期间丢弃。 ### 去抖动(Debounce) 应对有损背压问题的第一种方法称为**去抖动**,只有在一个特定的时间段通过并且当前Observable没有再次发射数据时,才会从源Observable获取发射的数据流。这在一些场景下很有用,例如用户输入太快,我们并不希望在每次用户触发按键后立即开始执行,而是等待半秒钟,直到用户没有输入后开始执行。 ```javascript var debounced = Rx.Observable.fromEvent(input, 'keyup') .map(function (e) { return e.target.value; }) .debounce(500 /* ms */); debounced.subscribeOnNext(function (value) { console.log('Input value: %s', value); }); ``` ### 节流(Throttling) 应对背压问题的另一种技术是通过使用周期性时间间隔在Observable发出第一条数据后使用`throttle`方法节流。这种节流方式特别适用于像窗口改变大小和滚动等触发频率很快的事件处理。 ```javascript var throttled = Rx.Observable.fromEvent(window, 'resize') .throttle(250 /* ms */); throttled.subscribeOnNext(function (e) { console.log('Window inner height: %d', window.innerHeight); console.log('Window inner width: %d', window.innerWidth); }); ``` ### 采样(Sampling) Observables 您也可以在一定的间隔内使用`sample`方法从观察序列中抽取值,而不需要消耗整个可观察序列。 ```javascript var sampled = getStockData() .sample(5000 /* ms */); sampled.subscribeOnNext(function (data) { console.log('Stock data: %o', data); }); ``` ### 可暂停(Pausable) Observables 暂停和恢复的能力也是RxJS在有损和无损版本中提供的强大概念。 在有损背压的情况下,`pausable`操作符可以在可观察序列分别调用`pause`和`resume`之后停止或者恢复监听。例如我们可以获取一些可观察序列并调用`pausable`方法,然后调用`pause`暂停序列并在5秒钟内恢复。这里需要注意的是,在暂停和恢复期间产生的数据都将丢失。这种情况只适用于Hot Observable,并不适合Code Observable,原因在于在恢复后它们将重新启动。 ```javascript var pausable = getSomeObservableSource() .pausable(); pausable.subscribeOnNext(function (data) { console.log('Data: %o', data); }); pausable.pause(); // Resume in five seconds setTimeout(function () { pausable.resume(); }, 5000); ``` > 注:rxjs 5.*版本中并没有`pausable`, `pause`和`resume`操作符 ## 无损背压(Loss-less Backpressure) 除了支持有损的背压机制,RxJS还支持以数据获取方式,使其能够以消费者自己的速度完全消费。 在工作中有许多策略,包括使用与时间盘,计数或两者兼容的缓冲区,可暂停缓冲区,反应拉动等。 ### 缓冲区(Buffers) and Windows 处理背压问题第一个策略是使用缓冲区,这允许消费者设置他们希望等待的时间、项目数量,或两者,以较先获得者为准。这在很多情况下是很有用的,例如您希望将窗口中的某些数据出于比较的目的,对数据根据需要进行分块。 `bufferWithCount`方法允许我们在将消息传递给缓冲区数组之前指定要捕获的项目数。 这是不切实际而有趣的用法是计算用户是否输入了Konami代码。 > 注:在rxjs 5.*中为`bufferCount` ```javascript var codes = [ 38, // up 38, // up 40, // down 40, // down 37, // left 39, // right 37, // left 39, // right 66, // b 65 // a ]; function isKonamiCode(buffer) { return codes.toString() === buffer.toString(); } var keys = Rx.Observable.fromEvent(document, 'keyup') .map(function (e) { return e.keyCode; }) .bufferWithCount(10, 1) .filter(isKonamiCode) .subscribeOnNext(function () { console.log('KONAMI!'); }); ``` 另一方面,您也可以使用`bufferWithTime`在缓冲区内获取一段给定时间的数据。 这是有用的,例如,如果您正在跟踪网络中的数据量,然后可以统一处理。 > 注:rxjs 5.* 中为 `bufferTime` ```javascript var source = getStockData() .bufferWithTime(5000, 1000) // time in milliseconds .subscribeOnNext(function (data) { data.forEach(function (d) { console.log('Stock: %o', d); }); }); ``` 为了避免缓冲区填充太快,有一种方法可以通过指定计数和时间间隔的天花板来限制缓冲区,以先到者为准。 例如,网络可能特别快,数据在指定的时间内,其他时间不是,所以为了保持数据级别,您可以通过`bufferWithTimeOrCount`指定这个阈值 ```javascript var source = getStockData() .bufferWithTimeOrCount(5000 /* ms */, 100 /* items */) .subscribeOnNext(function (data) { data.forEach(function (d) { console.log('Stock: %o', d); }); }); ``` ### 可暂停缓冲区(Pausable Buffers) `pausable`方法在处理热(hot)可观察对象时很有用,在这里你可以在丢弃数据时暂停和恢复,但您可能希望在此期间保留数据。为此,我们引入`pausableBuffered`方法,它在`pause`调用之前保持一个运行缓冲区, 当`resume`被调用时,它被回收。然后由开发人员决定何时暂停和恢复,同时也不会丢失任何数据。 ```javascript var source = getStockData() .pausableBuffered(); source.subscribeOnNext(function (stock) { console.log('Stock data: %o', stock); }); source.pause(); // Resume after five seconds setTimeout(function () { // Drains the buffer and subscribeOnNext is called with the data source.resume(); }, 5000); ``` ### 受控可观察对象(Controlled Observables) 在更高级的情况下,您可能希望控制在给定时间收到的项目的绝对数量,其余的通过`controlled`方法进行缓冲。 例如,您可以拉10个项目,其次是20个项目,由开发人员决定。 这更加符合“活动流”努力有效地将推送流转化为推/拉流的努力。 ```javascript var source = getStockData() .controlled(); source.subscribeOnNext(function (stock) { console.log('Stock data: %o', stock); }); source.request(2); // Keep getting more after 5 seconds setInterval(function () { source.request(2); }, 5000); ``` ## 未来计划(Future Work) 这当然只是背压工作的开始,因为还有很多其他可以考虑的策略。 在未来版本的RxJS中,受控观察者的想法将被烘焙到订阅本身中,然后允许背压成为合同的重要部分或者要求n个项目。 --- ## 译注 由于本人英文只有4级水平,所以全文借助Google进行翻译,有能力者还是建议直接看[原文](https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/backpressure.md),同时需要注意的是这里所有的示例都是基于Rx.js 4以前的版本。虽然内容有些过时,但我们可以感受到RxJS作者们考虑得真全面,集成了大部分业务场景需要解决的数据问题。