1 前言
之前旁邊的小伙伴問我熱點資料相關問題,在給他粗略地講解一波redis資料傾斜的案例之后,自己也順道回顧了一些關于熱點資料處理的方法論,同時也想起去年所學習JD開源專案hotkey——專門用來解決熱點資料問題的框架,在這里結合兩者所關聯到的知識點,通過幾個小圖和部分粗略的講解,來讓大家了解相關方法論以及hotkey的原始碼決議,
2 Redis資料傾斜
2.1 定義與危害
先說說資料傾斜的定義,借用百度詞條的解釋:
對于集群系統,一般快取是分布式的,即不同節點負責一定范圍的快取資料,我們把快取資料分散度不夠,導致大量的快取資料集中到了一臺或者幾臺服務節點上,稱為資料傾斜,一般來說資料傾斜是由于負載均衡實施的效果不好引起的,
從上面的定義中可以得知,資料傾斜的原因一般是因為LB的效果不好,導致部分節點資料量非常集中,
那這又會有什么危害呢?
如果發生了資料傾斜,那么保存了大量資料,或者是保存了熱點資料的實體的處理壓力就會增大,速度變慢,甚至還可能會引起這個實體的記憶體資源耗盡,從而崩潰,這是我們在應用切片集群時要避免的,
2.2 資料傾斜的分類
2.2.1 資料量傾斜(寫入傾斜)
1.圖示

如圖,在某些情況下,實體上的資料分布不均衡,某個實體上的資料特別多,
2.bigkey導致傾斜
某個實體上正好保存了 bigkey,bigkey 的 value 值很大(String 型別),或者是 bigkey 保存了大量集合元素(集合型別),會導致這個實體的資料量增加,記憶體資源消耗也相應增加,
應對方法
- 在業務層生成資料時,要盡量避免把過多的資料保存在同一個鍵值對中,
- 如果 bigkey 正好是集合型別,還有一個方法,就是把 bigkey 拆分成很多個小的集合型別資料,分散保存在不同的實體上,
3.Slot分配不均導致傾斜
先簡單的介紹一下slot的概念,slot其實全名是Hash Slot(哈希槽),在Redis Cluster切片集群中一共有16384 個 Slot,這些哈希槽類似于資料磁區,每個鍵值對都會根據它的 key,被映射到一個哈希槽中,Redis Cluster 方案采用哈希槽來處理資料和實體之間的映射關系,
一張圖來解釋,資料、哈希槽、實體這三者的映射分布情況,


這里的CRC16(city)%16384可以簡單的理解為將key1根據CRC16演算法取hash值然后對slot個數取模,得到的就是slot位置為14484,他所對應的實體節點是第三個,
運維在構建切片集群時候,需要手動分配哈希槽,并且把16384 個槽都分配完,否則 Redis 集群無法正常作業,由于是手動分配,則可能會導致部分實體所分配的slot過多,導致資料傾斜,
應對方法
使用CLUSTER SLOTS 命令來查
看slot分配情況,使用CLUSTER SETSLOT,CLUSTER GETKEYSINSLOT,MIGRATE這三個命令來進行slot資料的遷移,具體內容不再這里細說,感興趣的同學可以自行學習一下,
4.Hash Tag導致傾斜
- Hash Tag 定義 :指當一個key包含 {} 的時候,就不對整個key做hash,而僅對 {} 包括的字串做hash,
- 假設hash演算法為sha1,對user:{user1}:ids和user:{user1}:tweets,其hash值都等同于sha1(user1),
- Hash Tag 優勢 :如果不同 key 的 Hash Tag 內容都是一樣的,那么,這些 key 對應的資料會被映射到同一個 Slot 中,同時會被分配到同一個實體上,
- Hash Tag 劣勢 :如果不合理使用,會導致大量的資料可能被集中到一個實體上發生資料傾斜,集群中的負載不均衡,
2.2.2 資料訪問傾斜(讀取傾斜-熱key問題)
一般來說資料訪問傾斜就是熱key問題導致的,如何處理redis熱key問題也是面試中常會問到的,所以了解相關概念及方法論也是不可或缺的一環,
1.圖示

如圖,雖然每個集群實體上的資料量相差不大,但是某個實體上的資料是熱點資料,被訪問得非常頻繁,
但是為啥會有熱點資料的產生呢?
2.產生熱key的原因及危害
1)用戶消費的資料遠大于生產的資料(熱賣商品、熱點新聞、熱點評論、明星直播),
在日常作業生活中一些突發的事件,例如:雙十一期間某些熱門商品的降價促銷,當這其中的某一件商品被數萬次點擊瀏覽或者購買時,會形成一個較大的需求量,這種情況下就會造成熱點問題,
同理,被大量刊發、瀏覽的熱點新聞、熱點評論、明星直播等,這些典型的讀多寫少的場景也會產生熱點問題,
2)請求分片集中,超過單 Server 的性能極限,
在服務端讀資料進行訪問時,往往會對資料進行分片切分,此程序中會在某一主機 Server 上對相應的 Key 進行訪問,當訪問超過 Server 極限時,就會導致熱點 Key 問題的產生,
如果熱點過于集中,熱點 Key 的快取過多,超過目前的快取容量時,就會導致快取分片服務被打垮現象的產生,當快取服務崩潰后,此時再有請求產生,會快取到后臺 DB 上,由于DB 本身性能較弱,在面臨大請求時很容易發生請求穿透現象,會進一步導致雪崩現象,嚴重影響設備的性能,
3.常用的熱key問題解決辦法:
解決方案一: 備份熱key
可以把熱點資料復制多份,在每一個資料副本的 key 中增加一個隨機后綴,讓它和其它副本資料不會被映射到同一個 Slot 中,
這里相當于把一份資料復制到其他實體上,這樣在訪問的時候也增加隨機前綴,將對一個實體的訪問壓力,均攤到其他實體上
例如:我們在放入快取時就將對應業務的快取key拆分成多個不同的key,如下圖所示,我們首先在更新快取的一側,將key拆成N份,比如一個key名字叫做”good_100”,那我們就可以把它拆成四份,“good_100_copy1”、“good_100_copy2”、“good_100_copy3”、“good_100_copy4”,每次更新和新增時都需要去改動這N個key,這一步就是拆key,


對于service端來講,我們就需要想辦法盡量將自己訪問的流量足夠的均勻,
如何給自己即將訪問的熱key上加入后綴?幾種辦法,根據本機的ip或mac地址做hash,之后的值與拆key的數量做取余,最終決定拼接成什么樣的key后綴,從而打到哪臺機器上;服務啟動時的一個亂數對拆key的數量做取余,
偽代碼如下:
const M = N * 2
//生成亂數
random = GenRandom(0, M)
//構造備份新key
bakHotKey = hotKey + “_” + random
data = https://www.cnblogs.com/Jcloud/p/redis.GET(bakHotKey)
if data =https://www.cnblogs.com/Jcloud/p/= NULL {
data = https://www.cnblogs.com/Jcloud/p/GetFromDB()
redis.SET(bakHotKey, expireTime + GenRandom(0,5))
}
解決方案二: 本地快取+動態計算自動發現熱點快取


該方案通過主動發現熱點并對其進行存盤來解決熱點 Key 的問題,首先 Client 也會訪問 SLB,并且通過 SLB 將各種請求分發至 Proxy 中,Proxy 會按斬訓于路由的方式將請求轉發至后端的 Redis 中,
在熱點 key 的解決上是采用在服務端增加快取的方式進行,具體來說就是在 Proxy 上增加本地快取,本地快取采用 LRU 演算法來快取熱點資料,后端節點增加熱點資料計算模塊來回傳熱點資料,
Proxy 架構的主要有以下優點:
- Proxy 本地快取熱點,讀能力可水平擴展
- DB 節點定時計算熱點資料集合
- DB 反饋 Proxy 熱點資料
- 對客戶端完全透明,不需做任何兼容
熱點資料的發現與存盤


對于熱點資料的發現,首先會在一個周期內對 Key 進行請求統計,在達到請求量級后會對熱點 Key 進行熱點定位,并將所有的熱點 Key 放入一個小的 LRU 鏈表內,在通過 Proxy 請求進行訪問時,若 Redis 發現待訪點是一個熱點,就會進入一個反饋階段,同時對該資料進行標記,
可以使用一個etcd或者zk集群來存盤反饋的熱點資料,然后本地所有節點監聽該熱點資料,進而加載到本地JVM快取中,
熱點資料的獲取


在熱點 Key 的處理上主要分為寫入跟讀取兩種形式,在資料寫入程序當 SLB 收到資料 K1 并將其通過某一個 Proxy 寫入一個 Redis,完成資料的寫入,
假若經過后端熱點模塊計算發現 K1 成為熱點 key 后, Proxy 會將該熱點進行快取,當下次客戶端再進行訪問 K1 時,可以不經 Redis,
最后由于 proxy 是可以水平擴充的,因此可以任意增強熱點資料的訪問能力,
最佳成熟方案: JD開源hotKey這是目前較為成熟的自動探測熱key、分布式一致性快取解決方案,原理就是在client端做洞察,然后上報對應hotkey,server端檢測到后,將對應hotkey下發到對應服務端做本地快取,并且能保證本地快取和遠程快取的一致性,
在這里咱們就不細談了,這篇文章的第三部分:JD開源hotkey原始碼決議里面會帶領大家了解其整體原理,
3 JD開源hotkey—自動探測熱key、分布式一致性快取解決方案
3.1 解決痛點
從上面可知,熱點key問題在并發量比較高的系統中(特別是做秒殺活動)出現的頻率會比較高,對系統帶來的危害也很大,
那么針對此,hotkey誕生的目的是什么?需要解決的痛點是什么?以及它的實作原理,
在這里參考專案上的一段話來概述:對任意突發性的無法預先感知的熱點資料,包括并不限于熱點資料(如突發大量請求同一個商品)、熱用戶(如惡意爬蟲刷子)、熱介面(突發海量請求同一個介面)等,進行毫秒級精準探測到,然后對這些熱資料、熱用戶等,推送到所有服務端JVM記憶體中,以大幅減輕對后端資料存盤層的沖擊,并可以由使用者決定如何分配、使用這些熱key(譬如對熱商品做本地快取、對熱用戶進行拒絕訪問、對熱介面進行熔斷或回傳默認值),這些熱資料在整個服務端集群內保持一致性,并且業務隔離,
核心功能:熱資料探測并推送至集群各個服務器
3.2 集成方式
集成方式在這里就不詳述了,感興趣的同學可以自行搜索,
3.3 原始碼決議
3.3.1 架構簡介
1.全景圖一覽


