主頁 > 後端開發 > Java并發編程的藝術(5-10)學習總結

Java并發編程的藝術(5-10)學習總結

2021-12-07 10:28:13 後端開發

本文參考學習Java并發編程的藝術

第5章 Java中的鎖

5.1 Lock介面

  • synchronized沒有的特性
    • 嘗試非阻塞獲取鎖
    • 能夠中斷獲取鎖
    • 超時獲取鎖

image-20211205001858151

5.2 佇列同步器

  • 佇列同步器AbstractQueuedSynchronizer用來構建鎖,或者其它同步組件,用一個int成員變數表示同步狀態,通過內置的FIFO佇列完成資源獲取執行緒的排隊作業,
  • 同步器的實作主要是繼承,同步器需要提供(getState()、setState(int newState)和compareAndSetState(int expect,int update))方法來獲取同步的狀態,
  • 同步器支持獨占或者是共享地獲取鎖,

5.2.1 佇列同步器的介面與示例

  • 同步器的實作基于模板方法,繼承并重寫,

image-20211205003008781

image-20211205003024682

  • 模板方法包括3類,獨占式的獲取和釋放同步狀態,共享式的獲取和釋放同步狀態,查詢同步佇列的等待狀態執行緒情況,

通過獨占鎖來說明情況

  • 獨占鎖只能一個執行緒獲取鎖,其它執行緒只能進入到同步佇列,
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;

class Mutex implements Lock {
  // 靜態內部類,自定義同步器
  private static class Sync extends AbstractQueuedSynchronizer {
    // 是否處于占用狀態
    protected boolean isHeldExclusively() {
      return getState() == 1;
    }
    // 當狀態為0的時候獲取鎖
    public boolean tryAcquire(int acquires) {
      if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
      }
      return false;
    }
    // 釋放鎖,將狀態設定為0
    protected boolean tryRelease(int releases) {
      if (getState() == 0) throw new
              IllegalMonitorStateException();
      setExclusiveOwnerThread(null);
      setState(0);
      return true;
    }
    // 回傳一個Condition,每個condition都包含了一個condition佇列
    Condition newCondition() { return new ConditionObject(); }
  }
  // 僅需要將操作代理到Sync上即可
  private final Sync sync = new Sync();
  public void lock() { sync.acquire(1); }
  public boolean tryLock() { return sync.tryAcquire(1); }
  public void unlock() { sync.release(1); }
  public Condition newCondition() { return sync.newCondition(); }
  public boolean isLocked() { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
}
  • 上面的Mutex只有在tryAcquire的CAS設定成功才能夠說明獲取了同步狀態,
  • tryRelease把同步狀態設定為0,
  • 獲取狀態失敗就會進入到阻塞佇列,

5.2.2 佇列同步器的實作分析

1.同步佇列

  • 同步器依賴內部的同步佇列完成同步狀態管理,執行緒獲取同步狀態失敗,同步器就會把當前的執行緒以及等待狀態資訊構成節點Node存入到同步佇列,

image-20211205003737960

  • 節點是構成佇列的基礎,有首尾節點,如果執行緒沒有獲取同步狀態成功就會進入到佇列的尾部
  • 加入到尾部的時候一定要是一個執行緒安全的狀態,所以有方法compareAndSetTail(Node expect,Node update),
  • 每次喚醒都是先從頭部開始,

image-20211205003906078

2.獨占式同步狀態獲取與釋放

  • 同步器acquire(int arg)可以去獲取同步狀態,對中斷不敏感,也就是執行緒不會從同步佇列中移出去,
  • 首先是呼叫tryAcquire(int arg)保證執行緒安全獲取同步狀態,
  • 如果失敗構造同步節點Node.EXCLUSIVE,并且通過addWaiter(Node node)加入到同步佇列的尾部,
  • 再通過acquireQueued(Node node,int arg)進入死回圈獲取同步狀態,
  • 只有前驅的節點頭才能夠獲取同步狀態
    • 頭結點獲取同步狀態的節點,釋放之后會喚醒下一個節點
    • 維護FIFO原則,
public final void acquire(int arg) {
 if (!tryAcquire(arg) &&
 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
 selfInterrupt();
}

  • compareAndSetTail(Node expect,Node update)保證了節點執行緒安全加入,enq通過死回圈保證節點被正確添加,
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
// 快速嘗試在尾部添加
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • acquireQueued(final Node node,int arg)死回圈獲取同步狀態,
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

image-20211205004448874

image-20211205004527635

  • 接著就是release,喚醒頭結點后面的一個節點

3.共享式同步狀態獲取與釋放

  • 共享鎖可以多執行緒獲取同步狀態,

image-20211205004758005

  • acquireShared(int arg)共享式獲取同步狀態,
  • 同步器呼叫tryAcquireShared(int arg)來獲取同步狀態,回傳值大于等于0說明獲取成功,
 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

4.獨占式超時獲取同步狀態

  • 可以通過呼叫同步器的doAcquireNanos(int arg,long nanosTimeout)可以超時獲取同步狀態,
  • 如果是呼叫了acquireInterruptibly(int arg),那么只要執行緒被中斷就會報InterruptedException,
  • 但是doAcquireNanos(int arg,long nanosTimeout)能夠中斷,而且可以計算出需要睡眠的時間,nanosTimeout-=now-lastTime如果是大于0說明還沒有超時,否則就是超時了,
  • 如果 nanosTimeout小于等于spinForTimeoutThreshold(1000納秒)的時候,執行緒就不會進入到超時等待了,而是進入到快速自旋,直到超時,
 private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        long lastTime = System.nanoTime();
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                if (nanosTimeout <= 0)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node)
                        && nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                long now = System.nanoTime();
//計算時間,當前時間now減去睡眠之前的時間lastTime得到已經睡眠
//的時間delta,然后被原有超時時間nanosTimeout減去,得到了
//還應該睡眠的時間
                nanosTimeout -= now - lastTime;
                lastTime = now;
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

5.3 重入鎖

  • 支持一個執行緒多次獲取鎖,
  • 公平鎖效率未必比非公平的高,

1.實作重進入

  • 執行緒再次獲取鎖,需要鎖去識別當前獲取鎖的執行緒是不是和鎖的持有執行緒一樣,
  • 鎖的釋放,要求的就是計數重復獲取鎖的數量減低為0,
  • 下面的方法就增加了執行緒的判斷,增加了同步狀態的值,
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • 同樣要求在釋放的時候,減去狀態的值,
protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

