## Buffer
定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。
![buffer](https://box.kancloud.cn/167fec2ac6641d6e6f6308ed4d2adaf4_1280x640.png)
`Buffer`操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生的Observable发射这些数据的缓存集合。`Buffer`操作符在很多语言特定的实现中有很多种变体,它们在如何缓存这个问题上存在区别。
注意:如果原来的Observable发射了一个`onError`通知,`Buffer`会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。
`Window`操作符与`Buffer`类似,但是它在发射之前把收集到的数据放进单独的Observable,而不是放进一个数据结构。
在RxJava中有许多`Buffer`的变体:
### buffer(count)
![buffer3](https://box.kancloud.cn/6d4bbbc26e6ebb247acd5d55a10fe707_1280x640.png)
`buffer(count)`以列表(List)的形式发射非重叠的缓存,每一个缓存至多包含来自原始Observable的count项数据(最后发射的列表数据可能少于count项)
* Javadoc: [buffer(int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(int))
### buffer(count, skip)
![buffer4](https://box.kancloud.cn/e1947a86c65f181b1be894e480a21b85_1280x640.png)
`buffer(count, skip)`从原始Observable的第一项数据开始创建新的缓存,此后每当收到`skip`项数据,用`count`项数据填充缓存:开头的一项和后续的`count-1`项,它以列表(List)的形式发射缓存,取决于`count`和`skip`的值,这些缓存可能会有重叠部分(比如skip < count时),也可能会有间隙(比如skip > count时)。
* Javadoc: [buffer(int,int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(int,%20int))
### buffer(bufferClosingSelector)
![buffer1](https://box.kancloud.cn/cab7c8585f4d3416f74f554786110a51_1280x790.png)
当它订阅原来的Observable时,`buffer(bufferClosingSelector)`开始将数据收集到一个`List`,然后它调用`bufferClosingSelector`生成第二个Observable,当第二个Observable发射一个`TClosing`时,`buffer`发射当前的`List`,然后重复这个过程:开始组装一个新的`List`,然后调用`bufferClosingSelector`创建一个新的Observable并监视它。它会一直这样做直到原来的Observable执行完成。
* Javadoc: [buffer(Func0)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.functions.Func0))
### buffer(boundary)
![buffer8](https://box.kancloud.cn/f6aeb2c10004740c2111653551bf2892_1280x790.png)
`buffer(boundary)`监视一个名叫`boundary`的Observable,每当这个Observable发射了一个值,它就创建一个新的`List`开始收集来自原始Observable的数据并发射原来的`List`。
* Javadoc: [buffer(Observable)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable))
* Javadoc: [buffer(Observable,int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable,%20int))
### buffer(bufferOpenings, bufferClosingSelector)
![buffer2](https://box.kancloud.cn/98124630f619c88deac2fb9e0bef25a9_1280x940.png)
`buffer(bufferOpenings, bufferClosingSelector)`监视这个叫`bufferOpenings`的Observable(它发射`BufferOpening`对象),每当`bufferOpenings`发射了一个数据时,它就创建一个新的`List`开始收集原始Observable的数据,并将`bufferOpenings`传递给`closingSelector`函数。这个函数返回一个Observable。`buffer`监视这个Observable,当它检测到一个来自这个Observable的数据时,就关闭`List`并且发射它自己的数据(之前的那个List)。
* Javadoc: [buffer(Observable,Func1)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(rx.Observable,%20rx.functions.Func1))
### buffer(timespan, unit[, scheduler])
![buffer5](https://box.kancloud.cn/f1b90ccf4df51821fc4241eea0577f2e_1280x640.png)
`buffer(timespan, unit)`定期以`List`的形式发射新的数据,每个时间段,收集来自原始Observable的数据(从前面一个数据包裹之后,或者如果是第一个数据包裹,从有观察者订阅原来的Observale之后开始)。还有另一个版本的`buffer`接受一个`Scheduler`参数,默认情况下会使用`computation`调度器。
* Javadoc: [buffer(long,TimeUnit)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit))
* Javadoc: [buffer(long,TimeUnit,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit,%20rx.Scheduler))
### buffer(timespan, unit, count[, scheduler])
![buffer6](https://box.kancloud.cn/7b0bc3ae4f1d3bcca2aa4ec914c279f3_1280x640.png)
每当收到来自原始Observable的count项数据,或者每过了一段指定的时间后,`buffer(timespan, unit, count)`就以`List`的形式发射这期间的数据,即使数据项少于count项。还有另一个版本的`buffer`接受一个`Scheduler`参数,默认情况下会使用`computation`调度器。
* Javadoc: [buffer(long,TimeUnit,int)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit,%20int))
* Javadoc: [buffer(long,TimeUnit,int,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20java.util.concurrent.TimeUnit,%20int,%20rx.Scheduler))
### buffer(timespan, timeshift, unit[, scheduler])
![buffer7](https://box.kancloud.cn/dff665e72ba228174e2b4c73c08050c9_1280x640.png)
`buffer(timespan, timeshift, unit)`在每一个`timeshift`时期内都创建一个新的`List`,然后用原始Observable发射的每一项数据填充这个列表(在把这个`List`当做自己的数据发射前,从创建时开始,直到过了`timespan`这么长的时间)。如果`timespan`长于`timeshift`,它发射的数据包将会重叠,因此可能包含重复的数据项。
还有另一个版本的`buffer`接受一个`Scheduler`参数,默认情况下会使用`computation`调度器。
* Javadoc: [buffer(long,long,TimeUnit)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20long,%20java.util.concurrent.TimeUnit))
* Javadoc: [buffer(long,long,TimeUnit,Scheduler)](http://reactivex.io/RxJava/javadoc/rx/Observable.html#buffer(long,%20long,%20java.util.concurrent.TimeUnit,%20rx.Scheduler))
### buffer-backpressure
你可以使用`Buffer`操作符实现反压`backpressure`(意思是,处理这样一个Observable:它产生数据的速度可能比它的观察者消费数据的速度快)。
![bp.buffer2](https://box.kancloud.cn/b054d465a4c90fbb9e6c4423176f8c94_1280x540.png)
Buffer操作符可以将大量的数据序列缩减为较少的数据缓存序列,让它们更容易处理。例如,你可以按固定的时间间隔,定期关闭和发射来自一个爆发性Observable的数据缓存。这相当于一个缓冲区。
示例代码
```java
Observable<List<Integer>> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
```
![bp.buffer1](https://box.kancloud.cn/50e6a772804ab92a71eb0cee56da7a9b_1280x1000.png)
或者,如果你想更进一步,可以在爆发期将数据收集到缓存,然后在爆发期终止时发射这些数据,使用 [`Debounce`](Filtering-Operators#Debounce) 操作符给`buffer`操作符发射一个缓存关闭指示器(`buffer closing indicator`)可以做到这一点。
代码示例:
```java
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable<Integer> burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable<Integer> burstyDebounced = burstyMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable<List<Integer>> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
```
### 参见
* [DebouncedBuffer With RxJava by Gopal Kaushik](http://nerds.weddingpartyapp.com/tech/2015/01/05/debouncedbuffer-used-in-rxbus-example/)
- ReactiveX
- Observables
- Single
- Subject
- Scheduler
- Operators
- 创建操作
- Create
- Defer
- Empty/Never/Throw
- From
- Interval
- Just
- Range
- Repeat
- Start
- Timer
- 变换操作
- Buffer
- FlatMap
- GroupBy
- Map
- Scan
- Window
- 过滤操作
- Debounce
- Distinct
- ElementAt
- Filter
- First
- IgnoreElements
- Last
- Sample
- Skip
- SkipLast
- Take
- TakeLast
- 结合操作
- And/Then/When
- CombineLatest
- Join
- Merge
- StartWith
- Switch
- Zip
- 错误处理
- Catch
- Retry
- 辅助操作
- Delay
- Do
- Materialize/Dematerialize
- ObserveOn
- Serialize
- Subscribe
- SubscribeOn
- TimeInterval
- Timeout
- Timestamp
- Using
- To
- 条件和布尔操作
- All/Contains/Amb
- DefaultIfEmpty
- SequenceEqual
- SkipUntil/SkipWhile
- TakeUntil/TakeWhile
- 算术和聚合操作
- Average/Concat/Reduce
- Max/Min/Count/Sum
- 异步操作
- 连接操作
- Connect
- Publish
- RefCount
- Replay
- 转换操作
- 阻塞操作
- 字符串操作
- 按字母顺序的操作符列表
- RxJava文档和教程
- RxJava入门指南
- RxJava使用示例
- 实现自定义操作符
- 自定义插件
- Backpressure
- 错误处理
- Android模块
- 参与开发
- 补充阅读材料