以下是在學習中整理的一些內容,如有錯誤點,多謝指出,
ScheduledExecutorService
可以用來在給定延時后執行異步任務或者周期性執行任務,由于放入的任務不一定能夠立即執行,所以還是需要得放入佇列,然后獲取,看看是否滿足執行條件:時間是否滿足,
ScheduledExecutorService介面
public interface ScheduledExecutorService extends ExecutorService {
//在延遲delay時間后執行command,unit為時間單位,只調度一次
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//執行callable,
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);
//基于 上一次開始時間 來延遲固定時間后執行下一次任務
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//基于 上一次結束時間
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
ScheduledThreadPoolExecutor核心變數
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
//在關閉時應該取消周期性任務
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
//如果在關閉時應該取消非周期性的任務
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
//是否應該從佇列中洗掉
private volatile boolean removeOnCancel = false;
//順序號, 保證FIFO
private static final AtomicLong sequencer = new AtomicLong();
// ScheduledFutureTask類 用于封裝Runnable Callable 物件
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
//序號
private final long sequenceNumber;
//以納秒為單位,表明該任務下一次能夠被調度的時間
private long time;
//重復任務的周期
private final long period;
//被 reExecutePeriodic 方法重新加入佇列中的實際任務,默認當前任務
RunnableScheduledFuture<V> outerTask = this;
//延遲佇列的索引
int heapIndex;
//用于取消任務執行
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
//如果取消成功,則從任務佇列里移除任務
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();//是不是周期性執行任務
//判斷是否能繼續執行
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)//不是周期性調度任務,直接呼叫run
ScheduledFutureTask.super.run(); //FutureTask的run方法
else if (ScheduledFutureTask.super.runAndReset()) {//是周期性調度任務runAndReset方法執行
setNextRunTime(); //設定下一次調度時間
reExecutePeriodic(outerTask);//通過這個進行調度
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)//p>0 任務開始執行的時間 +周期調度時間
time += p;
else
//如果period 小于0 ,以任務執行完畢后的的時間來計算下一次執行的時間
time = triggerTime(-p);
}
//將任務重新放入任務佇列中執行
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//根據當前執行緒池狀態,判斷當前任務是否允許被執行
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task); //將任務添加到延遲佇列中
if (!canRunInCurrentRunState(true) && remove(task))//再次判斷是否應該執行
task.cancel(false); //取消任務執行
else
//正常情況下,保證執行緒池中至少有一個作業執行緒在處理任務
ensurePrestart();
}
}
}
//延時佇列的實作原理:DelayedWorkQueue :因為都是周期性任務 帶有時間的 對其排序 使用的是小根堆
static class DelayedWorkQueue extends AbstractQueue<Runnable> implements BlockingQueue<Runnable> {
//初始容量為 16
private static final int INITIAL_CAPACITY = 16;
//任務佇列 陣列
private RunnableScheduledFuture<?>[] queue = new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
//執行緒Leader
private Thread leader = null;
//條件變數用于作業執行緒等待執行任務
private final Condition available = lock.newCondition();
}
}
scheduleAtFixedRate實作
表示周期性任務調度,每次任務基于上一次任務開始執行的時間來決定下次啟動時間
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//triggerTime 用于計算該任務應被調度的時間,unit.toNanos(period) 用于執行周期變為納秒
ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
//封裝成了 ScheduledFutureTask 物件,通過 decorateTask, 呼叫 delayedExecute 方法執行
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
delayedExecute方法
//主執行周期性調度或者延遲任務的方法,
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())//SHURDOWN 了?
reject(task);
else {
super.getQueue().add(task); // 添加到佇列里去
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&//當前執行緒是不是在shutdown 的狀態下執行
remove(task))//為false的情況下 從當前佇列移除 然后取消當前任務
task.cancel(false);
else
//確保至少還有一個執行緒在執行任務
ensurePrestart();
}
}
ensurePrestart方法
如果執行緒池未關閉那么 ensurePrestart方法
//保證的是至少啟動一個核心執行緒
void ensurePrestart() {
int wc = workerCountOf(ctl.get());//獲取作業執行緒數
if (wc < corePoolSize)//如果小于 核心執行緒數
addWorker(null, true);//添加 核心執行緒數
else if (wc == 0)//至少保證還有一個作業執行緒
addWorker(null, false);
}
add添加
如果執行緒未關閉,那么直接呼叫 DelayedWorkQueue 里的 add -> offer 方法,將任務task 放入佇列中
public boolean add(Runnable e) {
return offer(e);
}
public boolean offer(Runnable x) {//向陣列中添加任務
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {//要不要擴容
int i = size;
if (i >= queue.length)
grow();
size = i + 1;
//之前沒有任務,放在第一位就行
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
//否則將任務放到最后一位,然后通過siftUp 方法調整
siftUp(i, e);
}
//如果佇列中的第一個任務是當前e則清除leader執行緒,然后喚醒一個等待佇列可用的執行緒來執行任務
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
grow 擴容方法
private void grow() {
int oldCapacity = queue.length;
//每次擴容 50%
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity < 0) //如果新容量小于0,那么表明溢位了
newCapacity = Integer.MAX_VALUE;
//將 old陣列任務 復制到相信陣列上
queue = Arrays.copyOf(queue, newCapacity);
}
siftUp調整方法
//調整堆
private void siftUp(int k, RunnableScheduledFuture<?> key) {
//當K>0時,不斷進行調整,k等于0表明調整到了根節點,也就是第一個元素,這時必須退出回圈
while (k > 0) {
// 找到他的 父親節點
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
//與父親節點比較 看看需不需要動,直到找到 位置
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
canRunInCurrentRunState
如果在添加到優先級佇列后,執行緒池已經關閉,需要通過 來判斷是否應該繼續執行該任務
/*通過是否是周期任務判斷 continueExistingPeriodicTasksAfterShutdown(執行緒池shutdown后是否執行周期性任務)
executeExistingDelayedTasksAfterShutdown(執行緒池shutdown后是否執行延遲任務) 是否應該繼續執行任務 */
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
take 方法
//從佇列中獲取任務,如果當前佇列沒有任務可取,則阻塞直到佇列有任務,即等待offer方法喚醒
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//可回應中斷的方式加鎖
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
//如果佇列第一個任務為null,則證明沒有任務了,當前執行緒等待
if (first == null)
available.await();
else {//否則獲取第一個任務的剩余等待時間,判斷是否小于0.需不需要執行
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // 在執行緒等待任務可執行時不保留參考
//由于任務還需要等待一段時間才能執行,這時看看前面有沒有執行緒正在等待,如果有,則當前執行緒繼續等待
if (leader != null)//根本沒有必要讓拿到第一個任務的執行緒等待
available.await();
else {/*如果前面沒有執行緒等待,則把自己設定為leader執行緒,然后開始等待delay時間
這時如果再來別的執行緒獲取任務,就只能讓這個成為leader的執行緒延遲被喚醒*/
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
//將 leader 變數去掉
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
/*當任務被執行緒獲取后,判斷leader是否為空且佇列不為空,由于沒有執行緒去等待或者獲取佇列中的下一個任務,因此需要喚醒一個執行緒擔任leader等待或者獲取下一個任務*/
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
//完成最終的任務出隊,這里傳入的f 為第一個等待任務,由于任務被出隊,因此需要調整堆
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
//當前任務佇列內任務數量 --,然后取出佇列尾部的一個任務,呼叫siftDown重新調整堆的順序
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
siftDown : 調整堆
//調整堆,將任務向堆尾移動,
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1; //獲取左孩子節點索引
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1; //右孩子
if (right < size && c.compareTo(queue[right]) > 0)//比較左右孩子的大小
c = queue[child = right];
if (key.compareTo(c) <= 0)// 以左右孩子的min 來和 key比較
break;
queue[k] = c; //如果key 小于 他們的min 那么交換
setIndex(c, k);
k = child;
}
queue[k] = key; //此時k 即為傳入key 應該存放的下標
setIndex(key, k);
}
scheduleWithFixedDelay實作
scheduleAtFixedRate 和 scheduleWithFixedDelay,前者是基于任務開始時間計算的 ,后者 是 基于上一個任務執行完成的時間計算的
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
shutdown
ThreadPoolExecutor 的shutdown ->onShutdown
@Override void onShutdown() {
//首先獲取任務佇列 q
BlockingQueue<Runnable> q = super.getQueue();
//默認 true
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();//執行緒池關閉后是否應該繼續執行延遲任務標志
//默認false
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();//執行緒池關閉后是否應該繼續執行周期性任務標志
//判斷是否在執行緒池shutdown后繼續執行延遲任務,是否繼續執行周期性調度任務
if (!keepDelayed && !keepPeriodic) {//都不是,將任務佇列清空,同時取消任務執行
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
//否則遍歷任務佇列,分別處理周期任務和延遲任務
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // 這里的 t.isCancelled() 表示任務已經被取消,應移除
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();//呼叫該方法嘗試進一步轉換執行緒池狀態
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/382861.html
標籤:java
上一篇:Python游戲開發,pygame模塊,Python實作過迷宮小游戲
下一篇:蘋果 M1 支持 Linux 最新進展;英特爾發布“GSC”Linux 驅動程式;Linux 基金會研究揭示開源趨勢 | 開源日報