2.公平與非公平獲取鎖的區別

  • 鎖是公平那么一定符合FIFO請求的絕對時間順序,
  • 對于非公平鎖來說只要CAS成功,那么就算是同步狀態成功,
  • 對于公平鎖,每次獲取鎖的時候還需要判斷佇列是不是有執行緒等待,才能夠獲取,
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
  • 非公平鎖只要CAS狀態成功就算是獲取鎖,所以可能會導致一個執行緒連續獲取鎖,
  • 而且公平鎖每次獲取鎖的執行緒不同每次都要切換,但是非公平鎖可以連續一個執行緒獲取鎖,減少切換的成本,

5.4 讀寫鎖

  • 讀寫鎖允許同一個時刻多個讀執行緒訪問,
  • 讀寫鎖維護了一對鎖,
  • ReentrantReadWriteLock的特性
    • 公平性選擇:支持公平和非公平獲取
    • 可重入
    • 鎖降級

5.4.1 讀寫鎖的介面與示例

image-20211205021529068

5.4.2 讀寫鎖的實作分析

1.讀寫狀態的設計

  • 同樣是依靠同步器實作同步的功能,
  • 維護讀寫鎖的同步狀態有多個狀態,所以通過按位切割使用,高16位是讀,低16位是寫,
  • 當前的同步狀態是讀鎖被同一個執行緒獲取了寫鎖,重入了兩次,而且還獲取了兩次讀鎖,

image-20211205021832670

2.寫鎖的獲取與釋放

  • 寫鎖是支持可重入的排它鎖,

  • 如果當前執行緒獲取了寫鎖,那么就增加寫狀態,如果當前執行緒在獲取寫鎖時,讀鎖已經被獲取或者該執行緒不是已經獲取寫鎖的執行緒,那么執行緒進入到等待狀態,

  • 這里除了判斷可重入,還判斷是否存在讀鎖,如果存在讀鎖,那么寫鎖就不能被獲取,

  • 因為讀寫鎖需要保證寫鎖的操作對讀鎖是可見的,因為讀鎖被獲取的狀況,去獲取寫鎖,那么當前運行的執行緒是沒有辦法感知當前寫執行緒的操作,

 protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        int c = getState();
        int w = exclusiveCount(c);
        if (c != 0) {
// 存在讀鎖或者當前獲取執行緒不是已經獲取寫鎖的執行緒
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            setState(c + acquires);
            return true;
        }
        if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
            return false;
        }
        setExclusiveOwnerThread(current);
        return true;
    }

3.讀鎖的獲取與釋放

  • 支持可重入的共享鎖,
  • 能被多個執行緒獲取,在沒有別的寫執行緒訪問的情況下,讀鎖會被成功獲取,
  • 如果當前執行緒已經獲取了讀鎖,那么就增加讀狀態,
  • 如果當前執行緒獲取讀鎖的時候,發現寫鎖被獲取,那么就會進入到等待狀態,
protected final int tryAcquireShared(int unused) {
        for (;;) {
            int c = getState();
            int nextc = c + (1 << 16);
            if (nextc < c)
                throw new Error("Maximum lock count exceeded");
            if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
                return -1;
            if (compareAndSetState(c, nextc))
                return 1;
        }
    }

4.鎖降級

  • 鎖降級指的是寫鎖降級成為讀鎖,意思是拿到寫鎖之后再獲取讀鎖
  • 鎖降級的獲取讀鎖是否有必要?如果不獲取讀鎖,直接釋放寫鎖的問題就是另一個執行緒獲取寫鎖并且修改資料,那么當前執行緒無法感知執行緒T的資料更新,

5.5 LockSupport工具

image-20211205023437669

  • park(Object blocker)、parkNanos(Object blocker,long nanos) 和parkUntil(Object blocker,long deadline)阻塞當前執行緒,blocker是標識執行緒等待的物件,

5.6 Condition介面

5.6.2 Condition的實作分析

1.等待佇列

  • 是一個FIFO佇列,如果執行緒呼叫await就會進入Condition的等待佇列,
  • 由于await一定是在獲取鎖的情況執行,所以不需要CAS保證執行緒安全性,

image-20211205023810296

image-20211205023927029

2.等待

  • 釋放鎖,并且執行緒進入到等待佇列,
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
// 當前執行緒加入等待佇列
        Node node = addConditionWaiter();
// 釋放同步狀態,也就是釋放鎖
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

3.通知

  • 喚醒之后回到同步佇列,

image-20211205024106731

第6章 Java并發容器和框架

6.1 ConcurrentHashMap的實作原理與使用

6.1.1 為什么要使用ConcurrentHashMap

  • 并發編程的HashMap容易產生死回圈,而且HashTable效率太低,
  • (1)執行緒不安全的HashMap
    • 多執行緒下的put會造成死回圈,
    • HashMap會進入到一個環形Enrty鏈表,next永遠不是空的,
final HashMap<String, String> map = new HashMap<String, String>(2);
    Thread t = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 10000; i++) {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        map.put(UUID.randomUUID().toString(), "");
                    }
                }, "ftf" + i).start();
            }
        }
    }, "ftf");
t.start();
t.join();
  • (2)效率低下的HashTable
    • 由于使用synchronized所以效率會比較差一些,
  • (3)ConcurrentHashMap鎖分段技術有效提高了并發訪問率,
    • 多執行緒可以訪問容器不同資料段的資料,而且執行緒之間不存在鎖競爭,
    • ConcurrentHashMap把資料分段,分配多把鎖,

6.1.2 ConcurrentHashMap的結構

  • ConcurrentHashMap通過Segment陣列結構和HashEntry陣列結構組成,
  • Segment是可重入鎖,HashEntry用于鍵值對存盤資料,修改HashEntry陣列必須先獲取這段鎖,

image-20211205024820830

6.1.3 ConcurrentHashMap的初始化

1.初始化segments陣列

  • segments長度必須是2^N
if (concurrencyLevel > MAX_SEGMENTS)
     concurrencyLevel = MAX_SEGMENTS;
     int sshift = 0;
     int ssize = 1;
     while (ssize < concurrencyLevel) {
     ++sshift;
     ssize <<= 1;
     }
     segmentShift = 32 - sshift;
     segmentMask = ssize - 1;
    this.segments = Segment.newArray(ssize);

