主頁 > 後端開發 > 時間輪TimeWheel作業原理決議

時間輪TimeWheel作業原理決議

2023-02-17 07:31:54 後端開發

時間輪作業原理決議

一.時間輪介紹

1.時間輪的簡單介紹

時間輪(TimeWheel)作為一種高效率的計時器實作方案,在1987年發表的論文Hashed and Hierarchical Timing Wheels中被首次提出,
其被發明的主要目的在于解決當時作業系統的計時器功能實作中,維護一個定時器的開銷隨著所維護定時器數量的增多而逐漸變大的問題(時間復雜度為:O(n)、O(log n)),
這導致作業系統無法同時高效的維護大量計時器,進一步導致一些優秀的、需要使用到大量定時器的的網路協議、實時控制系統等程式的實際表現不盡人意,

2.傳統的計時器功能實作方式

計時器作為一種普遍的需求,理解起來是很簡單的,計時器主要由兩部分組成,即用戶指定一個任務(task),并在等待指定的時間(delayTime)后task將會被回呼執行,
在時間輪演算法被發明出來之前,作業系統計時器功能的實作方式主要可以分為兩種:基于無序佇列基于有序佇列

基于無序佇列實作的計時器
  1. 新創建的計時器直接放在佇列的末尾,時間復雜度為O(1),
  2. 在每次硬體時鐘tick中斷時(per tick),遍歷當前佇列中所有的計時器,將當前時間下過期的計時器移出佇列并調度執行task,時間復雜度O(n),
    基于無序佇列的計時器中,所維護的計時器總數量越多,則每次硬體時鐘中斷時的處理流程開銷越大,最壞情況下甚至無法在一次時鐘tick的間隔內完成計時器佇列的遍歷,
基于有序佇列實作的計時器
  1. 有序佇列下,所有計時器按照過期時間進行排序,新創建的計時器加入佇列時的時間復雜度為O(log n)(通常使用完全二叉堆來實作有序佇列),
  2. 在每次硬體時鐘tick中斷時,僅檢查佇列的頭部元素(最早過期的任務)是否過期,如果未過期則直接結束,如果已過期則將隊首元素出隊調度task,并再次重復上述程序,直至最新的隊首元素不過期或佇列為空,平均時間復雜度為O(1),
    基于有序佇列的計時器中,所維護的計時器總數量越多,則每次用戶創建新的計時器時的延遲越高,在需要反復創建大量計時器的場合下,性能不佳

可以看到,在基于佇列的計時器模塊運行時,最關鍵的兩個功能(創建新計時器/處理每次tick)至少有一個會隨著總計時器數量的增大,而引起性能大幅度的下降,
juc中自帶的ScheduledThreadPoolExecutor調度執行緒池就是基于有序串列(二叉堆)的計時器,因此netty等需要大量使用計時器的框架需要另辟蹊徑,采用時間輪來實作更高效的計時器功能,

不同計時器實作與排序演算法的關聯

對基礎資料結構有一定了解的讀者會知道,常用的快速排序、歸并排序等基于比較的高效排序演算法其時間復雜度為O(n*log n),
而基數排序(桶排序)的時間復雜度則是O(n),其性能比上述基于比較的排序演算法高出一個數量級,
但基排序最大的缺陷則是對所要排序的資料集的排布有很高的要求,如果要排序的資料集的范圍非常廣,則所需要的桶(bucket)會非常多,空間復雜度會高到不可忍受,
舉個例子,如果是對1萬副撲克(不算大小王,52張牌)進行排序,由于撲克牌只有13種可能(A-K),即使1萬副撲克中牌的總數為52萬張,基排序只需要13個桶就能在線性時間復雜度O(n)內完成排序,
但如果是對資料范圍為0-1億范圍內的1萬個亂數進行一次基排序,則基排序需要多達1億個桶,其空間效率非常低,遠遜于快速排序等基于比較的排序,

截止目前,我們已經明確了兩個關鍵點:

  1. 基于有序串列的計時器,由于其基于比較的特征,所以插入時的時間復雜度O(log n)會隨著計時器總量的增大而增加,在計時器總量成千上萬時效率會急劇降低,
  2. 對于一個較小的資料集范圍,基排序的效率遠高于快速排序等基于比較的排序演算法,

一般來說,一次時鐘硬體的tick間隔非常小(納秒級別),如果想要用類似基排序的思想,使用一個巨大的陣列來存盤不同過期時間的計時器,
在理論上是可行的,但空間效率卻低到無法在現有的記憶體硬體上實作(1納秒對應1個bucket),
但如果能容忍時鐘調度的時間不是那么精確,則可以極大減少所需要的bucket桶的數量,
舉個例子,1毫秒等于1百萬納秒,如果時鐘調度的精度不需要是納秒級別,而是毫秒級別,則同一毫秒內的所有計時器(第100納秒和第999999納秒超時的計時器)都可以放在同一個桶中,所需要的陣列空間減少了100萬倍!
時間輪演算法就是基于這一特點產生的,即一定程度上舍棄調度時間的精確性,參考基排序的思路,實作在常數時間內創建新計時器,并同時在常數時間內完成時鐘tick的處理,

3.時間輪計時器實作思路的簡單介紹

下面我們簡單的介紹一個基于時間輪的計時器的基本實作思路(還有很多可以優化的地方):

  1. 時間輪在創建時需要指定調度精度,即時間輪內部邏輯上1次tick的間隔,
    在上述例子中,調度精度為1毫秒,則時間輪實際上1次tick的間隔也就是1毫秒(類似的,我們平常見到的鐘表中1次tick的間隔則是1秒鐘),
  2. 維護一個桶陣列,由于不同超時時間的任務可能會被映射到同一個桶中,因此陣列桶中維護一個指向某一串列的指標(參考),
  3. 創建新計時器時,對于任意超時時間的任務基于tick間隔進行哈希,計算出需要存入的對應陣列桶的下標(第100納秒和第999999納秒超時的計時器,都放入第0個桶)并插入對應桶的串列中,
  4. 維護一個當前時間指標,指向某一個陣列桶,每1次tick處理時,推動該指標,令其指向下一個tick對應的桶,并將桶指向的串列中的全部任務取出,丟到一個執行緒池中異步處理,
  5. 為了節約空間,桶陣列通常以環形陣列的形式存盤以重復利用bucket槽,這也是時間輪名字中輪(wheel)的來源,

時間輪示意圖

二.不同實作方式的時間輪的介紹

上面介紹的時間輪實作思路中繞過了一個很重要的問題,即在時間輪tick間隔確定的情況下,
雖然環形陣列能夠復用之前使用過的bucket槽,但bucket桶的數量似乎限制了時間輪所能支持的最大超時時間,
舉個例子,假設tick間隔為1毫秒,那么僅僅是存盤距離當前時間1天(86400秒)后超時的任務就至少需要86400*1000個bucket,所占用的空間無疑是巨大的,
而一般的定時器模塊所要支持的最大超時時間一般也不止1天這么短,
雖然進一步的減少精度(比如tick間隔改為100毫秒,或者1秒)似乎能解決這個問題,但事實上時間輪的論文中還提到了一些更優秀的實作方案,使得能同時兼顧精度和減少空間占用,

