ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## 创建和运行任务 如果无法通过并行流实现并发,则必须创建并运行自己的任务。稍后你将看到运行任务的理想Java 8方法是CompletableFuture,但我们将使用更基本的工具介绍概念。 Java并发的历史始于非常原始和有问题的机制,并且充满了各种尝试的改进。这些主要归入附录:[低级并发(Appendix: Low-Level Concurrency)](./Appendix-Low-Level-Concurrency.md)。在这里,我们将展示一个规范形式,表示创建和运行任务的最简单,最好的方法。与并发中的所有内容一样,存在各种变体,但这些变体要么降级到该附录,要么超出本书的范围。 - Tasks and Executors 在Java的早期版本中,你通过直接创建自己的Thread对象来使用线程,甚至将它们子类化以创建你自己的特定“任务线程”对象。你手动调用了构造函数并自己启动了线程。 创建所有这些线程的开销变得非常重要,现在不鼓励采用手动操作方法。在Java 5中,添加了类来为你处理线程池。你可以将任务创建为单独的类型,然后将其交给ExecutorService以运行该任务,而不是为每种不同类型的任务创建新的Thread子类型。ExecutorService为你管理线程,并且在运行任务后重新循环线程而不是丢弃线程。 首先,我们将创建一个几乎不执行任务的任务。它“sleep”(暂停执行)100毫秒,显示其标识符和正在执行任务的线程的名称,然后完成: ```java // concurrent/NapTask.java import onjava.Nap; public class NapTask implements Runnable { final int id; public NapTask(int id) { this.id = id; } @Override public void run() { new Nap(0.1);// Seconds System.out.println(this + " "+ Thread.currentThread().getName()); } @Override public String toString() { return"NapTask[" + id + "]"; } } ``` 这只是一个**Runnable**:一个包含**run()**方法的类。它没有包含实际运行任务的机制。我们使用**Nap**类中的“sleep”: ```java // onjava/Nap.java package onjava; import java.util.concurrent.*; public class Nap { public Nap(double t) { // Seconds try { TimeUnit.MILLISECONDS.sleep((int)(1000 * t)); } catch(InterruptedException e){ throw new RuntimeException(e); } } public Nap(double t, String msg) { this(t); System.out.println(msg); } } ``` 为了消除异常处理的视觉干扰,这被定义为实用程序。第二个构造函数在超时时显示一条消息 对**TimeUnit.MILLISECONDS.sleep()**的调用获取“当前线程”并在参数中将其置于休眠状态,这意味着该线程被挂起。这并不意味着底层处理器停止。操作系统将其切换到其他任务,例如在你的计算机上运行另一个窗口。OS任务管理器定期检查**sleep()**是否超时。当它执行时,线程被“唤醒”并给予更多处理时间。 你可以看到**sleep()**抛出一个受检的**InterruptedException**;这是原始Java设计中的一个工件,它通过突然断开它们来终止任务。因为它往往会产生不稳定的状态,所以后来不鼓励终止。但是,我们必须在需要或仍然发生终止的情况下捕获异常。 要执行任务,我们将从最简单的方法--SingleThreadExecutor开始: ```java //concurrent/SingleThreadExecutor.java import java.util.concurrent.*; import java.util.stream.*; import onjava.*; public class SingleThreadExecutor { public static void main(String[] args) { ExecutorService exec = Executors.newSingleThreadExecutor(); IntStream.range(0, 10) .mapToObj(NapTask::new) .forEach(exec::execute); System.out.println("All tasks submitted"); exec.shutdown(); while(!exec.isTerminated()) { System.out.println( Thread.currentThread().getName()+ " awaiting termination"); new Nap(0.1); } } } ``` 输出结果: ``` All tasks submitted main awaiting termination main awaiting termination NapTask[0] pool-1-thread-1 main awaiting termination NapTask[1] pool-1-thread-1 main awaiting termination NapTask[2] pool-1-thread-1 main awaiting termination NapTask[3] pool-1-thread-1 main awaiting termination NapTask[4] pool-1-thread-1 main awaiting termination NapTask[5] pool-1-thread-1 main awaiting termination NapTask[6] pool-1-thread-1 main awaiting termination NapTask[7] pool-1-thread-1 main awaiting termination NapTask[8] pool-1-thread-1 main awaiting termination NapTask[9] pool-1-thread-1 ``` 首先请注意,没有**SingleThreadExecutor**类。**newSingleThreadExecutor()**是**Executors**中的工厂,它创建特定类型的[^4] 我创建了十个NapTasks并将它们提交给ExecutorService,这意味着它们开始自己运行。然而,在此期间,main()继续做事。当我运行callexec.shutdown()时,它告诉ExecutorService完成已经提交的任务,但不接受任何新任务。此时,这些任务仍然在运行,因此我们必须等到它们在退出main()之前完成。这是通过检查exec.isTerminated()来实现的,这在所有任务完成后变为true。 请注意,main()中线程的名称是main,并且只有一个其他线程pool-1-thread-1。此外,交错输出显示两个线程确实同时运行。 如果你只是调用exec.shutdown(),程序将完成所有任务。也就是说,不需要**while(!exec.isTerminated())**。 ```java // concurrent/SingleThreadExecutor2.java import java.util.concurrent.*; import java.util.stream.*; public class SingleThreadExecutor2 { public static void main(String[] args)throws InterruptedException { ExecutorService exec =Executors.newSingleThreadExecutor(); IntStream.range(0, 10) .mapToObj(NapTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 输出结果: ``` NapTask[0] pool-1-thread-1 NapTask[1] pool-1-thread-1 NapTask[2] pool-1-thread-1 NapTask[3] pool-1-thread-1 NapTask[4] pool-1-thread-1 NapTask[5] pool-1-thread-1 NapTask[6] pool-1-thread-1 NapTask[7] pool-1-thread-1 NapTask[8] pool-1-thread-1 NapTask[9] pool-1-thread-1 ``` 一旦你callexec.shutdown(),尝试提交新任务将抛出RejectedExecutionException。 ```java // concurrent/MoreTasksAfterShutdown.java import java.util.concurrent.*; public class MoreTasksAfterShutdown { public static void main(String[] args) { ExecutorService exec =Executors.newSingleThreadExecutor(); exec.execute(newNapTask(1)); exec.shutdown(); try { exec.execute(newNapTask(99)); } catch(RejectedExecutionException e) { System.out.println(e); } } } ``` 输出结果: ``` java.util.concurrent.RejectedExecutionException: TaskNapTask[99] rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Shutting down, pool size = 1,active threads = 1, queued tasks = 0, completed tasks =0]NapTask[1] pool-1-thread-1 ``` **exec.shutdown()**的替代方法是**exec.shutdownNow()**,它除了不接受新任务外,还会尝试通过中断任务来停止任何当前正在运行的任务。同样,中断是错误的,容易出错并且不鼓励。 - 使用更多线程 使用线程的重点是(几乎总是)更快地完成任务,那么我们为什么要限制自己使用SingleThreadExecutor呢?查看执行**Executors**的Javadoc,你将看到更多选项。例如CachedThreadPool: ```java // concurrent/CachedThreadPool.java import java.util.concurrent.*; import java.util.stream.*; public class CachedThreadPool { public static void main(String[] args) { ExecutorService exec =Executors.newCachedThreadPool(); IntStream.range(0, 10) .mapToObj(NapTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 输出结果: ``` NapTask[7] pool-1-thread-8 NapTask[4] pool-1-thread-5 NapTask[1] pool-1-thread-2 NapTask[3] pool-1-thread-4 NapTask[0] pool-1-thread-1 NapTask[8] pool-1-thread-9 NapTask[2] pool-1-thread-3 NapTask[9] pool-1-thread-10 NapTask[6] pool-1-thread-7 NapTask[5] pool-1-thread-6 ``` 当你运行这个程序时,你会发现它完成得更快。这是有道理的,每个任务都有自己的线程,所以它们都并行运行,而不是使用相同的线程来顺序运行每个任务。这似乎没毛病,很难理解为什么有人会使用SingleThreadExecutor。 要理解这个问题,我们需要一个更复杂的任务: ```java // concurrent/InterferingTask.java public class InterferingTask implements Runnable { final int id; private static Integer val = 0; public InterferingTask(int id) { this.id = id; } @Override public void run() { for(int i = 0; i < 100; i++) val++; System.out.println(id + " "+ Thread.currentThread().getName() + " " + val); } } ``` 每个任务增加val一百次。这似乎很简单。让我们用CachedThreadPool尝试一下: ```java // concurrent/CachedThreadPool2.java import java.util.concurrent.*; import java.util.stream.*; public class CachedThreadPool2 { public static void main(String[] args) { ExecutorService exec =Executors.newCachedThreadPool(); IntStream.range(0, 10) .mapToObj(InterferingTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 输出结果: ``` 0 pool-1-thread-1 200 1 pool-1-thread-2 200 4 pool-1-thread-5 300 5 pool-1-thread-6 400 8 pool-1-thread-9 500 9 pool-1-thread-10 600 2 pool-1-thread-3 700 7 pool-1-thread-8 800 3 pool-1-thread-4 900 6 pool-1-thread-7 1000 ``` 输出不是我们所期望的,并且从一次运行到下一次运行会有所不同。问题是所有的任务都试图写入val的单个实例,并且他们正在踩着彼此的脚趾。我们称这样的类是线程不安全的。让我们看看SingleThreadExecutor会发生什么: ```java // concurrent/SingleThreadExecutor3.java import java.util.concurrent.*; import java.util.stream.*; public class SingleThreadExecutor3 { public static void main(String[] args)throws InterruptedException { ExecutorService exec =Executors.newSingleThreadExecutor(); IntStream.range(0, 10) .mapToObj(InterferingTask::new) .forEach(exec::execute); exec.shutdown(); } } ``` 输出结果: ``` 0 pool-1-thread-1 100 1 pool-1-thread-1 200 2 pool-1-thread-1 300 3 pool-1-thread-1 400 4 pool-1-thread-1 500 5 pool-1-thread-1 600 6 pool-1-thread-1 700 7 pool-1-thread-1 800 8 pool-1-thread-1 900 9 pool-1-thread-1 1000 ``` 现在我们每次都得到一致的结果,尽管**InterferingTask**缺乏线程安全性。这是SingleThreadExecutor的主要好处 - 因为它一次运行一个任务,这些任务不会相互干扰,因此强加了线程安全性。这种现象称为线程封闭,因为在单线程上运行任务限制了它们的影响。线程封闭限制了加速,但可以节省很多困难的调试和重写。 - 产生结果 因为**InterferingTask**是一个**Runnable**,它没有返回值,因此只能使用副作用产生结果 - 操纵缓冲值而不是返回结果。副作用是并发编程中的主要问题之一,因为我们看到了**CachedThreadPool2.java**。**InterferingTask**中的**val**被称为可变共享状态,这就是问题所在:多个任务同时修改同一个变量会产生竞争。结果取决于首先在终点线上执行哪个任务,并修改变量(以及其他可能性的各种变化)。 避免竞争条件的最好方法是避免可变的共享状态。我们可以称之为自私的孩子原则:什么都不分享。 使用**InterferingTask**,最好删除副作用并返回任务结果。为此,我们创建**Callable**而不是**Runnable**: ```java // concurrent/CountingTask.java import java.util.concurrent.*; public class CountingTask implements Callable<Integer> { final int id; public CountingTask(int id) { this.id = id; } @Override public Integer call() { Integer val = 0; for(int i = 0; i < 100; i++) val++; System.out.println(id + " " + Thread.currentThread().getName() + " " + val); return val; } } ``` **call()完全独立于所有其他CountingTasks生成其结果**,这意味着没有可变的共享状态 **ExecutorService**允许你使用**invokeAll()**启动集合中的每个Callable: ```java // concurrent/CachedThreadPool3.java import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class CachedThreadPool3 { public static Integer extractResult(Future<Integer> f) { try { return f.get(); } catch(Exception e) { throw new RuntimeException(e); } } public static void main(String[] args)throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); List<CountingTask> tasks = IntStream.range(0, 10) .mapToObj(CountingTask::new) .collect(Collectors.toList()); List<Future<Integer>> futures = exec.invokeAll(tasks); Integer sum = futures.stream() .map(CachedThreadPool3::extractResult) .reduce(0, Integer::sum); System.out.println("sum = " + sum); exec.shutdown(); } } ``` 输出结果: ``` 1 pool-1-thread-2 100 0 pool-1-thread-1 100 4 pool-1-thread-5 100 5 pool-1-thread-6 100 8 pool-1-thread-9 100 9 pool-1-thread-10 100 2 pool-1-thread-3 100 3 pool-1-thread-4 100 6 pool-1-thread-7 100 7 pool-1-thread-8 100 sum = 1000 ``` 只有在所有任务完成后,**invokeAll()**才会返回一个**Future**列表,每个任务一个**Future**。**Future**是Java 5中引入的机制,允许你提交任务而无需等待它完成。在这里,我们使用**ExecutorService.submit()**: ```java // concurrent/Futures.java import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class Futures { public static void main(String[] args)throws InterruptedException, ExecutionException { ExecutorService exec =Executors.newSingleThreadExecutor(); Future<Integer> f = exec.submit(newCountingTask(99)); System.out.println(f.get()); // [1] exec.shutdown(); } } ``` 输出结果: ``` 99 pool-1-thread-1 100 100 ``` - [1] 当你的任务在尚未完成的**Future**上调用**get()**时,调用会阻塞(等待)直到结果可用。 但这意味着,在**CachedThreadPool3.java**中,**Future**似乎是多余的,因为**invokeAll()**甚至在所有任务完成之前都不会返回。但是,这里的Future并不用于延迟结果,而是用于捕获任何可能发生的异常。 还要注意在**CachedThreadPool3.java.get()**中抛出异常,因此**extractResult()**在Stream中执行此提取。 因为当你调用**get()**时,**Future**会阻塞,所以它只能解决等待任务完成才暴露问题。最终,**Futures**被认为是一种无效的解决方案,现在不鼓励,我们推荐Java 8的**CompletableFuture**,这将在本章后面探讨。当然,你仍会在遗留库中遇到Futures。 我们可以使用并行Stream以更简单,更优雅的方式解决这个问题: ```java // concurrent/CountingStream.java // {VisuallyInspectOutput} import java.util.*; import java.util.concurrent.*; import java.util.stream.*; public class CountingStream { public static void main(String[] args) { System.out.println( IntStream.range(0, 10) .parallel() .mapToObj(CountingTask::new) .map(ct -> ct.call()) .reduce(0, Integer::sum)); } } ``` 输出结果: ``` 1 ForkJoinPool.commonPool-worker-3 100 8 ForkJoinPool.commonPool-worker-2 100 0 ForkJoinPool.commonPool-worker-6 100 2 ForkJoinPool.commonPool-worker-1 100 4 ForkJoinPool.commonPool-worker-5 100 9 ForkJoinPool.commonPool-worker-7 100 6 main 100 7 ForkJoinPool.commonPool-worker-4 100 5 ForkJoinPool.commonPool-worker-2 100 3 ForkJoinPool.commonPool-worker-3 100 1000 ``` 这不仅更容易理解,而且我们需要做的就是将 `parallel()` 插入到其他顺序操作中,然后一切都在同时运行。 - Lambda和方法引用作为任务 在 **java8** 有了 **lambdas** 和方法引用,你不需要受限于只能使用 **Runnable** 和 **Callable** 。因为 java8 的**lambdas** 和方法引用可以通过匹配方法签名来使用(即,它支持结构一致性),所以我们可以将非 **Runnable** 或 **Callable** 的参数传递给 `ExecutorService` : ```java // concurrent/LambdasAndMethodReferences.java import java.util.concurrent.*; class NotRunnable { public void go() { System.out.println("NotRunnable"); } } class NotCallable { public Integer get() { System.out.println("NotCallable"); return 1; } } public class LambdasAndMethodReferences { public static void main(String[] args)throws InterruptedException { ExecutorService exec = Executors.newCachedThreadPool(); exec.submit(() -> System.out.println("Lambda1")); exec.submit(new NotRunnable()::go); exec.submit(() -> { System.out.println("Lambda2"); return 1; }); exec.submit(new NotCallable()::get); exec.shutdown(); } } ``` 输出结果: ``` Lambda1 NotCallable NotRunnable Lambda2 ``` 这里,前两个**submit()**调用可以改为调用**execute()**。所有**submit()**调用都返回**Futures**,你可以在后两次调用的情况下提取结果。