2.初始化segmentShift和segmentMask

  • sshift等于ssize左移位數,默認concurrencyLevel=16,也就是需要移動4次,
  • segmentShift等于32減sshift,等于的是28,32是因為ConcurrentHashMap里的hash()方法輸出最大是32.也就是演算法輸出的二進制數的長度是32位,
  • segmentMask是散列運算的掩碼,等于ssize-1,
  • sshift的意思就是ssize的1向左移動的次數,由于ssize一定是2n,所以只有一個1存在,也就是取高位的幾位,取決于Segment的大小的2n占了多少位,

3.初始化每個segment

  • initialCapacity是ConcurrentHashMap的初始化容量,loadfactor是每個segment的負載因子,

  • c就是初始容量initialCapacity/ssize,也就是除以Segment陣列的大小,可以發現初始的倍數就是1.

  • cap就是等于c,

  • cap是Segment里面HashEntry陣列的長度,

  • 可以看到一開始的擴容閾值是threshold=(int)cap*loadFacto=0,也就是只要插入資料就會擴容,

  if (initialCapacity > MAXIMUM_CAPACITY)
    initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
            ++c;
    int cap = 1;
while (cap < c)
    cap <<= 1;
for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);

6.1.4 定位Segment

  • 讓元素能夠均勻分配到每個Segment,所以需要再一次進行hash操作,避免散列沖突嚴重,主要根據元素定位Segment的位置,
  • 默認下segmentShift為28,segmentMask為15,所以每次散列之后的值,右移28位,相當于就是高4位來進行散列運算,避免了大量的散列沖突,相當于hash演算法一次,+右移segmentShift與Segment陣列大小相與一次一次,
private static int hash(int h) {
    h += (h << 15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h << 3);
    h ^= (h >>> 6);
    h += (h << 2) + (h << 14);
    return h ^ (h >>> 16);
}

6.1.5 ConcurrentHashMap的操作

1.get操作

  • 首先經過一次散列,然后找到segment,然后再次通過散列找到對應的HashEntry陣列的元素,
  • get不加鎖,只有值是空的時候才會加鎖重讀,
  • get不加鎖的原因是計算segment大小的count和存盤值的value都是定義為volatile物件,保證了不會讀到過期的值,但是只能被單執行緒寫,
public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}	
  • HashEntry是直接重新散列,并沒有使用元素的hashCode防止Segment和HashEntry的散列是一樣的,
hash >>> segmentShift) & segmentMask // 定位Segment所使用的hash演算法
int index = hash & (tab.length - 1); // 定位HashEntry所使用的hash演算法

2.put操作

  • put的時候必須加鎖,先定位Segment,然后進行插入操作,插入的步驟
    • 判斷Segment的HashEntry是否需要擴容,
    • 第二步就是添加元素的位置,把它放到HashEntry上面,
  • 是否需要擴容,
    • 首先判斷HashEntry的大小是不是大于threshold,如果是那么就擴容,
  • 如何擴容
    • 創建兩倍大的陣列,然后把之前陣列的元素進行再散列插入到新的陣列,ConcurrentHashMap只會對部分的Segment進行擴容,

3.size操作

  • 現在是不是把所有Segment的size相加就是能夠得到ConcurrentHashMap的大小?
    • 可能相加的時候Segment已經發生了變化,最安全的辦法就是把put、remove、clean都給鎖住,但是做法的效率非常低下,
    • ConcurrentHashMap的做法是先嘗試兩次統計各個Segment大小,如果count發生變化,那么才會加鎖,
    • 那么ConcurrentHashMap如果判斷容器變化了,主要是看modCount也就是put和clean還有remove操作的時候這個變數都會發生變化+1,所以統計size的時候比較前后的modCount是否發生變化,就能夠得知容器是否發生變化,

6.2 ConcurrentLinkedQueue

  • 實作執行緒安全的佇列的兩種方式阻塞和非阻塞,
    • 阻塞演算法使用的是一把鎖或者兩個鎖實作入隊和出隊,
    • 非阻塞可以使用CAS解決,
  • 那么非阻塞是如何做到的?
  • ConcurrentLinkedQueue基于鏈表的節點的無界執行緒安全佇列,先進先出進行的排序,每次加入都加入到佇列的尾部,

6.2.1 ConcurrentLinkedQueue的結構

image-20211205123741808

  • 有head和tail節點,每個節點通過next關聯起來,默認head是空節點,tail指向head節點,

6.2.2 入佇列

1.入佇列的程序

  • 入隊就是把節點加入到佇列的末尾,并且把末尾指標指向最后一個節點,

image-20211205124051741

  • 那么如果發生插隊是如何搶占和插入節點的?
    • 入隊之前會創建一個節點
    • 創建一個指向末尾節點的臨時節點,
    • 然后開始先去檢查p是否有下一個節點,如果沒有下一個節點,那么就能夠把n設定為下一個節點,
    • 如果有那么就回圈指向下一個節點,說明已經被別人搶先插入,hops++
    • 接著如果回圈發現next是空的,那么就插入n進去,這個插入是一個CAS,允許失敗,也就是可能會多個執行緒爭搶這個位置,如果成功,那么就結束回圈,
    • 如果失敗,那么就p指向下一個節點再一次進入回圈,
  • 總結來說就是判斷是不是尾節點,如果不是就回圈,如果是那么就CAS插入,
 public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
// 入隊前,創建一個入隊節點
        Node<E> n = new Node<E>(e);
        retry:
// 死回圈,入隊不成功反復入隊,
        for (;;) {
// 創建一個指向tail節點的參考
            Node<E> t = tail;
// p用來表示佇列的尾節點,默認情況下等于tail節點,
            Node<E> p = t;
            for (int hops = 0; ; hops++) {
// 獲得p節點的下一個節點,
                Node<E> next = succ(p);
// next節點不為空,說明p不是尾節點,需要更新p后在將它指向next節點
                if (next != null) {
// 回圈了兩次及其以上,并且當前節點還是不等于尾節點
                    if (hops > HOPS && t != tail)
                        continue retry;
                    p = next;
                }
// 如果p是尾節點,則設定p節點的next節點為入隊節點,
                else if (p.casNext(null, n)) {
/*如果tail節點有大于等于1個next節點,則將入隊節點設定成tail節點,
更新失敗了也沒關系,因為失敗了表示有其他執行緒成功更新了tail節點*/
                    if (hops >= HOPS)
                        casTail(t, n); // 更新tail節點,允許失敗
                    return true;
                }
// p有next節點,表示p的next節點是尾節點,則重新設定p節點
                else {
                    p = succ(p);
                }
            }
        }
    }

