[TOC]
## WebSocket介绍
`WebSocket`协议,[RFC 6455](https://tools.ietf.org/html/rfc6455),提供了一种标准化的方法,通过单个`TCP`连接在客户端和服务器之间建立全双工、双向的通信通道。它是一种不同于`HTTP`的`TCP`协议,但被设计在`HTTP`上工作,使用端口`80`和`443`,并允许重用现有的防火墙规则。
`WebSocket`交互从一个`HTTP`请求开始,该请求使用`HTTP Upgrade`头进行升级,或者在本例中,切换到`WebSocket`协议。下面的例子展示了这样的交互:
~~~
GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
//Upgrade请求头
Upgrade: websocket
//通过Upgrade连接
Connection: Upgrade
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
~~~
而不是通常的`200`状态码,一个支持`WebSocket`的服务器返回类似如下的输出:
~~~
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
~~~
成功握手之后,`HTTP`升级请求底层的`TCP`套接字将保持打开状态,以便客户机和服务器继续发送和接收消息。
注意,如果`WebSocket`服务器运行在`web`服务器(例如`nginx`)后面,你可能需要配置它来将`WebSocket`升级请求传递给`WebSocket`服务器。同样,如果应用程序运行在云环境中,请检查云提供商有关`WebSocket`支持的说明。
## HTTP与WebSocket
尽管`WebSocket`被设计成与`HTTP`兼容,并且从`HTTP`请求开始,但重要的是要理解这两种协议导致非常不同的体系结构和应用程序编程模型。
在`HTTP`和`REST`中,应用程序被建模为多个`url`。为了与应用程序交互,客户端以请求-响应的方式访问这些`url`。服务器根据`HTTP URL`、方法和头将请求路由到适当的处理程序。
相反,在`WebSockets`中,通常只有一个`URL`用于初始连接。随后,所有应用程序消息在同一`TCP`连接上流动。这指向一个完全不同的异步、事件驱动的消息传递体系结构。
`WebSocket`也是一种低级传输协议,与`HTTP`不同,它没有规定消息内容的任何语义。这意味着,除非客户端和服务器在消息语义上达成一致,否则无法路由或处理消息。
`WebSocket`客户端和服务器可以通过`HTTP`握手请求的`Sec-WebSocket-Protocol`报头来协商使用更高级别的消息传递协议(例如`STOMP`)。
## 何时使用WebSockets
`WebSockets`可以使网页具有动态和交互性。然而,在许多情况下,`Ajax`和`HTTP`流或长轮询的组合可以提供一个简单而有效的解决方案。例如,新闻、邮件和社交源需要动态更新,但每隔几分钟更新一次可能完全没有问题。另一方面,协作、游戏和金融应用需要更接近实时。
延迟本身并不是一个决定性因素。如果消息量相对较低(例如,监视网络故障),`HTTP`流或轮询可以提供有效的解决方案。低延迟、高频率和高容量的结合是`WebSocket`的最佳使用情况。
在`Internet`上,你控制范围之外的限制性代理可能会阻止`WebSocket`交互,要么因为它们没有配置为传递`Upgrade`头,要么因为它们关闭了看起来空闲的长时间连接。这意味着将`WebSocket`用于防火墙内的内部应用程序比用于面向公共的应用程序要简单得多。
## WebSocket使用
`Spring`框架提供了一个`WebSocket API`,你可以使用它来编写处理`WebSocket`消息的客户端和服务器端应用程序。
### 服务端
要创建`WebSocket`服务器,你可以先创建一个`WebSocketHandler`。下面的例子展示了如何做到这一点:
~~~
public class MyWebSocketHandler implements WebSocketHandler {
private Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1L), Duration.ofSeconds(1L));
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.send(intervalFlux.map(item -> session.textMessage(item + "")))
.and(session.receive().doOnNext(msg -> {
String msgText = msg.getPayloadAsText();
System.out.println("收到客户端消息:" + msgText);
}).then());
}
}
~~~
### 客户端
`Spring WebFlux`提供了一个`WebSocketClient`抽象,实现了`Reactor Netty`、`Tomcat`、`Jetty`、`Undertow`和标准`Java`(即`JSR-356`)。
要启动`WebSocket`会话,你可以创建一个客户端的实例,并使用它的`execute`方法:
~~~
WebSocketClient client = new ReactorNettyWebSocketClient();
URI url = new URI("ws://localhost:8080/path");
client.execute(url,
session -> session.send(Mono.just(session.textMessage("hello world")))
.thenMany(session.receive().map(WebSocketMessage::getPayloadAsText).log())
.then())
.block();
~~~
服务端日志:
![](https://img.kancloud.cn/5d/61/5d610f611e4a9014053541b233fdfce3_968x268.png)
客户端日志:
![](https://img.kancloud.cn/b4/42/b4428b7a486d3d24f22964b1f89c0407_1198x448.png)
有些客户端,比如`Jetty`,实现了`Lifecycle`,需要在使用它们之前停止和启动。所有客户端都有与底层`WebSocket`客户端配置相关的构造函数选项。
### `WebSocketHandler`
`WebSocketHandler`的`handle`方法接受`WebSocketSession`并返回`Mono<Void>`来指示应用程序对会话的处理何时完成。会话通过两个流处理,一个用于输入消息,另一个用于输出消息。下表描述了处理流的两种方法:
| WebSocketSession方法 | 说明 |
| --- | --- |
| `Flux<WebSocketMessage> receive()` | 提供对输入消息流的访问,并在连接关闭时完成。 |
|`Mono<Void> send(Publisher<WebSocketMessage>)` | 获取输出消息的源,写入消息,并返回一个`Mono<Void>`,该`Mono<Void>`在源完成且写入完成时完成。|
`WebSocketHandler`必须将输入和输出流组合成一个统一的流,并返回一个`Mono<Void>`,以反映该流的完成。根据应用需求,统一流程在以下情况下完成:
* 输入或输出消息流完成。
* 输入流完成(即连接关闭),而输出流是无限的。
* 在选定的点,通过`WebSocketSession`的`close`方法。
当输入和输出消息流组合在一起时,不需要检查连接是否打开,因为`Reactive streams`会发出结束活动的信号。输出流接收到完成或错误信号,而输出流接收到取消信号。
处理程序的最基本实现是处理输入流。下面的例子展示了这样一个实现:
~~~
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
return session.receive()//访问输入消息流
.doOnNext(message -> {
// ...处理每条消息。
})
.concatMap(message -> {
// 执行使用消息内容的嵌套异步操作。 当接收完成时,返回一个Mono<Void>。
})
.then();//当接收完成时,返回一个Mono<Void>。
}
}
~~~
> 对于嵌套的异步操作,你可能需要在使用数据池缓冲区的底层服务器上调用`message.retain()(`例如`Netty`)。否则,数据缓冲区可能会在你有机会读取数据之前被释放。
下面的实现组合了输入和输出流:
~~~
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Flux<WebSocketMessage> output = session.receive()//处理输入消息流
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
// 创建输出消息,生成组合流。
.map(value -> session.textMessage("Echo " + value));
//返回一个Mono<Void>,当继续接收时它不会完成。
return session.send(output);
}
}
~~~
输入流和输出流可以是独立的,并且只能在完成时进行连接,如下面的示例所示:
~~~
class ExampleHandler implements WebSocketHandler {
@Override
public Mono<Void> handle(WebSocketSession session) {
Mono<Void> input = session.receive()//处理输入消息
.doOnNext(message -> {
// ...
})
.concatMap(message -> {
// ...
})
.then();
Flux<String> source = ... ;
//发送消息
Mono<Void> output = session.send(source.map(session::textMessage));
//加入两个流并返回一个Mono<Void>,该函数在任意一个流结束时结束。
return Mono.zip(input, output).then();
}
}
~~~
### 握手
`WebSocketHandlerAdapter`委托给`WebSocketService`。默认情况下,它是一个`HandshakeWebSocketService`的实例,它对`WebSocket`请求执行基本的检查,然后对正在使用的服务器使用`RequestUpgradeStrategy`。目前,它内置了对`Reactor Netty`、`Tomcat`、`Jetty`和`Undertow`的支持。
`HandshakeWebSocketService`公开了一个`sessionAttributePredicate`属性,该属性允许设置`Predicate`来从`WebSession`中提取属性,并将它们插入到`WebSocketSession`的属性中。
~~~
@Bean
public WebSocketService webSocketService() {
ReactorNettyRequestUpgradeStrategy strategy=new ReactorNettyRequestUpgradeStrategy();
HandshakeWebSocketService handshakeWebSocketService = new HandshakeWebSocketService(strategy);
handshakeWebSocketService.setSessionAttributePredicate(Predicates.isTrue());
return handshakeWebSocketService;
}
~~~
### 服务端配置
每个服务器的`RequestUpgradeStrategy`公开了特定于底层`WebSocket`服务器引擎的配置。
~~~
@Configuration
class WebConfig {
@Bean
public WebSocketService webSocketService() {
TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
strategy.setMaxSessionIdleTimeout(0L);
return new HandshakeWebSocketService(strategy);
}
}
~~~
检查服务器的`upgrade`策略,看看有哪些可用选项。目前,只有`Tomcat`和`Jetty`公开了这些选项。
### 跨域
配置`CORS`和限制对`WebSocket`端点访问的最简单的方法是让你的`WebSocketHandler`实现`CorsConfigurationSource`并返回一个`CorsConfiguration`,包含允许的源、头和其他细节。如果你不能这样做,你还可以在`SimpleUrlHandler`上设置`corsConfigurations`属性,以通过`URL`模式指定`CORS`设置。如果两者都指定了,它们将通过使用`CorsConfiguration`上的`combine`方法进行组合。
~~~
public class MyWebSocketHandler implements WebSocketHandler, CorsConfigurationSource {
@Override
public CorsConfiguration getCorsConfiguration(ServerWebExchange exchange) {
return new CorsConfiguration();
}
}
~~~
- 1.反应式编程概述
- 2.Reactor框架
- Flux
- Mono
- 订阅(Subscribe)
- 编程创建序列
- 线程和调度器
- 错误处理
- 3.Spring WebFlux概述
- 4.Spring WebFlux核心组件
- HttpHandler
- WebHandler
- ServerWebExchange
- 编码和解码器
- JSON
- Form Data
- Multipart Data
- 过滤器
- 异常处理器
- DispatcherHandler
- 5.Spring Boot启动WebFlux
- 6.Spring WebFlux注解控制器
- 请求映射
- 处理程序方法
- 方法参数
- 返回值
- 类型转换
- 模型(Model)
- 数据绑定(DataBinder)
- 异常管理
- @ControllerAdvice
- 7.Spring WebFlux函数端点
- HandlerFunction
- RouterFunction
- 运行服务
- 函数过滤器
- 8.Spring Boot中使用函数端点
- 9.Spring Webflux请求处理流程
- 10.Spring WebFlux配置
- 11.Spring WebFlux使用R2DBC访问MySQL
- 12.Spring WebFlux访问Redis
- 13.Spring WebFlux访问MongoDB
- 14.Spring WebFlux集成Thymeleaf
- 15.Spring WebFlux集成FreeMarker
- 16.Spring WebFlux WebClient
- 17.Spring WebFlux WebSocket
- 18.测试
- 19.RSocket