單層多輪次時間輪

第一種方式是引入輪次(round)的概念(論文中提到的方案6),即每一個bucket中的串列元素帶上一個round屬性,
假設一個時間輪的tick間隔為1秒,并且環形陣列有86400個bucket桶,那么這個時間輪明面上可以支持的最大超時時間只有1天,而引入了輪次的概念后,則理論上可以支持的最大超時時間是沒有限制的,

單層多輪次時間輪創建新任務

舉個例子,假設有一個定時器任務的超時時間為2天10小時20分鐘30秒,那么在創建新計時器任務時基于當前時間輪單輪次可以支持的最大超時時間(即一天)進行求余,
可以得到10小時20分鐘30秒,根據余數我們可以計算出當前任務應該被插入到哪個bucket槽的串列中,而超時時間/最大超時時間(1天)得到除法的結果就是round輪次,即round=2,

單層多輪次時間輪tick處理

同時在每次tick處理當前時間指標所指向的串列時,不再簡單的將串列中的所有任務一并取出執行,而是對其進行遍歷,

  1. 只有round為0的任務才會被撈出來執行
  2. 而round大于0的任務其邏輯上并沒有真的超時,而只是將round自減1,等到后面的輪次處理并最終自減為0后才代表著其真的超時而需要出隊執行,

可以看到,引入了round概念后,多輪次的時間輪兼顧了精度的同時,也能夠在有限、可控的空間內支持足夠大的超時時間,

多層時間輪

論文中提到的另一種實作方案便是多層次時間輪(如論文題目所指Hashed and Hierarchical Timing Wheels),
多層時間輪的靈感來自于我們日常生活中隨處可見的機械鐘表,通常機械鐘表有一個秒針(60秒),一個分針(60分鐘)和一個時針(12小時),其本質上相當于一個tick間隔為1秒,支持的最大超時時間為12小時的多層時間輪,
12小時有60 * 60 * 12=43200秒,但是鐘表中實際上并沒有這么多的bucket,卻也能準確的表達12小時中的任何一秒,

這是因為鐘表中的秒針、分針和時針本質上相當于三個不同層次的時間輪:

  1. 秒針對應的時間輪是最底層的,共60個bucket,tick間隔為1秒鐘
  2. 分針對應的時間輪是第二層的,也是60個bucket,tick間隔為1分鐘
  3. 時針對應的時間輪是最上層的,共12個bucket,tick間隔為1小時

在多層時間輪的實作中,可以建立N個不同層次的時間輪,其中上一層時間輪的tick間隔等于下一層時間輪走完一周的時間(類似1分鐘等于60秒,1小時等于60分鐘),
如果時間輪的層次足夠多,理論上也能支持足夠大范圍的超時時間,
舉個例子,精度為秒的的時間輪,只需要5層共(60+60+24+365+100)=609個bucket就能支持最大100年的超時時間(假設一年都是365天),

多層時間輪創建新任務

創建新計時器時,根據超時時間,先嘗試著放入最底層的時間輪,如果最底層的時間輪能放的下(比如第0分鐘58秒過期的),就根據當前時間輪的tick間隔做除法來計算出需要放入的具體bucket,
如果當前時間輪放不下(比如距離當前時間10分鐘20秒過期的,無法直接放入最大60秒的秒級時間輪,但能放到最大支持60分鐘的分鐘時間輪中),則嘗試著放到上一層的時間輪中,但是是基于上一層的時間輪的tick間隔來做除法來計算出具體要放入的bucket槽,
如果還是放不下(比如距離當前時間3小時20分鐘18秒過期的,只能放到最大12小時的小時級時間輪中),
回圈往復這一程序,直到放到合適層次的時間輪中,

多層時間輪tick處理

多層次的時間輪中的基礎tick間隔是由最底層的時間輪決定的,
每次tick時會推動當前時間,首先將最底層的時間輪中新指向的插槽中的任務全部取出進行調度;
接著判斷當前時間輪是否走完了一整圈,如果是的話則推動上一層級的時間輪推進而指向新的bucket槽(比如秒級時間輪走完了60秒,則推進分針前進1格),
被推動的上層時間輪需要將新指向的bucket槽中的任務全部取出,嘗試著放到下層時間輪中
(下一層或者下N層都有可能,比如超時時間為1小時10分鐘30秒的任務會在小時時間輪從0推進到1時放到分鐘時間輪里,而超時時間為1小時0分鐘30秒的任務則會被直接放到最下層的秒鐘時間輪里),
層級時間輪的tick推動是從下層蔓延到上層的,每次tick可能都會推動1至N層時間輪(比如第0小時第59分鐘59秒->第1小時第0分鐘第0秒就推動了2層),

三.時間輪實作的原始碼級分析

上面介紹的時間輪實作方式是很粗略的,連偽代碼都不算,要想真正理解時間輪的作業原理,最好的辦法還是通過參考已有實作,并自己親手實作一遍才會印象深刻,
在本篇博客中將會結合原始碼介紹三種實作方式略有不同的時間輪,分別是:

  1. 單層多輪次時間輪(參考netty的HashedWheelTimer實作)
  2. 多層次時間輪(存在空轉問題)
  3. 解決了空轉問題的多層次時間輪(參考kafka的Timer實作)

為了便于讀者理解和閱讀原始碼,相比netty或kafka中的工程化的實作,博客中實作的版本是簡化過的,其只聚焦于時間輪本身的作業原理,而舍棄掉了關于取消定時任務、優雅啟動/停止等相關的邏輯,

為了便于測驗,所有的時間輪實作都實作了一個自定義的Timer介面

public interface Timer {

    /**
     * 啟動時間輪
     * */
    void startTimeWheel();

    /**
     * 創建新的超時任務(必須先startTimeWheel完成后,才能創建新任務)
     * @param task 超時時需要調度的自定義任務
     * @param delayTime 延遲時間
     * @param timeUnit 延遲時間delayTime的單位
     * */
    void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit);
}