2.定位尾節點

  • 通過succ方法來定位,因為可能尾節點是tail節點,也可能是next,
final Node<E> succ(Node<E> p) {
    Node<E> next = p.getNext();
    return (p == next) head : next;
}

3.設定入隊節點為尾節點

  • p.casNext(null,n),就是把入隊節點設定為p.next的指向節點,

4.HOPS的設計意圖

  • 下面的意思就是回圈去找到tail節點,并且設定next節點是n,并且重新設定tail節點,這樣可行嗎?
  • 這樣的問題其實就是CAS的次數太多,每次進來都需要CAS,那么如何減少CAS?
  • 使用hops變數,并不是每次都把tail節點更新為尾節點,而是tail節點和尾節點大于等于常量HOPS的時候才會更新,距離越長,那么CAS更新的次數就會越少,帶來一個問題就是距離越長,定位的時間也就越長,本質上就是增加volatile的讀來減少volatile的寫,因為寫volatile消耗更大,需要增加屏障,
  • 那么為什么會增加了volatile的讀?原因就是CAS的操作是volatile讀寫一起的,但是純粹的succ訪問下一節點只是一個volatile讀操作,前面是知道volatile寫操作還需要加上一個StoreLoad屏障,但是volatile讀已經不需要加任何屏障,因為X86不允許讀寫和讀讀的重排序,
 public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        Node<E> n = new Node<E>(e);
        for (;;) {
            Node<E> t = tail;
            if (t.casNext(null, n) && casTail(t, n)) {
                return true;
            }
        }
    }

6.2.3 出佇列

image-20211205131113733

  • 出隊也是利用了hops,
    • 首先是p指向了head節點,
    • 然后獲取p的元素,
    • 如果元素不是空的,那么就設定p的元素是null,而且不是每次都會直接更新頭節點,而是等待頭結點和真正的頭結點有一段距離的時候才會更新,
  • 總結來說就是判斷頭節點的元素是不是空,如果是,那么就找到下一個頭結點回圈判斷,如果不是空,那么就設定為空,并且等待超過HOPS的時候才能夠重新設定頭節點,也是通過增加volatile讀的方式來減少volatile的寫,
 public E poll() {
        Node<E> h = head;
// p表示頭節點,需要出隊的節點
        Node<E> p = h;
        for (int hops = 0;; hops++) {
// 獲取p節點的元素
            E item = p.getItem();
// 如果p節點的元素不為空,使用CAS設定p節點參考的元素為null,
// 如果成功則回傳p節點的元素,
            if (item != null && p.casItem(item, null)) {
                if (hops >= HOPS) {
// 將p節點下一個節點設定成head節點
                    Node<E> q = p.getNext();
                    updateHead(h, (q != null) q : p);
                }
                return item;
            }
// 如果頭節點的元素為慷訓頭節點發生了變化,這說明頭節點已經被另外
// 一個執行緒修改了,那么獲取p節點的下一個節點
            Node<E> next = succ(p);
// 如果p的下一個節點也為空,說明這個佇列已經空了
            if (next == null) {
// 更新頭節點,
                updateHead(h, p);
                break;
            }
// 如果下一個元素不為空,則將頭節點的下一個節點設定成頭節點
            p = next;
        }
        return null;
    }

6.3 Java中的阻塞佇列

6.3.1 什么是阻塞佇列

  • 支持阻塞的插入:佇列滿的時候會阻塞插入元素的執行緒
  • 支持阻塞的移除:佇列是空的時候,佇列阻塞移除的執行緒,等待非空,

image-20211205131758980

遇到佇列滿和佇列空的處理方式,

  • 拋出例外:佇列滿的時候插入元素就會拋出例外

  • 回傳特殊值:當往佇列插入元素的時候,成功true,移除方法取出失敗回傳null,

  • 一直阻塞:阻塞佇列滿的時候,繼續put的執行緒會被阻塞,如果是空的時候get,那么也會被阻塞,直到不空的時候

  • 超時退出:阻塞佇列滿的時候,生產者執行緒往佇列里面插入元素,佇列就會阻塞生產者執行緒一段時間,如果超出一段時間生產者執行緒就會退出,

6.3.2 Java里的阻塞佇列

  • ArrayBlockingQueue:陣列有界阻塞佇列
  • LinkedBlockingQueue鏈表有界阻塞佇列
  • PriorityBlockingQueue:支持優先級的無界阻塞佇列
  • DelayQueue:使用優先級佇列的無界阻塞佇列,
  • SynchronousQueue:不存盤元素的無界阻塞佇列,
  • LinkedTransferQueue:鏈表組成的無界阻塞佇列
  • LinkedBlockingDeque:鏈表組成的雙向阻塞佇列,
  1. ArrayBlockingQueue
  • 陣列實作的有界阻塞佇列,遵循FIFO
  • 默認不保證執行緒公平訪問佇列,也就是阻塞和新進來的執行緒都能夠爭奪資源,
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
    throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull = lock.newCondition();
}

  1. LinkedBlockingQueue
  • 鏈表實作的有界阻塞佇列,也是FIFO
  1. PriorityBlockingQueue
  • 支持優先級的無界阻塞佇列,默認是升序,
  1. DelayQueue
  • 支持延時獲取元素的無界阻塞佇列,使用的是優先級佇列實作的,
  • 佇列介面必須實作Delayed介面,創建元素多久才能夠被獲取,
  • 應用
    • 快取系統設計:佇列保存快取的有效期,并且執行緒輪詢,
    • 定時任務調度:一旦能夠獲取任務,就可以執行,
  • (1)如何實作Delayed介面
    • 初始化基本資料,time記錄物件延遲什么時候可以使用,
    • 實作getDelay方法,回傳當前元素還需要延時的時間,
    • compareTo指定元素的順序,把延時比較長的放到末尾,
  • (2)如何實作延時阻塞佇列
    • 獲取元素的時候沒有達到延時時間就會被阻塞,
  1. SynchronousQueue
  • 不存盤元素的阻塞佇列,
  • 每個put必須等待一個take,否則不能繼續添加元素,
  • 支持公平訪問佇列,默認是非公平的,
  • 傳遞的速度非常快
  1. LinkedTransferQueue
  • 由鏈表結構組成的無界阻塞TransferQueue佇列
  • 多出tryTransfer和transfer方法,
  • (1)transfer方法
    • transfer可以把元素立刻傳輸給消費者,如果沒有消費者,那么就存放到尾部,
    • 下面的關鍵代碼
    • 第一行嘗試把s當前節點作為tail節點,
    • 第二行是CPU自旋等待消費者消費元素,自旋一定次數會呼叫yield來切換元素,
