本篇文章主要基于Redisson中實作的分布式鎖機制繼續進行展開,分析Redisson中的時間輪機制,
在前面分析的Redisson的分布式鎖實作中,有一個Watch Dog機制來對鎖鍵進行續約,代碼如下:
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//用到了時間輪機制
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
//添加一個任務到時間輪
//省略部分代碼....
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次間隔租期的1/3時間執行
ee.setTimeout(task);
}
實際上是構建了一個TimerTask,通過timer.newTimeout(task, delay, unit);添加到時間輪中,
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
try {
//delay: 延遲執行時間
//unit: 延遲執行時間單位
return timer.newTimeout(task, delay, unit);
} catch (IllegalStateException e) {
if (isShuttingDown()) {
return DUMMY_TIMEOUT;
}
throw e;
}
}
private HashedWheelTimer timer;
先來了解一下什么是時間輪
時間輪這個技術其實出來很久了,在kafka、zookeeper等技術中都有時間輪使用的方式,我第一次聽這個概念,是當時我一個朋友在拼多多,負責整體架構設計時需要考慮到超時訂單的自動關單,而訂單交易量又特別多,直接去輪詢資料的效率有點低,所以當時溝通下來聊到了時間輪這個東西,什么是時間輪呢?
簡單來說: 時間輪是一種高效利用執行緒資源進行批量化調度的一種調度模型,把大批量的調度任務全部系結到同一個調度器上,使用這一個調度器來進行所有任務的管理、觸發、以及運行,
所以時間輪的模型能夠高效管理各種延時任務、周期任務、通知任務, 以后大家在作業中遇到類似的功能,可以采用時間輪機制,
如圖3-11,時間輪,從圖片上來看,就和手表的表圈是一樣,所以稱為時間輪,是因為它是以時間作為刻度組成的一個環形佇列,這個環形佇列采用陣列來實作,陣列的每個元素稱為槽,每個槽可以放一個定時任務串列,叫HashedWheelBucket,它是一個雙向鏈表,量表的每一項表示一個定時任務項(HashedWhellTimeout),其中封裝了真正的定時任務TimerTask,
時間輪是由多個時間格組成,下圖中有8個時間格,每個時間格代表當前時間輪的基本時間跨度(tickDuration),其中時間輪的時間格的個數是固定的,
在下圖中,有8個時間格(槽),假設每個時間格的單位為1s,那么整個時間輪走完一圈需要8s鐘,每秒鐘指標會沿著順時針方向移動一個,這個單位可以設定,比如以秒為單位,可以以一小時為單位,這個單位可以代表時間精度,通過指標移動,來獲得每個時間格中的任務串列,然后遍歷這一個時間格中的雙向鏈表來執行任務,以此回圈,