1.單層/多輪次時間輪(參考netty的實作)

  • MyHashedTimeWheel是參考netty實作的單層多輪時間輪,其包含有一個環形陣列ringBucketArray,陣列中的每個槽(MyHashedTimeWheelBucket)都對應著一個存盤任務節點的鏈表,
  • 為了支持多執行緒并發的創建新任務,在創建新任務時,不是直接將其放入時間輪的環形陣列中,而是先暫時存盤在一個阻塞佇列unProcessTaskQueue中,
    而由模擬tick,推動當前時間的Worker執行緒來將其轉移到環形陣列中的(一個時間輪計時器只有一個Worker執行緒,所以是單執行緒操作無需考慮并發),
  • Worker執行緒會在時間輪啟動后開始運行,其主要完成以下幾個任務
    1. 最初啟動時,設定時間輪的當前時間(System.nanoTime()區別于System.currentTimeMillis()不是獲取現實中的絕對時間),
    2. 隨后執行一個無限回圈,主要用于推進時間輪的當前時間,
    3. 因為java無法直接訪問硬體時鐘,本質上需要依賴作業系統層面的計時器來感知硬體時鐘的變化,
      所以無限回圈中waitForNextTick方法中,基于Thread.sleep來模擬每次tick的間隔,以避免浪費CPU資源,
    4. 隨后在waitForNextTick回傳后,代表著當前時間輪推進了1tick,接著通過transferTaskToBuckets將當前unProcessTaskQueue佇列中的新任務單執行緒挨個的加入時間輪中,
      計算的程序如第二章中所描述的那樣,基于實際需要等待的超時時間與當前時間輪最大間隔的余數獲得應該插入的bucket槽的下標;基于除數獲得剩余的rounds,
    5. 再然后處理當前時間指向的bucket槽中的所有任務(bucket.expireTimeoutTask),如果任務的round<=0,則代表已經超時了,將其丟入指定的執行緒池中異步處理,
      如果round>0,則將其自減1,等待后續的expireTimeoutTask最終將其減至0,
/**
 * 參考netty實作的單層時間輪
 * */
public class MyHashedTimeWheel implements Timer{

    /**
     * 環形陣列
     * */
    private final MyHashedTimeWheelBucket[] ringBucketArray;

    /**
     * 世間輪啟動時的具體時間戳(單位:納秒nanos)
     * */
    private long startTime;

    /**
     * 是否已啟動
     * */
    private final AtomicBoolean started = new AtomicBoolean(false);

    /**
     * 時間輪每次轉動的時間(單位:納秒nanos)
     * (perTickTime越短,調度會更精確,但cpu開銷也會越大)
     * */
    private final long perTickTime;

    /**
     * 總tick數
     * */
    private long totalTick = 0;

    /**
     * 待處理任務的佇列
     * (多外部生產者寫入,時間輪內的單worker消費者讀取,所以netty的實作里使用了效率更高的MpscQueue,Mpsc即MultiProducerSingleConsumer)
     * */
    private final Queue<MyTimeoutTaskNode> unProcessTaskQueue = new LinkedBlockingDeque<>();

    /**
     * 用于實際執行到期任務的執行緒池
     * */
    private final Executor taskExecutor;

    private Thread workerThread;

    /**
     * 建構式
     * */
    public MyHashedTimeWheel(int ringArraySize, long perTickTime, Executor taskExecutor) {
        this.ringBucketArray = new MyHashedTimeWheelBucket[ringArraySize];
        for(int i=0; i<ringArraySize; i++){
            // 初始化,填充滿時間輪喚醒陣列
            this.ringBucketArray[i] = new MyHashedTimeWheelBucket();
        }

        this.perTickTime = perTickTime;
        this.taskExecutor = taskExecutor;
    }

    /**
     * 啟動worker執行緒等初始化操作,必須執行完成后才能正常作業
     * (簡單起見,和netty不一樣不是等任務被創建時才懶加載的,必須提前啟動)
     * */
    @Override
    public void startTimeWheel(){
        // 啟動worker執行緒
        this.workerThread = new Thread(new Worker());
        this.workerThread.start();

        while (!this.started.get()){
            // 自旋回圈,等待一會
        }

        System.out.println("startTimeWheel 啟動完成:" + this.getClass().getSimpleName());
    }

    @Override
    public void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit){
        long deadline = System.nanoTime() + timeUnit.toNanos(delayTime);

        // Guard against overflow.
        if (delayTime > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }

        MyTimeoutTaskNode newTimeoutTaskNode = new MyTimeoutTaskNode();
        newTimeoutTaskNode.setTargetTask(task);
        newTimeoutTaskNode.setDeadline(deadline);

        unProcessTaskQueue.add(newTimeoutTaskNode);
    }

    private final class Worker implements Runnable{

        @Override
        public void run() {
            MyHashedTimeWheel.this.startTime = System.nanoTime();

            // 啟動
            MyHashedTimeWheel.this.started.set(true);

            // 簡單起見,不考慮優雅啟動和暫停的邏輯
            while (true){
                // 等待perTick
                waitForNextTick();

                // 在撈取當前tick下需要處理的bucket前,先將加入到佇列中的任務轉移到環形陣列中(可能包含在當前tick下就要處理的任務)
                transferTaskToBuckets();

                // 基于總tick數,對環形陣列的長度取模,計算出當前tick下需要處理的bucket桶的下標
                int idx = (int) (MyHashedTimeWheel.this.totalTick % MyHashedTimeWheel.this.ringBucketArray.length);
                MyHashedTimeWheelBucket bucket = MyHashedTimeWheel.this.ringBucketArray[idx];
                // 處理當前插槽內的任務(遍歷鏈表中的所有任務,round全部減一,如果減為負數了則說明這個任務超時到期了,將其從鏈表中移除后并交給執行緒池執行指定的任務)
                bucket.expireTimeoutTask(MyHashedTimeWheel.this.taskExecutor);
                // 回圈tick一次,總tick數自增1
                MyHashedTimeWheel.this.totalTick++;
            }
        }

        /**
         * per tick時鐘跳動,基于Thread.sleep
         * */
        private void waitForNextTick(){
            // 由于Thread.sleep并不是絕對精確的被喚醒,所以只能通過(('總的tick數+1' * '每次tick的間隔') + '時間輪啟動時間')來計算精確的下一次tick時間
            // 而不能簡單的Thread.sleep(每次tick的間隔)

            long nextTickTime = (MyHashedTimeWheel.this.totalTick + 1) * MyHashedTimeWheel.this.perTickTime
                            + MyHashedTimeWheel.this.startTime;

            // 因為nextTickTime是納秒,sleep需要的是毫秒,需要保證納秒數過小時,導致直接計算出來的毫秒數為0
            // 因此(‘實際休眠的納秒數’+999999)/1000000,保證了納秒轉毫秒時,至少會是1毫秒,而不會出現sleep(0毫秒)令cpu空轉
            long needSleepTime = (nextTickTime - System.nanoTime() + 999999) / 1000000;
            try {
                // 比起netty,忽略了一些處理特殊場景bug的邏輯
                Thread.sleep(needSleepTime);
            } catch (InterruptedException ignored) {

            }
        }

        private void transferTaskToBuckets() {
            // 為了避免worker執行緒在一次回圈中處理太多的任務,所以直接限制了一個最大值100000
            // 如果真的有這么多,就等到下次tick回圈的時候再去做,
            // 因為這個操作是cpu密集型的,處理太多的話,可能導致無法在一個短的tick周期內完成一次回圈
            for (int i = 0; i < 100000; i++) {
                MyTimeoutTaskNode timeoutTaskNode = MyHashedTimeWheel.this.unProcessTaskQueue.poll();
                if (timeoutTaskNode == null) {
                    // 佇列為空了,直接結束
                    return;
                }

                // 計算到任務超時時,應該執行多少次tick
                // (和netty里的不一樣,這里的deadline是超時時間的絕對時間,所以需要先減去時間輪的startTime)
                // (netty中是生產者執行緒在add時事先減去了startTime,比起由worker執行緒統一處理效率更高,但個人覺得這里的寫法會更直觀)
                long totalTickWhenTimeout = (timeoutTaskNode.getDeadline() - MyHashedTimeWheel.this.startTime) / MyHashedTimeWheel.this.perTickTime;
                // 減去當前時間輪已經進行過的tick數量
                long remainingTickWhenTimeout = (totalTickWhenTimeout - MyHashedTimeWheel.this.totalTick);
                // 因為一次時間輪旋轉會經過ringBucketArray.length次tick,所以求個余數
                long remainingRounds = remainingTickWhenTimeout / MyHashedTimeWheel.this.ringBucketArray.length;
                // 計算出當前任務需要轉多少圈之后才會超時
                timeoutTaskNode.setRounds(remainingRounds);

                // 如果傳入的deadline早于當前系統時間,則totalTickWhenTimeout可能會小于當前的totalTick
                // 這種情況下,讓這個任務在當前tick下就立即超時而被調度是最合理的,而不能在求余后放到一個錯誤的位置而等一段時間才調度(所以必須取兩者的最大值)
                final long ticks = Math.max(totalTickWhenTimeout, MyHashedTimeWheel.this.totalTick); // Ensure we don't schedule for past.
                // 如果能限制環形陣列的長度為2的冪,則可以改為ticks & mask,位運算效率更高
                int stopIndex = (int) (ticks % MyHashedTimeWheel.this.ringBucketArray.length);
                MyHashedTimeWheelBucket bucket = MyHashedTimeWheel.this.ringBucketArray[stopIndex];
                // 計算并找到應該被放置的那個bucket后,將其插入當前bucket指向的鏈表中
                bucket.addTimeout(timeoutTaskNode);
            }
        }
    }
}
/**
 * 時間輪環形陣列下標對應的桶(保存一個超時任務MyTimeoutTaskNode的鏈表)
 * */