Node pred = tryAppend(s, haveData);
return awaitMatch(s, pred, e, (how == TIMED), nanos);
  • (2)tryTransfer方法
    • 試探生產者的元素是否能夠傳給消費者,
    • 可以設定時間等待,tryTransfer(E e,long timeout,TimeUnit unit)如果沒有指定的消費者消費,那么就會回傳false,
  1. LinkedBlockingDeque
  • 由鏈表結構組成的雙向阻塞佇列

6.3.3 阻塞佇列的實作原理

  • 消費者和生產者如何知道佇列的情況?

  • 如何高效通信?

  • 使用通知模式實作,生產者添加元素的時候阻塞住生產者,消費者消費了一個元素之后會通知生產者佇列可用,可以看到ArrayBlockingQueue使用了Condition實作,

private final Condition notFull;
    private final Condition notEmpty;
    public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他代碼
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        } finally {
            lock.unlock();
        }
    }
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        } finally {
            lock.unlock();
        }
    }
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }
  • 而且這里的await的原理實際上就是LockSupport的阻塞,park的原理是unsafe的park來阻塞當前的執行緒,這是一個native的方法,
//await的原理,
public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

//Lock.park
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}
//Unsafe.park
public native void park(boolean isAbsolute, long time);


  • 只有在park對應的unpark執行的時候才會回傳,
  • 或者是執行緒中斷
  • 或者是park超時
  • 或者是出現了例外,

JVM如何實作park,

  • Linux系統的pthread_cond_wait實作的,

6.4 Fork/Join框架

6.4.1 什么是Fork/Join框架

  • 把大任務切分成各種小任務,并且最后把結果匯總的框架,

image-20211205134633342

6.4.2 作業竊取演算法

  • 某個執行緒從其它佇列竊取任務執行,
  • 為了減少競爭我們會把各個切分的子任務放到不同的佇列,并且一個佇列創建一個執行緒來執行,
  • 但是有的執行緒把任務執行完之后去幫助別的執行緒處理,所以為了減少兩個執行緒競爭,它會把佇列弄成雙端的,
  • 優點是充分利用執行緒計算,減少執行緒間的競爭
  • 缺點是雙端佇列只有一個任務的時候,仍然存在競爭,演算法消耗資源,創建多個執行緒和佇列,

image-20211205134847943

6.4.3 Fork/Join框架的設計

  1. 分割任務
  2. 執行并合并任務,分割的任務放到雙端佇列,結果統一放到一個佇列,
  3. 啟動執行緒并且從佇列拿出資料合并,

Fork/Join使用兩個類來完成以上兩件事情

  • 使用ForkJoinTask的子類
    • RecursiveAction:沒有結果的任務
    • RecursiveTask:有回傳結果的任務,
  • ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行,

6.4.4 使用Fork/Join框架

  • 實際上就是一個二分遞回演算法,
public class CountTask extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 2; // 閾值
        private int start;
        private int end;
        public CountTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
        @Override
        protected Integer compute() {
            int sum = 0;
// 如果任務足夠小就計算任務
            boolean canCompute = (end - start) <= THRESHOLD;
            if (canCompute) {
                for (int i = start; i <= end; i++) {
                    sum += i;
                }
            } else {
// 如果任務大于閾值,就分裂成兩個子任務計算
                int middle = (start + end) / 2;
                CountTask leftTask = new CountTask(start, middle);
                CountTask rightTask = new CountTask(middle + 1, end);
// 執行子任務
                leftTask.fork();
                rightTask.fork();
// 等待子任務執行完,并得到其結果
                int leftResult=leftTask.join();
                int rightResult=rightTask.join();
// 合并子任務
                sum = leftResult + rightResult;
            }
            return sum;
        }
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一個計算任務,負責計算1+2+3+4
            CountTask task = new CountTask(1, 4);
// 執行一個任務
            Future<Integer> result = forkJoinPool.submit(task);
            try {
                System.out.println(result.get());
            } catch (InterruptedException e) {
            } catch (ExecutionException e) {
            }
        }
    }

6.4.5 Fork/Join框架的例外處理

  • 可以通過isCompletedAbnormally()檢查任務是否出現例外,

6.4.6 Fork/Join框架的實作原理

  • ForkJoinPool由ForkJoinTask陣列和ForkJoinWorkerThread陣列組成,一個表示任務,一個就是執行緒執行任務,
  • (1)ForkJoinTask的fork方法實作原理
    • 實際上就是異步執行這個任務,呼叫了一個thread,
    • pushTask把任務存放到task陣列上,并且呼叫ForkJoinPool的signalWork方法喚醒一個執行緒執行,
public final ForkJoinTask<V> fork() {
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}
  • (2)ForkJoinTask的join方法實作原理
    • 阻塞當前執行緒并且等待結果,
    • 呼叫了doJon方法,得到任務的狀態,判斷任務是否完成,任務的分為三個狀態,已完成(NORMAL)、被取消(CANCELLED)、信號(SIGNAL)和出現例外 (EXCEPTIONAL),
    • 如果發現任務沒有完成,那么就去拿出執行緒來執行
    • 如果完成那么就回傳
    • 如果出現例外那么就拋出例外,
//join
public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }
    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }

//doJoin
private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s;
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            return w.joinTask(this);
        }
        else
            return externalAwaitDone();
    }

第7章 Java中的13個原子操作類

7.1 原子更新基本型別類

  • ·AtomicBoolean:原子更新布爾型別,
  • ·AtomicInteger:原子更新整型,
  • ·AtomicLong:原子更新長整型,

