在單生產者單消費者場景下,這個queue執行緒安全嗎?
takeIndex并且putIndex需要AtomicLong?Object[] ar需要AtomicReferenceArray這樣我們才能AtomicReferenceArray#set用來更新array。
我的理解是Object[] arr需要是一個AtomicReferenceArray以確保可見性,以便在producer放置元素時,消費者可以立即看到它。否則,當陣列中已經有一個元素時,可能consumer無法立即消費它(可能需要下一個poll才能獲取資料),這會導致資料消費延遲?但是不會帶來例外行為,比如兩次消費同一個元素,或者拋出一些意外的例外(這些都是不可接受的)。
public class SpscArrayQueue<E> implements Queue<E> {
private final int cap;
private Object[] arr; // change to AtomicReferenceArray?
long takeIndex; // change to AtomicLong?
long putIndex; // change to AtomicLong?
final int mask;
static int ceilingNextPowerOfTwo(int x) {
return 1 << (Integer.SIZE - Integer.numberOfLeadingZeros(x - 1));
}
public SpscArrayQueue(int cap) {
this.cap = ceilingNextPowerOfTwo(cap);
arr = new Object[this.cap];
mask = this.cap - 1;
}
@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null element");
}
final long index = this.putIndex;
final int mask = this.mask;
final int offset = (int) (mask & index);
if (arr[offset] != null) {
return false;
}
arr[offset] = e; // use unsafe.xxx ? or AtomicReferenceArray#set?
putIndex = (index 1); // use AtomicLong#increment?
return true;
}
@Override
@SuppressWarnings("unchecked")
public E poll() {
final long index = this.takeIndex;
final int mask = this.mask;
final int offset = (int) (mask & index);
final E e = (E) arr[offset];
if (e == null) {
return null;
}
takeIndex = (index 1); // use AtomicLong#increment?
arr[offset] = null; // use unsafe.xxx ? or AtomicReferenceArray#set?
return e;
}
// other methods
}
uj5u.com熱心網友回復:
當前的實作不是執行緒安全的,因為在生產和消費專案之間沒有發生之前的邊緣。并且在釋放插槽和獲取插槽之間的邊緣之前沒有發生。
我玩環形緩沖區已經有一段時間了。但它應該足以使 take 和 put 索引“同步”(例如 AtomicLong 或 volatile long)和對陣列的簡單訪問。這應該在寫入和讀取專案之間創建發生之前的邊緣。并且通常使用環緩沖區的頭和尾序列來確定是否有專案,不檢查陣列中是否為空/非空。
如果你想在生產中使用這樣的東西,我建議看看JCTools。
JCTools 已經過適當測驗,并且已經解決了諸如虛假共享之類的典型性能瓶頸。除了使用相對昂貴的成熟的 volatile 之外,JCTools 還提供了使用釋放/獲取語意的更輕松的方法,這在大多數情況下已經足夠了。
如果是出于教育目的,我會查看 JCTools 代碼以及以下java-ring-buffer。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/371826.html