public class MyHashedTimeWheelBucket {

    private final LinkedList<MyTimeoutTaskNode> linkedList = new LinkedList<>();

    public void addTimeout(MyTimeoutTaskNode timeout) {
        linkedList.add(timeout);
    }

    /**
     * 遍歷鏈表中的所有任務,round全部減一,如果減為負數了則說明這個任務超時到期了,將其從鏈表中移除后并交給執行緒池執行指定的任務
     * */
    public void expireTimeoutTask(Executor executor){
        Iterator<MyTimeoutTaskNode> iterator = linkedList.iterator();
        while(iterator.hasNext()){
            MyTimeoutTaskNode currentNode = iterator.next();
            long currentNodeRound = currentNode.getRounds();
            if(currentNodeRound <= 0){
                // 將其從鏈表中移除
                iterator.remove();
                // count小于等于0,說明超時了,交給執行緒池去異步執行
                executor.execute(currentNode.getTargetTask());
            }else{
                // 當前節點還未超時,round自減1
                currentNode.setRounds(currentNodeRound-1);
            }

            // 簡單起見,不考慮任務被外部自己取消的case(netty里的timeout.isCancelled())
        }
    }
}
public class MyTimeoutTaskNode {

    /**
     * 任務具體的到期時間(絕對時間)
     * */
    private long deadline;

    /**
     * 存盤在時間輪中,需要等待的輪次
     * (rounds在初始化后,每次時間輪轉動一周便自減1,當減為0時便代表當前任務需要被調度)
     * */
    private long rounds;

    /**
     * 創建任務時,用戶指定的到期時進行調度的任務
     * */
    private Runnable targetTask;

    public long getDeadline() {
        return deadline;
    }

    public void setDeadline(long deadline) {
        this.deadline = deadline;
    }

    public long getRounds() {
        return rounds;
    }

    public void setRounds(long rounds) {
        this.rounds = rounds;
    }

    public Runnable getTargetTask() {
        return targetTask;
    }

    public void setTargetTask(Runnable targetTask) {
        this.targetTask = targetTask;
    }
}

2.層次時間輪(存在空轉問題)

層次時間輪MyHierarchicalHashedTimerV1的主體邏輯與單層多輪次時間輪MyHashedTimeWheel基本保持一致,主要的區別有幾點:

  1. 由于是多層次的時間輪,所以單獨抽象出了Timer(MyHierarchicalHashedTimerV1)和TimerWheel(MyHierarchicalHashedTimeWheelV1)這兩個類,
    Timer類中只持有最底層的時間輪lowestTimeWheel,而單獨的時間輪類MyHierarchicalHashedTimeWheelV1中也存盤了更上層時間輪的參考overFlowWheel,
    不同層次的時間輪之間按照層級構成了一個單向鏈表,
  2. 從unProcessTaskQueue中轉移計時器任務到環形陣列時(MyHierarchicalHashedTimeWheelV1.addTimeoutTask),
    如果當前時間輪的最大間隔內也放不下任務,則會嘗試著將其放入上層的時間輪中;如果上層時間輪不存在則創建之(lazy加載),
    考慮到超時時間可能會很大,所以addTimeoutTask方法可能會遞回呼叫多次,直到找到一個間隔足夠大的時間輪來存盤任務,
  3. 在推動tick時(advanceClockByTick),先推動最底層的時間輪(level為0),將指向的bucket串列中的任務全部交給指定的執行緒池執行,
    同時,如果當前時間輪已經走完一圈后,則去推動上一層的時間輪(可能遞回多次),
    上層的時間輪(level>0)在推動時,通過重新執行advanceClockByTick,將對應bucket串列中的任務轉移到更下層的時間輪中,
/**
 * 層次時間輪,會存在空轉問題
 * */
public class MyHierarchicalHashedTimerV1 implements Timer {

    /**
     * 是否已啟動
     * */
    private AtomicBoolean started = new AtomicBoolean(false);

    /**
     * 世間輪啟動時的具體時間戳(單位:納秒nanos)
     * */
    private long startTime;

    /**
     * 時間輪每次轉動的時間(單位:納秒nanos)
     * (perTickTime越短,調度會更精確,但cpu開銷也會越大)
     * */
    private final long perTickTime;

    /**
     * 總tick數
     * */
    private long totalTick = 0;

    /**
     * 待處理任務的佇列
     * (多外部生產者寫入,時間輪內的單worker消費者讀取,所以netty的實作里使用了效率更高的MpscQueue,Mpsc即MultiProducerSingleConsumer)
     * */
    private final Queue<MyTimeoutTaskNode> unProcessTaskQueue = new LinkedBlockingDeque<>();

    /**
     * timer持有的最低層的時間輪
     * */
    private final MyHierarchicalHashedTimeWheelV1 lowestTimeWheel;