提供的方法

  • int addAndGet(int delta)輸入值與實體的值相加,并且回傳結果
  • boolean compareAndSet(int expect,int update):輸入的值與預期的一樣,那么就會把原來的值設定為新的值,
  • int getAndIncrement():原子的方式自增,
  • void lazySet(int newValue):最侄訓設定為新值,可能會延遲,
  • ·int getAndSet(int newValue):原子的方式設定新的值,并且回傳舊的值,

getAndIncrement如何實作原子操作

  • 首先是獲取原來Atomoic存盤的值
  • 然后進行加一,
  • compareAndSet(current, next)進行原子操作,檢查當前的值是否等于current,如果沒有被修改就能賦值,
 public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
  • Atomic的類基本是Unsafe實作的,
 /**
     * 如果當前數值是expected,則原子的將Java變數更新成x
     * @return 如果更新成功則回傳true
     */
    public final native boolean compareAndSwapObject(Object o,
                                                     long offset,
                                                     Object expected,
                                                     Object x);
    public final native boolean compareAndSwapInt(Object o, long offset,
                                                  int expected,
                                                  int x);
    public final native boolean compareAndSwapLong(Object o, long offset,
                                                   long expected,
                                                   long x);

7.2 原子更新陣列

  • ·AtomicIntegerArray:原子更新整型陣列里的元素,

  • ·AtomicLongArray:原子更新長整型陣列里的元素,

  • ·AtomicReferenceArray:原子更新參考型別陣列里的元素,

  • ·AtomicIntegerArray類主要是提供原子的方式更新陣列里的整型

他們的方法與api

  • int addAndGet(int i,int delta):原子方式把delta與索引是i的值相加
  • boolean compareAndSet(int i,int expect,int update):是否等于預期的值,如果是才能夠修改,

7.3 原子更新參考型別

  • ·AtomicReference:原子更新參考型別,
  • ·AtomicReferenceFieldUpdater:原子更新參考型別里的欄位,
  • ·AtomicMarkableReference:原子更新帶有標記位的參考型別,
  • 主要是能夠原子更新AtomicReference也就是它指向的參考,

7.4 原子更新欄位類

  • ·AtomicIntegerFieldUpdater:原子更新整型的欄位的更新器,

  • ·AtomicLongFieldUpdater:原子更新長整型欄位的更新器,

  • ·AtomicStampedReference:原子更新帶有版本號的參考型別,

  • 使用方式就是定位位置,然后呼叫方法即可,

 public class AtomicIntegerFieldUpdaterTest {
        // 創建原子更新器,并設定需要更新的物件類和物件的屬性
        private static AtomicIntegerFieldUpdater<User> a = AtomicIntegerFieldUpdater.
                newUpdater(User.class"old");
        public static void main(String[] args) {
// 設定柯南的年齡是10歲
            User conan = new User("conan"10);
// 柯南長了一歲,但是仍然會輸出舊的年齡
            System.out.println(a.getAndIncrement(conan));
// 輸出柯南現在的年齡
            System.out.println(a.get(conan));
        }
        public static class User {
            private String name;
            public volatile int old;
            public User(String name, int old) {
                this.name = name;
                this.old = old;
            }
            public String getName() {
                return name;
            }
            public int getOld() {
                return old;
            }
        }
    }

第9章 Java中的執行緒池

執行緒池的好處

  • 降低資源消耗,利用已經創建執行緒,減少執行緒的創建和銷毀帶來的性能消耗,
  • 提高回應速度,當任務到達的時候,任務可以不需要執行緒創建就能夠立即執行,
  • 提高執行緒的可管理性,

9.1 執行緒池的實作原理

  • 執行緒池判斷核心執行緒池里面的執行緒是否都在執行任務,如果不是那么就創建一個新的執行緒來執行任務,如果核心執行緒池執行緒都在執行任務,那么進入下一個流程,
  • 執行緒池判斷作業佇列是不是滿了,如果沒有滿,那么就把任務存盤在佇列,如果滿了,那么進入到下一個流程
  • 執行緒池判斷執行緒池的執行緒是不是都處于作業狀態,如果沒有就創建一個執行任務,否則只能執行飽和策略來執行任務了,

image-20211205155847511

ThreadPoolExecutor執行execute方法分四種,

  1. 當前運行的執行緒數少于corePoolSize,那么創建新的執行緒作業,
  2. 如果當前運行執行緒數大于corePoolSize,那么就把任務加入到BlockingQueue,
  3. 如果無法將任務加入到BlockingQueue也就是佇列滿的情況,那么就要創建新的執行緒執行,
  4. 如果執行緒超出了maximumPoolSize那么執行執行飽和策略了,

image-20211205160152922

原始碼分析

  • command就是任務,判斷邏輯基本上是和上面是一模一樣的,
   public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
// 如果執行緒數小于基本執行緒數,則創建執行緒并執行當前任務
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
// 如執行緒數大于等于基本執行緒數或執行緒創建失敗,則將當前任務放到作業佇列中,
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
// 如果執行緒池不處于運行中或任務無法放入佇列,并且當前執行緒數量小于最大允許的執行緒數量,
// 則創建一個執行緒執行任務,
            else if (!addIfUnderMaximumPoolSize(command))
// 拋出RejectedExecutionException例外
                reject(command); // is shutdown or saturated
        }
    }

作業執行緒

  • 執行緒會被封裝為Worker,Worker執行任務之后還會回圈獲取作業佇列里面的任務執行,
  • 可以看看run方法,就是不斷回圈,并且獲取任務執行,
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }

image-20211205160839933

  • 執行緒池執行任務分為兩種情況
    • execute方法創建執行緒,并且執行任務
    • 執行緒執行任務之后,會取到BlockingQueue獲取任務執行,

9.2 執行緒池的使用

9.2.1 執行緒池的創建

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
milliseconds,runnableTaskQueue, handler);
  • corePoolSize:核心執行緒數
  • runnableTaskQueue:任務佇列,也就是上面提到的各種阻塞佇列,比如ArrayBlockingQueue,
    • ArrayBlockingQueue
    • ·LinkedBlockingQueue
    • ·SynchronousQueue
    • PriorityBlockingQueue
  • maximumPoolSize:執行緒的最大數量,
  • ThreadFactory:創建執行緒的工廠,執行緒工廠可以給執行緒賦予名字,
  • RejectedExecutionHandler:飽和策略,佇列和執行緒池都已經滿的情況如何處理任務,
    • AbortPolicy:拋出例外
    • CallerRunsPolicy:呼叫者所在的執行緒執行任務
    • DiscardOldestPolicy:丟棄佇列最近的一個任務
    • DiscardPolicy:直接丟棄,
  • keepAliveTime:超過corePoolSize的執行緒空閑存活時間,
  • TimeUnit:上面存活時間的一個單位,

