parallel()然而,今天我使用的是在映射之后執行操作的流;底層源是一個迭代器,它不是執行緒安全的,類似于BufferedReader.lines實作。
我原本以為會在創建的執行緒上呼叫 trySplit,但是;我觀察到對迭代器的訪問來自多個執行緒。
例如,以下愚蠢的迭代器實作只是設定了足夠的元素來導致拆分,并且還跟蹤訪問該hasNext方法的唯一執行緒。
class SillyIterator implements Iterator<String> {
private final ArrayDeque<String> src =
IntStream.range(1, 10000)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayDeque::new));
private Map<String, String> ts = new ConcurrentHashMap<>();
public Set<String> threads() { return ts.keySet(); }
private String nextRecord = null;
@Override
public boolean hasNext() {
var n = Thread.currentThread().getName();
ts.put(n, n);
if (nextRecord != null) {
return true;
} else {
nextRecord = src.poll();
return nextRecord != null;
}
}
@Override
public String next() {
if (nextRecord != null || hasNext()) {
var rec = nextRecord;
nextRecord = null;
return rec;
}
throw new NoSuchElementException();
}
}
使用它來創建一個流,如下所示:
var iter = new SillyIterator();
StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
iter, Spliterator.ORDERED | Spliterator.NONNULL
), false)
.map(n -> "value = " n)
.parallel()
.collect(toList());
System.out.println(iter.threads());
這在我的系統上輸出了兩個 fork 連接執行緒以及主執行緒,這讓我很害怕。
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main]
uj5u.com熱心網友回復:
執行緒安全并不一定意味著只能被一個執行緒訪問。重要的方面是沒有并發訪問,即沒有多個執行緒同時訪問。如果不同執行緒的訪問是臨時排序的,并且這種排序也確保了必要的記憶體可見性,這是??呼叫者的責任,它仍然是執行緒安全的用法。
該Spliterator檔案說:
盡管它們在并行演算法中具有明顯的實用性,但預計分離器不是執行緒安全的。相反,使用拆分器的并行演算法實作應確保拆分器一次僅被一個執行緒使用。這通常可以通過串行執行緒限制輕松實作,這通常是通過遞回分解作業的典型并行演算法的自然結果。
拆分器不需要在其整個生命周期中都限制在同一個執行緒中,但是呼叫方應該有一個明確的切換,以確保舊執行緒在新執行緒開始使用它之前停止使用它。
但重要的一點是,spliterator 不需要是執行緒安全的,因此,被 spliterator 包裝的迭代器也不需要是執行緒安全的。
請注意,一個典型的行為是在開始遍歷之前進行拆分和移交,但是由于普通的Iterator不支持拆分,因此包裝拆分器必須迭代和緩沖元素以實作拆分。因此,Iterator從Stream實作的角度來看,當遍歷尚未開始時,會經歷不同執行緒(但一次一個)的遍歷。
也就是說, 的lines()實作BufferedReader是一個你不應該遵循的壞例子。由于它以單個readLine()呼叫為中心,因此Spliterator直接實作而不是實作更復雜的Iterator并通過spliteratorUnknownSize(…).
由于您的示例同樣以單個poll()呼叫為中心,因此Spliterator直接實作也很簡單:
class SillySpliterator extends Spliterators.AbstractSpliterator<String> {
private final ArrayDeque<String> src = IntStream.range(1, 10000)
.mapToObj(Integer::toString).collect(toCollection(ArrayDeque::new));
SillySpliterator() {
super(Long.MAX_VALUE, ORDERED | NONNULL);
}
@Override
public boolean tryAdvance(Consumer<? super String> action) {
String nextRecord = src.poll();
if(nextRecord == null) return false;
action.accept(nextRecord);
return true;
}
}
根據您的實際情況,您還可以將實際雙端佇列大小傳遞給建構式并提供SIZED特征。
然后,你可以像這樣使用它
var result = StreamSupport.stream(new SillySpliterator(), true)
.map(n -> "value = " n)
.collect(toList());
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/382679.html
上一篇:行程和執行緒之間共享和更新串列
