ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
[TOC] ### `Future`特点(trait) `Future`特点的定义如下: ``` trait Future { /// The type of the value returned when the future completes. type Item; /// The type representing errors that occurred while processing the computation. type Error; /// The function that will be repeatedly called to see if the future is /// has completed or not fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error>; } ``` #### Item和Error `Item`是`Future`在完成时将产生的值的类型,错误是如果在导致`Future`能够完成之前出现错误,`Future`可能会产生的错误类型。 #### Poll方法 `poll`是在`tokio`运行时调用的,以便查看Future是否已完成。 如果你很好奇:Async是一个带有值的枚举,Ready(Item)或者NotReady告诉`tokio`运行时`Future`是否完成。 ***** ### 流(Streams) 流是同类`Future`的迭代器。流不会在未来的某个时间点产生值,而是在将来的某个时刻产生一组值。换句话说,像`Future`一样,流在未来的某一点上不会产生一个值。他们随着时间的推移继续产生值。 Streams在实现过程中非常类似于future: ``` trait Stream { /// The type of the value yielded by the stream. type Item; /// The type representing errors that occurred while processing the computation. type Error; /// The function that will be repeatedly called to see if the stream has /// another value it can yield fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>; } ``` ### Futures是用来管理异步逻辑的基本构件 #### EchoServer的实现 #### 创建项目echo_server 生成一个新的项目 ``` $ cargo new echo_server $ cd echo_server ``` 在`Cargo.toml`中添加依赖项: ``` [dependencies] tokio = "0.1" futures = "0.1" ``` 文件src/main.rs: ``` extern crate tokio; extern crate futures; use tokio::io; use tokio::net::TcpListener; use tokio::prelude::*; ``` #### 服务器必要结构: - 绑定`TcpListener`到本地端口。 - 定义接受入站连接并处理它们的任务。 - 生成服务器任务。 - 启动TokioRuntime #### 编写一个什么都不做的服务器 ``` let addr = "127.0.0.1:7878".parse().unwrap(); let listener = TcpListener::bind(&addr).unwrap(); let _server = listener.incoming().for_each(|socket| { Ok(()) }).map_err(|err|{ println!("accept error:{:?}", err) }); tokio::run(_server); ``` `TcpListener`是监听器,`incoming`将监听器转换为入站客户端连接流,用`for_each`将产生每个入站客户端连接。 目前我们没有对此入站连接做任何事情. 一旦我们拥有了我们的服务器,我们就可以将它交给`tokio::run`。到目前为止,我们的服务器功能一无所获。由Tokio运行时驱动我们的`Future`完成。 ` ` 注意:我们必须在服务器上调用`map_err`,因为`tokio :: run`需要一个`Item`为type()和`Error`为type()的`Future`。 这是为了确保在将`Future`交付给运行时之前处理所有值和错误。 ` ` #### 处理连接 将从套接字读取的所有数据复制回套接字本身,使用`io::copy`函数,他需要两个参数,但是我们只有一个参数socket,但是socket有一个函数`split`,它将可读和可写的流分成两半。 然后,`copy`函数返回一个`Future`,当复制操作完成时,将接收此`Future`,解析为复制的数据量: ``` let server = listener.incoming().for_each(|socket| { let (reader, writer) = socket.split(); let amount = io::copy(reader, writer); let msg = amount.then(|result| { match result { Ok((amount, _, _)) => println!("wrote {} bytes", amount), Err(e) => println!("error: {}", e), } Ok(()) }); tokio::spawn(msg); Ok(()) }) ``` #### tokio::spawn [tokio::spawn](https://docs.rs/tokio-executor/0.1/tokio_executor/fn.spawn.html)的调用是关键所在,至关重要的是我们希望所有`clients`同时取得进展,而不是在完成另一个`client`时阻止其中一个。 为此,使用`tokio :: spawn`函数在后台执行工作。 如果我们没有这样做,那么`for_each`中块的每次调用都会在一次解决,这意味着我们永远不会同时处理两个客户端连接! ***** ###