我必須管理系統中的計劃檔案復制。檔案復制由用戶安排,我需要限制復制期間使用的系統資源量。每次復制可能花費的時間量未定義(即復制可能被安排為每 15 分鐘運行一次,而在下一次運行到期時上一次運行可能仍在運行)并且如果復制已經排隊,則不應排隊或運行。
我有一個調度程式,它會定期檢查到期的檔案復制,并且對于每個復制,(1) 如果它沒有排隊也沒有運行,則將它添加到阻塞佇列中,或者 (2) 否則將其洗掉。
private final Object scheduledReplicationsLock = new Object();
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
private final Set<Long> queuedReplicationIds = new HashSet<>();
private final Set<Long> runningReplicationIds = new HashSet<>();
public boolean add(Replication replication) {
synchronized (scheduledReplicationsLock) {
// If the replication job is either still executing or is already queued, do not add it.
if (queuedReplicationIds.contains(replication.id) || runningReplicationIds.contains(replication.id)) {
return false;
}
replicationQueue.add(replication)
queuedReplicationIds.add(replication.id);
}
我還有一個執行緒池,等待佇列中有復制并執行它。下面是執行緒池中各個執行緒的main方法:
public void run() {
while (True) {
Replication replication = null;
synchronized (scheduledReplicationsLock) {
// This will block until a replication job is ready to be run or the current thread is interrupted.
replication = replicationQueue.take();
// Move the ID value out of the queued set and into the active set
Long replicationId = replication.getId();
queuedReplicationIds.remove(replicationId);
runningReplicationIds.add(replicationId);
}
executeReplication(replication)
}
}
此代碼陷入死鎖,因為執行緒輪詢中的第一個執行緒將獲得 scheduleLock 并阻止調度程式向佇列添加復制。將 replicationQueue.take() 移出同步塊將消除死鎖,但隨后有可能從佇列中移除元素并且散列集未用它自動更新,這可能導致復制被錯誤洗掉。
如果佇列為空,我應該使用 BlockingQueue.poll() 并釋放鎖 睡眠而不是使用 BlockingQueue.take() 嗎?
歡迎對當前解決方案或滿足要求的其他解決方案進行修復。
uj5u.com熱心網友回復:
等待/通知
保持相同的控制流,而不是BlockingQueue在持有互斥鎖時阻塞實體,您可以wait通知scheduledReplicationsLock強制作業執行緒釋放鎖并回傳等待池。
這是您的生產商的簡化樣本:
private final List<Replication> replicationQueue = new LinkedList<>();
private final Set<Long> runningReplicationIds = new HashSet<>();
public boolean add(Replication replication) {
synchronized (replicationQueue) {
// If the replication job is either still executing or is already queued, do not add it.
if (replicationQueue.contains(replication) || runningReplicationIds.contains(replication.id)) {
return false;
} else {
replicationQueue.add(replication);
replicationQueue.notifyAll();
}
}
}
然后作業人員Runnable將更新如下:
public void run() {
synchronized (replicationQueue) {
while (true) {
if (replicationQueue.isEmpty()) {
scheduledReplicationsLock.wait();
}
if (!replicationQueue.isEmpty()) {
Replication replication = replicationQueue.poll();
runningReplicationIds.add(replication.getId())
executeReplication(replication);
}
}
}
}
阻塞佇列
通常,您最好使用BlockingQueue來協調您的生產者和復制作業池。
的BlockingQueue是,顧名思義,自然阻塞,并會導致呼叫執行緒阻塞僅當專案無法拉/從/推到了佇列中。
同時,請注意,您必須更新運行/入隊狀態管理,因為您只會同步BlockingQueue洗掉任何約束的專案。這將取決于背景關系,這是否可以接受。
這樣,您將洗掉所有其他使用過的互斥鎖并將其BlockingQueue用作同步狀態:
private final BlockingQueue<Replication> replicationQueue = new LinkedBlockingQueue<>();
public boolean add(Replication replication) {
// not sure if this is the proper invariant to check as at some point the replication would be neither queued nor running while still have been processed
if (replicationQueue.contains(replication)) {
return false;
}
// use `put` instead of `add` as this will block waiting for free space
replicationQueue.put(replication);
return true;
}
工人將take無限期地從BlockingQueue:
public void run() {
while (true) {
Replication replication = replicationQueue.take();
executeReplication(replication);
}
}
uj5u.com熱心網友回復:
如果您使用 BlockingQueue,則無需使用任何額外的同步塊
參考自 docs ( https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html )
BlockingQueue 實作是執行緒安全的。所有排隊方法都使用內部鎖或其他形式的并發控制以原子方式實作其效果。
只需使用這樣的東西
public void run() {
try {
while (replicationQueue.take()) { //Thread will be wait for the next element in the queue
Long replicationId = replication.getId();
queuedReplicationIds.remove(replicationId);
runningReplicationIds.add(replicationId);
executeReplication(replication);
}
} catch (InterruptedException ex) {
//if interrupted while waiting next element
}
}
}
查看 javadoc https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/LinkedBlockingQueue.html#take()
或者您可以將 BlockinQueue.pool() 與超時設定一起使用
UPD:經過討論,我使用兩個 ConcurrentHashSet 擴展 LinkedBlockingQueue 并添加方法 afterTake() 以洗掉處理的副本。您不需要佇列外的額外同步。只需將副本放在第一個執行緒中并在另一個執行緒中獲取它,并在復制完成后呼叫 afterTake() 。如果你想使用它,你需要覆寫其他方法。
package ru.everytag;
import io.vertx.core.impl.ConcurrentHashSet;
import java.util.concurrent.LinkedBlockingQueue;
public class TwoPhaseBlockingQueue<E> extends LinkedBlockingQueue<E> {
private ConcurrentHashSet<E> items = new ConcurrentHashSet<>();
private ConcurrentHashSet<E> taken = new ConcurrentHashSet<>();
@Override
public void put(E e) throws InterruptedException {
if (!items.contains(e)) {
items.add(e);
super.put(e);
}
}
public E take() {
E item = take();
taken.add(item);
items.remove(item);
return item;
}
public void afterTake(E e) {
if (taken.contains(e)) {
taken.remove(e);
} else if (items.contains(e)) {
throw new IllegalArgumentException("Element still in the queue");
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/355681.html
上一篇:從嵌套的Mongoose陣列原子操作中使用$pull的問題
下一篇:如何定期檢查執行緒是否完成
