企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
### 基本用法 这是一个带有静态方法**work()**的类,它对该类的对象执行某些工作: ```java // concurrent/Machina.java import onjava.Nap; public class Machina { public enum State { START, ONE, TWO, THREE, END; State step() { if(equals(END)) return END; return values()[ordinal() + 1]; } } private State state = State.START; private final int id; public Machina(int id) { this.id = id; } public static Machina work(Machina m) { if(!m.state.equals(State.END)){ new Nap(0.1); m.state = m.state.step(); } System.out.println(m); return m; } @Override public String toString() { return"Machina" + id + ": " + (state.equals(State.END)? "complete" : state); } } ``` 这是一个有限状态机,一个微不足道的机器,因为它没有分支......它只是从头到尾遍历一条路径。**work()**方法将机器从一个状态移动到下一个状态,并且需要100毫秒才能完成“工作”。 **CompletableFuture**可以被用来做的一件事是, 使用**completedFuture()**将它感兴趣的对象进行包装。 ```java // concurrent/CompletedMachina.java import java.util.concurrent.*; public class CompletedMachina { public static void main(String[] args) { CompletableFuture<Machina> cf = CompletableFuture.completedFuture( new Machina(0)); try { Machina m = cf.get(); // Doesn't block } catch(InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } } ``` **completedFuture()**创建一个“已经完成”的**CompletableFuture**。对这样一个未来做的唯一有用的事情是**get()**里面的对象,所以这看起来似乎没有用。注意**CompletableFuture**被输入到它包含的对象。这个很重要。 通常,**get()**在等待结果时阻塞调用线程。此块可以通过**InterruptedException**或**ExecutionException**中断。在这种情况下,阻止永远不会发生,因为CompletableFutureis已经完成,所以答案立即可用。 当我们将**handle()**包装在**CompletableFuture**中时,发现我们可以在**CompletableFuture**上添加操作来处理所包含的对象,使得事情变得更加有趣: ```java // concurrent/CompletableApply.java import java.util.concurrent.*; public class CompletableApply { public static void main(String[] args) { CompletableFuture<Machina> cf = CompletableFuture.completedFuture( new Machina(0)); CompletableFuture<Machina> cf2 = cf.thenApply(Machina::work); CompletableFuture<Machina> cf3 = cf2.thenApply(Machina::work); CompletableFuture<Machina> cf4 = cf3.thenApply(Machina::work); CompletableFuture<Machina> cf5 = cf4.thenApply(Machina::work); } } ``` **输出结果**: ``` Machina0: ONE Machina0: TWO Machina0: THREE Machina0: complete ``` `thenApply()` 应用一个接收输入并产生输出的函数。在本例中,`work()` 函数产生的类型与它所接收的类型相同 (`Machina`),因此每个 `CompletableFuture`添加的操作的返回类型都为 `Machina`,但是(类似于流中的 `map()` )函数也可以返回不同的类型,这将体现在返回类型上。 你可以在此处看到有关**CompletableFutures**的重要信息:它们会在你执行操作时自动解包并重新包装它们所携带的对象。这使得编写和理解代码变得更加简单, 而不会在陷入在麻烦的细节中。 我们可以消除中间变量并将操作链接在一起,就像我们使用Streams一样: ```java // concurrent/CompletableApplyChained.javaimport java.util.concurrent.*; import onjava.Timer; public class CompletableApplyChained { public static void main(String[] args) { Timer timer = new Timer(); CompletableFuture<Machina> cf = CompletableFuture.completedFuture( new Machina(0)) .thenApply(Machina::work) .thenApply(Machina::work) .thenApply(Machina::work) .thenApply(Machina::work); System.out.println(timer.duration()); } } ``` 输出结果: ``` Machina0: ONE Machina0: TWO Machina0: THREE Machina0: complete 514 ``` 这里我们还添加了一个 `Timer`,它的功能在每一步都显性地增加 100ms 等待时间之外,还将 `CompletableFuture` 内部 `thenApply` 带来的额外开销给体现出来了。 **CompletableFutures** 的一个重要好处是它们鼓励使用私有子类原则(不共享任何东西)。默认情况下,使用 **thenApply()** 来应用一个不对外通信的函数 - 它只需要一个参数并返回一个结果。这是函数式编程的基础,并且它在并发特性方面非常有效[^5]。并行流和 `ComplempleFutures` 旨在支持这些原则。只要你不决定共享数据(共享非常容易导致意外发生)你就可以编写出相对安全的并发程序。 回调 `thenApply()` 一旦开始一个操作,在完成所有任务之前,不会完成 **CompletableFuture** 的构建。虽然这有时很有用,但是开始所有任务通常更有价值,这样就可以运行继续前进并执行其他操作。我们可通过`thenApplyAsync()` 来实现此目的: ```java // concurrent/CompletableApplyAsync.java import java.util.concurrent.*; import onjava.*; public class CompletableApplyAsync { public static void main(String[] args) { Timer timer = new Timer(); CompletableFuture<Machina> cf = CompletableFuture.completedFuture( new Machina(0)) .thenApplyAsync(Machina::work) .thenApplyAsync(Machina::work) .thenApplyAsync(Machina::work) .thenApplyAsync(Machina::work); System.out.println(timer.duration()); System.out.println(cf.join()); System.out.println(timer.duration()); } } ``` 输出结果: ``` 116 Machina0: ONE Machina0: TWO Machina0:THREE Machina0: complete Machina0: complete 552 ``` 同步调用(我们通常使用的那种)意味着:“当你完成工作时,才返回”,而异步调用以意味着: “立刻返回并继续后续工作”。 正如你所看到的,`cf` 的创建现在发生的更快。每次调用 `thenApplyAsync()` 都会立刻返回,因此可以进行下一次调用,整个调用链路完成速度比以前快得多。 事实上,如果没有回调 `cf.join()` 方法,程序会在完成其工作之前退出。而 `cf.join()` 直到cf操作完成之前,阻止 `main()` 进程结束。我们还可以看出本示例大部分时间消耗在 `cf.join()` 这。 这种“立即返回”的异步能力需要 `CompletableFuture` 库进行一些秘密(`client` 无感)工作。特别是,它将你需要的操作链存储为一组回调。当操作的第一个链路(后台操作)完成并返回时,第二个链路(后台操作)必须获取生成的 `Machina` 并开始工作,以此类推! 但这种异步机制没有我们可以通过程序调用栈控制的普通函数调用序列,它的调用链路顺序会丢失,因此它使用一个函数地址来存储的回调来解决这个问题。 幸运的是,这就是你需要了解的有关回调的全部信息。程序员将这种人为制造的混乱称为 callback hell(回调地狱)。通过异步调用,`CompletableFuture` 帮你管理所有回调。 除非你知道系统的一些具体的变化,否则你更想使用异步调用来实现程序。 - 其他操作 当你查看`CompletableFuture`的 `Javadoc` 时,你会看到它有很多方法,但这个方法的大部分来自不同操作的变体。例如,有 `thenApply()`,`thenApplyAsync()` 和第二种形式的 `thenApplyAsync()`,它们使用 `Executor` 来运行任务(在本书中,我们忽略了 `Executor` 选项)。 下面的示例展示了所有"基本"操作,这些操作既不涉及组合两个 `CompletableFuture`,也不涉及异常(我们将在后面介绍)。首先,为了提供简洁性和方便性,我们应该重用以下两个实用程序: ```java package onjava; import java.util.concurrent.*; public class CompletableUtilities { // Get and show value stored in a CF: public static void showr(CompletableFuture<?> c) { try { System.out.println(c.get()); } catch(InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } // For CF operations that have no value: public static void voidr(CompletableFuture<Void> c) { try { c.get(); // Returns void } catch(InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } } ``` `showr()` 在 `CompletableFuture<Integer>` 上调用 `get()`,并显示结果,`try/catch` 两个可能会出现的异常。 `voidr()` 是 `CompletableFuture<Void>` 的 `showr()` 版本,也就是说,`CompletableFutures` 只为任务完成或失败时显示信息。 为简单起见,下面的 `CompletableFutures` 只包装整数。`cfi()` 是一个便利的方法,它把一个整数包装在一个完整的 `CompletableFuture<Integer>` : ```java // concurrent/CompletableOperations.java import java.util.concurrent.*; import static onjava.CompletableUtilities.*; public class CompletableOperations { static CompletableFuture<Integer> cfi(int i) { return CompletableFuture.completedFuture( Integer.valueOf(i)); } public static void main(String[] args) { showr(cfi(1)); // Basic test voidr(cfi(2).runAsync(() -> System.out.println("runAsync"))); voidr(cfi(3).thenRunAsync(() -> System.out.println("thenRunAsync"))); voidr(CompletableFuture.runAsync(() -> System.out.println("runAsync is static"))); showr(CompletableFuture.supplyAsync(() -> 99)); voidr(cfi(4).thenAcceptAsync(i -> System.out.println("thenAcceptAsync: " + i))); showr(cfi(5).thenApplyAsync(i -> i + 42)); showr(cfi(6).thenComposeAsync(i -> cfi(i + 99))); CompletableFuture<Integer> c = cfi(7); c.obtrudeValue(111); showr(c); showr(cfi(8).toCompletableFuture()); c = new CompletableFuture<>(); c.complete(9); showr(c); c = new CompletableFuture<>(); c.cancel(true); System.out.println("cancelled: " + c.isCancelled()); System.out.println("completed exceptionally: " + c.isCompletedExceptionally()); System.out.println("done: " + c.isDone()); System.out.println(c); c = new CompletableFuture<>(); System.out.println(c.getNow(777)); c = new CompletableFuture<>(); c.thenApplyAsync(i -> i + 42) .thenApplyAsync(i -> i * 12); System.out.println("dependents: " + c.getNumberOfDependents()); c.thenApplyAsync(i -> i / 2); System.out.println("dependents: " + c.getNumberOfDependents()); } } ``` **输出结果** : ``` 1 runAsync thenRunAsync runAsync is static 99 thenAcceptAsync: 4 47 105 111 8 9 cancelled: true completed exceptionally: true done: true java.util.concurrent.CompletableFuture@6d311334[Complet ed exceptionally] 777 dependents: 1 dependents: 2 ``` - `main()` 包含一系列可由其 `int` 值引用的测试。 - `cfi(1)` 演示了 `showr()` 正常工作。 - `cfi(2)` 是调用 `runAsync()` 的示例。由于 `Runnable` 不产生返回值,因此使用了返回 `CompletableFuture <Void>` 的`voidr()` 方法。 - 注意使用 `cfi(3)`,`thenRunAsync()` 效果似乎与 上例 `cfi(2)` 使用的 `runAsync()`相同,差异在后续的测试中体现: - `runAsync()` 是一个 `static` 方法,所以你通常不会像`cfi(2)`一样调用它。相反你可以在 `QuittingCompletable.java` 中使用它。 - 后续测试中表明 `supplyAsync()` 也是静态方法,区别在于它需要一个 `Supplier` 而不是`Runnable`, 并产生一个`CompletableFuture<Integer>` 而不是 `CompletableFuture<Void>`。 - `then` 系列方法将对现有的 `CompletableFuture<Integer>` 进一步操作。 - 与 `thenRunAsync()` 不同,`cfi(4)`,`cfi(5)` 和`cfi(6)` "then" 方法的参数是未包装的 `Integer`。 - 通过使用 `voidr()`方法可以看到: - `AcceptAsync()`接收了一个 `Consumer`,因此不会产生结果。 - `thenApplyAsync()` 接收一个`Function`, 并生成一个结果(该结果的类型可以不同于其输入类型)。 - `thenComposeAsync()` 与 `thenApplyAsync()`非常相似,唯一区别在于其 `Function` 必须产生已经包装在`CompletableFuture`中的结果。 - `cfi(7)` 示例演示了 `obtrudeValue()`,它强制将值作为结果。 - `cfi(8)` 使用 `toCompletableFuture()` 从 `CompletionStage` 生成一个`CompletableFuture`。 - `c.complete(9)` 显示了如何通过给它一个结果来完成一个`task`(`future`)(与 `obtrudeValue()` 相对,后者可能会迫使其结果替换该结果)。 - 如果你调用 `CompletableFuture`中的 `cancel()`方法,如果已经完成此任务,则正常结束。 如果尚未完成,则使用 `CancellationException` 完成此 `CompletableFuture`。 - 如果任务(`future`)完成,则**getNow()**方法返回`CompletableFuture`的完成值,否则返回`getNow()`的替换参数。 - 最后,我们看一下依赖(`dependents`)的概念。如果我们将两个`thenApplyAsync()`调用链路到`CompletableFuture`上,则依赖项的数量不会增加,保持为1。但是,如果我们另外将另一个`thenApplyAsync()`直接附加到`c`,则现在有两个依赖项:两个一起的链路和另一个单独附加的链路。 - 这表明你可以使用一个`CompletionStage`,当它完成时,可以根据其结果派生多个新任务。