    /**
     * 建構式
     * */
    public MyHierarchicalHashedTimerV1(int ringArraySize, long perTickTime, Executor taskExecutor) {
        this.perTickTime = perTickTime;

        // 初始化最底層的時間輪
        this.lowestTimeWheel = new MyHierarchicalHashedTimeWheelV1(ringArraySize,perTickTime,taskExecutor,0);
    }

    /**
     * 啟動worker執行緒等初始化操作,必須執行完成后才能正常作業
     * (簡單起見,和netty不一樣不是等任務被創建時才懶加載的,必須提前啟動)
     * */
    @Override
    public void startTimeWheel(){
        // 啟動worker執行緒
        new Thread(new Worker()).start();

        while (!this.started.get()){
            // 自旋回圈,等待一會
        }

        System.out.println("startTimeWheel 啟動完成:" + this.getClass().getSimpleName());
    }

    @Override
    public void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit){
        long deadline = System.nanoTime() + timeUnit.toNanos(delayTime);

        // Guard against overflow.
        if (delayTime > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }

        MyTimeoutTaskNode newTimeoutTaskNode = new MyTimeoutTaskNode();
        newTimeoutTaskNode.setTargetTask(task);
        newTimeoutTaskNode.setDeadline(deadline);

        this.unProcessTaskQueue.add(newTimeoutTaskNode);
    }

    private final class Worker implements Runnable{

        @Override
        public void run() {
            MyHierarchicalHashedTimerV1.this.startTime = System.nanoTime();
            // 啟動
            MyHierarchicalHashedTimerV1.this.started.set(true);

            // 簡單起見,不考慮優雅啟動和暫停的邏輯
            while (true){
                // 等待perTick
                waitForNextTick();

                // 在撈取當前tick下需要處理的bucket前,先將加入到佇列中的任務轉移到時間輪中(可能包含在當前tick下就要處理的任務)
                // 層級時間輪內部會做進一步的分配(放不下的話就溢位到更上一層的時間輪)
                transferTaskToTimeWheel();

                // 推進時間輪(層級時間輪內部滿了一圈就會進一步的推進更上一層的時間輪)
                MyHierarchicalHashedTimerV1.this.lowestTimeWheel.advanceClockByTick(
                    (taskNode)->
                        // 參考kafka的寫法,避免Timer里的一些屬性被傳到各個bucket里面
                        MyHierarchicalHashedTimerV1.this.lowestTimeWheel
                            .addTimeoutTask(MyHierarchicalHashedTimerV1.this.startTime, taskNode)
                );

                // 回圈tick一次,總tick數自增1
                MyHierarchicalHashedTimerV1.this.totalTick++;
            }
        }

        /**
         * per tick時鐘跳動,基于Thread.sleep
         * */
        private void waitForNextTick(){
            // 由于Thread.sleep并不是絕對精確的被喚醒,所以只能通過(('總的tick數+1' * '每次tick的間隔') + '時間輪啟動時間')來計算精確的下一次tick時間
            // 而不能簡單的Thread.sleep(每次tick的間隔)

            long nextTickTime = (MyHierarchicalHashedTimerV1.this.totalTick + 1) * MyHierarchicalHashedTimerV1.this.perTickTime
                + MyHierarchicalHashedTimerV1.this.startTime;

            // 因為nextTickTime是納秒,sleep需要的是毫秒,需要保證納秒數過小時,導致直接計算出來的毫秒數為0
            // 因此(‘實際休眠的納秒數’+999999)/1000000,保證了納秒轉毫秒時,至少會是1毫秒,而不會出現sleep(0毫秒)令cpu空轉
            long needSleepTime = (nextTickTime - System.nanoTime() + 999999) / 1000000;
            
            try {
                // 比起netty,忽略了一些處理特殊場景bug的邏輯
                Thread.sleep(needSleepTime);
            } catch (InterruptedException ignored) {

            }
        }

        /**
         * 加入到佇列中的任務轉移到時間輪中
         * */
        private void transferTaskToTimeWheel() {
            // 為了避免worker執行緒在一次回圈中處理太多的任務,所以直接限制了一個最大值100000
            // 如果真的有這么多,就等到下次tick回圈的時候再去做,
            // 因為這個操作是cpu密集型的,處理太多的話,可能導致無法在一個短的tick周期內完成一次回圈
            for (int i = 0; i < 100000; i++) {
                MyTimeoutTaskNode timeoutTaskNode = MyHierarchicalHashedTimerV1.this.unProcessTaskQueue.poll();
                if (timeoutTaskNode == null) {
                    // 佇列為空了,直接結束
                    return;
                }

                // 層級時間輪內部會做進一步的分配(放不下的話就溢位到更上一層的時間輪)
                MyHierarchicalHashedTimerV1.this.lowestTimeWheel.addTimeoutTask(
                    MyHierarchicalHashedTimerV1.this.startTime, timeoutTaskNode);
            }
        }
    }
}
public class MyHierarchicalHashedTimeWheelV1 {

    private final MyHierarchyHashedTimeWheelBucketV1[] ringBucketArray;

    /**
     * 總tick數
     * */
    private long totalTick = 0;

    /**
     * 當前時間輪所能承載的時間間隔
     * */
    private final long interval;

    /**
     * 時間輪每次轉動的時間(單位:納秒nanos)
     * (perTickTime越短,調度會更精確,但cpu開銷也會越大)
     * */
    private final long perTickTime;

    /**
     * 上一層時間跨度更大的時間輪
     * */
    private MyHierarchicalHashedTimeWheelV1 overFlowWheel;

    /**
     * 用于實際執行到期任務的執行緒池
     * */
    private final Executor taskExecutor;

    /**
     * 是否是最底層的時間輪(只有最底層的時間輪才真正的對任務進行調度)
     * */
    private final int level;

    public MyHierarchicalHashedTimeWheelV1(int ringArraySize,long perTickTime, Executor taskExecutor,int level) {
        this.ringBucketArray = new MyHierarchyHashedTimeWheelBucketV1[ringArraySize];
        for(int i=0; i<ringArraySize; i++){
            // 初始化,填充滿時間輪喚醒陣列
            this.ringBucketArray[i] = new MyHierarchyHashedTimeWheelBucketV1();
        }

        this.perTickTime = perTickTime;
        this.taskExecutor = taskExecutor;
        this.interval = perTickTime * ringArraySize;
        this.level = level;

        if(level > 0){
            this.totalTick = 1;
        }
    }