流程介紹:
- 客戶端通過參考hotkey的client包,在啟動的時候上報自己的資訊給worker,同時和worker之間建立長連接,定時拉取配置中心上面的規則資訊和worker集群資訊,
- 客戶端呼叫hotkey的ishot()的方法來首先匹配規則,然后統計是不是熱key,
- 通過定時任務把熱key資料上傳到worker節點,
- worker集群在收取到所有關于這個key的資料以后(因為通過hash來決定key 上傳到哪個worker的,所以同一個key只會在同一個worker節點上),在和定義的規則進行匹配后判斷是不是熱key,如果是則推送給客戶端,完成本地快取,
2.角色構成
這里直接借用作者的描述:
1)etcd集群etcd作為一個高性能的配置中心,可以以極小的資源占用,提供高效的監聽訂閱服務,主要用于存放規則配置,各worker的ip地址,以及探測出的熱key、手工添加的熱key等,
2)client端jar包就是在服務中添加的參考jar,引入后,就可以以便捷的方式去判斷某key是否熱key,同時,該jar完成了key上報、監聽etcd里的rule變化、worker資訊變化、熱key變化,對熱key進行本地caffeine快取等,
3) worker端集群worker端是一個獨立部署的Java程式,啟動后會連接etcd,并定期上報自己的ip資訊,供client端獲取地址并進行長連接,之后,主要就是對各個client發來的待測key進行累加計算,當達到etcd里設定的rule閾值后,將熱key推送到各個client,
4) dashboard控制臺控制臺是一個帶可視化界面的Java程式,也是連接到etcd,之后在控制臺設定各個APP的key規則,譬如2秒20次算熱,然后當worker探測出來熱key后,會將key發往etcd,dashboard也會監聽熱key資訊,進行入庫保存記錄,同時,dashboard也可以手工添加、洗掉熱key,供各個client端監聽,
3.hotkey工程結構


3.3.2 client端
主要從下面三個方面來決議原始碼:


1.客戶端啟動器
1)啟動方式
@PostConstruct
public void init() {
ClientStarter.Builder builder = new ClientStarter.Builder();
ClientStarter starter = builder.setAppName(appName).setEtcdServer(etcd).build();
starter.startPipeline();
}
appName:是這個應用的名稱,一般為${spring.application.name}的值,后續所有的配置都以此為開頭
etcd:是etcd集群的地址,用逗號分隔,配置中心,
還可以看到ClientStarter實作了建造者模式,使代碼更為簡介,
2)核心入口
com.jd.platform.hotkey.client.ClientStarter#startPipeline
/**
* 啟動監聽etcd
*/
public void startPipeline() {
JdLogger.info(getClass(), "etcdServer:" + etcdServer);
//設定caffeine的最大容量
Context.CAFFEINE_SIZE = caffeineSize;
//設定etcd地址
EtcdConfigFactory.buildConfigCenter(etcdServer);
//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);
PushSchedulerStarter.startCountPusher(10);
//開啟worker重連器
WorkerRetryConnector.retryConnectWorkers();
registEventBus();
EtcdStarter starter = new EtcdStarter();
//與etcd相關的監聽都開啟
starter.start();
}
該方法主要有五個功能:


① 設定本地快取(caffeine)的最大值,并創建etcd實體
//設定caffeine的最大容量
Context.CAFFEINE_SIZE = caffeineSize;
//設定etcd地址
EtcdConfigFactory.buildConfigCenter(etcdServer);
caffeineSize是本地快取的最大值,在啟動的時候可以設定,不設定默認為200000,
etcdServer是上面說的etcd集群地址,
Context可以理解為一個配置類,里面就包含兩個欄位:
public class Context {
public static String APP_NAME;
public static int CAFFEINE_SIZE;
}
EtcdConfigFactory是ectd配置中心的工廠類
public class EtcdConfigFactory {
private static IConfigCenter configCenter;
private EtcdConfigFactory() {}
public static IConfigCenter configCenter() {
return configCenter;
}
public static void buildConfigCenter(String etcdServer) {
//連接多個時,逗號分隔
configCenter = JdEtcdBuilder.build(etcdServer);
}
}
通過其configCenter()方法獲取創建etcd實體物件,IConfigCenter介面封裝了etcd實體物件的行為(包括基本的crud、監控、續約等)