9.2.2 向執行緒池提交任務

  • execute不需要回傳值,只需要輸入一個Runnable任務,
   threadsPool.execute(new Runnable() {
        @Override
        public void run() {
// TODO Auto-generated method stub
        }
    });
  • submit回傳一個Future物件,相當于就是資料封裝到了Future這里,
 Future<Object> future = executor.submit(harReturnValuetask);
        try {
            Object s = future.get();
        } catch (InterruptedException e) {
// 處理中斷例外
        } catch (ExecutionException e) {
// 處理無法執行任務例外
        } finally {
// 關閉執行緒池
            executor.shutdown();
        }

9.2.3 關閉執行緒池

  • 呼叫執行緒池的shutdown或者是shutdownNow方法關閉執行緒池,原理是遍歷作業執行緒,發出中斷信號,
    • shutdownNow設定執行緒狀態是STOP然后嘗試停止所有執行緒
    • shutdown只是設定執行緒狀態SHUTDOWN,中斷所有沒有執行任務的執行緒,
  • 所有任務關閉,那么執行緒池關閉成功,

9.2.4 合理地配置執行緒池

從哪方面考量?

  • 任務性質:CPU密集型還是IO密集型
  • 任務的優先級
  • 任務執行時間
  • 任務的依賴性,

如何選擇執行緒池

  • CPU密集型選擇執行緒盡可能少一點的,
  • 對于IO密集型的選擇執行緒多一些,因為IO密集說明不是所有執行緒都能作業,有的被阻塞導致CPU無法很好被使用,
  • 優先級不同可以使用PriorityBlockingQueue來進行處理,能夠讓優先級更高的任務先執行,
  • 依賴資料庫連接池的任務,執行緒提交SQL之后會進入阻塞,所以使用更多執行緒會更好使用CPU,
  • 建議使用有界佇列,避免任務的積壓,任務積壓導致的記憶體撐滿的問題,會導致程式直接結束,

9.2.5 執行緒池的監控

如何監控?

  • 通過各種執行緒池引數
    • taskCount:執行緒池需要執行的任務數量
    • completedTaskCount:執行緒池完成任務的數量
    • largestPoolSize:創建過的執行緒池最大的數量,
    • getPoolSize:執行緒池的執行緒數量,
    • getActiveCount:獲取活動的執行緒數,

第10章 Executor框架

10.1 Executor框架簡介

10.1.1 Executor框架的兩級調度模型

  • 在HotSpot VM模型里面,java執行緒被一對一映射為本地作業系統的執行緒,作業系統可以調度執行緒,分配給CPU,
  • java多執行緒會把應用分成多個任務,然后使用用戶級的調度器,把任務映射為固定數量的執行緒,作業系統內核會把這些執行緒映射到硬體處理器上,
  • 也就是說Executor把任務分配給執行緒執行,執行緒通過作業系統內核來映射到CPU的調度,

10.1.2 Executor框架的結構與成員

image-20211205162907845

1.Executor框架的結構

組成部分

  • 任務:實作任務的Callable和Runnable介面
  • 任務執行:任務執行機制的核心介面Executor,以及集成Executor的ExecutorService,還有兩個實作了ExecutorService介面的ThreadPoolExecutor和ScheduledThreadPoolExecutor,
  • 異步計算的結果,也就是Future介面和實作Future介面的FutureTask類

類和介面的簡介

  • Executor是Executor框架的基礎,把任務的提交和任務的執行分離開,
  • ThreadPoolExecutor:執行緒池的核心實作類,執行被被提交的任務,
  • ScheduledThreadPoolExecutor:延遲執行命令,或者定期執行命令,
  • Future:代表異步計算結果,
  • Runnable和Callable的實作類可以被ThreadPoolExecutpor和ScheduledThreadPoolExecutor,

image-20211205164408937

Executor框架的使用,

image-20211205164425841

  • 首先是主執行緒要創建Runnable或者是Callable的實作物件,
  • 然后把任務物件Runnable交給ExecutorService執行execute,
  • 如果任務物件是Callable那么就是執行submit,回傳一個Future異步結果,
  • 主執行緒可以通過Future的get來等待任務執行結束,

2.Executor框架的成員

  • (1)ThreadPoolExecutor
    • 這個通常直接通過工廠類Executors直接創建,
    • Executors創建3種型別的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool,
    • 1)FixedThreadPool,:創建固定執行緒的執行緒池,適用負載比較重的服務器,
    • 2)SingleThreadExecutor:順序完成任務,所以只有一個執行緒在執行,
    • 3)CachedThreadPool,:根據需要創建執行緒,執行緒數量是無限的,可以執行很多小任務,適用于負載比較輕的,
  • (2)ScheduledThreadPoolExecutor
    • 兩種型別,
    • ScheduledThreadPoolExecutor包含若干個執行緒的型別
      • 適合多個后臺的周期執行任務,
    • SingleThreadScheduledExecutor包含一個執行緒的型別
      • 適合單個后臺的周期執行任務,
  • (3)Future介面
    • 能夠表示異步計算的結果,通過FutureTask,
  • (4)Runnable介面和Callable介面
    • 就是任務的實作介面,

10.2 ThreadPoolExecutor詳解

  • corePool:核心執行緒池的大小
  • ·maximumPool:最大執行緒池的大小,
  • ·BlockingQueue:用來暫時保存任務的作業佇列,
  • ·RejectedExecutionHandler:飽和策略,

10.2.1 FixedThreadPool詳解

  • corePool和maximumPool都被設定為nThreads
  • 說明不能夠有多余的執行緒,
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
    }

image-20211205165410522

  1. 如果當前執行緒少于core那么就創建執行緒執行任務
  2. 執行緒池完成預熱之后,把任務加入到LinkedBlockingQueue,
  3. 執行完1的任務之后,會反復去到佇列獲取任務,
  • 這個執行緒池不拒絕新任務

10.2.2 SingleThreadExecutor詳解

  • core和max執行緒數量都是1,
  • 與無界佇列LinkedBlockingQueue配合,
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>()));
    }