    /**
     * 當前時間輪加入任務(溢位的話,則需要放到上一層的時間輪中)
     * */
    public void addTimeoutTask(long startTime, MyTimeoutTaskNode timeoutTaskNode){
        long deadline = timeoutTaskNode.getDeadline();

        // 當前時間輪所能承載的最大絕對時間為:每個tick的間隔 * 插槽數 + (基于startTime的當前絕對時間)
        long currentWheelMaxRange = this.interval + (startTime + this.perTickTime * this.totalTick);

        if(deadline < currentWheelMaxRange){
            // 當前時間輪能夠承載這個任務,無需放到上一層時間輪中

            // 計算到任務超時時,應該執行多少次tick
            // (和netty里的不一樣,這里的deadline是超時時間的絕對時間,所以需要先減去時間輪的startTime)
            // (netty中是生產者執行緒在add時事先減去了startTime,比起由worker執行緒統一處理效率更高,但個人覺得這里的寫法會更直觀)
            long totalTickWhenTimeout = (deadline - startTime) / this.perTickTime;

            // 如果傳入的deadline早于當前系統時間,則totalTickWhenTimeout可能會小于當前的totalTick
            // 這種情況下,讓這個任務在當前tick下就立即超時而被調度是最合理的,而不能在求余后放到一個錯誤的位置而等一段時間才調度(所以必須取兩者的最大值)
            final long ticks = Math.max(totalTickWhenTimeout, this.totalTick); // Ensure we don't schedule for past.
            // 如果能限制環形陣列的長度為2的冪,則可以改為ticks & mask,位運算效率更高
            int stopIndex = (int) (ticks % this.ringBucketArray.length);

            MyHierarchyHashedTimeWheelBucketV1 bucket = this.ringBucketArray[stopIndex];
            // 計算并找到應該被放置的那個bucket后,將其插入當前bucket指向的鏈表中
            bucket.addTimeout(timeoutTaskNode);
        }else{
            // 當前時間輪無法承載這個任務,需要放到上一層時間輪中

            // 上層時間輪不存在,創建之
            if(this.overFlowWheel == null){
                // 上層時間輪的環形陣列大小保持不變,perTick是當前時間輪的整個間隔(類似低層的60秒等于上一層的1分鐘)
                this.overFlowWheel = new MyHierarchicalHashedTimeWheelV1(
                    this.ringBucketArray.length, this.interval, taskExecutor,this.level+1);
            }

            // 加入到上一層的時間輪中(對于較大的deadline,addTimeoutTask操作可能會遞回數次,放到第N層的時間輪中)
            this.overFlowWheel.addTimeoutTask(startTime,timeoutTaskNode);
        }
    }

    public void advanceClockByTick(Consumer<MyTimeoutTaskNode> flushInLowerWheelFn){
        // 基于總tick數,對環形陣列的長度取模,計算出當前tick下需要處理的bucket桶的下標
        int idx = (int) (this.totalTick % this.ringBucketArray.length);

        MyHierarchyHashedTimeWheelBucketV1 bucket = this.ringBucketArray[idx];

        if(this.level == 0){
            // 如果是最底層的時間輪,將當前tick下命中的bucket中的任務丟到taskExecutor中執行
            bucket.expireTimeoutTask(this.taskExecutor);
        }else{
            // 如果不是最底層的時間輪,將當前tick下命中的bucket中的任務交給下一層的時間輪
            // 這里轉交到下一層有兩種方式:第一種是從上到下的轉交,另一種是當做新任務一樣還是從最下層的時間輪開始放,放不下再往上溢位
            // 選用后一種邏輯,最大的復用已有的創建新任務的邏輯,會好理解一點
            bucket.flush(flushInLowerWheelFn);
        }

        // 當前時間輪的總tick自增1
        this.totalTick++;

        // 當前時間輪的總tick數滿了一圈之后,推進上一層時間輪進行一次tick(如果上一層時間輪存在的話)
        if(this.totalTick % this.ringBucketArray.length == 0 && this.overFlowWheel != null){
            this.overFlowWheel.advanceClockByTick(flushInLowerWheelFn);
        }
    }
}
/**
 * 時間輪環形陣列下標對應的桶(保存一個超時任務MyTimeoutTaskNode的鏈表)
 * */
public class MyHierarchyHashedTimeWheelBucketV1 {

    private final LinkedList<MyTimeoutTaskNode> linkedList = new LinkedList<>();

    public void addTimeout(MyTimeoutTaskNode timeout) {
        linkedList.add(timeout);
    }

    /**
     * 遍歷鏈表中的所有任務,round全部減一,如果減為負數了則說明這個任務超時到期了,將其從鏈表中移除后并交給執行緒池執行指定的任務
     * */
    public void expireTimeoutTask(Executor executor){
        Iterator<MyTimeoutTaskNode> iterator = linkedList.iterator();
        while(iterator.hasNext()){
            MyTimeoutTaskNode currentNode = iterator.next();
            long currentNodeRound = currentNode.getRounds();
            if(currentNodeRound <= 0){
                // 將其從鏈表中移除
                iterator.remove();
                // count小于等于0,說明超時了,交給執行緒池去異步執行
                executor.execute(currentNode.getTargetTask());
            }else{
                // 當前節點還未超時,round自減1
                currentNode.setRounds(currentNodeRound-1);
            }

            // 簡單起見,不考慮任務被外部自己取消的case(netty里的timeout.isCancelled())
        }
    }

    /**
     * 將當前bucket中的資料,通過flushInLowerWheelFn,全部轉移到更底層的時間輪中
     * */
    public void flush(Consumer<MyTimeoutTaskNode> flushInLowerWheelFn){
        Iterator<MyTimeoutTaskNode> iterator = linkedList.iterator();
        while(iterator.hasNext()){
            MyTimeoutTaskNode currentNode = iterator.next();
            // 先從鏈表中移除
            iterator.remove();
            // 通過flushInLowerWheelFn,轉移到更底層的時間輪中
            flushInLowerWheelFn.accept(currentNode);

            // 簡單起見,不考慮任務被外部自己取消的case(netty里的timeout.isCancelled())
        }
    }
}

3.解決了空轉問題的層次時間輪(參考kafka的實作)

上面實作的單層多輪時間輪以及層次時間輪都存在一個問題,即時間輪論文中提到的空轉問題(step through an empty bucket),
舉個例子,假設時間輪的tick間隔被設定為1秒,用戶創建了一個10秒后過期的任務和一個10小時后過期的任務,在處理完了第一個10秒后過期的任務后,剩下的幾萬次tick都由于每個時間輪當前時間指向的bucket是一個空串列而在做無用功,
生產環境中為了保證一定的調度精度,tick間隔一般會設定為毫秒級別甚至更低,那么時間輪空轉對CPU的浪費就不是一個可以忽視的問題了,

在著名的訊息佇列kafka中就實作了一個能解決空轉問題的層次時間輪(Timer/TimingWheel),其解決時間輪空轉的方式是引入延遲佇列,
請注意:這里的延遲佇列不是用于存盤計時器任務的,而是用來存盤bucket槽的(MyHierarchyHashedTimeWheelBucketV2),
前面提到,時間輪插槽的數量是相對固定的,其遠遠少于計時器任務的數量,所以不會出現性能瓶頸,

