[TOC] ## 什么是反应式编程(Reactive programming) 维基百科定义: > 在计算领域,响应式编程是一种关注数据流和变更传播的声明式编程范式。有了这个范例,就可以轻松地表达静态(例如,数组)或动态(例如,事件发射器)数据流,还可以传递关联执行模型中存在的推断依赖项,这有助于自动传播已更改的数据流。 在一个命令式编程中`a:= b + c`意味着`b+c`的结算结果赋值给`a`,后续`b,c`值的更改不会影响`a`的值。在反应式编程中,每当`b`或`c`的值发生变化时,`a`的值就会自动更新,而无需程序显式地重新执行语句`a:= b + c`来重新赋值。 </br> ## 反应式系统特质 [反应式宣言](https://www.reactivemanifesto.org/zh-CN) 是反应式的理论基础,定义了反应式系统的4个特质: **1. 即时响应性(Responsive)** 只要有可能,系统就会及时地做出响应。 **2. 回弹性(Resilient)** 系统在出现失败时依然保持即时响应性。 **3. 弹性(Elastic)** 系统在不断变化的工作负载之下依然保持即时响应性。 **4. 消息驱动(Message Driven)** 反应式系统依赖一步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。 </br> ![](https://img.kancloud.cn/a9/d8/a9d827c7812dc681e7efc0ab9c01aba7_2876x1206.png) ## 反应式流(Reactive Streams) 作为向反应式编程方向迈出的第一步,微软在.NET生态系统中创建了响应式扩展(`Rx`)库。然后`RxJava`在`JVM`上实现了响应式编程。随着时间的推移,通反应式流(`Reactive Streams`)的努力,`Java`的标准化出现了,该规范为`JVM`上的响应式库定义了一组接口和交互规则。它的接口已经集成到`Java 9`的`Flow`类下。 </br> 反应式流[Reactive Streams](https://www.reactive-streams.org/)是非阻塞背压异步流处理的规范,包括针对运行时环境(`JVM`和`JavaScript`)以及网络协议。 </br> ### 目标、设计与适用范围 在异步系统中,处理数据流—尤其是容量没有预先确定的“实时”数据—需要特别注意。最突出的问题是需要控制资源消耗,以便快速数据源不会压倒流目的地。为了在协作网络主机或一台机器中的多个CPU核上并行使用计算资源,需要异步。 Reactive Streams的主要目标是管理跨异步边界的流数据交换—比如将元素传递给另一个线程或线程池—同时确保接收端不会被迫缓冲任意数量的数据。换句话说,背压是该模型的一个组成部分,以便允许在线程之间进行中介的队列被绑定。如果背压的通信是同步的,那么异步处理的好处就会被抵消,因此必须谨慎地强制要求完全非阻塞。 该规范的意图是允许创建许多符合规范的实现,这些实现通过遵守规则将能够顺利地互操作,在流应用程序的整个处理图中保持上述优点和特征。 `Reactive Streams `的范围是找到一组最小的接口、方法和协议,这些接口、方法和协议将描述实现目标所需的操作和实体——具有非阻塞背压的异步数据流。 </br> 总之,`Reactive Streams`是JVM上面向流的库的一个标准和规范: * 处理一个潜在无限元素的数目 * 依次处理 * 异步地在组件之间传递元素 * 必须强制有非阻塞后压 `Reactive Streams`规范由以下部分组成: </br> 1、API 规定了需要实现的响应式流类型,并且在不同的实现间完成互操作性。 2、技术兼容套件(`TCK`)是用于实现一致性测试的标准测试套件。 各种实现可以自由地实现规范中没有提到的额外特性,只要它们遵从API要求和在TCK中通过测试。 ### API组件 反应式编程范式通常在面向对象语言中作为`Observer`设计模式的扩展而出现。你还可以将`Reactive Streams`模式与熟悉的`Iterator`设计模式进行比较。`Iterator`模式中`Iterable-Iterator`总是成对出现,一个主要的区别是,`Iterator`是基于`pull`的,而`Reactive Streams`是基于`push`的。 </br> 使用`Iterator`是一种命令式编程模式,尽管访问值的方法完全由`Iterable`负责。实际上,何时访问序列中的`next()`项取决于开发人员。在反应式流中,上面这一对的等效物是`Publisher-Subscriber`。但是`Publisher`在出现新可用值时通知`Subscriber`,而这个`push`方面是反应式的关键。 </br> 除了`push`值之外,错误处理和完成方面也以定义良好的方式进行了介绍。`Publisher`可以向其`Subscriber`推送新值(通过调用`onNext`),但也可以发出错误信号(通过调用`onError`)或完成信号(通过调用`onComplete`)。错误和完成都终止序列。可以总结如下: </br> ~~~java onNext x 0..N [onError | onComplete] ~~~ </br> 该API由以下组件组成,这些组件必须由`Reactive Streams`实现提供: 1. `Publisher` 发布者 2. `Subscriber` 订阅者 3. `Subscription` 订阅对象 4. `Processor` 处理者 下图展示了订阅者与发布者交互的典型场景: ![](https://img.kancloud.cn/9a/92/9a9230795192aaae140aba29f805da2b_700x321.png) 接口定义: ~~~java //Publisher将数据流发送到 Subscriber public static interface Publisher<T> { //指定订阅者 Subscriber public void subscribe(Subscriber<? super T> subscriber); } ~~~ ~~~java //消费处理 Publisher 发送过来的数据流 public static interface Subscriber<T> { //开启订阅Subscription具体的订阅对象 public void onSubscribe(Subscription subscription); //接收数据 public void onNext(T item); //错误处理 public void onError(Throwable throwable); //数据处理结束 public void onComplete(); } ~~~ ~~~java //订阅对象:发布者和订阅者之间交互的操作对象 public static interface Subscription { //订阅者拿到订阅对象后,通过调用订阅对象的request方法,根据自身消费能力请求n条数据 //request方法被调用时,会触发订阅者的onNext事件方法,把数据传输给订阅者。 //数据全部传输完成,则触发订阅者的onComplete事件方法。如果数据传输发生错误,则触发订阅者的onError事件方法。 public void request(long n); //调用cancel方法来停止接收数据 public void cancel(); } ~~~ ~~~java //处理者既是发布者又是订阅者,用于发布者和订阅者之间转换数据格式, //把发布者的T类型数据转换为订阅者接受的R类型数据。处理者作为数据转换的中介不是必须的。 public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { } ~~~ ## Java9 Flow jdk9中的`java.util.concurrent.Flow`接口`1:1`语义上等价于对应的`Reactive Streams`,`Flow`接口约定了`Reactive`编程的一套规范,并没有具体的实现。常用的实现有`RxJava`、`Reactor`、`Akka`等,`Spring WebFlux`中集成的是`Reactor3.0`。