時間輪的使用
這里使用的時間輪是Netty這個包中提供的,使用方法比較簡單,
- 先構建一個HashedWheelTimer時間輪,
- tickDuration: 100 ,表示每個時間格代表當前時間輪的基本時間跨度,這里是100ms,也就是指標100ms跳動一次,每次跳動一個窗格
- ticksPerWheel:1024,表示時間輪上一共有多少個窗格,分配的窗格越多,占用記憶體空間就越大
- leakDetection:是否開啟記憶體泄漏檢測,
- maxPendingTimeouts[可選引數],最大允許等待的任務數,默認沒有限制,
- 通過newTimeout()把需要延遲執行的任務添加到時間輪中
@RestController
public class RedissonController {
@Autowired
RedissonClient redissonClient;
HashedWheelTimer hashedWheelTimer= new HashedWheelTimer(new DefaultThreadFactory("demo-timer"), 100, TimeUnit.MILLISECONDS, 1024, false);
/**
* 添加延遲任務
* @param delay
*/
@GetMapping("/{delay}")
public void tick(@PathVariable("delay")Long delay){
System.out.println("currentDate:"+new Date());
hashedWheelTimer.newTimeout(timeout -> {
System.out.println("executeDate:"+new Date());
}, delay, TimeUnit.SECONDS);
}
}
時間輪的原理決議
時間輪的整體原理,分為幾個部分,
-
創建時間輪
時間輪本質上是一個環狀陣列,比如我們初始化時間輪時:ticksPerWheel=8,那么意味著這個環狀陣列的長度是8,如圖3-12所示,
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
圖3-12 -
添加任務,如圖3-13所示
-
當通過newTimeout()方法添加一個延遲任務時,該任務首先會加入到一個阻塞佇列中中,
-
然后會有一個定時任務從該佇列獲取任務,添加到時間輪的指定位置,計算方法如下,
//當前任務的開始執行時間除以每個視窗的時間間隔,得到一個calculated值(表示需要經過多少tick,指標沒跳動一個窗格,tick會遞增),單位為nanos(微毫秒) long calculated = timeout.deadline / tickDuration; //計算當前任務需要在時間輪中經歷的圈數,因為當前任務執行時間有可能大于完整一圈的時間,所以需要計算經過幾圈之后才能執行該任務, timeout.remainingRounds = (calculated - tick) / wheel.length; //取最大的一個tick,有可能當前任務在佇列中已經過了執行時間,這種情況下直接用calculated這個值就沒意義了, final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. int stopIndex = (int) (ticks & mask); //通過ticks取模mask,得到一個下標 HashedWheelBucket bucket = wheel[stopIndex]; //把任務添加到指定陣列下標位置
圖3-13 -
-
任務執行
Worker執行緒按照每次間隔時間轉動后,得到該時間窗格中的任務鏈表,然后從鏈表的head開始逐個取出任務,有兩個判斷條件
- 當前任務需要轉動的圈數為0,表示任務是當前圈開始執行
- 當前任務達到了delay時間,也就是
timeout.deadline <= deadline - 最終呼叫timeout.expire()方法執行任務,
public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds --; } timeout = next; } }
時間輪的原始碼分析
HashedWheelTimer的構造
- 呼叫createWheel創建一個時間輪,時間輪陣列一定是2的冪次方,比如傳入的ticksPerWheel=6,那么初始化的wheel長度一定是8,這樣是便于時間格的計算,
- tickDuration,表示時間輪的跨度,代表每個時間格的時間精度,以納秒的方式來表現,
- 把作業執行緒Worker封裝成WorkerThread,從名字可以知道,它就是最終那個負責干活的執行緒,
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel,
long maxPendingTimeouts) {
// 創建時間輪基本的資料結構,一個陣列,長度為不小于ticksPerWheel的最小2的n次方
wheel = createWheel(ticksPerWheel);
// 這是一個標示符,用來快速計算任務應該呆的格子,
// 我們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.但是%操作是個相對耗時的操作,所以使用一種變通的位運算代替:
// 因為一圈的長度為2的n次方,mask = 2^n-1后低位將全部是1,然后deadline&mast == deadline%wheel.length
// java中的HashMap在進行hash之后,進行index的hash尋址尋址的演算法也是和這個一樣的
mask = wheel.length - 1;
//時間輪的基本時間跨度,(tickDuration傳入是1的話,這里會轉換成1000000)
this.tickDuration = unit.toNanos(tickDuration);
// 校驗是否存在溢位,即指標轉動的時間間隔不能太長而導致tickDuration*wheel.length>Long.MAX_VALUE
if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
//把worker包裝成thread
workerThread = threadFactory.newThread(worker);
this.maxPendingTimeouts = maxPendingTimeouts;
//如果HashedWheelTimer實體太多,那么就會列印一個error日志
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
- 對傳入的ticksPerWheel進行整形
- 初始化固定長度的HashedWheelBucket
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}
//對傳入的時間輪大小進行整形,整形成2的冪次方
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
//初始化一個固定長度的Bucket陣列
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
添加任務到時間輪
完成時間輪的初始化之后,并沒有去啟動時間輪,繼續看FailbackClusterInvoker中的代碼,
構建了一個RetryTimerTask,也就是一個重試的定時任務,接著把這個任務通過newTimeout加入到時間輪中,其中
- retryTimerTask,表示具體的重試任務
- RETRY_FAILED_PERIOD , 表示重試間隔時間,默認為5s
RetryTimerTask retryTimerTask = new RetryTimerTask(loadbalance, invocation, invokers, lastInvoker, retries, RETRY_FAILED_PERIOD);
failTimer.newTimeout(retryTimerTask, RETRY_FAILED_PERIOD, TimeUnit.SECONDS);
呼叫newTimeout方法,把任務添加進來,
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
//統計任務個數
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//判斷最大任務數量是否超過限制
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//如果時間輪沒有啟動,則通過start方法進行啟動
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
//計算任務的延遲時間,通過當前的時間+當前任務執行的延遲時間-時間輪啟動的時間,
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
//在delay為正數的情況下,deadline是不可能為負數
//如果為負數,那么說明超過了long的最大值
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
//創建一個Timeout任務,理論上來說,這個任務應該要加入到時間輪的時間格子中,但是這里并不是先添加到時間格,而是先
//加入到一個阻塞佇列,然后等到時間輪執行到下一個格子時,再從佇列中取出最多100000個任務添加到指定的時間格(槽)中,
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
start
任務添加到阻塞佇列之后,我們再來看啟動方法
start方法會根據當前的workerState狀態來啟動時間輪,并且用了startTimeInitialized來控制執行緒的運行,如果workerThread沒有啟動起來,那么newTimeout方法會一直阻塞在運行start方法中,如果不阻塞,newTimeout方法會獲取不到startTime,
public void start() {
//workerState一開始的時候是0(WORKER_STATE_INIT),然后才會設定為1(WORKER_STATE_STARTED)
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// 等待worker執行緒初始化時間輪的啟動時間
while (startTime == 0) {
try {
//這里使用countDownLauch來確保調度的執行緒已經被啟動
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
啟動時間輪
呼叫start()方法, 會呼叫workerThread.start();來啟動一個作業執行緒,這個作業執行緒是在構造方法中初始化的,包裝的是一個Worker內部執行緒類,
所以直接進入到Worker這個類的run方法,了解下它的設計邏輯
public void run() {
// 初始化startTime,表示時間輪的啟動時間
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// 喚醒被阻塞的start()方法,
startTimeInitialized.countDown();
do {
//回傳每tick一次的時間間隔
final long deadline = waitForNextTick();
if (deadline > 0) {
//計算時間輪的槽位
int idx = (int) (tick & mask);
//移除掉CancelledTask
processCancelledTasks();
//得到當前指標位置的時間槽
HashedWheelBucket bucket =
wheel[idx];
//將newTimeout()方法中加入到待處理定時任務佇列中的任務加入到指定的格子中
transferTimeoutsToBuckets();
//運行目前指標指向的槽中的bucket鏈表中的任務
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
//如果Worker_State一只是started狀態,就一直回圈
// Fill the unprocessedTimeouts so we can return them from stop() method.
for (HashedWheelBucket bucket : wheel) {
bucket.clearTimeouts(unprocessedTimeouts); //清除時間輪中不需要處理的任務
}
for (; ; ) {
//遍歷任務佇列,發現如果有任務被取消,則添加到unprocessedTimeouts,也就是不需要處理的佇列中,
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//處理被取消的任務.
processCancelledTasks();
}
時間輪指標跳動
這個方法的主要作用就是回傳下一個指標指向的時間間隔,然后進行sleep操作,
大家可以想象一下,一個鐘表上秒與秒之間是有時間間隔的,那么waitForNextTick就是根據當前時間計算出跳動到下個時間的時間間隔,然后進行sleep,然后再回傳當前時間距離時間輪啟動時間的時間間隔,
說得再直白一點:,假設當前的tickDuration的間隔是1s,tick默認=0, 此時第一次進來,得到的deadline=1,也就是下一次跳動的時間間隔是1s,假設當前處于
private long waitForNextTick() {
//tick表示總的tick數
//tickDuration表示每個時間格的跨度,所以deadline回傳的是下一次時間輪指標跳動的時間
long deadline = tickDuration * (tick + 1);
for (; ; ) {
//計算當前時間距離啟動時間的時間間隔
final long currentTime = System.nanoTime() - startTime;
//通過下一次指標跳動的延遲時間距離當前時間的差額,這個作為sleep時間使用,
// 其實執行緒是以睡眠一定的時候再來執行下一個ticket的任務的
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
//sleepTimeMs小于零表示走到了下一個時間槽位置
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
if (isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
//進入到這里進行sleep,表示當前時間距離下一次tick時間還有一段距離,需要sleep,
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
transferTimeoutsToBuckets
轉移任務到時間輪中,前面我們講過,任務添加進來時,是先放入到阻塞佇列,
而在現在這個方法中,就是把阻塞佇列中的資料轉移到時間輪的指定位置,
在這個轉移方法中,寫死了一個回圈,每次都只轉移10萬個任務,
然后根據HashedWheelTimeout的deadline延遲時間計算出時間輪需要運行多少次才能運行當前的任務,如果當前的任務延遲時間大于時間輪跑一圈所需要的時間,那么就計算需要跑幾圈才能到這個任務運行,
最后計算出該任務在時間輪中的槽位,添加到時間輪的鏈表中,
private void transferTimeoutsToBuckets() {
// 回圈100000次,也就是每次轉移10w個任務
for (int i = 0; i < 100000; i++) {
//從阻塞佇列中獲得具體的任務
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
//計算tick次數,deadline表示當前任務的延遲時間,tickDuration表示時間槽的間隔,兩者相除就可以計算當前任務需要tick幾次才能被執行
long calculated = timeout.deadline / tickDuration;
// 計算剩余的輪數, 只有 timer 走夠輪數, 并且到達了 task 所在的 slot, task 才會過期.(被執行)
timeout.remainingRounds = (calculated - tick) / wheel.length;
//如果任務在timeouts佇列里面放久了, 以至于已經過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法呼叫完后就會被執行
final long ticks = Math.max(calculated, tick);
// 算出任務應該插入的 wheel 的 slot, stopIndex = tick 次數 & mask, mask = wheel.length - 1
int stopIndex = (int) (ticks & mask);
//把timeout任務插入到指定的bucket鏈中,
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);
}
}
運行時間輪中的任務
當指標跳動到某一個時間槽中時,會就觸發這個槽中的任務的執行,該功能是通過expireTimeouts來實作
這個方法的主要作用是: 過期并執行格子中到期的任務,也就是當tick進入到指定格子時,worker執行緒會呼叫這個方法
HashedWheelBucket是一個鏈表,所以我們需要從head節點往下進行遍歷,如果鏈表沒有遍歷到鏈表尾部那么就繼續往下遍歷,
獲取的timeout節點節點,如果剩余輪數remainingRounds大于0,那么就說明要到下一圈才能運行,所以將剩余輪數減一;
如果當前剩余輪數小于等于零了,那么就將當前節點從bucket鏈表中移除,并判斷一下當前的時間是否大于timeout的延遲時間,如果是則呼叫timeout的expire執行任務,
void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// 遍歷當前時間槽中的所有任務
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
//如果當前任務要被執行,那么remainingRounds應該小于或者等于0
if (timeout.remainingRounds <= 0) {
//從bucket鏈表中移除當前timeout,并回傳鏈表中下一個timeout
next = remove(timeout);
//如果timeout的時間小于當前的時間,那么就呼叫expire執行task
if (timeout.deadline <= deadline) {
timeout.expire();
} else {
//不可能發生的情況,就是說round已經為0了,deadline卻>當前槽的deadline
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
next = remove(timeout);
} else {
//因為當前的槽位已經過了,說明已經走了一圈了,把輪數減一
timeout.remainingRounds--;
}
//把指標放置到下一個timeout
timeout = next;
}
}
關注[跟著Mic學架構]公眾號,獲取更多精品原創

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/331948.html
標籤:Java