MyHierarchicalHashedTimerV2由于引入了延遲佇列,所以在實作上相對復雜了一些,

  • 在每次bucket槽中插入第一個新元素時(兩種情況:一是時間輪剛剛初始化從未插入過元素,二是當前bucket槽中的元素已經在之前的一次tick中被全部處理完了),
    將當前bucket插槽插入延遲佇列(DelayQueue)中,
  • bucket插槽中維護了一個expiration超時時間屬性,其代表著當前插槽距離下一次被當前時間指標推動而被指到的絕對時間,
    假設有一個時分秒三層的時間輪,當前時間為1小時5分0秒,如果一個超時時間為2分10秒的任務創建時,其將會被放入分鐘時間輪的第6個插槽中(下標從0開始),
    由于對應插槽將會在2分鐘后被當前時間指標指到,所以其expiration的值當前時間1小時5分0秒+2分,
  • bucket是實作了Delayed介面的,其實際回傳的是expiration減去當前時間的值(之所以減去當前時間,是因為延遲佇列中只有getDelay小于等于0才可以出隊),
    bucket在被加入延遲佇列時,會實際上會按照getDelayed計算的值來進行排序,因此時間輪中理論上越早會被調度的bucket槽,越先出隊,
  • 與v1版本不同,Worker執行緒不再是基于固定的tick間隔來休眠并推進時間,而是監聽延遲佇列(bucketDelayQueue.take),
    當延遲佇列中的bucket到了超時時間時,便會被Worker取出,并進行同樣的推動操作;而那些空的bucket則不會被感知到,從而解決了空轉問題,
  • 同樣的例子,如果1秒的tick間隔下,1個10秒過期和1個10小時過期的任務創建并最終處理,
    MyHierarchicalHashedTimerV2中的Worker執行緒總共只會在當前時間指向的bucket不為空時才會被喚醒(個位數級別的tick處理),而不會一直空轉,
public class MyHierarchicalHashedTimerV2 implements Timer {

    /**
     * 是否已啟動
     * */
    private AtomicBoolean started = new AtomicBoolean(false);

    /**
     * 關聯的最底層時間輪
     * */
    private volatile MyHierarchicalHashedTimeWheelV2 lowestTimeWheel;

    /**
     * 時間輪的啟動時間(單位:納秒)
     * */
    private long startTime;

    /**
     * 每次tick的間隔(單位:納秒)
     * */
    private final long perTickTime;

    /**
     * 時間輪的大小
     * */
    private final int timeWheelSize;

    /**
     * 用于實際執行到期任務的執行緒池
     * */
    private final Executor taskExecutor;

    /**
     * 用于存盤bucket元素的延遲佇列,用于解決時間輪空轉的問題
     * */
    private final DelayQueue<MyHierarchyHashedTimeWheelBucketV2> bucketDelayQueue = new DelayQueue<>();

    public MyHierarchicalHashedTimerV2(int timeWheelSize,long perTickTime, Executor taskExecutor) {
        this.timeWheelSize = timeWheelSize;
        this.perTickTime = perTickTime;
        this.taskExecutor = taskExecutor;
    }

    /**
     * 啟動worker執行緒等初始化操作,必須執行完成后才能正常作業
     * (簡單起見,和netty不一樣不是等任務被創建時才懶加載的,必須提前啟動)
     * */
    @Override
    public void startTimeWheel(){
        // 啟動worker執行緒
        new Thread(new Worker()).start();

        while (!this.started.get()){
            // 自旋回圈,等待一會
        }

        System.out.println("startTimeWheel 啟動完成:" + this.getClass().getSimpleName());
    }

    @Override
    public void newTimeoutTask(Runnable task, long delayTime, TimeUnit timeUnit){
        long deadline = System.nanoTime() + timeUnit.toNanos(delayTime);

        // Guard against overflow.
        if (delayTime > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }

        MyTimeoutTaskNode newTimeoutTaskNode = new MyTimeoutTaskNode();
        newTimeoutTaskNode.setTargetTask(task);
        newTimeoutTaskNode.setDeadline(deadline);

        // 加入到最底層的時間輪中,當前時間輪放不下的會溢位都上一層時間輪
        this.lowestTimeWheel.addTimeoutTask(newTimeoutTaskNode);
    }

    private void advanceClock(){
        try {
            MyHierarchyHashedTimeWheelBucketV2 bucket = this.bucketDelayQueue.take();
            lowestTimeWheel.advanceClockByTick(bucket.getExpiration());
            bucket.flush((node)->{
                // 當前選中的bucket中的任務,重新插入到時間輪中
                // 1 原本處于高層的bucket中的任務會被放到更底層
                // 2 原本就處于最低一層的bucket中的任務會被直接執行
                this.lowestTimeWheel.addTimeoutTask(node);
            });
            // 將當前時間輪的資料
        } catch (Exception e) {
            // 忽略掉例外
            e.printStackTrace();
        }
    }


    private final class Worker implements Runnable {
        @Override
        public void run() {
            MyHierarchicalHashedTimerV2.this.startTime = System.nanoTime();

            // 初始化最底層的時間輪
            MyHierarchicalHashedTimerV2.this.lowestTimeWheel = new MyHierarchicalHashedTimeWheelV2(
                MyHierarchicalHashedTimerV2.this.startTime,
                MyHierarchicalHashedTimerV2.this.perTickTime,
                MyHierarchicalHashedTimerV2.this.timeWheelSize,
                MyHierarchicalHashedTimerV2.this.taskExecutor,
                MyHierarchicalHashedTimerV2.this.bucketDelayQueue
            );

            // 啟動
            MyHierarchicalHashedTimerV2.this.started.set(true);

            while (true){
                // 一直無限回圈,不斷推進時間
                advanceClock();
            }
        }
    }
}
public class MyHierarchicalHashedTimeWheelV2 {

    /**
     * 上層時間輪(生產者/消費者都會訪問到,volatile修飾)
     * */
    private volatile MyHierarchicalHashedTimeWheelV2 overflowTimeWheel;

    /**
     * 每次tick的間隔(單位:納秒)
     * */
    private final long perTickTime;

    /**
     * 時間輪環形陣列
     * */
    private final MyHierarchyHashedTimeWheelBucketV2[] ringBucketArray;

    /**
     * 用于實際執行到期任務的執行緒池
     * */
    private final Executor taskExecutor;

    /**
     * 時間輪的當前時間
     * */
    private long currentTime;

    /**
     * 當前時間輪的間隔(每次tick的時間 * 時間輪的大小)
     * */
    private final long interval;

    private final DelayQueue<MyHierarchyHashedTimeWheelBucketV2> bucketDelayQueue;

    public MyHierarchicalHashedTimeWheelV2(long startTime, long perTickTime, int wheelSize, Executor taskExecutor,
                                           DelayQueue<MyHierarchyHashedTimeWheelBucketV2> bucketDelayQueue) {
        // 初始化環形陣列
        this.ringBucketArray = new MyHierarchyHashedTimeWheelBucketV2[wheelSize];
        for(int i=0; i<wheelSize; i++){
            this.ringBucketArray[i] = new MyHierarchyHashedTimeWheelBucketV2();
        }

        // 初始化時,當前時間為startTime
        this.currentTime = startTime - (startTime % perTickTime);
        this.perTickTime = perTickTime;
        this.taskExecutor = taskExecutor;
        this.interval = perTickTime * wheelSize;
        this.bucketDelayQueue = bucketDelayQueue;
    }