② 創建并啟動定時任務:PushSchedulerStarter
//開始定時推送
PushSchedulerStarter.startPusher(pushPeriod);//每0.5秒推送一次待測key
PushSchedulerStarter.startCountPusher(10);//每10秒推送一次數量統計,不可配置
pushPeriod是推送的間隔時間,可以再啟動的時候設定,最小為0.05s,推送越快,探測的越密集,會越快探測出來,但對client資源消耗相應增大
PushSchedulerStarter類
/**
* 每0.5秒推送一次待測key
*/
public static void startPusher(Long period) {
if (period == null || period <= 0) {
period = 500L;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-pusher-service-executor", true));
scheduledExecutorService.scheduleAtFixedRate(() -> {
//熱key的收集器
IKeyCollector<HotKeyModel, HotKeyModel> collectHK = KeyHandlerFactory.getCollector();
//這里相當于每0.5秒,通過netty來給worker來推送收集到的熱key的資訊,主要是一些熱key的元資料資訊(熱key來源的app和key的型別和是否是洗掉事件,還有該熱key的上報次數)
//這里面還有就是該熱key在每次上報的時候都會生成一個全域的唯一id,還有該熱key每次上報的創建時間是在netty發送的時候來生成,同一批次的熱key時間是相同的
List<HotKeyModel> hotKeyModels = collectHK.lockAndGetResult();
if(CollectionUtil.isNotEmpty(hotKeyModels)){
//積攢了半秒的key集合,按照hash分發到不同的worker
KeyHandlerFactory.getPusher().send(Context.APP_NAME, hotKeyModels);
collectHK.finishOnce();
}
},0, period, TimeUnit.MILLISECONDS);
}
/**
* 每10秒推送一次數量統計
*/
public static void startCountPusher(Integer period) {
if (period == null || period <= 0) {
period = 10;
}
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("hotkey-count-pusher-service-executor", true));
scheduledExecutorService.scheduleAtFixedRate(() -> {
IKeyCollector<KeyHotModel, KeyCountModel> collectHK = KeyHandlerFactory.getCounter();
List<KeyCountModel> keyCountModels = collectHK.lockAndGetResult();
if(CollectionUtil.isNotEmpty(keyCountModels)){
//積攢了10秒的數量,按照hash分發到不同的worker
KeyHandlerFactory.getPusher().sendCount(Context.APP_NAME, keyCountModels);
collectHK.finishOnce();
}
},0, period, TimeUnit.SECONDS);
}
從上面兩個方法可知,都是通過定時執行緒池來實作定時任務的,都是守護執行緒,
咱們重點關注一下KeyHandlerFactory類,它是client端設計的一個比較巧妙的地方,從類名上直譯為key處理工廠,具體的實體物件是DefaultKeyHandler:
public class DefaultKeyHandler {
//推送HotKeyMsg訊息到Netty的推送者
private IKeyPusher iKeyPusher = new NettyKeyPusher();
//待測key的收集器,這里面包含兩個map,key主要是熱key的名字,value主要是熱key的元資料資訊(比如:熱key來源的app和key的型別和是否是洗掉事件)
private IKeyCollector<HotKeyModel, HotKeyModel> iKeyCollector = new TurnKeyCollector();
//數量收集器,這里面包含兩個map,這里面key是相應的規則,HitCount里面是這個規則的總訪問次數和熱后訪問次數
private IKeyCollector<KeyHotModel, KeyCountModel> iKeyCounter = new TurnCountCollector();
public IKeyPusher keyPusher() {
return iKeyPusher;
}
public IKeyCollector<HotKeyModel, HotKeyModel> keyCollector() {
return iKeyCollector;
}
public IKeyCollector<KeyHotModel, KeyCountModel> keyCounter() {
return iKeyCounter;
}
}
這里面有三個成員物件,分別是封裝推送訊息到netty的NettyKeyPusher、待測key收集器TurnKeyCollector、數量收集器TurnCountCollector,其中后兩者都實作了介面IKeyCollector,能對hotkey的處理起到有效的聚合,充分體現了代碼的高內聚,
先來看看封裝推送訊息到netty的NettyKeyPusher:
/**
* 將msg推送到netty的pusher
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public class NettyKeyPusher implements IKeyPusher {
@Override
public void send(String appName, List<HotKeyModel> list) {
//積攢了半秒的key集合,按照hash分發到不同的worker
long now = System.currentTimeMillis();
Map<Channel, List<HotKeyModel>> map = new HashMap<>();
for(HotKeyModel model : list) {
model.setCreateTime(now);
Channel channel = WorkerInfoHolder.chooseChannel(model.getKey());
if (channel == null) {
continue;
}
List<HotKeyModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());
newList.add(model);
}
for (Channel channel : map.keySet()) {
try {
List<HotKeyModel> batch = map.get(channel);
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_NEW_KEY, Context.APP_NAME);
hotKeyMsg.setHotKeyModels(batch);
channel.writeAndFlush(hotKeyMsg).sync();
} catch (Exception e) {
try {
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());
} catch (Exception ex) {
JdLogger.error(getClass(),"flush error");
}
}
}
}
@Override
public void sendCount(String appName, List<KeyCountModel> list) {
//積攢了10秒的數量,按照hash分發到不同的worker
long now = System.currentTimeMillis();
Map<Channel, List<KeyCountModel>> map = new HashMap<>();
for(KeyCountModel model : list) {
model.setCreateTime(now);
Channel channel = WorkerInfoHolder.chooseChannel(model.getRuleKey());
if (channel == null) {
continue;
}
List<KeyCountModel> newList = map.computeIfAbsent(channel, k -> new ArrayList<>());
newList.add(model);
}
for (Channel channel : map.keySet()) {
try {
List<KeyCountModel> batch = map.get(channel);
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.REQUEST_HIT_COUNT, Context.APP_NAME);
hotKeyMsg.setKeyCountModels(batch);
channel.writeAndFlush(hotKeyMsg).sync();
} catch (Exception e) {
try {
InetSocketAddress insocket = (InetSocketAddress) channel.remoteAddress();
JdLogger.error(getClass(),"flush error " + insocket.getAddress().getHostAddress());
} catch (Exception ex) {
JdLogger.error(getClass(),"flush error");
}
}
}
}
}
send(String appName, List list)
主要是將TurnKeyCollector收集的待測key通過netty推送給worker,HotKeyModel物件主要是一些熱key的元資料資訊(熱key來源的app和key的型別和是否是洗掉事件,還有該熱key的上報次數)
sendCount(String appName, List list)
主要是將TurnCountCollector收集的規則所對應的key通過netty推送給worker,KeyCountModel物件主要是一些key所對應的規則資訊以及訪問次數等
WorkerInfoHolder.chooseChannel(model.getRuleKey())
根據hash演算法獲取key對應的服務器,分發到對應服務器相應的Channel 連接,所以服務端可以水平無限擴容,毫無壓力問題,
再來分析一下key收集器:TurnKeyCollector與TurnCountCollector:
實作IKeyCollector介面:
/**
* 對hotkey進行聚合
* @author wuweifeng wrote on 2020-01-06
* @version 1.0
*/
public interface IKeyCollector<T, V> {
/**
* 鎖定后的回傳值
*/
List<V> lockAndGetResult();
/**
* 輸入的引數
*/
void collect(T t);
void finishOnce();
}
lockAndGetResult()
主要是獲取回傳collect方法收集的資訊,并將本地暫存的資訊清空,方便下個統計周期積攢資料,
collect(T t)
顧名思義他是收集api呼叫的時候,將收集的到key資訊放到本地存盤,
finishOnce()
該方法目前實作都是空,無需關注,
待測key收集器:TurnKeyCollector
public class TurnKeyCollector implements IKeyCollector<HotKeyModel, HotKeyModel> {
//這map里面的key主要是熱key的名字,value主要是熱key的元資料資訊(比如:熱key來源的app和key的型別和是否是洗掉事件)
private ConcurrentHashMap<String, HotKeyModel> map0 = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, HotKeyModel> map1 = new ConcurrentHashMap<>();
private AtomicLong atomicLong = new AtomicLong(0);
@Override
public List<HotKeyModel> lockAndGetResult() {
//自增后,對應的map就會停止被寫入,等待被讀取
atomicLong.addAndGet(1);
List<HotKeyModel> list;
//可以觀察這里與collect方法里面的相同位置,會發現一個是操作map0一個是操作map1,這樣保證在讀map的時候,不會阻塞寫map,
//兩個map同時提供輪流提供讀寫能力,設計的很巧妙,值得學習
if (atomicLong.get() % 2 == 0) {
list = get(map1);
map1.clear();
} else {
list = get(map0);
map0.clear();
}
return list;
}
private List<HotKeyModel> get(ConcurrentHashMap<String, HotKeyModel> map) {
return CollectionUtil.list(false, map.values());
}
@Override
public void collect(HotKeyModel hotKeyModel) {
String key = hotKeyModel.getKey();
if (StrUtil.isEmpty(key)) {
return;
}
if (atomicLong.get() % 2 == 0) {
//不存在時回傳null并將key-value放入,已有相同key時,回傳該key對應的value,并且不覆寫
HotKeyModel model = map0.putIfAbsent(key, hotKeyModel);
if (model != null) {
//增加該hotMey上報的次數
model.add(hotKeyModel.getCount());
}
} else {
HotKeyModel model = map1.putIfAbsent(key, hotKeyModel);
if (model != null) {
model.add(hotKeyModel.getCount());
}
}
}
@Override
public void finishOnce() {}
}
可以看到該類中有兩個ConcurrentHashMap和一個AtomicLong,通過對AtomicLong來自增,然后對2取模,來分別控制兩個map的讀寫能力,保證每個map都能做讀寫,并且同一個map不能同時讀寫,這樣可以避免并發集合讀寫不阻塞,這一塊無鎖化的設計還是非常巧妙的,極大的提高了收集的吞吐量,
key數量收集器:TurnCountCollector
這里的設計與TurnKeyCollector大同小異,咱們就不細談了,值得一提的是它里面有個并行處理的機制,當收集的數量超過DATA_CONVERT_SWITCH_THRESHOLD=5000的閾值時,lockAndGetResult處理是使用java Stream并行流處理,提升處理的效率,
③ 開啟worker重連器
//開啟worker重連器
WorkerRetryConnector.retryConnectWorkers();
public class WorkerRetryConnector {
/**
* 定時去重連沒連上的workers
*/
public static void retryConnectWorkers() {
@SuppressWarnings("PMD.ThreadPoolCreationRule")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("worker-retry-connector-service-executor", true));
//開啟拉取etcd的worker資訊,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(WorkerRetryConnector::reConnectWorkers, 30, 30, TimeUnit.SECONDS);
}
private static void reConnectWorkers() {
List<String> nonList = WorkerInfoHolder.getNonConnectedWorkers();
if (nonList.size() == 0) {
return;
}
JdLogger.info(WorkerRetryConnector.class, "trying to reConnect to these workers :" + nonList);
NettyClient.getInstance().connect(nonList);//這里會觸發netty連接方法channelActive
}
}
也是通過定時執行緒來執行,默認時間間隔是30s,不可設定,
通過WorkerInfoHolder來控制client的worker連接資訊,連接資訊是個List,用的CopyOnWriteArrayList,畢竟是一個讀多寫少的場景,類似與元資料資訊,
/**
* 保存worker的ip地址和Channel的映射關系,這是有序的,每次client發送訊息時,都會根據該map的size進行hash
* 如key-1就發送到workerHolder的第1個Channel去,key-2就發到第2個Channel去
*/
private static final List<Server> WORKER_HOLDER = new CopyOnWriteArrayList<>();
④ 注冊EventBus事件訂閱者
private void registEventBus() {
//netty連接器會關注WorkerInfoChangeEvent事件
EventBusCenter.register(new WorkerChangeSubscriber());
//熱key探測回呼關注熱key事件
EventBusCenter.register(new ReceiveNewKeySubscribe());
//Rule的變化的事件
EventBusCenter.register(new KeyRuleHolder());
}
使用guava的EventBus事件訊息總線,利用發布/訂閱者模式來對專案進行解耦,它可以利用很少的代碼,來實作多組件間通信,
基本原理圖如下:


監聽worker資訊變動:WorkerChangeSubscriber
/**
* 監聽worker資訊變動
*/
@Subscribe
public void connectAll(WorkerInfoChangeEvent event) {
List<String> addresses = event.getAddresses();
if (addresses == null) {
addresses = new ArrayList<>();
}
WorkerInfoHolder.mergeAndConnectNew(addresses);
}
/**
* 當client與worker的連接斷開后,洗掉
*/
@Subscribe
public void channelInactive(ChannelInactiveEvent inactiveEvent) {
//獲取斷線的channel
Channel channel = inactiveEvent.getChannel();
InetSocketAddress socketAddress = (InetSocketAddress) channel.remoteAddress();
String address = socketAddress.getHostName() + ":" + socketAddress.getPort();
JdLogger.warn(getClass(), "this channel is inactive : " + socketAddress + " trying to remove this connection");
WorkerInfoHolder.dealChannelInactive(address);
}


