Netty物件池 Recycler
對于為什么要使用物件池,肯定是提高性能啊!
物件的創建,回收,再創建;很耗損jvm性能的,所以就有了物件池的概念,Netty通過Recycler管理物件的創建與回收,而在物件回收時,也不是真正的把物件從堆記憶體中移除了,而是在記憶體中快取了,那具體什么時候回收呢?物件有一個WeakReference,GC回收就與弱參考有關了,具體請查看:強參考、軟參考、弱參考、虛參考他們之間的區別了,
一、物件池組件介紹
1、Recycler
Recycler是對所有組件的一個管理,通過原始碼可以看出,它的方法并不多,只對外暴露提供get()和recycle()方法,get()方法是從物件池中獲取“物件”,recycle()是把“物件”進行回收;Recycler還提供了公用“物件”DELAYED_RECYCLED,具體作用將在下面介紹邏輯時詳細解釋,
2、Stack
Stack是物件池的最直接的管理類,提供了物件創建、回收、異執行緒“物件”獲取、物件移除等,Stack持有指向上層Recycler的指標,
3、DefaultHandle
DefaultHandle是實際存盤快取“物件”的類,它有一個Object value屬性,就是用來保存實際“物件”,另外DefaultHandle還有指向上層Stack的指標,代表該DefaultHandle實際屬于哪個Stack,
4、WeakOrderQueue
WeakOrderQueue是一個單向佇列,它主要用來存盤共享資料(物件池中的物件),在Stack中未get到"物件"時,將從該佇列中檢其他執行緒創建的"物件",并把檢索到的所有“物件”復制到本Stack中;對異執行緒的“物件”回收,也會放到WeakOrderQueue中,
5、Head
WeakOrderQueue的內部類,每個Head都會有一個Link鏈表,下面會講解Link的作用,Head有一個availableSharedCapacity屬性,該屬性與Stack的availableSharedCapacity系結,可以有Head動態擴展分享容量的大小,所以Head的作用是有一個Link的指標和動態擴展容量,
6、Link
每個Link都有DefaultHandle<?>[] elements屬性,這是實際存盤“物件”資料的地方,而跨執行緒檢索“物件”,也就是從各個Link中檢索,如果查到了,就整體復制到Stack的elements中,Link繼承AtomicInteger,所以Link還擁有AtomicInteger的一切特性,
二、組件關系
單從上面各個組件的介紹,無法實際理解他們之間的關系;下面就根據下圖整體介紹一下:

(1)每個Recycler都有一個Stack物件,用來實際存盤本執行緒的“物件”資料,
(2)所有Recycler又有一個公共的FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED物件,該物件是靜態的,所以資料在各個執行緒中共享,每個Recycler創建“物件”之后,都會在DELAYED_RECYCLED中快取一份,從存盤型別可以看出,Stack作為Key,WeakOrderQueue作為Value,而Stack又與Recycler系結,
(3)Recycler還有一個FastThreadLocal<Stack> threadLocal屬性,用來存盤該Recycler關聯的Stack物件的,
(4)Stack中有DefaultHandle<?>[] elements屬性,實際“物件”資料存盤的地方,
(5)Stack有 volatile WeakOrderQueue head屬性,是多執行緒立即可見的,
(6)WeakOrderQueue中又Head head屬性
(7)Head有Link屬性,
(8)Link是單向鏈表,
通過描述和上面的結構圖,應該很清楚各個組件之間的關系了,
三、實作物件快取、獲取與檢索
該程序會設計到很多原始碼分析,一步一步理解之后詳細看下去,肯定能弄明白,不要著急,
1、下面是Recycler的實際使用,大家在學習是可以打斷點一點一點的追查下去,
import io.netty.util.Recycler;
public class RecyclerDemo {
// 創建記憶體池,實作記憶體池的newObject方法
public static final Recycler<Worker> recycler = new Recycler<Worker>() {
@Override
protected Worker newObject(Handle<Worker> handle) {
return new Worker(handle); // 此處傳入handle用來物件回收
}
};
public static void main(String[] args) {
Worker worker1 = recycler.get(); // 從物件池中獲取Worker物件,如果獲取不到,并且handle也為空,則會呼叫上面的newObject創建一個物件
worker1.setName("1111");
worker1.recycle(); // 物件進行回收,實際回收并不會把物件從記憶體中洗掉,而是又放回了物件池
Worker worker2 = recycler.get();
System.out.println(worker2.getName());
System.out.println(worker1 == worker2);
}
public static class Worker {
private String name;
private Recycler.Handle<Worker> handle;
public Worker(Recycler.Handle<Worker> handle) { this.handle = handle; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
// 實際實作物件回收的代碼
public void recycle() { handle.recycle(this); }
}
}
上面代碼輸出結果如下:說明兩次從物件池中獲取的物件是同一個,
1111
true
2、分析recycler.get() 物件如何創建的?
下面是Recycler的get方法原始碼
public final T get() {
// 保證該執行緒的最大容量不為0,否則就回傳一個空物件
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
// 從threadLocal獲取快取的Stack物件
Stack<T> stack = threadLocal.get();
// 從stack中彈出一個最新創建的“物件”
DefaultHandle<T> handle = stack.pop();
// 如果為空,則創建
if (handle == null) {
handle = stack.newHandle();
// 此處的newObject就是在Demo中重寫的newObject
handle.value = newObject(handle);
}
return (T) handle.value;
}
// 創建DefaultHandle物件,應對上面方法
DefaultHandle<T> newHandle() {
return new DefaultHandle<T>(this);
}
物件至此創建完成,
3、threadLocal屬性什么時候賦值的 stack物件什么時候存盤到threadLocal中的?
由此可見,在new Recycler的時候,就把threadLocal也一并創建了,又重寫了initialVale()和onRemoval()方法,
stack物件創建就是在initialVale()中,原理與Recycler創建Worker物件相同,
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
4、重點來了,stack.pop()
DefaultHandle<T> pop() {
int size = this.size; // @1 第一個重點
if (size == 0) {
if (!scavenge()) { // @2 第二個重點
return null;
}
size = this.size;
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
(1)在@1標注的this.size表示當前執行緒(ThreadA)存盤“物件”的數量,如果數量為0,則從別的執行緒(ThreadB:此處只列舉一個執行緒)中檢索,
(2)@2標注的代碼就是檢索邏輯:scavenge(),scavenge()中只有scavengeSome()是有用的,
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
5、分析scavengeSome()方法
boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor; // @1 start
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) {
return false;
}
} else {
prev = this.prev;
} // @1 end
boolean success = false;
do {
if (cursor.transfer(this)) { // @2
success = true;
break;
}
WeakOrderQueue next = cursor.next; // @3
if (cursor.owner.get() == null) {
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
(1)@1start到@1end,拿到cursor的游標,從cursor處開始查找,
(2)根據WeakOrderQueue的單向鏈表特性,一個節點一個節點的往后查找,而實際操作查找的方法是transfer;而scavengeSome()就是個控制指標移動的方法,
6、分析transfer()
boolean transfer(Stack<?> dst) {
Link head = this.head.link; // @1
if (head == null) {
return false;
}
if (head.readIndex == LINK_CAPACITY) { // @2
if (head.next == null) {
return false;
}
this.head.link = head = head.next;
this.head.reclaimSpace(LINK_CAPACITY); // @2end
}
final int srcStart = head.readIndex; // @3
int srcEnd = head.get();
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
} // @3end
final int dstSize = dst.size; // @4
final int expectedCapacity = dstSize + srcSize;
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
} // @4end
if (srcStart != srcEnd) { // @5
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
} // @5end
if (srcEnd == LINK_CAPACITY && head.next != null) { // @6
// Add capacity back as the Link is GCed.
this.head.reclaimSpace(LINK_CAPACITY);
this.head.link = head.next;
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
(1) @1的作用是拿到要檢索的WeakOrderQueue中的Head指向的Link;WeakOrderQueue中有Head,Head中又Link,Link中又DefaultHandle陣列,該陣列是存盤“物件”的;
(2)@2的作用是判斷Link有沒有讀到尾部,如果已經讀到了尾部則把下一個Link拿出來;
(3)Link是繼承AtomicInteger,@3的作用就是拿到Link中的可讀的下標起始和終點值,
(4)@4是拿到資料最終拷貝目標地址,并保證目標地址的記憶體足夠拷貝資料;
(5)@5執行拷貝,并過濾中間已經被洗掉的物件,
四、物件回收 recycle()
回收方法如下:最終真正執行的是stack.push();
public final boolean recycle(T o, Handle<T> handle) {
if (handle == NOOP_HANDLE) {
return false;
}
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}
h.recycle(o);
return true;
}
public void recycle(Object object) {
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
pushNow(item);
} else {
pushLater(item, currentThread);
}
通過上面最后一段代碼,如果執行回收的是本執行緒的物件,則執行pushNow();否則執行pushLater().
1、本執行緒內的物件回收: pushNow()
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
這段代碼簡單,就是把要回收的物件再放回到Stack的elements中,另外再加上一些空間的驗證,
2、異執行緒的物件回收: pushLater()
private void pushLater(DefaultHandle<?> item, Thread thread) {
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
return;
}
queue.add(item);
}
從DELAYED_RECYCLED中獲取該執行緒對應的WeakOrderQueue,如果為null,則創建一個WeakOrderQueue物件,不為空則之間添加到queue中,
至此整個物件池的決議完成,
純屬原著,一點一點分析的Netty物件池原始碼,如有轉載,請宣告!
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/266655.html
標籤:其他
上一篇:個人博客系統整體介紹
下一篇:信任即做加法也做減法