    public void addTimeoutTask(MyTimeoutTaskNode timeoutTaskNode) {
        long deadline = timeoutTaskNode.getDeadline();
        if(deadline < this.currentTime + this.perTickTime){
            // 超時時間小于1tick,直接執行
            this.taskExecutor.execute(timeoutTaskNode.getTargetTask());
        }else if(deadline < this.currentTime + this.interval){
            // 當前時間輪放的下

            // 在超時時,理論上總共需要的tick數
            long totalTick = deadline / this.perTickTime;

            // 如果傳入的deadline早于當前系統時間,則totalTickWhenTimeout可能會小于當前的totalTick
            // 這種情況下,讓這個任務在當前tick下就立即超時而被調度是最合理的,而不能在求余后放到一個錯誤的位置而等一段時間才調度(所以必須取兩者的最大值)
            // 如果能限制環形陣列的長度為2的冪,則可以改為ticks & mask,位運算效率更高
            int stopIndex = (int) (totalTick % this.ringBucketArray.length);

            MyHierarchyHashedTimeWheelBucketV2 bucket = this.ringBucketArray[stopIndex];
            // 計算并找到應該被放置的那個bucket后,將其插入當前bucket指向的鏈表中
            bucket.addTimeout(timeoutTaskNode);

            // deadline先除以this.perTickTime再乘以this.perTickTime,可以保證放在同一個插槽下的任務,expiration都是一樣的
            long expiration = totalTick * this.perTickTime;
            boolean isNewRound = bucket.setExpiration(expiration);
            if(isNewRound){
                this.bucketDelayQueue.offer(bucket);
            }
        }else{
            // 當前時間輪放不下
            if(this.overflowTimeWheel == null){
                createOverflowWheel();
            }

            // 加入到上層的時間輪中(較大的deadline會遞回多次)
            this.overflowTimeWheel.addTimeoutTask(timeoutTaskNode);
        }
    }

    /**
     * 推進當前時間輪的時鐘
     * 舉個例子:假設當前時間輪的當前時間是第10分鐘,perTickTime是1分鐘,
     * 1.如果expiration是第10分鐘第1秒,則不用推動當前時間
     * 2.如果expiration是第11分鐘第0秒,則需要推動當前時間
     * */
    public void advanceClockByTick(long expiration){
        // 只會在tick推進時才會被呼叫,引數expiration可以認為是當前時間輪的系統時間
        if(expiration >= this.currentTime + this.perTickTime){
            // 超過了1tick,則需要推進當前時間輪 (始終保持當前時間是perTickTime的整數倍,邏輯上的totalTick)
            this.currentTime = expiration - (expiration % this.perTickTime);
            if(this.overflowTimeWheel != null){
                // 如果上層時間輪存在,則遞回的繼續推進
                this.overflowTimeWheel.advanceClockByTick(expiration);
            }
        }
    }

    private synchronized void createOverflowWheel(){
        if(this.overflowTimeWheel == null){
            // 創建上層時間輪,上層時間輪的perTickTime = 當前時間輪的interval
            this.overflowTimeWheel = new MyHierarchicalHashedTimeWheelV2(
                this.currentTime, this.interval, this.ringBucketArray.length, this.taskExecutor, this.bucketDelayQueue);
        }
    }
}
public class MyHierarchyHashedTimeWheelBucketV2 implements Delayed {

    private final LinkedList<MyTimeoutTaskNode> taskList = new LinkedList<>();

    private final AtomicLong expiration = new AtomicLong(-1);

    public synchronized void addTimeout(MyTimeoutTaskNode timeout) {
        taskList.add(timeout);
    }

    public synchronized void flush(Consumer<MyTimeoutTaskNode> flush) {
        Iterator<MyTimeoutTaskNode> iterator = taskList.iterator();
        while (iterator.hasNext()){
            MyTimeoutTaskNode node = iterator.next();
            // 從當前bucket中移除,轉移到更下層的時間輪中
            iterator.remove();
            flush.accept(node);

            // 簡單起見,不考慮任務被外部自己取消的case(netty里的timeout.isCancelled())
        }

        this.expiration.set(-1L);
    }

    /**
     * 設定當前bucket的超時時間
     * @return 是否是一個新的bucket  true:是
     * */
    public boolean setExpiration(long expiration){
        long oldValue = https://www.cnblogs.com/xiaoxiongcanguan/archive/2023/02/16/this.expiration.getAndSet(expiration);

        // 如果不一樣,說明當前的expiration已經超過了原來的expiration一圈了,邏輯上不再是同一個bucket
        return oldValue != expiration;
    }

    public long getExpiration(){
        return this.expiration.get();
    }

    @Override
    public long getDelay(TimeUnit unit) {
        // 還剩余多少時間過期
        long delayNanos = Math.max(this.expiration.get() - System.nanoTime(), 0);

        // 將納秒單位基于unit轉換
        return unit.convert(delayNanos,TimeUnit.NANOSECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if(o instanceof MyHierarchyHashedTimeWheelBucketV2){
            return Long.compare(this.expiration.get(),((MyHierarchyHashedTimeWheelBucketV2) o).expiration.get());
        }

        return 0;
    }
}

為什么netty的時間輪不解決空轉問題?(個人理解)

netty作為一個網路框架,大量的計時器任務的超時時間都是相對較短的(最大一般是秒級),時間上的排布相對密集,時間輪空轉的問題不是特別大(rounds的值也會很小,從創建到被調度的開銷很低),
而kafka的計時器模塊所要處理的任務其超時時間的跨度就相對大很多,時間上的排布很稀疏,所以引入延遲佇列來解決空轉問題收益就會大很多,

總結

  • 雖然很早就了解過時間輪的概念,但直到自己造RPC框架輪子玩的時候才發現自己對時間輪的作業原理了解的并不深,
    說來慚愧,當時的我甚至無法很好的回答為什么netty、dubbo等框架要用到計時器的地方不去使用jdk現成的ScheduledThreadPoolExecutor而要自己寫一個時間輪,
  • 基于費曼學習法,我仔細的研究了時間輪的論文并參考已有的開源實作,重新實作了幾種簡化版的時間輪,并以技術博客的形式分享出來,希望能幫助到對時間輪作業原理感興趣的人,
  • 本篇博客的完整代碼在我的github上:https://github.com/1399852153/Reinventing-the-wheel-for-learning(timeWheel模塊) 內容如有錯誤,還請多多指教,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/544099.html

標籤:其他

上一篇:中文標題相似度檢測

下一篇:day11-JSON處理和HttpMessageConverter<T>

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more