監聽熱key回呼事件:ReceiveNewKeySubscribe
private ReceiveNewKeyListener receiveNewKeyListener = new DefaultNewKeyListener();
@Subscribe
public void newKeyComing(ReceiveNewKeyEvent event) {
HotKeyModel hotKeyModel = event.getModel();
if (hotKeyModel == null) {
return;
}
//收到新key推送
if (receiveNewKeyListener != null) {
receiveNewKeyListener.newKey(hotKeyModel);
}
}
該方法會收到新的熱key訂閱事件之后,會將其加入到KeyHandlerFactory的收集器里面處理,
核心處理邏輯:DefaultNewKeyListener#newKey:
@Override
public void newKey(HotKeyModel hotKeyModel) {
long now = System.currentTimeMillis();
//如果key到達時已經過去1秒了,記錄一下,手工洗掉key時,沒有CreateTime
if (hotKeyModel.getCreateTime() != 0 && Math.abs(now - hotKeyModel.getCreateTime()) > 1000) {
JdLogger.warn(getClass(), "the key comes too late : " + hotKeyModel.getKey() + " now " +
+now + " keyCreateAt " + hotKeyModel.getCreateTime());
}
if (hotKeyModel.isRemove()) {
//如果是洗掉事件,就直接洗掉
deleteKey(hotKeyModel.getKey());
return;
}
//已經是熱key了,又推過來同樣的熱key,做個日志記錄,并重繪一下
if (JdHotKeyStore.isHot(hotKeyModel.getKey())) {
JdLogger.warn(getClass(), "receive repeat hot key :" + hotKeyModel.getKey() + " at " + now);
}
addKey(hotKeyModel.getKey());
}
private void deleteKey(String key) {
CacheFactory.getNonNullCache(key).delete(key);
}
private void addKey(String key) {
ValueModel valueModel = ValueModel.defaultValue(key);
if (valueModel == null) {
//不符合任何規則
deleteKey(key);
return;
}
//如果原來該key已經存在了,那么value就被重置,過期時間也會被重置,如果原來不存在,就新增的熱key
JdHotKeyStore.setValueDirectly(key, valueModel);
}
- 如果該HotKeyModel里面是洗掉事件,則獲取RULE_CACHE_MAP里面該key超時時間對應的caffeine,然后從中洗掉該key快取,然后回傳(這里相當于洗掉了本地快取),
- 如果不是洗掉事件,則在RULE_CACHE_MAP對應的caffeine快取中添加該key的快取,
- 這里有個注意點,如果不為洗掉事件,呼叫addKey()方法在caffeine增加快取的時候,value是一個魔術值0x12fcf76,這個值只代表加了這個快取,但是這個快取在查詢的時候相當于為null,
監聽Rule的變化事件:KeyRuleHolder


可以看到里面有兩個成員屬性:RULE_CACHE_MAP,KEY_RULES
/**
* 保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)]
*/
private static final ConcurrentHashMap<Integer, LocalCache> RULE_CACHE_MAP = new ConcurrentHashMap<>();
/**
* 這里KEY_RULES是保存etcd里面該appName所對應的所有rule
*/
private static final List<KeyRule> KEY_RULES = new ArrayList<>();
ConcurrentHashMap RULE_CACHE_MAP:
- 保存超時時間和caffeine的映射,key是超時時間,value是caffeine[(String,Object)],
- 巧妙的設計:這里將key的過期時間作為分桶策略,這樣同一個過期時間的key就會在一個桶(caffeine)里面,這里面每一個caffeine都是client的本地快取,也就是說hotKey的本地快取的KV實際上是存盤在這里面的,
List KEY_RULES:
- 這里KEY_RULES是保存etcd里面該appName所對應的所有rule,
具體監聽KeyRuleInfoChangeEvent事件方法:
@Subscribe
public void ruleChange(KeyRuleInfoChangeEvent event) {
JdLogger.info(getClass(), "new rules info is :" + event.getKeyRules());
List<KeyRule> ruleList = event.getKeyRules();
if (ruleList == null) {
return;
}
putRules(ruleList);
}
核心處理邏輯:KeyRuleHolder#putRules:
/**
* 所有的規則,如果規則的超時時間變化了,會重建caffeine
*/
public static void putRules(List<KeyRule> keyRules) {
synchronized (KEY_RULES) {
//如果規則為空,清空規則表
if (CollectionUtil.isEmpty(keyRules)) {
KEY_RULES.clear();
RULE_CACHE_MAP.clear();
return;
}
KEY_RULES.clear();
KEY_RULES.addAll(keyRules);
Set<Integer> durationSet = keyRules.stream().map(KeyRule::getDuration).collect(Collectors.toSet());
for (Integer duration : RULE_CACHE_MAP.keySet()) {
//先清除掉那些在RULE_CACHE_MAP里存的,但是rule里已沒有的
if (!durationSet.contains(duration)) {
RULE_CACHE_MAP.remove(duration);
}
}
//遍歷所有的規則
for (KeyRule keyRule : keyRules) {
int duration = keyRule.getDuration();
//這里如果RULE_CACHE_MAP里面沒有超時時間為duration的value,則新建一個放入到RULE_CACHE_MAP里面
//比如RULE_CACHE_MAP本來就是空的,則在這里來構建RULE_CACHE_MAP的映射關系
//TODO 如果keyRules里面包含相同duration的keyRule,則也只會建一個key為duration,value為caffeine,其中caffeine是(string,object)
if (RULE_CACHE_MAP.get(duration) == null) {
LocalCache cache = CacheFactory.build(duration);
RULE_CACHE_MAP.put(duration, cache);
}
}
}
}
- 使用synchronized關鍵字來保證執行緒安全;
- 如果規則為空,清空規則表(RULE_CACHE_MAP、KEY_RULES);
- 使用傳遞進來的keyRules來覆寫KEY_RULES;
- 清除掉RULE_CACHE_MAP里面在keyRules沒有的映射關系;
- 遍歷所有的keyRules,如果RULE_CACHE_MAP里面沒有相關的超時時間key,則在里面賦值;
⑤ 啟動EtcdStarter(etcd連接管理器)
EtcdStarter starter = new EtcdStarter();
//與etcd相關的監聽都開啟
starter.start();
public void start() {
fetchWorkerInfo();
fetchRule();
startWatchRule();
//監聽熱key事件,只監聽手工添加、洗掉的key
startWatchHotKey();
}
fetchWorkerInfo()
從etcd里面拉取worker集群地址資訊allAddress,并更新WorkerInfoHolder里面的WORKER_HOLDER
/**
* 每隔30秒拉取worker資訊
*/
private void fetchWorkerInfo() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker資訊,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch worker info");
fetch();
}, 0, 30, TimeUnit.SECONDS);
}
- 使用定時執行緒池來執行,單執行緒,
- 定時從etcd里面獲取,地址/jd/workers/+$appName或default,時間間隔不可設定,默認30秒,這里面存盤的是worker地址的ip+port,
- 發布WorkerInfoChangeEvent事件,
- 備注:地址有$appName或default,在worker里面配置,如果把worker放到某個appName下,則該worker只會參與該app的計算,
fetchRule()
定時執行緒來執行,單執行緒,時間間隔不可設定,默認是5秒,當拉取規則配置和手動配置的hotKey成功后,該執行緒被終止(也就是說只會成功執行一次),執行失敗繼續執行
private void fetchRule() {
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
//開啟拉取etcd的worker資訊,如果拉取失敗,則定時繼續拉取
scheduledExecutorService.scheduleAtFixedRate(() -> {
JdLogger.info(getClass(), "trying to connect to etcd and fetch rule info");
boolean success = fetchRuleFromEtcd();
if (success) {
//拉取已存在的熱key
fetchExistHotKey();
//這里如果拉取規則和拉取手動配置的hotKey成功之后,則該定時執行執行緒停止
scheduledExecutorService.shutdown();
}
}, 0, 5, TimeUnit.SECONDS);
}
fetchRuleFromEtcd()
- 從etcd里面獲取該appName配置的rule規則,地址/jd/rules/+$appName,
- 如果查出來規則rules為空,會通過發布KeyRuleInfoChangeEvent事件來清空本地的rule配置快取和所有的規則key快取,
- 發布KeyRuleInfoChangeEvent事件,
fetchExistHotKey()
- 從etcd里面獲取該appName手動配置的熱key,地址/jd/hotkeys/+$appName,
- 發布ReceiveNewKeyEvent事件,并且內容HotKeyModel不是洗掉事件,
startWatchRule()
/**
* 異步監聽rule規則變化
*/
private void startWatchRule() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
JdLogger.info(getClass(), "--- begin watch rule change ----");
try {
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.rulePath + Context.APP_NAME);
//如果有新事件,即rule的變更,就重新拉取所有的資訊
while (watchIterator.hasNext()) {
//這句必須寫,next會讓他卡住,除非真的有新rule變更
WatchUpdate watchUpdate = watchIterator.next();
List<Event> eventList = watchUpdate.getEvents();
JdLogger.info(getClass(), "rules info changed. begin to fetch new infos. rule change is " + eventList);
//全量拉取rule資訊
fetchRuleFromEtcd();
}
} catch (Exception e) {
JdLogger.error(getClass(), "watch err");
}
});
}
- 異步監聽rule規則變化,使用etcd監聽地址為/jd/rules/+$appName的節點變化,
- 使用執行緒池,單執行緒,異步監聽rule規則變化,如果有事件變化,則呼叫fetchRuleFromEtcd()方法,
startWatchHotKey()
異步開始監聽熱key變化資訊,使用etcd監聽地址前綴為/jd/hotkeys/+$appName
/**
* 異步開始監聽熱key變化資訊,該目錄里只有手工添加的key資訊
*/
private void startWatchHotKey() {
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
JdLogger.info(getClass(), "--- begin watch hotKey change ----");
IConfigCenter configCenter = EtcdConfigFactory.configCenter();
try {
KvClient.WatchIterator watchIterator = configCenter.watchPrefix(ConfigConstant.hotKeyPath + Context.APP_NAME);
//如果有新事件,即新key產生或洗掉
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List<Event> eventList = watchUpdate.getEvents();
KeyValue keyValue = https://www.cnblogs.com/Jcloud/p/eventList.get(0).getKv();
Event.EventType eventType = eventList.get(0).getType();
try {
//從這個地方可以看出,etcd給的回傳是節點的全路徑,而我們需要的key要去掉前綴
String key = keyValue.getKey().toStringUtf8().replace(ConfigConstant.hotKeyPath + Context.APP_NAME + "/", "");
//如果是洗掉key,就立刻洗掉
if (Event.EventType.DELETE == eventType) {
HotKeyModel model = new HotKeyModel();
model.setRemove(true);
model.setKey(key);
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
} else {
HotKeyModel model = new HotKeyModel();
model.setRemove(false);
String value = https://www.cnblogs.com/Jcloud/p/keyValue.getValue().toStringUtf8();
//新增熱key
JdLogger.info(getClass(), "etcd receive new key : " + key + " --value:" + value);
//如果這是一個洗掉指令,就什么也不干
//TODO 這里有個疑問,監聽到worker自動探測發出的惰性洗掉指令,這里之間跳過了,但是本地快取沒有更新吧?
//TODO 所以我猜測在客戶端使用判斷快取是否存在的api里面,應該會判斷相關快取的value值是否為"#[DELETE]#"洗掉標記
//解疑:這里確實只監聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的
if (Constant.DEFAULT_DELETE_VALUE.equals(value)) {
continue;
}
//手工創建的value是時間戳
model.setCreateTime(Long.valueOf(keyValue.getValue().toStringUtf8()));
model.setKey(key);
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
}
} catch (Exception e) {
JdLogger.error(getClass(), "new key err :" + keyValue);
}
}
} catch (Exception e) {
JdLogger.error(getClass(), "watch err");
}
});
}
- 使用執行緒池,單執行緒,異步監聽熱key變化
- 使用etcd監聽前綴地址的當前節點以及子節點的所有變化值
- 洗掉節點動作
- 發布ReceiveNewKeyEvent事件,并且內容HotKeyModel是洗掉事件
- 新增or更新節點動作
- 事件變化的value值為洗掉標記#[DELETE]#
- 如果是洗掉標記的話,代表是worker自動探測或者client需要洗掉的指令,
- 如果是洗掉標記則什么也不做,直接跳過(這里從HotKeyPusher#push方法可以看到,做洗掉事件的操作時候,他會給/jd/hotkeys/+$appName的節點里面增加一個值為洗掉標記的節點,然后再洗掉相同路徑的節點,這樣就可以觸發上面的洗掉節點事件,所以這里判斷如果是洗掉標記直接跳過),
- 不為洗掉標記
- 發布ReceiveNewKeyEvent事件,事件內容HotKeyModel里面的createTime是kv對應的時間戳
疑問: 這里代碼注釋里面說只監聽手工添加或者洗掉的hotKey,難道說/jd/hotkeys/+$appName地址只是手工配置的地址嗎?
解疑: 這里確實只監聽手工配置的hotKey,etcd的/jd/hotkeys/+$appName該地址只是手動配置hotKey,worker自動探測的hotKey是直接通過netty通道來告知client的
2.API決議
1)流程圖示
① 查詢流程


