ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
<!-- Parallel Streams --> ## 并行流 Java 8流的一个显著优点是,在某些情况下,它们可以很容易地并行化。这来自仔细的库设计,特别是流使用内部迭代的方式 - 也就是说,它们控制着自己的迭代器。特别是,他们使用一种特殊的迭代器,称为Spliterator,它被限制为易于自动分割。我们只需要念 `.parallel()` 就会产生魔法般的结果,流中的所有内容都作为一组并行任务运行。如果你的代码是使用Streams编写的,那么并行化以提高速度似乎是一种琐事 例如,考虑来自Streams的Prime.java。查找质数可能是一个耗时的过程,我们可以看到该程序的计时: ```java // concurrent/ParallelPrime.java import java.util.*; import java.util.stream.*; import static java.util.stream.LongStream.*; import java.io.*; import java.nio.file.*; import onjava.Timer; public class ParallelPrime { static final int COUNT = 100_000; public static boolean isPrime(long n){ return rangeClosed(2, (long)Math.sqrt(n)).noneMatch(i -> n % i == 0); } public static void main(String[] args) throws IOException { Timer timer = new Timer(); List<String> primes = iterate(2, i -> i + 1) .parallel() // [1] .filter(ParallelPrime::isPrime) .limit(COUNT) .mapToObj(Long::toString) .collect(Collectors.toList()); System.out.println(timer.duration()); Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE); } } ``` 输出结果: ``` Output: 1224 ``` 请注意,这不是微基准测试,因为我们计时整个程序。我们将数据保存在磁盘上以防止编译器过激的优化;如果我们没有对结果做任何事情,那么一个高级的编译器可能会观察到程序没有意义并且终止了计算(这不太可能,但并非不可能)。请注意使用nio2库编写文件的简单性(在[文件](./17-Files.md)一章中有描述)。 当我注释掉[1] parallel()行时,我的结果用时大约是parallel()的三倍。 并行流似乎是一个甜蜜的交易。你所需要做的就是将编程问题转换为流,然后插入parallel()以加快速度。实际上,有时候这很容易。但遗憾的是,有许多陷阱。 - parallel()不是灵丹妙药 作为对流和并行流的不确定性的探索,让我们看一个看似简单的问题:对增长的数字序列进行求和。事实证明有大量的方式去实现它,并且我将冒险用计时器将它们进行比较 - 我会尽量小心,但我承认我可能会在计时代码执行时遇到许多基本陷阱之一。结果可能有一些缺陷(例如JVM没有“热身”),但我认为它仍然提供了一些有用的指示。 我将从一个计时方法**timeTest()**开始,它采用**LongSupplier**,测量**getAsLong()**调用的长度,将结果与**checkValue**进行比较并显示结果。 请注意,一切都必须严格使用**long**;我花了一些时间发现隐蔽的溢出,然后才意识到在重要的地方错过了**long**。 所有关于时间和内存的数字和讨论都是指“我的机器”。你的经历可能会有所不同。 ```java // concurrent/Summing.java import java.util.stream.*; import java.util.function.*; import onjava.Timer; public class Summing { static void timeTest(String id, long checkValue, LongSupplier operation){ System.out.print(id + ": "); Timer timer = new Timer(); long result = operation.getAsLong(); if(result == checkValue) System.out.println(timer.duration() + "ms"); else System.out.format("result: %d%ncheckValue: %d%n", result, checkValue); } public static final int SZ = 100_000_000; // This even works: // public static final int SZ = 1_000_000_000; public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; // Gauss's formula public static void main(String[] args){ System.out.println(CHECK); timeTest("Sum Stream", CHECK, () -> LongStream.rangeClosed(0, SZ).sum()); timeTest("Sum Stream Parallel", CHECK, () -> LongStream.rangeClosed(0, SZ).parallel().sum()); timeTest("Sum Iterated", CHECK, () -> LongStream.iterate(0, i -> i + 1) .limit(SZ+1).sum()); // Slower & runs out of memory above 1_000_000: // timeTest("Sum Iterated Parallel", CHECK, () -> // LongStream.iterate(0, i -> i + 1) // .parallel() // .limit(SZ+1).sum()); } } ``` 输出结果: ``` 5000000050000000 Sum Stream: 167ms Sum Stream Parallel: 46ms Sum Iterated: 284ms ``` **CHECK**值是使用Carl Friedrich Gauss(高斯)在1700年代后期还在上小学的时候创建的公式计算出来的. **main()** 的第一个版本使用直接生成 **Stream** 并调用 **sum()** 的方法。我们看到流的好处在于即使SZ为十亿(1_000_000_000)程序也可以很好地处理而没有溢出(为了让程序运行得快一点,我使用了较小的数字)。使用 **parallel()** 的基本范围操作明显更快。 如果使用**iterate()**来生成序列,则减速是相当明显的,可能是因为每次生成数字时都必须调用lambda。但是如果我们尝试并行化,当**SZ**超过一百万时,结果不仅比非并行版本花费的时间更长,而且也会耗尽内存(在某些机器上)。当然,当你可以使用**range()**时,你不会使用**iterate()**,但如果你生成的东西不是简单的序列,你必须使用**iterate()**。应用**parallel()**是一个合理的尝试,但会产生令人惊讶的结果。我们将在后面的部分中探讨内存限制的原因,但我们可以对流并行算法进行初步观察: - 流并行性将输入数据分成多个部分,因此算法可以应用于那些单独的部分。 - 数组分割成本低,分割均匀且对分割的大小有着完美的掌控。 - 链表没有这些属性;“拆分”一个链表仅仅意味着把它分成“第一元素”和“其余元素”,这相对无用。 - 无状态生成器的行为类似于数组;上面使用的 **range()** 就是无状态的。 - 迭代生成器的行为类似于链表; **iterate()** 是一个迭代生成器。 现在让我们尝试通过在数组中填充值并对数组求和来解决问题。因为数组只分配了一次,所以我们不太可能遇到垃圾收集时序问题。 首先我们将尝试一个充满原始**long**的数组: ```java // concurrent/Summing2.java // {ExcludeFromTravisCI}import java.util.*; public class Summing2 { static long basicSum(long[] ia) { long sum = 0; int size = ia.length; for(int i = 0; i < size; i++) sum += ia[i];return sum; } // Approximate largest value of SZ before // running out of memory on mymachine: public static final int SZ = 20_000_000; public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; public static void main(String[] args) { System.out.println(CHECK); long[] la = newlong[SZ+1]; Arrays.parallelSetAll(la, i -> i); Summing.timeTest("Array Stream Sum", CHECK, () -> Arrays.stream(la).sum()); Summing.timeTest("Parallel", CHECK, () -> Arrays.stream(la).parallel().sum()); Summing.timeTest("Basic Sum", CHECK, () -> basicSum(la));// Destructive summation: Summing.timeTest("parallelPrefix", CHECK, () -> { Arrays.parallelPrefix(la, Long::sum); return la[la.length - 1]; }); } } ``` 输出结果: ``` 200000010000000 Array Stream Sum: 104ms Parallel: 81ms Basic Sum: 106ms parallelPrefix: 265ms ``` 第一个限制是内存大小;因为数组是预先分配的,所以我们不能创建几乎与以前版本一样大的任何东西。并行化可以加快速度,甚至比使用 **basicSum()** 循环更快。有趣的是, **Arrays.parallelPrefix()** 似乎实际上减慢了速度。但是,这些技术中的任何一种在其他条件下都可能更有用 - 这就是为什么你不能做出任何确定性的声明,除了“你必须尝试一下”。 最后,考虑使用包装类**Long**的效果: ```java // concurrent/Summing3.java // {ExcludeFromTravisCI} import java.util.*; public class Summing3 { static long basicSum(Long[] ia) { long sum = 0; int size = ia.length; for(int i = 0; i < size; i++) sum += ia[i]; return sum; } // Approximate largest value of SZ before // running out of memory on my machine: public static final int SZ = 10_000_000; public static final long CHECK = (long)SZ * ((long)SZ + 1)/2; public static void main(String[] args) { System.out.println(CHECK); Long[] aL = newLong[SZ+1]; Arrays.parallelSetAll(aL, i -> (long)i); Summing.timeTest("Long Array Stream Reduce", CHECK, () -> Arrays.stream(aL).reduce(0L, Long::sum)); Summing.timeTest("Long Basic Sum", CHECK, () -> basicSum(aL)); // Destructive summation: Summing.timeTest("Long parallelPrefix",CHECK, ()-> { Arrays.parallelPrefix(aL, Long::sum); return aL[aL.length - 1]; }); } } ``` 输出结果: ``` 50000005000000 Long Array Stream Reduce: 1038ms Long Basic Sum: 21ms Long parallelPrefix: 3616ms ``` 现在可用的内存量大约减半,并且所有情况下所需的时间都会很长,除了**basicSum()**,它只是循环遍历数组。令人惊讶的是, **Arrays.parallelPrefix()** 比任何其他方法都要花费更长的时间。 我将 **parallel()** 版本分开了,因为在上面的程序中运行它导致了一个冗长的垃圾收集,扭曲了结果: ```java // concurrent/Summing4.java // {ExcludeFromTravisCI} import java.util.*; public class Summing4 { public static void main(String[] args) { System.out.println(Summing3.CHECK); Long[] aL = newLong[Summing3.SZ+1]; Arrays.parallelSetAll(aL, i -> (long)i); Summing.timeTest("Long Parallel", Summing3.CHECK, () -> Arrays.stream(aL) .parallel() .reduce(0L,Long::sum)); } } ``` 输出结果: ``` 50000005000000 Long Parallel: 1014ms ``` 它比非parallel()版本略快,但并不显着。 导致时间增加的一个重要原因是处理器内存缓存。使用**Summing2.java**中的原始**long**,数组**la**是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 **Long parallelPrefix** 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为**Long**生成一个超出缓存的引用。 使用**Summing3.java**和**Summing4.java**,**aL**是一个**Long**数组,它不是一个连续的数据数组,而是一个连续的**Long**对象引用数组。尽管该数组可能会在缓存中出现,但指向的对象几乎总是不在缓存中。 这些示例使用不同的SZ值来显示内存限制。 为了进行时间比较,以下是SZ设置为最小值1000万的结果: **Sum Stream: 69msSum Stream Parallel: 18msSum Iterated: 277ms Array Stream Sum: 57ms Parallel: 14ms Basic Sum: 16ms parallelPrefix: 28ms Long Array Stream Reduce: 1046ms Long Basic Sum: 21ms Long parallelPrefix: 3287ms Long Parallel: 1008ms** 虽然Java 8的各种内置“并行”工具非常棒,但我认为它们被视为神奇的灵丹妙药:“只需添加parallel()并且它会更快!” 我希望我已经开始表明情况并非所有都是如此,并且盲目地应用内置的“并行”操作有时甚至会使运行速度明显变慢。 - parallel()/limit()交点 使用**parallel()**时会有更复杂的问题。从其他语言中吸取的流机制被设计为大约是一个无限的流模型。如果你拥有有限数量的元素,则可以使用集合以及为有限大小的集合设计的关联算法。如果你使用无限流,则使用针对流优化的算法。 Java 8将两者合并起来。例如,**Collections**没有内置的**map()**操作。在**Collection**和**Map**中唯一类似流的批处理操作是**forEach()**。如果要执行**map()**和**reduce()**等操作,必须首先将**Collection**转换为存在这些操作的**Stream**: ```java // concurrent/CollectionIntoStream.java import onjava.*; import java.util.*; import java.util.stream.*; public class CollectionIntoStream { public static void main(String[] args) { List<String> strings = Stream.generate(new Rand.String(5)) .limit(10) .collect(Collectors.toList()); strings.forEach(System.out::println); // Convert to a Stream for many more options: String result = strings.stream() .map(String::toUpperCase) .map(s -> s.substring(2)) .reduce(":", (s1, s2) -> s1 + s2); System.out.println(result); } } ``` 输出结果: ``` btpen pccux szgvg meinn eeloz tdvew cippc ygpoa lkljl bynxt :PENCUXGVGINNLOZVEWPPCPOALJLNXT ``` **Collection**确实有一些批处理操作,如**removeAll()**,**removeIf()**和**retainAll()**,但这些都是破坏性的操作。**ConcurrentHashMap**对**forEach**和**reduce**操作有特别广泛的支持。 在许多情况下,只在集合上调用**stream()**或者**parallelStream()**没有问题。但是,有时将**Stream**与**Collection**混合会产生意想不到的结果。这是一个有趣的难题: ```java // concurrent/ParallelStreamPuzzle.java import java.util.*; import java.util.function.*; import java.util.stream.*; public class ParallelStreamPuzzle { static class IntGenerator implements Supplier<Integer> { private int current = 0; @Override public Integer get() { return current++; } } public static void main(String[] args) { List<Integer> x = Stream.generate(newIntGenerator()) .limit(10) .parallel() // [1] .collect(Collectors.toList()); System.out.println(x); } } /* Output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] */ ``` 如果[1]注释运行它,它会产生预期的: **[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]** 每次。但是包含了parallel(),它看起来像一个随机数生成器,带有输出(从一次运行到下一次运行不同),如: **[0, 3, 6, 8, 11, 14, 17, 20, 23, 26]** 这样一个简单的程序怎么会如此糟糕呢?让我们考虑一下我们在这里要实现的目标:“并行生成。”那意味着什么?一堆线程都在从一个生成器取值,然后以某种方式选择有限的结果集?代码看起来很简单,但它变成了一个特别棘手的问题。 为了看到它,我们将添加一些仪器。由于我们正在处理线程,因此我们必须将任何跟踪信息捕获到并发数据结构中。在这里我使用**ConcurrentLinkedDeque**: ```java // concurrent/ParallelStreamPuzzle2.java import java.util.*; import java.util.function.*; import java.util.stream.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; import java.nio.file.*; public class ParallelStreamPuzzle2 { public static final Deque<String> TRACE = new ConcurrentLinkedDeque<>(); static class IntGenerator implements Supplier<Integer> { private AtomicInteger current = new AtomicInteger(); @Override public Integer get() { TRACE.add(current.get() + ": " +Thread.currentThread().getName()); return current.getAndIncrement(); } } public static void main(String[] args) throws Exception { List<Integer> x = Stream.generate(newIntGenerator()) .limit(10) .parallel() .collect(Collectors.toList()); System.out.println(x); Files.write(Paths.get("PSP2.txt"), TRACE); } } ``` 输出结果: ``` [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` current是使用线程安全的 **AtomicInteger** 类定义的,可以防止竞争条件;**parallel()**允许多个线程调用**get()**。 在查看 **PSP2.txt**.**IntGenerator.get()** 被调用1024次时,你可能会感到惊讶。 **0: main 1: ForkJoinPool.commonPool-worker-1 2: ForkJoinPool.commonPool-worker-2 3: ForkJoinPool.commonPool-worker-2 4: ForkJoinPool.commonPool-worker-1 5: ForkJoinPool.commonPool-worker-1 6: ForkJoinPool.commonPool-worker-1 7: ForkJoinPool.commonPool-worker-1 8: ForkJoinPool.commonPool-worker-4 9: ForkJoinPool.commonPool-worker-4 10: ForkJoinPool.commonPool-worker-4 11: main 12: main 13: main 14: main 15: main...10 17: ForkJoinPool.commonPool-worker-110 18: ForkJoinPool.commonPool-worker-610 19: ForkJoinPool.commonPool-worker-610 20: ForkJoinPool.commonPool-worker-110 21: ForkJoinPool.commonPool-worker-110 22: ForkJoinPool.commonPool-worker-110 23: ForkJoinPool.commonPool-worker-1** 这个块大小似乎是内部实现的一部分(尝试使用`limit()` 的不同参数来查看不同的块大小)。将`parallel()`与`limit()`结合使用可以预取一串值,作为流输出。 试着想象一下这里发生了什么:一个流抽象出无限序列,按需生成。当你要求它并行产生流时,你要求所有这些线程尽可能地调用`get()`。添加`limit()`,你说“只需要这些。”基本上,当你为了随机输出而选择将`parallel()`与`limit()`结合使用时,这种方法可能对你正在解决的问题有效。但是当你这样做时,你必须明白。这是一个仅限专家的功能,而不是要争辩说“Java弄错了”。 什么是更合理的方法来解决问题?好吧,如果你想生成一个int流,你可以使用IntStream.range(),如下所示: ```java // concurrent/ParallelStreamPuzzle3.java // {VisuallyInspectOutput} import java.util.*; import java.util.stream.*; public class ParallelStreamPuzzle3 { public static void main(String[] args) { List<Integer> x = IntStream.range(0, 30) .peek(e -> System.out.println(e + ": " +Thread.currentThread() .getName())) .limit(10) .parallel() .boxed() .collect(Collectors.toList()); System.out.println(x); } } ``` 输出结果: ``` 8: main 6: ForkJoinPool.commonPool-worker-5 3: ForkJoinPool.commonPool-worker-7 5: ForkJoinPool.commonPool-worker-5 1: ForkJoinPool.commonPool-worker-3 2: ForkJoinPool.commonPool-worker-6 4: ForkJoinPool.commonPool-worker-1 0: ForkJoinPool.commonPool-worker-4 7: ForkJoinPool.commonPool-worker-1 9: ForkJoinPool.commonPool-worker-2 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] ``` 为了表明**parallel()**确实有效,我添加了一个对**peek()**的调用,这是一个主要用于调试的流函数:它从流中提取一个值并执行某些操作但不影响从流向下传递的元素。注意这会干扰线程行为,但我只是尝试在这里做一些事情,而不是实际调试任何东西。 你还可以看到**boxed()**的添加,它接受**int**流并将其转换为**Integer**流。 现在我们得到多个线程产生不同的值,但它只产生10个请求的值,而不是1024个产生10个值。 它更快吗?一个更好的问题是:什么时候开始有意义?当然不是这么小的一套;上下文切换的代价远远超过并行性的任何加速。很难想象什么时候用并行生成一个简单的数字序列会有意义。如果你要生成的东西需要很高的成本,它可能有意义 - 但这都是猜测。只有通过测试我们才能知道用并行是否有效。记住这句格言:“首先使它工作,然后使它更快地工作 - 只有当你必须这样做时。”**parallel()**和**limit()**仅供专家使用(把话说在前面,我不认为自己是这里的专家)。 - 并行流只看起来很容易 实际上,在许多情况下,并行流确实可以毫不费力地更快地产生结果。但正如你所见,仅仅将**parallel()**加到你的Stream操作上并不一定是安全的事情。在使用**parallel()**之前,你必须了解并行性如何帮助或损害你的操作。一个基本认知错误就是认为使用并行性总是一个好主意。事实上并不是。Stream意味着你不需要重写所有代码便可以并行运行它。但是流的出现并不意味着你可以不用理解并行的原理以及不用考虑并行是否真的有助于实现你的目标。