作者:后青春期的Keats
https://www.cnblogs.com/keatsCoder/
前言
在 Java7 之前,如果想要并行處理一個集合,我們需要以下幾步:
-
手動分成幾部分
-
為每部分創建執行緒
-
在適當的時候合并,并且還需要關注多個執行緒之間共享變數的修改問題,
而 Java8 為我們提供了并行流,可以一鍵開啟并行模式,是不是很酷呢?讓我們來看看吧
并行流
認識和開啟并行流
什么是并行流: 并行流就是將一個流的內容分成多個資料塊,并用不同的執行緒分別處理每個不同資料塊的流,例如有這么一個需求:
有一個 List 集合,而 list 中每個 apple 物件只有重量,我們也知道 apple 的單價是 5元/kg,現在需要計算出每個 apple 的單價,傳統的方式是這樣:
List<Apple> appleList = new ArrayList<>(); // 假裝資料是從庫里查出來的
for (Apple apple : appleList) {
apple.setPrice(5.0 * apple.getWeight() / 1000);
}
我們通過迭代器遍歷 list 中的 apple 物件,完成了每個 apple 價格的計算,而這個演算法的時間復雜度是 O(list.size()) 隨著 list 大小的增加,耗時也會跟著線性增加,并行流可以大大縮短這個時間,
并行流處理該集合的方法如下:
appleList.parallelStream().forEach(apple -> apple.setPrice(5.0 * apple.getWeight() / 1000));
和普通流的區別是這里呼叫的 parallelStream() 方法,當然也可以通過 stream.parallel() 將普通流轉換成并行流,推薦看下:Java 8 創建 Stream 的 10 種方式,更多可以關注Java技術堆疊公眾號回復java獲取系列教程,
并行流也能通過 sequential() 方法轉換為順序流,但要注意:流的并行和順序轉換不會對流本身做任何實際的變化,僅僅是打了個標記而已,并且在一條流水線上對流進行多次并行 / 順序的轉換,生效的是最后一次的方法呼叫
并行流如此方便,它的執行緒從那里來呢?有多少個?怎么配置呢?
并行流內部使用了默認的 ForkJoinPool 執行緒池,默認的執行緒數量就是處理器的核心數,而配置系統核心屬性:java.util.concurrent.ForkJoinPool.common.parallelism 可以改變執行緒池大小,不過該值是全域變數,
改變他會影響所有并行流,目前還無法為每個流配置專屬的執行緒數,一般來說采用處理器核心數是不錯的選擇
測驗并行流的性能
為了更容易的測驗性能,我們在每次計算完蘋果價格后,讓執行緒睡 1s,表示在這期間執行了其他 IO 相關的操作,并輸出程式執行耗時,順序執行的耗時:
public static void main(String[] args) throws InterruptedException {
List<Apple> appleList = initAppleList();
Date begin = new Date();
for (Apple apple : appleList) {
apple.setPrice(5.0 * apple.getWeight() / 1000);
Thread.sleep(1000);
}
Date end = new Date();
log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
}
并行版本
List<Apple> appleList = initAppleList();
Date begin = new Date();
appleList.parallelStream()
.forEach(apple ->
{
apple.setPrice(5.0 * apple.getWeight() / 1000);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
);
Date end = new Date();
log.info("蘋果數量:{}個, 耗時:{}s", appleList.size(), (end.getTime() - begin.getTime()) /1000);
耗時情況

跟我們的預測一致,我的電腦是 四核I5 處理器,開啟并行后四個處理器每人執行一個執行緒,最后 1s 完成了任務!
并行流可以隨便用嗎?
可拆分性影響流的速度
通過上面的測驗,有的人會輕易得到一個結論:并行流很快,我們可以完全放棄 foreach/fori/iter 外部迭代,使用 Stream 提供的內部迭代來實作了,
事實真的是這樣嗎?并行流真的如此完美嗎?答案當然是否定的,大家可以復制下面的代碼,在自己的電腦上測驗,測驗完后可以發現,并行流并不總是最快的處理方式,
-
對于 iterate 方法來處理的前 n 個數字來說,不管并行與否,它總是慢于回圈的,非并行版本可以理解為流化操作沒有回圈更偏向底層導致的慢,可并行版本是為什么慢呢?這里有兩個需要注意的點:
-
iterate 生成的是裝箱的物件,必須拆箱成數字才能求和
-
我們很難把 iterate 分成多個獨立的塊來并行執行
這個問題很有意思,我們必須意識到某些流操作比其他操作更容易并行化,對于 iterate 來說,每次應用這個函式都要依賴于前一次應用的結果,因此在這種情況下,我們不僅不能有效的將流劃分成小塊處理,反而還因為并行化再次增加了開支,
-
而對于 LongStream.rangeClosed() 方法來說,就不存在 iterate 的第兩個痛點了,它生成的是基本型別的值,不用拆裝箱操作,另外它可以直接將要生成的數字 1 - n 拆分成 1 - n/4, 1n/4 - 2n/4, ... 3n/4 - n 這樣四部分,因此并行狀態下的 rangeClosed() 是快于 for 回圈外部迭代的
package lambdasinaction.chap7;
import java.util.stream.*;
public class ParallelStreams {
public static long iterativeSum(long n) {
long result = 0;
for (long i = 0; i <= n; i++) {
result += i;
}
return result;
}
public static long sequentialSum(long n) {
return Stream.iterate(1L, i -> i + 1).limit(n).reduce(Long::sum).get();
}
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1).limit(n).parallel().reduce(Long::sum).get();
}
public static long rangedSum(long n) {
return LongStream.rangeClosed(1, n).reduce(Long::sum).getAsLong();
}
public static long parallelRangedSum(long n) {
return LongStream.rangeClosed(1, n).parallel().reduce(Long::sum).getAsLong();
}
}
package lambdasinaction.chap7;
import java.util.concurrent.*;
import java.util.function.*;
public class ParallelStreamsHarness {
public static final ForkJoinPool FORK_JOIN_POOL = new ForkJoinPool();
public static void main(String[] args) {
System.out.println("Iterative Sum done in: " + measurePerf(ParallelStreams::iterativeSum, 10_000_000L) + " msecs");
System.out.println("Sequential Sum done in: " + measurePerf(ParallelStreams::sequentialSum, 10_000_000L) + " msecs");
System.out.println("Parallel forkJoinSum done in: " + measurePerf(ParallelStreams::parallelSum, 10_000_000L) + " msecs" );
System.out.println("Range forkJoinSum done in: " + measurePerf(ParallelStreams::rangedSum, 10_000_000L) + " msecs");
System.out.println("Parallel range forkJoinSum done in: " + measurePerf(ParallelStreams::parallelRangedSum, 10_000_000L) + " msecs" );
}
public static <T, R> long measurePerf(Function<T, R> f, T input) {
long fastest = Long.MAX_VALUE;
for (int i = 0; i < 10; i++) {
long start = System.nanoTime();
R result = f.apply(input);
long duration = (System.nanoTime() - start) / 1_000_000;
System.out.println("Result: " + result);
if (duration < fastest) fastest = duration;
}
return fastest;
}
}
共享變數修改的問題
并行流雖然輕易的實作了多執行緒,但是仍未解決多執行緒中共享變數的修改問題,下面代碼中存在共享變數 total,分別使用順序流和并行流計算前n個自然數的和
public static long sideEffectSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).forEach(accumulator::add);
return accumulator.total;
}
public static long sideEffectParallelSum(long n) {
Accumulator accumulator = new Accumulator();
LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
return accumulator.total;
}
public static class Accumulator {
private long total = 0;
public void add(long value) {
total += value;
}
}
順序執行每次輸出的結果都是:50000005000000,而并行執行的結果卻五花八門了,這是因為每次訪問 totle 都會存在資料競爭,關于資料競爭的原因,大家可以看看關于 volatile 的博客,因此當代碼中存在修改共享變數的操作時,是不建議使用并行流的,
并行流的使用注意
在并行流的使用上有下面幾點需要注意:
-
盡量使用 LongStream / IntStream / DoubleStream 等原始資料流代替 Stream 來處理數字,以避免頻繁拆裝箱帶來的額外開銷
-
要考慮流的操作流水線的總計算成本,假設 N 是要操作的任務總數,Q 是每次操作的時間,N * Q 就是操作的總時間,Q 值越大就意味著使用并行流帶來收益的可能性越大
例如:前端傳來幾種型別的資源,需要存盤到資料庫,每種資源對應不同的表,我們可以視作型別數為 N,存盤資料庫的網路耗時 + 插入操作耗時為 Q,一般情況下網路耗時都是比較大的,因此該操作就比較適合并行處理,當然當型別數目大于核心數時,該操作的性能提升就會打一定的折扣了,更好的優化方法在日后的博客會為大家奉上
-
對于較少的資料量,不建議使用并行流
-
容易拆分成塊的流資料,建議使用并行流
以下是一些常見的集合框架對應流的可拆分性能表:

碼字不易,如果你覺得讀完以后有識訓,不妨點個推薦讓更多的人看到吧!
關注公眾號Java技術堆疊回復"面試"獲取我整理的2020最全面試題及答案,
推薦去我的博客閱讀更多:
1.Java JVM、集合、多執行緒、新特性系列教程
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
4.Java、后端、架構、阿里巴巴等大廠最新面試題
覺得不錯,別忘了點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/137123.html
標籤:Java