② 洗掉流程:


從上面的流程圖中,大家應該知道該熱點key在代碼中是如何扭轉的,這里再給大家講解一下核心API的原始碼決議,限于篇幅的原因,咱們不一個個貼相關原始碼了,只是單純的告訴你它的內部邏輯是怎么樣的,
2)核心類:JdHotKeyStore


JdHotKeyStore是封裝client呼叫的api核心類,包含上面10個公共方法,咱們重點決議其中6個重要方法:
① isHotKey(String key)
判斷是否在規則內,如果不在,回傳false
判斷是否是熱key,如果不是或者是且過期時間在2s內,則給TurnKeyCollector#collect收集
最后給TurnCountCollector#collect做統計收集
② get(String key)
從本地caffeine取值
如果取到的value是個魔術值,只代表加入到caffeine快取里面了,查詢的話為null
③ smartSet(String key, Object value)
判斷是否是熱key,這里不管它在不在規則內,如果是熱key,則給value賦值,如果不為熱key什么也不做
④ forceSet(String key, Object value)
強制給value賦值
如果該key不在規則配置內,則傳遞的value不生效,本地快取的賦值value會被變為null
⑤ getValue(String key, KeyType keyType)
獲取value,如果value不存在則呼叫HotKeyPusher#push方法發往netty
如果沒有為該key配置規則,就不用上報key,直接回傳null
如果取到的value是個魔術值,只代表加入到caffeine快取里面了,查詢的話為null
⑥ remove(String key)
洗掉某key(本地的caffeine快取),會通知整個集群洗掉(通過etcd來通知集群洗掉)
3)client上傳熱key入口呼叫類:HotKeyPusher
核心方法:
public static void push(String key, KeyType keyType, int count, boolean remove) {
if (count <= 0) {
count = 1;
}
if (keyType == null) {
keyType = KeyType.REDIS_KEY;
}
if (key == null) {
return;
}
//這里之所以用LongAdder是為了保證多執行緒計數的執行緒安全性,雖然這里是在方法內呼叫的,但是在TurnKeyCollector的兩個map里面,
//存盤了HotKeyModel的實體物件,這樣在多個執行緒同時修改count的計數屬性時,會存在執行緒安全計數不準確問題
LongAdder adderCnt = new LongAdder();
adderCnt.add(count);
HotKeyModel hotKeyModel = new HotKeyModel();
hotKeyModel.setAppName(Context.APP_NAME);
hotKeyModel.setKeyType(keyType);
hotKeyModel.setCount(adderCnt);
hotKeyModel.setRemove(remove);
hotKeyModel.setKey(key);
if (remove) {
//如果是洗掉key,就直接發到etcd去,不用做聚合,但是有點問題現在,這個洗掉只能刪手工添加的key,不能刪worker探測出來的
//因為各個client都在監聽手工添加的那個path,沒監聽自動探測的path,所以如果手工的那個path下,沒有該key,那么是洗掉不了的,
//刪不了,就達不到集群監聽洗掉事件的效果,怎么辦呢?可以通過新增的方式,新增一個熱key,然后洗掉它
//TODO 這里為啥不直接洗掉該節點,難道worker自動探測處理的hotKey不會往該節點增加新增事件嗎?
//釋疑:worker根據探測配置的規則,當判斷出某個key為hotKey后,確實不會往keyPath里面加入節點,他只是單純的往本地快取里面加入一個空值,代表是熱點key
EtcdConfigFactory.configCenter().putAndGrant(HotKeyPathTool.keyPath(hotKeyModel), Constant.DEFAULT_DELETE_VALUE, 1);
EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyPath(hotKeyModel));//TODO 這里很巧妙待補充描述
//也刪worker探測的目錄
EtcdConfigFactory.configCenter().delete(HotKeyPathTool.keyRecordPath(hotKeyModel));
} else {
//如果key是規則內的要被探測的key,就積累等待傳送
if (KeyRuleHolder.isKeyInRule(key)) {
//積攢起來,等待每半秒發送一次
KeyHandlerFactory.getCollector().collect(hotKeyModel);
}
}
}
從上面的原始碼中可知:
- 這里之所以用LongAdder是為了保證多執行緒計數的執行緒安全性,雖然這里是在方法內呼叫的,但是在TurnKeyCollector的兩個map里面,存盤了HotKeyModel的實體物件,這樣在多個執行緒同時修改count的計數屬性時,會存在執行緒安全計數不準確問題,
- 如果是remove洗掉型別,在洗掉手動配置的熱key配置路徑的同時,還會洗掉dashboard展示熱key的配置路徑,
- 只有在規則配置的key,才會被積攢探測發送到worker內進行計算,
3.通訊機制(與worker互動)
1)NettyClient:netty連接器
public class NettyClient {
private static final NettyClient nettyClient = new NettyClient();
private Bootstrap bootstrap;
public static NettyClient getInstance() {
return nettyClient;
}
private NettyClient() {
if (bootstrap == null) {
bootstrap = initBootstrap();
}
}
private Bootstrap initBootstrap() {
//少執行緒
EventLoopGroup group = new NioEventLoopGroup(2);
Bootstrap bootstrap = new Bootstrap();
NettyClientHandler nettyClientHandler = new NettyClientHandler();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ByteBuf delimiter = Unpooled.copiedBuffer(Constant.DELIMITER.getBytes());
ch.pipeline()
.addLast(new DelimiterBasedFrameDecoder(Constant.MAX_LENGTH, delimiter))//這里就是定義TCP多個包之間的分隔符,為了更好的做拆包
.addLast(new MsgDecoder())
.addLast(new MsgEncoder())
//30秒沒訊息時,就發心跳包過去
.addLast(new IdleStateHandler(0, 0, 30))
.addLast(nettyClientHandler);
}
});
return bootstrap;
}
}
- 使用Reactor執行緒模型,只有2個作業執行緒,沒有單獨設定主執行緒
- 長連接,開啟TCP_NODELAY
- netty的分隔符”$()$”,類似TCP報文分段的標準,方便拆包
- Protobuf序列化與反序列化
- 30s沒有訊息發給對端的時候,發送一個心跳包判活
- 作業執行緒處理器NettyClientHandler
JDhotkey的tcp協議設計就是收發字串,每個tcp訊息包使用特殊字符$()$來分割
優點:這樣實作非常簡單,
獲得訊息包后進行json或者protobuf反序列化,
缺點:是需要,從位元組流-》反序列化成字串-》反序列化成訊息物件,兩層序列化損耗了一部分性能,
protobuf還好序列化很快,但是json序列化的速度只有幾十萬每秒,會損耗一部分性能,
2)NettyClientHandler:作業執行緒處理器
@ChannelHandler.Sharable
public class NettyClientHandler extends SimpleChannelInboundHandler<HotKeyMsg> {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
//這里表示如果讀寫都掛了
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
//向服務端發送訊息
ctx.writeAndFlush(new HotKeyMsg(MessageType.PING, Context.APP_NAME));
}
}
super.userEventTriggered(ctx, evt);
}
//在Channel注冊EventLoop、系結SocketAddress和連接ChannelFuture的時候都有可能會觸發ChannelInboundHandler的channelActive方法的呼叫
//類似TCP三次握手成功之后觸發
@Override
public void channelActive(ChannelHandlerContext ctx) {
JdLogger.info(getClass(), "channelActive:" + ctx.name());
ctx.writeAndFlush(new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME));
}
//類似TCP四次揮手之后,等待2MSL時間之后觸發(大概180s),比如channel通道關倍訓觸發(channel.close())
//客戶端channel主動關閉連接時,會向服務端發送一個寫請求,然后服務端channel所在的selector會監聽到一個OP_READ事件,然后
//執行資料讀取操作,而讀取時發現客戶端channel已經關閉了,則讀取資料位元組個數回傳-1,然后執行close操作,關閉該channel對應的底層socket,
//并在pipeline中,從head開始,往下將InboundHandler,并觸發handler的channelInactive和channelUnregistered方法的執行,以及移除pipeline中的handlers一系列操作,
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
//斷線了,可能只是client和server斷了,但都和etcd沒斷,也可能是client自己斷網了,也可能是server斷了
//發布斷線事件,后續10秒后進行重連,根據etcd里的worker資訊來決定是否重連,如果etcd里沒了,就不重連,如果etcd里有,就重連
notifyWorkerChange(ctx.channel());
}
private void notifyWorkerChange(Channel channel) {
EventBusCenter.getInstance().post(new ChannelInactiveEvent(channel));
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, HotKeyMsg msg) {
if (MessageType.PONG == msg.getMessageType()) {
JdLogger.info(getClass(), "heart beat");
return;
}
if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {
JdLogger.info(getClass(), "receive new key : " + msg);
if (CollectionUtil.isEmpty(msg.getHotKeyModels())) {
return;
}
for (HotKeyModel model : msg.getHotKeyModels()) {
EventBusCenter.getInstance().post(new ReceiveNewKeyEvent(model));
}
}
}
}
userEventTriggered
- 收到對端發來的心跳包,回傳new HotKeyMsg(MessageType.PING, Context.APP_NAME)
channelActive
- 在Channel注冊EventLoop、系結SocketAddress和連接ChannelFuture的時候都有可能會觸發ChannelInboundHandler的channelActive方法的呼叫
- 類似TCP三次握手成功之后觸發,給對端發送new HotKeyMsg(MessageType.APP_NAME, Context.APP_NAME)
channelInactive
- 類似TCP四次揮手之后,等待2MSL時間之后觸發(大概180s),比如channel通道關倍訓觸發(channel.close())該方法,發布ChannelInactiveEvent事件,來10s后重連
channelRead0
- 接收PONG訊息型別時,打個日志回傳
- 接收RESPONSE_NEW_KEY訊息型別時,發布ReceiveNewKeyEvent事件
3.3.3 worker端
1.入口啟動加載:7個@PostConstruct
1)worker端對etcd相關的處理:EtcdStarter
① 第一個@PostConstruct:watchLog()
@PostConstruct
public void watchLog() {
AsyncPool.asyncDo(() -> {
try {
//取etcd的是否開啟日志配置,地址/jd/logOn
String loggerOn = configCenter.get(ConfigConstant.logToggle);
LOGGER_ON = "true".equals(loggerOn) || "1".equals(loggerOn);
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
//監聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.logToggle);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List<Event> eventList = watchUpdate.getEvents();
KeyValue keyValue = https://www.cnblogs.com/Jcloud/p/eventList.get(0).getKv();
logger.info("log toggle changed : " + keyValue);
String value = https://www.cnblogs.com/Jcloud/p/keyValue.getValue().toStringUtf8();
LOGGER_ON = "true".equals(value) || "1".equals(value);
}
});
}
- 放到執行緒池里面異步執行
- 取etcd的是否開啟日志配置,地址/jd/logOn,默認true
- 監聽etcd地址/jd/logOn是否開啟日志配置,并實時更改開關
- 由于有etcd的監聽,所以會一直執行,而不是執行一次結束
② 第二個@PostConstruct:watch()
/**
* 啟動回呼監聽器,監聽rule變化
*/
@PostConstruct
public void watch() {
AsyncPool.asyncDo(() -> {
KvClient.WatchIterator watchIterator;
if (isForSingle()) {
watchIterator = configCenter.watch(ConfigConstant.rulePath + workerPath);
} else {
watchIterator = configCenter.watchPrefix(ConfigConstant.rulePath);
}
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
List<Event> eventList = watchUpdate.getEvents();
KeyValue keyValue = https://www.cnblogs.com/Jcloud/p/eventList.get(0).getKv();
logger.info("rule changed : " + keyValue);
try {
ruleChange(keyValue);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
/**
* rule發生變化時,更新快取的rule
*/
private synchronized void ruleChange(KeyValue keyValue) {
String appName = keyValue.getKey().toStringUtf8().replace(ConfigConstant.rulePath, "");
if (StrUtil.isEmpty(appName)) {
return;
}
String ruleJson = keyValue.getValue().toStringUtf8();
List<KeyRule> keyRules = FastJsonUtils.toList(ruleJson, KeyRule.class);
KeyRuleHolder.put(appName, keyRules);
}
通過etcd.workerPath配置,來判斷該worker是否為某個app單獨服務的,默認為”default”,如果是默認值,代表該worker參與在etcd上所有app client的計算,否則只為某個app來服務計算
使用etcd來監聽rule規則變化,如果是共享的worker,監聽地址前綴為”/jd/rules/“,如果為某個app獨享,監聽地址為”/jd/rules/“+$etcd.workerPath
如果規則變化,則修改對應app在本地存盤的rule快取,同時清理該app在本地存盤的KV快取
KeyRuleHolder:rule快取本地存盤
- Map> RULE_MAP,這個map是concurrentHashMap,map的kv分別是appName和對應的rule
- 相對于client的KeyRuleHolder的區別:worker是存盤所有app規則,每個app對應一個規則桶,所以用map
CaffeineCacheHolder:key快取本地存盤
- Map> CACHE_MAP,也是concurrentHashMap,map的kv分別是appName和對應的kv的caffeine
- 相對于client的caffeine,第一是worker沒有做快取介面比如LocalCache,第二是client的map的kv分別是超時時間、以及相同超時時間所對應key的快取桶
放到執行緒池里面異步執行,由于有etcd的監聽,所以會一直執行,而不是執行一次結束
③ 第三個@PostConstruct:watchWhiteList()
/**
* 啟動回呼監聽器,監聽白名單變化,只監聽自己所在的app,白名單key不參與熱key計算,直接忽略
*/
@PostConstruct
public void watchWhiteList() {
AsyncPool.asyncDo(() -> {
//從etcd配置中獲取所有白名單
fetchWhite();
KvClient.WatchIterator watchIterator = configCenter.watch(ConfigConstant.whiteListPath + workerPath);
while (watchIterator.hasNext()) {
WatchUpdate watchUpdate = watchIterator.next();
logger.info("whiteList changed ");
try {
fetchWhite();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
- 拉取并監聽etcd白名單key配置,地址為/jd/whiteList/+$etcd.workerPath
- 在白名單的key,不參與熱key計算,直接忽略
- 放到執行緒池里面異步執行,由于有etcd的監聽,所以會一直執行,而不是執行一次結束
④ 第四個@PostConstruct:makeSureSelfOn()
/**
* 每隔一會去check一下,自己還在不在etcd里
*/
@PostConstruct
public void makeSureSelfOn() {
//開啟上傳worker資訊
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
if (canUpload) {
uploadSelfInfo();
}
} catch (Exception e) {
//do nothing
}
}, 0, 5, TimeUnit.SECONDS);
}
- 在執行緒池里面異步執行,定時執行,時間間隔為5s
- 將本機woker的hostName,ip+port以kv的形式定時上報給etcd,地址為/jd/workers/+$etcd.workPath+”/“+$hostName,續期時間為8s
- 有一個canUpload的開關來控制worker是否向etcd來定時續期,如果這個開關關閉了,代表worker不向etcd來續期,這樣當上面地址的kv到期之后,etcd會洗掉該節點,這樣client回圈判斷worker資訊變化了
2)將熱key推送到dashboard供入庫:DashboardPusher
① 第五個@PostConstruct:uploadToDashboard()
@Component
public class DashboardPusher implements IPusher {
/**
* 熱key集中營
*/
private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();
@PostConstruct
public void uploadToDashboard() {
AsyncPool.asyncDo(() -> {
while (true) {
try {
//要么key達到1千個,要么達到1秒,就匯總上報給etcd一次
List<HotKeyModel> tempModels = new ArrayList<>();
Queues.drain(hotKeyStoreQueue, tempModels, 1000, 1, TimeUnit.SECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
//將熱key推到dashboard
DashboardHolder.flushToDashboard(FastJsonUtils.convertObjectToJSON(tempModels));
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
- 當熱key的數量達到1000或者每隔1s,把熱key的資料通過與dashboard的netty通道來發送給dashboard,資料型別為REQUEST_HOT_KEY
- LinkedBlockingQueue hotKeyStoreQueue:worker計算的給dashboard熱key的集中營,所有給dashboard推送熱key存盤在里面
3)推送到各客戶端服務器:AppServerPusher
① 第六個@PostConstruct:batchPushToClient()
public class AppServerPusher implements IPusher {
/**
* 熱key集中營
*/
private static LinkedBlockingQueue<HotKeyModel> hotKeyStoreQueue = new LinkedBlockingQueue<>();
/**
* 和dashboard那邊的推送主要區別在于,給app推送每10ms一次,dashboard那邊1s一次
*/
@PostConstruct
public void batchPushToClient() {
AsyncPool.asyncDo(() -> {
while (true) {
try {
List<HotKeyModel> tempModels = new ArrayList<>();
//每10ms推送一次
Queues.drain(hotKeyStoreQueue, tempModels, 10, 10, TimeUnit.MILLISECONDS);
if (CollectionUtil.isEmpty(tempModels)) {
continue;
}
Map<String, List<HotKeyModel>> allAppHotKeyModels = new HashMap<>();
//拆分出每個app的熱key集合,按app分堆
for (HotKeyModel hotKeyModel : tempModels) {
List<HotKeyModel> oneAppModels = allAppHotKeyModels.computeIfAbsent(hotKeyModel.getAppName(), (key) -> new ArrayList<>());
oneAppModels.add(hotKeyModel);
}
//遍歷所有app,進行推送
for (AppInfo appInfo : ClientInfoHolder.apps) {
List<HotKeyModel> list = allAppHotKeyModels.get(appInfo.getAppName());
if (CollectionUtil.isEmpty(list)) {
continue;
}
HotKeyMsg hotKeyMsg = new HotKeyMsg(MessageType.RESPONSE_NEW_KEY);
hotKeyMsg.setHotKeyModels(list);
//整個app全部發送
appInfo.groupPush(hotKeyMsg);
}
//推送完,及時清理不使用記憶體
allAppHotKeyModels = null;
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
- 會按照key的appName來進行分組,然后通過對應app的channelGroup來推送
- 當熱key的數量達到10或者每隔10ms,把熱key的資料通過與app的netty通道來發送給app,資料型別為RESPONSE_NEW_KEY
- LinkedBlockingQueue hotKeyStoreQueue:worker計算的給client熱key的集中營,所有給client推送熱key存盤在里面
4)client實體節點處理:NodesServerStarter
① 第七個@PostConstruct:start()
public class NodesServerStarter {
@Value("${netty.port}")
private int port;
private Logger logger = LoggerFactory.getLogger(getClass());
@Resource
private IClientChangeListener iClientChangeListener;
@Resource
private List<INettyMsgFilter> messageFilters;
@PostConstruct
public void start() {
AsyncPool.asyncDo(() -> {
logger.info("netty server is starting");
NodesServer nodesServer = new NodesServer();
nodesServer.setClientChangeListener(iClientChangeListener);
nodesServer.setMessageFilters(messageFilters);
try {
nodesServer.startNettyServer(port);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
- 執行緒池里面異步執行,啟動client端的nettyServer
- iClientChangeListener和messageFilters這兩個依賴最侄訓被傳遞到netty訊息處理器里面,iClientChangeListener會作為channel下線處理來洗掉ClientInfoHolder下線或者超時的通道,messageFilters會作為netty收到事件訊息的處理過濾器(責任鏈模式)
② 依賴的bean:IClientChangeListener iClientChangeListener
public interface IClientChangeListener {
/**
* 發現新連接
*/
void newClient(String appName, String channelId, ChannelHandlerContext ctx);
/**
* 客戶端掉線
*/
void loseClient(ChannelHandlerContext ctx);
}
對客戶端的管理,新來(newClient)(會觸發netty的連接方法channelActive)、斷線(loseClient)(會觸發netty的斷連方法channelInactive())的管理
client的連接資訊主要是在ClientInfoHolder里面
- List apps,這里面的AppInfo主要是appName和對應的channelGroup
- 對apps的add和remove主要是通過新來(newClient)、斷線(loseClient)
③ 依賴的bean:List messageFilters
/**
* 對netty來的訊息,進行過濾處理
* @author wuweifeng wrote on 2019-12-11
* @version 1.0
*/
public interface INettyMsgFilter {
boolean chain(HotKeyMsg message, ChannelHandlerContext ctx);
}
對client發給worker的netty訊息,進行過濾處理,共有四個實作類,也就是說底下四個過濾器都是收到client發送的netty訊息來做處理
④ 各個訊息處理的型別:MessageType
APP_NAME((byte) 1),
REQUEST_NEW_KEY((byte) 2),
RESPONSE_NEW_KEY((byte) 3),
REQUEST_HIT_COUNT((byte) 7), //命中率
REQUEST_HOT_KEY((byte) 8), //熱key,worker->dashboard
PING((byte) 4), PONG((byte) 5),
EMPTY((byte) 6);
順序1:HeartBeatFilter
- 當訊息型別為PING,則給對應的client示例回傳PONG
順序2:AppNameFilter
- 當訊息型別為APP_NAME,代表client與worker建立連接成功,然后呼叫iClientChangeListener的newClient方法增加apps元資料資訊
順序3:HotKeyFilter
- 處理接收訊息型別為REQUEST_NEW_KEY
- 先給HotKeyFilter.totalReceiveKeyCount原子類增1,該原子類代表worker實體接收到的key的總數
- publishMsg方法,將訊息通過自建的生產者消費者模型(KeyProducer,KeyConsumer),來把訊息給發到生產者中分發消費
- 接收到的訊息HotKeyMsg里面List
- 首先判斷HotKeyModel里面的key是否在白名單內,如果在則跳過,否則將HotKeyModel通過KeyProducer發送
順序4:KeyCounterFilter
- 處理接收型別為REQUEST_HIT_COUNT
- 這個過濾器是專門給dashboard來匯算key的,所以這個appName直接設定為該worker配置的appName
- 該過濾器的資料來源都是client的NettyKeyPusher#sendCount(String appName, List list),這里面的資料都是默認積攢10s的,這個10s是可以配置的,這一點在client里面有講
- 將構造的new KeyCountItem(appName, models.get(0).getCreateTime(), models)放到阻塞佇列LinkedBlockingQueue COUNTER_QUEUE中,然后讓CounterConsumer來消費處理,消費邏輯是單執行緒的
- CounterConsumer:熱key統計消費者
- 放在公共執行緒池中,來單執行緒執行
- 從阻塞佇列COUNTER_QUEUE里面取資料,然后將里面的key的統計資料發布到etcd的/jd/keyHitCount/+ appName + “/“ + IpUtils.getIp() + “-“ + System.currentTimeMillis()里面,該路徑是worker服務的client集群或者default,用來存放客戶端hotKey訪問次數和總訪問次數的path,然后讓dashboard來訂閱統計展示
2.三個定時任務:3個@Scheduled
1)定時任務1:EtcdStarter#pullRules()
/**
* 每隔1分鐘拉取一次,所有的app的rule
*/
@Scheduled(fixedRate = 60000)
public void pullRules() {
try {
if (isForSingle()) {
String value = https://www.cnblogs.com/Jcloud/p/configCenter.get(ConfigConstant.rulePath + workerPath);
if (!StrUtil.isEmpty(value)) {
List<KeyRule> keyRules = FastJsonUtils.toList(value, KeyRule.class);
KeyRuleHolder.put(workerPath, keyRules);
}
} else {
List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.rulePath);
for (KeyValue keyValue : keyValues) {
ruleChange(keyValue);
}
}
} catch (StatusRuntimeException ex) {
logger.error(ETCD_DOWN);
}
}
每隔1分鐘拉取一次etcd地址為/jd/rules/的規則變化,如果worker所服務的app或者default的rule有變化,則更新規則的快取,并清空該appName所對應的本地key快取
2)定時任務2:EtcdStarter#uploadClientCount()
/**
* 每隔10秒上傳一下client的數量到etcd中
*/
@Scheduled(fixedRate = 10000)
public void uploadClientCount() {
try {
String ip = IpUtils.getIp();
for (AppInfo appInfo : ClientInfoHolder.apps) {
String appName = appInfo.getAppName();
int count = appInfo.size();
//即便是full gc也不能超過3秒,因為這里給的過期時間是13s,由于該定時任務每隔10s執行一次,如果full gc或者說上報給etcd的時間超過3s,
//則在dashboard查詢不到client的數量
configCenter.putAndGrant(ConfigConstant.clientCountPath + appName + "/" + ip, count + "", 13);
}
configCenter.putAndGrant(ConfigConstant.caffeineSizePath + ip, FastJsonUtils.convertObjectToJSON(CaffeineCacheHolder.getSize()), 13);
//上報每秒QPS(接收key數量、處理key數量)
String totalCount = FastJsonUtils.convertObjectToJSON(new TotalCount(HotKeyFilter.totalReceiveKeyCount.get(), totalDealCount.longValue()));
configCenter.putAndGrant(ConfigConstant.totalReceiveKeyCount + ip, totalCount, 13);
logger.info(totalCount + " expireCount:" + expireTotalCount + " offerCount:" + totalOfferCount);
//如果是穩定一直有key發送的應用,建議開啟該監控,以避免可能發生的網路故障
if (openMonitor) {
checkReceiveKeyCount();
}
// configCenter.putAndGrant(ConfigConstant.bufferPoolPath + ip, MemoryTool.getBufferPool() + "", 10);
} catch (Exception ex) {
logger.error(ETCD_DOWN);
}
}
- 每個10s將worker計算存盤的client資訊上報給etcd,來方便dashboard來查詢展示,比如/jd/count/對應client數量,/jd/caffeineSize/對應caffeine快取的大小,/jd/totalKeyCount/對應該worker接收的key總量和處理的key總量
- 可以從代碼中看到,上面所有etcd的節點租期時間都是13s,而該定時任務是每10s執行一次,意味著如果full gc或者說上報給etcd的時間超過3s,則在dashboard查詢不到client的相關匯算資訊
- 長時間不收到key,判斷網路狀態不好,斷開worker給etcd地址為/jd/workers/+$workerPath節點的續租,因為client會回圈判斷該地址的節點是否變化,使得client重新連接worker或者斷開失聯的worker
3)定時任務3:EtcdStarter#fetchDashboardIp()
/**
* 每隔30秒去獲取一下dashboard的地址
*/
@Scheduled(fixedRate = 30000)
public void fetchDashboardIp() {
try {
//獲取DashboardIp
List<KeyValue> keyValues = configCenter.getPrefix(ConfigConstant.dashboardPath);
//是空,給個警告
if (CollectionUtil.isEmpty(keyValues)) {
logger.warn("very important warn !!! Dashboard ip is null!!!");
return;
}
String dashboardIp = keyValues.get(0).getValue().toStringUtf8();
NettyClient.getInstance().connect(dashboardIp);
} catch (Exception e) {
e.printStackTrace();
}
}
每隔30s拉取一次etcd前綴為/jd/dashboard/的dashboard連接ip的值,并且判斷DashboardHolder.hasConnected里面是否為未連接狀態,如果是則重新連接worker與dashboard的netty通道
3.自建的生產者消費者模型(KeyProducer,KeyConsumer)
一般生產者消費者模型包含三大元素:生產者、消費者、訊息存盤佇列
這里訊息存盤佇列是DispatcherConfig里面的QUEUE,使用LinkedBlockingQueue,默認大小為200W
1)KeyProducer
@Component
public class KeyProducer {
public void push(HotKeyModel model, long now) {
if (model == null || model.getKey() == null) {
return;
}
//5秒前的過時訊息就不處理了
if (now - model.getCreateTime() > InitConstant.timeOut) {
expireTotalCount.increment();
return;
}
try {
QUEUE.put(model);
totalOfferCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
判斷接收到的HotKeyModel是否超出”netty.timeOut”配置的時間,如果是將expireTotalCount紀錄過期總數給自增,然后回傳
2)KeyConsumer
public class KeyConsumer {
private IKeyListener iKeyListener;
public void setKeyListener(IKeyListener iKeyListener) {
this.iKeyListener = iKeyListener;
}
public void beginConsume() {
while (true) {
try {
//從這里可以看出,這里的生產者消費者模型,本質上還是拉模式,之所以不使用EventBus,是因為需要佇列來做緩沖
HotKeyModel model = QUEUE.take();
if (model.isRemove()) {
iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
} else {
iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
}
//處理完畢,將數量加1
totalDealCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void removeKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key,appName+keyType+key
String key = buildKey(hotKeyModel);
hotCache.invalidate(key);
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
//推送所有client洗掉
hotKeyModel.setCreateTime(SystemClock.now());
logger.info(DELETE_KEY_EVENT + hotKeyModel.getKey());
for (IPusher pusher : iPushers) {
//這里可以看到,洗掉熱key的netty訊息只給client端發了過去,沒有給dashboard發過去(DashboardPusher里面的remove是個空方法)
pusher.remove(hotKeyModel);
}
}
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key
String key = buildKey(hotKeyModel);
//判斷是不是剛熱不久
//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內不重復處理相同的hotKey,
//畢竟hotKey都是瞬時流量,可以避免在這5s內重復推送給client和dashboard,避免無效的網路開銷
Object o = hotCache.getIfPresent(key);
if (o != null) {
return;
}
//********** watch here ************//
//該方法會被InitConstant.threadCount個執行緒同時呼叫,存在多執行緒問題
//下面的那句addCount是加了鎖的,代表給Key累加數量時是原子性的,不會發生多加、少加的情況,到了設定的閾值一定會hot
//譬如閾值是2,如果多個執行緒累加,在沒hot前,hot的狀態肯定是對的,譬如thread1 加1,thread2加1,那么thread2會hot回傳true,開啟推送
//但是極端情況下,譬如閾值是10,當前是9,thread1走到這里時,加1,回傳true,thread2也走到這里,加1,此時是11,回傳true,問題來了
//該key會走下面的else兩次,也就是2次推送,
//所以出現問題的原因是hotCache.getIfPresent(key)這一句在并發情況下,沒return掉,放了兩個key+1到addCount這一步時,會有問題
//測驗代碼在TestBlockQueue類,直接運行可以看到會同時hot
//那么該問題用解決嗎,NO,不需要解決,1 首先要發生的條件極其苛刻,很難觸發,以京東這樣高的并發量,線上我也沒見過觸發連續2次推送同一個key的
//2 即便觸發了,后果也是可以接受的,2次推送而已,毫無影響,客戶端無感知,但是如果非要解決,就要對slidingWindow實體加鎖了,必然有一些開銷
//所以只要保證key數量不多計算就可以,少計算了沒事,因為熱key必然頻率高,漏計幾次沒事,但非熱key,多計算了,被干成了熱key就不對了
SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);//從這里可知,每個app的每個key都會對應一個滑動視窗
//看看hot沒
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
if (!hot) {
//如果沒hot,重新put,cache會自動重繪過期時間
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).put(key, slidingWindow);
} else {
//這里之所以放入的value為1,是因為hotCache是用來專門存盤剛生成的hotKey
//hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內不重復處理相同的hotKey,
//畢竟hotKey都是瞬時流量,可以避免在這5s內重復推送給client和dashboard,避免無效的網路開銷
hotCache.put(key, 1);
//刪掉該key
//這個key從實際上是專門針對slidingWindow的key,他的組合邏輯是appName+keyType+key,而不是給client和dashboard推送的hotKey
CaffeineCacheHolder.getCache(hotKeyModel.getAppName()).invalidate(key);
//開啟推送
hotKeyModel.setCreateTime(SystemClock.now());
//當開關打開時,列印日志,大促時關閉日志,就不列印了
if (EtcdStarter.LOGGER_ON) {
logger.info(NEW_KEY_EVENT + hotKeyModel.getKey());
}
//分別推送到各client和etcd
for (IPusher pusher : iPushers) {
pusher.push(hotKeyModel);
}
}
}
“thread.count”配置即為消費者個數,多個消費者共同消費一個QUEUE佇列
生產者消費者模型,本質上還是拉模式,之所以不使用EventBus,是因為需要佇列來做緩沖
根據HotKeyModel里面是否是洗掉訊息型別
- 洗掉訊息型別
- 根據HotKeyModel里面的appName+keyType+key的名字,來構建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應
- 洗掉hotCache里面newkey的快取,放入的快取kv分別是newKey和1,hotCache作用是用來存盤該生成的熱key,hotCache對應的caffeine有效期為5s,也就是說該key會保存5s,在5s內不重復處理相同的hotKey,畢竟hotKey都是瞬時流量,可以避免在這5s內重復推送給client和dashboard,避免無效的網路開銷
- 洗掉CaffeineCacheHolder里面對應appName的caffeine里面的newKey,這里面存盤的是slidingWindow滑動視窗
- 推送給該HotKeyModel對應的所有client實體,用來讓client洗掉該HotKeyModel
- 非洗掉訊息型別
- 根據HotKeyModel里面的appName+keyType+key的名字,來構建caffeine里面的newkey,該newkey在caffeine里面主要是用來與slidingWindow滑動時間窗對應
- 通過hotCache來判斷該newkey是否剛熱不久,如果是則回傳
- 根據滑動時間視窗來計算判斷該key是否為hotKey(這里可以學習一下滑動時間視窗的設計),并回傳或者生成該newKey對應的滑動視窗
- 如果沒有達到熱key的標準
- 通過CaffeineCacheHolder重新put,cache會自動重繪過期時間
- 如果達到了熱key標準
- 向hotCache里面增加newkey對應的快取,value為1表示剛為熱key,
- 洗掉CaffeineCacheHolder里面對應newkey的滑動視窗快取,
- 向該hotKeyModel對應的app的client推送netty訊息,表示新產生hotKey,使得client本地快取,但是推送的netty訊息只代表為熱key,client本地快取不會存盤key對應的value值,需要呼叫JdHotKeyStore里面的api來給本地快取的value賦值
- 向dashboard推送hotKeyModel,表示新產生hotKey
3)計算熱key滑動視窗的設計
限于篇幅的原因,這里就不細談了,直接貼出專案作者對其寫的說明文章:Java簡單實作滑動視窗
3.3.4 dashboard端
這個沒啥可說的了,就是連接etcd、mysql,增刪改查,不過京東的前端框架很方便,直接回傳list就可以成串列,
4 總結
文章第二部分為大家講解了redis資料傾斜的原因以及應對方案,并對熱點問題進行了深入,從發現熱key到解決熱key的兩個關鍵問題的總結,
文章第三部分是熱key問題解決方案——JD開源hotkey的原始碼決議,分別從client端、worker端、dashboard端來進行全方位講解,包括其設計、使用及相關原理,
希望通過這篇文章,能夠使大家不僅學習到相關方法論,也能明白其方法論具體的落地方案,一起學習,一起成長,
作者:李鵬
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/509172.html
標籤:其他