10.2.3 CachedThreadPool詳解

  • 這里沒有core但是max是被設定為無界的,說明可以不斷創建執行緒,
  • 使用的佇列是SynchronousQueue,也就是來一個任務必須要快速找到執行緒處理
  • 如果任務太多可能會導致記憶體不足,
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                60L, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
    }

image-20211205170051841

  1. 首先執行的是SynchronousQueue.offer(Runnable task)如果有空閑執行緒,執行緒立刻執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),
  2. 如果maximumPool為空,說明沒有空閑執行緒,那么就會創建一個新的執行緒執行任務
  3. 創建執行緒成功之后,執行緒開始執行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),也就是要在60s之內空閑執行緒要接收任務并且執行,如果60s之后,那么空閑執行緒就會結束,

image-20211205170424543

10.3 ScheduledThreadPoolExecutor詳解

  • 繼承了ThreadPoolExecutor,定期任務和延遲任務,

10.3.1 ScheduledThreadPoolExecutor的運行機制

  • ScheduledThreadPoolExecutor使用的是DelayQueue無界佇列,所以maximumPoolSize沒有意義,
  • scheduleAtFixedRate()和scheduleAtFixedDelay()會向DelayQueue添加一個實作了RunnableScheduledFutur的介面,
  • 然后就是執行緒獲取任務和執行,

image-20211205170818695

10.3.2 ScheduledThreadPoolExecutor的實作

ScheduledFutureTask任務包含三個成員變數

  • ·long型成員變數time,這個任務執行的具體時間
  • long型成員變數sequenceNumber,表示這個任務被添加到ScheduledThreadPoolExecutor的序號,
  • long型成員變數period表示任務執行的間隔周期,
  • DelayQueue封裝了一個PriorityQueue佇列,能夠對里面的任務進行排序,time小的放到前面,然后對比序號,

執行任務的四個步驟

  1. 執行緒從佇列獲取到期的任務
  2. 執行緒執行任務
  3. 執行緒修改任務的time變數為下次的執行時間,
  4. 重新把任務放回去,

image-20211205171207978

  • 這個就是take任務的代碼
    • 獲取Lock
    • 獲取周期任務
      • 如果PriorityQueue是空的,那么進入到Condition等待
      • 如果PriorityQueue的time時間比當前時間大,那么就到Condition等待time時間,
      • 獲取PriorityQueue的頭元素,如果不是空,那么喚醒Condition的等待執行緒,
    • 釋放Lock,
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); // 1
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await(); // 2.1
                } else {
                    long delay = first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay); // 2.2
                    } else {
                        E x = q.poll(); // 2.3.1
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); // 2.3.2
                        return x;
                    }
                }
            }
        } finally {
            lock.unlock(); // 3
        }
    }

image-20211205171530584

10.4 FutureTask詳解

10.4.1 FutureTask簡介

  • 實作了Future還實作了Runnable介面,可以交給執行緒池執行,
  • 三種狀態
    • 未啟動
    • 已經啟動
    • 已經完成,

image-20211205171916527

  • 當FutureTask是未啟動或者是已經啟動的時候,那么就會呼叫get方法導致呼叫執行緒的阻塞,
  • 如果處于已完成,那么就會回傳結果或者拋出例外,
  • 如果FutureTask處于未啟動狀態的時候,可以呼叫cancel導致任務不會被執行,
  • 如果是啟動的時候呼叫cancel(true),那么就會試圖中斷執行這個任務,
  • 如果是啟動的時候呼叫cancel(false),那么不會對正在執行的任務執行緒產生影響,
  • 如果是已經完成的任務,那么就會回傳false,

image-20211205174329434

10.4.2 FutureTask的使用

  • 回圈創建和執行任務,
  • 執行緒會去到taskCache里面獲取任務
  • 如果發現沒有,那么就創建任務,并且存入taskCache里面,
  • 然后獲取并執行任務,
private final ConcurrentMap<Object, Future<String>> taskCache =
            new ConcurrentHashMap<Object, Future<String>>();
    private String executionTask(final String taskName)
            throws ExecutionException, InterruptedException {
        while (true) {
            Future<String> future = taskCache.get(taskName); // 1.1,2.1
            if (future == null) {
                Callable<String> task = new Callable<String>() {
                    public String call() throws InterruptedException {
                        return taskName;
                    }
                }; // 1.2創建任務
                FutureTask<String> futureTask = new FutureTask<String>(task);
                future = taskCache.putIfAbsent(taskName, futureTask); // 1.3
                if (future == null) {
                    future = futureTask;
                    futureTask.run(); // 1.4執行任務
                }
            }
            try {
                return future.get(); // 1.5,2.2執行緒在此等待任務執行完成
            } catch (CancellationException e) {
                taskCache.remove(taskName, future);
            }
        }
    }

image-20211205174607104

10.4.3 FutureTask的實作

  • FutureTask基于AQS

image-20211205175027557

  • FutureTask的get呼叫AQS.acquireSharedInterruptibly(int arg)

    • 呼叫AQS.acquireSharedInterruptibly(int arg)也就是去爭搶鎖,獲取成功的條件是state=RAN或者是CANCELLED
    • 如果get成功立刻回傳,否則就要等待其它執行緒釋放release,
    • 其它執行緒release,喚醒當前執行緒,再次去獲取鎖
    • 最后回傳計算結果,
  • run程序

    • 執行建構式的任務
    • 原子方式更新同步狀態,AQS.compareAndSetState(int expect,int update)設定為state=RAN如果設定成功,代表計算結果的值是Callable.call()的回傳值,然后釋放鎖AQS.releaseShared(int arg),其實就是修改同步狀態,
    • AQS.releaseShared(int arg)之后會呼叫tryReleaseShared(arg)來release,然后喚醒第一個執行緒,
    • 呼叫FutureTask.done(),執行FutureTask.get()的時候如果task不是出于已經完成RAN或者是CANCELLED,那么執行緒就只能去到AQS的同步佇列等待,如果某個執行緒執行run或者cancel,那么就會喚醒下一個執行緒,

    image-20211205180030827

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/374745.html

標籤:java

上一篇:大二期末作孽(SpringBoot+Vue前后端分離博客社區(重構White Hole))

下一篇:Java淺拷貝深拷貝

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more