索引
- 分析選擇器 - 一個讓人感動的演算法
- 分析 Dubbo 服務 - 中道崩殂...
- 分析同步配置 - Dubbo服務調不成功?!
- 分析請求型別決議
分析選擇器 - 一個讓人感動的演算法
首先分別啟動 soul-admin 、soul-bootstrap , 再啟動兩個相同的test服務組建集群 soul-test-http.
轉到后臺管理頁的 Divide 插件頁, 已經注冊了的選擇器 http 中, 可以看到兩個服務路徑的配置:

前置作業準備就緒, 在 DividePlugin 插件這打個斷點, 看看 selector 的此時內容:
SelectorData(id=1349650852775989248, pluginId=5, pluginName=divide, name=/http, matchMode=0, type=1, sort=1, enabled=true, loged=true, continued=true, handle=[{"upstreamHost":"localhost","protocol":"http://","upstreamUrl":"192.168.31.233:8187","weight":"99"},{"upstreamHost":"localhost","protocol":"http://","upstreamUrl":"192.168.31.233:8188","weight":"1"}], conditionList=[ConditionData(paramType=uri, operator=match, paramName=/, paramValue=/http/**)])
重點的是 handle 屬性, 存放兩個真實路徑資訊, 以及權重 weight .
在 DividePlugin 的 doExecute() 中, 沒有直接使用這個 Selector 的內容, 而是直接在快取查詢其id的對應資訊:
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
對于檢索到的 DivideUpstream 集合, 有包含權重、路徑等資訊, 并經過負載均衡工具類, 選擇出一個服務:
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
繼續追溯 LoadBalanceUtils 這部分:
public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {
// 通過 algorithm, 確定回傳的LoadBalance的子類
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
// 呼叫LoadBalance的select(), 從服務集群中選擇一個節點
return loadBalance.select(upstreamList, ip);
}
這里就有疑問了, algorithm 是由上面 ruleHandle.getLoadBalance() 的資訊得來, 而 ruleHandle 應該是規則配置, 仔細看了后臺管理頁面, 選擇器中并沒有規則配置, 規則配置被劃分在更加細粒度的路徑上了.

到這基本明白選擇器的玩法了, 集群間的機器, 只有權重分數不同, 具體落實到不同路徑上, 可以分配不同的規則.
負載規則總共有三種, hash (哈希)、random (隨機)、roundRobin (輪詢). 到這又有疑問了, 后面兩個我大致能猜測到怎樣與權重相作用, 使得實際比率與權重分比相匹配, 但哈希怎么去分呢? 看看 HashLoadBalance 代碼吧:
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
final ConcurrentSkipListMap<Long, DivideUpstream> treeMap = new ConcurrentSkipListMap<>();
for (DivideUpstream address : upstreamList) {
// 每個節點*VIRTUAL_NODE_NUM(默認5), 使hash更加均勻
for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
long addressHash = hash("SOUL-" + address.getUpstreamUrl() + "-HASH-" + i);
treeMap.put(addressHash, address);
}
}
// 從當前ip得到一個hash值, 并比對treemap(有序), 找到大于此hash值的位置
long hash = hash(String.valueOf(ip));
SortedMap<Long, DivideUpstream> lastRing = treeMap.tailMap(hash);
// 只要服務節點不增減, 同一個ip得到的節點就可以保持不變
if (!lastRing.isEmpty()) {
return lastRing.get(lastRing.firstKey());
}
return treeMap.firstEntry().getValue();
}
并沒有使用到 weight 屬性, 可以看到, 當選用 hash 策略時, 權重就失去了其作用. 每個機器都有同等幾率, 在某個ip下被訪問到.
看下另一個負載均衡規則, RandomLoadBalance 是怎么實作權重的:
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
// 總個數
int length = upstreamList.size();
// 總權重
int totalWeight = 0;
// 權重是否都一樣
boolean sameWeight = true;
for (int i = 0; i < length; i++) {
int weight = upstreamList.get(i).getWeight();
// 累計總權重
totalWeight += weight;
if (sameWeight && i > 0
&& weight != upstreamList.get(i - 1).getWeight()) {
// 計算所有權重是否一樣
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// 如果權重不相同且權重大于0則按總權重數隨機
int offset = RANDOM.nextInt(totalWeight);
// 并確定隨機值落在哪個片斷上
for (DivideUpstream divideUpstream : upstreamList) {
offset -= divideUpstream.getWeight();
if (offset < 0) {
return divideUpstream;
}
}
}
// 如果權重相同或權重為0則均等隨機
return upstreamList.get(RANDOM.nextInt(length));
}
PS: 這些注釋是官方自帶的…
這里可以通過注釋很明白的得知, 當使用 random 規則時, 所有節點權重分累加并隨機得到數字, 看具體是落在那個節點的權重片段上; 如果分數0或者相同則很直接的隨機集群長度即可.
最后看下 `RoundRobinLoadBalance` 的實作:
public DivideUpstream doSelect(final List<DivideUpstream> upstreamList, final String ip) {
String key = upstreamList.get(0).getUpstreamUrl();
ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.get(key);
if (map == null) {
methodWeightMap.putIfAbsent(key, new ConcurrentHashMap<>(16));
map = methodWeightMap.get(key);
}
int totalWeight = 0;
long maxCurrent = Long.MIN_VALUE;
long now = System.currentTimeMillis();
DivideUpstream selectedInvoker = null;
WeightedRoundRobin selectedWRR = null;
for (DivideUpstream upstream : upstreamList) {
String rKey = upstream.getUpstreamUrl();
// 取出節點在快取中的資訊
WeightedRoundRobin weightedRoundRobin = map.get(rKey);
int weight = upstream.getWeight();
if (weightedRoundRobin == null) {
weightedRoundRobin = new WeightedRoundRobin();
weightedRoundRobin.setWeight(weight);
map.putIfAbsent(rKey, weightedRoundRobin);
}
if (weight != weightedRoundRobin.getWeight()) {
weightedRoundRobin.setWeight(weight);
}
// 這里是第一個關鍵: 快取中的分數增加當前節點權重分
long cur = weightedRoundRobin.increaseCurrent();
weightedRoundRobin.setLastUpdate(now);
// 選擇快取分值高的節點
if (cur > maxCurrent) {
maxCurrent = cur;
selectedInvoker = upstream;
selectedWRR = weightedRoundRobin;
}
totalWeight += weight;
}
if (!updateLock.get() && upstreamList.size() != map.size() && updateLock.compareAndSet(false, true)) {
try {
ConcurrentMap<String, WeightedRoundRobin> newMap = new ConcurrentHashMap<>(map);
newMap.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > recyclePeriod);
methodWeightMap.put(key, newMap);
} finally {
updateLock.set(false);
}
}
if (selectedInvoker != null) {
// 這里是第二個關鍵: 快取中的分數, 減少總節點權重分
selectedWRR.sel(totalWeight);
return selectedInvoker;
}
return upstreamList.get(0);
}
這個演算法有點復雜, 我解釋下核心計算權重的方面:
- 兩個分值分別為2、100的節點進入, 快取中保留它們各自, 分值從0開始
- 經過for回圈后, 兩個節點在快取中的分值會以自身為基數增加, 假設后面步驟不進行, 則快取第一次為2、100, 第二次為4、200, 依次類推.
- 關鍵的第三步, 選出節點快取中分值最高的, 進行"處罰"措施, 減少所有節點的累計分值, 即102.
根據這個演算法的步驟, 一直沒有被選中的節點, 作為"成長獎勵", 會持續以自身為基數自增; 而被選中的節點, 作為"懲罰", 會減少其他節點的權重分之和.
可以預見, 權重分小的節點, 要自增到很久之后, 才會等來自身被選中的一刻, 然而那一刻它被懲罰的力度會非常大, 導致它一朝回到解放前, 又要開始漫長的積蓄力量. 而權重分大的節點, 每次被選上的懲罰力度很小, 即使多次后分數太低沒被選上, 他的獎勵分數(自身)也特別高, 一次增加就遠遠超越其他節點.
(就仿佛看到了一個沒有天賦但一直努力的平凡人, 然而上天仿佛給他開了玩笑, 每到一定時間, 必然降下天罰, 更可悲的是, 千百次的努力也抵不過一次懲罰, 一切又是歸0… 周而往復, 這個平凡的人, 依舊還會用千百次追逐換一次短暫的露頭, 可惡, 我被一個演算法感動到了!)
分析 Dubbo 服務 - 中道崩殂…
啟動 soul-test-alibaba-dubbo-service 專案注冊Dubbo服務. 注意要在后臺管理的插件管理中, 將Dubbo插件開啟, 并運行 Zookeeper 服務.
啟動后老習慣, 看看列印的日志里能撈出什么資訊:
2021-01-17 00:08:06.702 INFO 2244 --- [pool-1-thread-1] a.d.AlibabaDubboServiceBeanPostProcessor : dubbo client register success :{} {"appName":"dubbo","contextPath":"/dubbo","path":"/dubbo/saveComplexBeanTestAndName","pathDesc":"","rpcType":"dubbo","serviceName":"org.dromara.soul.test.dubbo.api.service.DubboMultiParamService","methodName":"saveComplexBeanTestAndName","ruleName":"/dubbo/saveComplexBeanTestAndName","parameterTypes":"org.dromara.soul.test.dubbo.api.entity.ComplexBeanTest,java.lang.String","rpcExt":"{\"timeout\":10000}","enabled":true}
首先看到啟動程序中陸陸續續的打出這類日志, 表示注冊了相關Dubbo服務, 這里有資源名、類名方法名等資訊, 關鍵類 AlibabaDubboServiceBeanPostProcessor 中追溯下, 發現還是與http服務的注冊基本一樣, 老一套也恰恰說明, soul 將dubbo服務與http服務定義了同樣的元資料, 設計很好的體現. 之后可以看看元資料這方面的設計.
其他資訊就沒什么有價值的了, 都是Dubbo包打出來的.
分析同步配置 - Dubbo服務調不成功?!
在呼叫網關轉發 Dubbo 服務期間發生了意外, 沒有成功調取, 于是在插件祖宗類 SoulPlugin 上斷點, 看下都進了哪些插件. 發現除了 WebClientPlugin 和 WebClientResponsePlugin, 其他都走了一遍. Web的不走是正確的, 本身是Dubbo服務, 但是, 最關鍵的是沒有看到 DubboPlugin .
于是繼續追溯, 檢查 SoulWebHandler 中獲取到的 plugins 串列, 發現還是那9個, 并沒有Dubbo插件, 繼續探索!
發現 soul-bootstrap 在啟動時的配置 SoulConfiguration , 有獲取:
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("loader plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
嘗試重啟網關并斷點, 仍然只有9個插件, 疑惑十足. 這時想起, 分析原始碼第0期, 有在 soul-bootstrap 啟動日志中, 看到和 soul-admin 建立 WebSocket 的相關類 WebsocketSyncDataService, 當時還記了一筆日后要分析它的同步配置, 沒想到來的這么快…
跟蹤 WebsocketSyncDataService 的構造器, 找到監聽類 SoulWebsocketClient :
@Override
public void onMessage(final String result) {
handleResult(result);
}
private void handleResult(final String result) {
WebsocketData websocketData = GsonUtils.getInstance().fromJson(result, WebsocketData.class);
ConfigGroupEnum groupEnum = ConfigGroupEnum.acquireByName(websocketData.getGroupType());
String eventType = websocketData.getEventType();
String json = GsonUtils.getInstance().toJson(websocketData.getData());
websocketDataHandler.executor(groupEnum, json, eventType);
}
這個 handleResult() 方法處斷點, 并在 管理后臺 手動點下同步配置, 這里看到傳入更新插件的json:
{
"groupType": "PLUGIN",
"eventType": "REFRESH",
"data": [
{
"id": "1",
"name": "sign",
"role": 0,
"enabled": false
},
{
"id": "2",
"name": "waf",
"config": "{\"model\":\"black\"}",
"role": 0,
"enabled": false
},
{
"id": "3",
"name": "rewrite",
"role": 0,
"enabled": false
},
{
"id": "4",
"name": "rate_limiter",
"config": "{\"master\":\"mymaster\",\"mode\":\"Standalone\",\"url\":\"192.168.1.1:6379\",\"password\":\"abc\"}",
"role": 0,
"enabled": false
},
{
"id": "5",
"name": "divide",
"role": 0,
"enabled": true
},
{
"id": "6",
"name": "dubbo",
"config": "{\"register\":\"zookeeper://localhost:2181\"}",
"role": 0,
"enabled": true
},
{
"id": "7",
"name": "monitor",
"config": "{\"metricsName\":\"prometheus\",\"host\":\"localhost\",\"port\":\"9190\",\"async\":\"true\"}",
"role": 0,
"enabled": false
},
{
"id": "8",
"name": "springCloud",
"role": 0,
"enabled": false
},
{
"id": "9",
"name": "hystrix",
"role": 0,
"enabled": false
}
]
}
可以看到, soul-admin 傳來的配置資訊都是正確的, 繼續追溯 handleResult , 找到 WebsocketDataHandler 的抽象類 AbstractDataHandler:
public void handle(final String json, final String eventType) {
List<T> dataList = convert(json);
if (CollectionUtils.isNotEmpty(dataList)) {
DataEventTypeEnum eventTypeEnum = DataEventTypeEnum.acquireByName(eventType);
switch (eventTypeEnum) {
case REFRESH:
case MYSELF:
doRefresh(dataList);
break;
case UPDATE:
case CREATE:
doUpdate(dataList);
break;
case DELETE:
doDelete(dataList);
break;
default:
break;
}
}
}
這里就是網關的同步重繪機制核心處了, 根據debug找到 doRefresh , 找到插件相關的更新類 PluginDataHandler :
protected void doRefresh(final List<PluginData> dataList) {
// 類似清理快取, 先跳過不管, 關注更新方法
pluginDataSubscriber.refreshPluginDataSelf(dataList);
// 追溯這個方法
dataList.forEach(pluginDataSubscriber::onSubscribe);
}
追溯到 CommonPluginDataSubscriber 類:
public void onSubscribe(final PluginData pluginData) {
BaseDataCache.getInstance().cachePluginData(pluginData);
Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));
}
這里的第二句, 看的出從map匹配對應的handler實作, 不為空則執行. 查看 PluginDataHandler 的實作, 包含有 AlibabaDubboPluginDataHandler 這個看上去和 Dubbo相關的資料更新類, 但為什么插件沒出來呢?
debug到 handlerMap 的資料后, 終于發現問題所在! 這里的內容, 僅包含之前見到的幾個插件, 根本沒有 Dubbo插件的名稱, 看來問題就出在這里了.

追溯 handlerMap 的來源, 又回到一切配置的源頭 SoulConfiguration 類, 這里的 pluginDataSubscriber 方法, 注入了 handlerMap 的所有資訊:
@Bean
public PluginDataSubscriber pluginDataSubscriber(final ObjectProvider<List<PluginDataHandler>> pluginDataHandlerList) {
return new CommonPluginDataSubscriber(pluginDataHandlerList.getIfAvailable(Collections::emptyList));
}
這是spring bean , 自然是啟動時加載的資料, 那么這些可讀取的插件資訊, 就肯定來自與組態檔.
又是漫長的時間過去, 最終檢查到 soul-bootstrap 的 pom.xml 這里, 看到 alibaba-dubbo 這里的屏蔽狀態, 我立刻相通了…
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-plugin-alibaba-dubbo</artifactId>
<version>${project.version}</version>
</dependency>
解開真相, 找到 soul-spring-boot-starter-plugin-alibaba-dubbo 專案的 AlibabaDubboPluginConfiguration 類, 這里就有我苦苦找尋的 Dubbo插件資料更新處理類:
@Bean
public PluginDataHandler alibabaDubboPluginDataHandler() {
return new AlibabaDubboPluginDataHandler();
}
將 soul-bootstrap 網關的pom中alibaba-dubbo啟用并重啟專案, 它在 CommonPluginDataSubscriber 里靜靜躺著:

嘗試 http呼叫網關dubbo服務, 測驗通過! 太辛苦了!
分析請求型別決議
嘗試呼叫dubbo的時候又發生了個小插曲, 呼叫沒有走通在 DividePlugin 這里被吃掉了, 沒有走到 DubboPlugin 處. 折騰了一會后 (重啟等) 再次呼叫又好了, 在這里分析下原因:
首先 DividePlugin 插件在 DubboPlugin 插件前, 如果走入 DividePlugin 且沒匹配到規則, 會直接 return 掉, 在 AbstractSoulPlugin 的 execute() 這里:
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return CheckUtils.checkSelector(pluginName, exchange, chain);
}
final SelectorData selectorData = matchSelector(exchange, selectors);
// 規則未匹配到, 進入這里
if (Objects.isNull(selectorData)) {
if (PluginEnum.WAF.getName().equals(pluginName)) {
return doExecute(exchange, chain, null, null);
}
return CheckUtils.checkSelector(pluginName, exchange, chain);
}
...
}
return chain.execute(exchange);
}
CheckUtils 類這里, 會列印規則未匹配的日志, 并return:
public static Mono<Void> checkRule(final String pluginName, final ServerWebExchange exchange, final SoulPluginChain chain) {
if (PluginEnum.DIVIDE.getName().equals(pluginName)
|| PluginEnum.DUBBO.getName().equals(pluginName)
|| PluginEnum.SPRING_CLOUD.getName().equals(pluginName)) {
LOGGER.error("can not match rule data :{}", pluginName);
Object error = SoulResultWarp.error(SoulResultEnum.RULE_NOT_FIND.getCode(), SoulResultEnum.RULE_NOT_FIND.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
return chain.execute(exchange);
}
所以, 如果關于 Dubbo 服務的呼叫, 只要走入 DividePlugin 肯定就出不去了, 正常的情況是怎樣呢? 會跳過 DividePlugin 直接下一個的, 具體看 SoulWebHandler 這:
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
// 如果是dubbo呼叫, 輪到DividePlugin時會跳過的
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
} else {
return plugin.execute(exchange, this);
}
} else {
return Mono.empty();
}
});
}
繼續追蹤 DividePlugin 的 skip() 方法:
public Boolean skip(final ServerWebExchange exchange) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
// 檢查呼叫型別是否是 http
return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());
}
這里得到的 RPCType 正常是 dubbo , 說明剛剛的呼叫時回傳的背景關系中 soulContext 資訊有誤.
查看下 rpcType 屬性被誰參考, 看到 DefaultSoulContextBuilder 類有 set() 過, debug追溯, 發現每次請求, 都會進入這里的 build() 去注入相關資訊給背景關系:
public SoulContext build(final ServerWebExchange exchange) {
final ServerHttpRequest request = exchange.getRequest();
String path = request.getURI().getPath();
// 這里是關鍵, 通過鏈接決議元資料, 得到rpcType型別
MetaData metaData = MetaDataCache.getInstance().obtain(path);
if (Objects.nonNull(metaData) && metaData.getEnabled()) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
}
return transform(request, metaData);
}
private SoulContext transform(final ServerHttpRequest request, final MetaData metaData) {
final String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
final String sign = request.getHeaders().getFirst(Constants.SIGN);
final String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
SoulContext soulContext = new SoulContext();
String path = request.getURI().getPath();
soulContext.setPath(path);
if (Objects.nonNull(metaData) && metaData.getEnabled()) {
if (RpcTypeEnum.SPRING_CLOUD.getName().equals(metaData.getRpcType())) {
setSoulContextByHttp(soulContext, path);
// 這里將元資料的rpctype注入到背景關系中
soulContext.setRpcType(metaData.getRpcType());
} else {
setSoulContextByDubbo(soulContext, metaData);
}
} else {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
}
soulContext.setAppKey(appKey);
soulContext.setSign(sign);
soulContext.setTimestamp(timestamp);
soulContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> soulContext.setHttpMethod(httpMethod.name()));
return soulContext;
}
繼續追蹤決議類 MetaDataCache:
public MetaData obtain(final String path) {
MetaData metaData = META_DATA_MAP.get(path);
if (Objects.isNull(metaData)) {
String key = META_DATA_MAP.keySet().stream().filter(k -> PathMatchUtils.match(k, path)).findFirst().orElse("");
return META_DATA_MAP.get(key);
}
return metaData;
}
其中的 PathMatchUtils 會去匹配并輸出型別, 不繼續跟蹤了, 明日嘗試下看還能否復現問題…
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/250251.html
標籤:其他
