[TOC]
## 什么是反应式编程(Reactive programming)
维基百科定义:
> 在计算领域,响应式编程是一种关注数据流和变更传播的声明式编程范式。有了这个范例,就可以轻松地表达静态(例如,数组)或动态(例如,事件发射器)数据流,还可以传递关联执行模型中存在的推断依赖项,这有助于自动传播已更改的数据流。
在一个命令式编程中`a:= b + c`意味着`b+c`的结算结果赋值给`a`,后续`b,c`值的更改不会影响`a`的值。在反应式编程中,每当`b`或`c`的值发生变化时,`a`的值就会自动更新,而无需程序显式地重新执行语句`a:= b + c`来重新赋值。
</br>
## 反应式系统特质
[反应式宣言](https://www.reactivemanifesto.org/zh-CN) 是反应式的理论基础,定义了反应式系统的4个特质:
**1. 即时响应性(Responsive)**
只要有可能,系统就会及时地做出响应。
**2. 回弹性(Resilient)**
系统在出现失败时依然保持即时响应性。
**3. 弹性(Elastic)**
系统在不断变化的工作负载之下依然保持即时响应性。
**4. 消息驱动(Message Driven)**
反应式系统依赖一步的消息传递,从而确保了松耦合、隔离、位置透明的组件之间有着明确边界。
</br>
![](https://img.kancloud.cn/a9/d8/a9d827c7812dc681e7efc0ab9c01aba7_2876x1206.png)
## 反应式流(Reactive Streams)
作为向反应式编程方向迈出的第一步,微软在.NET生态系统中创建了响应式扩展(`Rx`)库。然后`RxJava`在`JVM`上实现了响应式编程。随着时间的推移,通反应式流(`Reactive Streams`)的努力,`Java`的标准化出现了,该规范为`JVM`上的响应式库定义了一组接口和交互规则。它的接口已经集成到`Java 9`的`Flow`类下。
</br>
反应式流[Reactive Streams](https://www.reactive-streams.org/)是非阻塞背压异步流处理的规范,包括针对运行时环境(`JVM`和`JavaScript`)以及网络协议。
</br>
### 目标、设计与适用范围
在异步系统中,处理数据流—尤其是容量没有预先确定的“实时”数据—需要特别注意。最突出的问题是需要控制资源消耗,以便快速数据源不会压倒流目的地。为了在协作网络主机或一台机器中的多个CPU核上并行使用计算资源,需要异步。
Reactive Streams的主要目标是管理跨异步边界的流数据交换—比如将元素传递给另一个线程或线程池—同时确保接收端不会被迫缓冲任意数量的数据。换句话说,背压是该模型的一个组成部分,以便允许在线程之间进行中介的队列被绑定。如果背压的通信是同步的,那么异步处理的好处就会被抵消,因此必须谨慎地强制要求完全非阻塞。
该规范的意图是允许创建许多符合规范的实现,这些实现通过遵守规则将能够顺利地互操作,在流应用程序的整个处理图中保持上述优点和特征。
`Reactive Streams `的范围是找到一组最小的接口、方法和协议,这些接口、方法和协议将描述实现目标所需的操作和实体——具有非阻塞背压的异步数据流。
</br>
总之,`Reactive Streams`是JVM上面向流的库的一个标准和规范:
* 处理一个潜在无限元素的数目
* 依次处理
* 异步地在组件之间传递元素
* 必须强制有非阻塞后压
`Reactive Streams`规范由以下部分组成:
</br>
1、API 规定了需要实现的响应式流类型,并且在不同的实现间完成互操作性。
2、技术兼容套件(`TCK`)是用于实现一致性测试的标准测试套件。
各种实现可以自由地实现规范中没有提到的额外特性,只要它们遵从API要求和在TCK中通过测试。
### API组件
反应式编程范式通常在面向对象语言中作为`Observer`设计模式的扩展而出现。你还可以将`Reactive Streams`模式与熟悉的`Iterator`设计模式进行比较。`Iterator`模式中`Iterable-Iterator`总是成对出现,一个主要的区别是,`Iterator`是基于`pull`的,而`Reactive Streams`是基于`push`的。
</br>
使用`Iterator`是一种命令式编程模式,尽管访问值的方法完全由`Iterable`负责。实际上,何时访问序列中的`next()`项取决于开发人员。在反应式流中,上面这一对的等效物是`Publisher-Subscriber`。但是`Publisher`在出现新可用值时通知`Subscriber`,而这个`push`方面是反应式的关键。
</br>
除了`push`值之外,错误处理和完成方面也以定义良好的方式进行了介绍。`Publisher`可以向其`Subscriber`推送新值(通过调用`onNext`),但也可以发出错误信号(通过调用`onError`)或完成信号(通过调用`onComplete`)。错误和完成都终止序列。可以总结如下:
</br>
~~~java
onNext x 0..N [onError | onComplete]
~~~
</br>
该API由以下组件组成,这些组件必须由`Reactive Streams`实现提供:
1. `Publisher` 发布者
2. `Subscriber` 订阅者
3. `Subscription` 订阅对象
4. `Processor` 处理者
下图展示了订阅者与发布者交互的典型场景:
![](https://img.kancloud.cn/9a/92/9a9230795192aaae140aba29f805da2b_700x321.png)
接口定义:
~~~java
//Publisher将数据流发送到 Subscriber
public static interface Publisher<T> {
//指定订阅者 Subscriber
public void subscribe(Subscriber<? super T> subscriber);
}
~~~
~~~java
//消费处理 Publisher 发送过来的数据流
public static interface Subscriber<T> {
//开启订阅Subscription具体的订阅对象
public void onSubscribe(Subscription subscription);
//接收数据
public void onNext(T item);
//错误处理
public void onError(Throwable throwable);
//数据处理结束
public void onComplete();
}
~~~
~~~java
//订阅对象:发布者和订阅者之间交互的操作对象
public static interface Subscription {
//订阅者拿到订阅对象后,通过调用订阅对象的request方法,根据自身消费能力请求n条数据
//request方法被调用时,会触发订阅者的onNext事件方法,把数据传输给订阅者。
//数据全部传输完成,则触发订阅者的onComplete事件方法。如果数据传输发生错误,则触发订阅者的onError事件方法。
public void request(long n);
//调用cancel方法来停止接收数据
public void cancel();
}
~~~
~~~java
//处理者既是发布者又是订阅者,用于发布者和订阅者之间转换数据格式,
//把发布者的T类型数据转换为订阅者接受的R类型数据。处理者作为数据转换的中介不是必须的。
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}
~~~
## Java9 Flow
jdk9中的`java.util.concurrent.Flow`接口`1:1`语义上等价于对应的`Reactive Streams`,`Flow`接口约定了`Reactive`编程的一套规范,并没有具体的实现。常用的实现有`RxJava`、`Reactor`、`Akka`等,`Spring WebFlux`中集成的是`Reactor3.0`。
- 1.反应式编程概述
- 2.Reactor框架
- Flux
- Mono
- 订阅(Subscribe)
- 编程创建序列
- 线程和调度器
- 错误处理
- 3.Spring WebFlux概述
- 4.Spring WebFlux核心组件
- HttpHandler
- WebHandler
- ServerWebExchange
- 编码和解码器
- JSON
- Form Data
- Multipart Data
- 过滤器
- 异常处理器
- DispatcherHandler
- 5.Spring Boot启动WebFlux
- 6.Spring WebFlux注解控制器
- 请求映射
- 处理程序方法
- 方法参数
- 返回值
- 类型转换
- 模型(Model)
- 数据绑定(DataBinder)
- 异常管理
- @ControllerAdvice
- 7.Spring WebFlux函数端点
- HandlerFunction
- RouterFunction
- 运行服务
- 函数过滤器
- 8.Spring Boot中使用函数端点
- 9.Spring Webflux请求处理流程
- 10.Spring WebFlux配置
- 11.Spring WebFlux使用R2DBC访问MySQL
- 12.Spring WebFlux访问Redis
- 13.Spring WebFlux访问MongoDB
- 14.Spring WebFlux集成Thymeleaf
- 15.Spring WebFlux集成FreeMarker
- 16.Spring WebFlux WebClient
- 17.Spring WebFlux WebSocket
- 18.测试
- 19.RSocket