場景描述:客戶端遠程異步呼叫ServiceA,ServiceA在處理客戶端請求的程序中需要遠程同步呼叫ServiceB,ServiceA從ServiceB的回應中取資料時,得到的是null,對就是這個坑。
使用DEBUG模式,分析Dubbo原始碼得到問題的起因。
分析程序如下:
客戶端和服務端通信,配置使用netty進行網路傳輸,通過NettyHandler進行具體的訊息收發操作,所以從此入手進行原始碼分析。
client到ServiceA的遠程方法異步呼叫,會在RpcContext(RpcContext是一個臨時狀態記錄器,當接收到RPC請求,或發起RPC請求時,RpcContext的狀態都會變化。比如:A調B,B再調C,則B機器上,在B調C之前,RpcContext記錄的是A調B的資訊,在B調C之后,RpcContext記錄的是B調C的資訊)的attachments(Map結構)屬性中添加async=true的鍵值對,同時也會在RpcInvocation的attachments(Map結構)中添加async=true的鍵值對。
經過一系列的Filter,程式運行到AbstractInvoker的invoke方法,注意該方法中的如下代碼段,
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
這里會把當前RpcContext中的attachments添加到呼叫ServiceB的RpcInvocation中,這時候async=true已經添加了,接著執行如下代碼段,
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)){
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
上面代碼判斷呼叫ServiceB的URL中是否含有async=true,如果有將設定async=true到RpcInvocation的attachments中,這時候是不包含的。
繼續跟蹤代碼,運行到DubboInvoker中,呼叫doInvoke方法,該方法中有如下的代碼段,boolean isAsync = RpcUtils.isAsync(getUrl(), invocation),這個isAsync方法具體宣告如下,
public static boolean isAsync(URL url, Invocation inv) {
boolean isAsync ;
//如果Java代碼中設定優先.
if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {
isAsync = true;
} else {
isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);
}
return isAsync;
}
上面方法首先判斷RpcInvocation的attachments中async=true是否成立,如果成立則這是一次異步呼叫,否則判斷請求URL中async=true是否成立,如果成立則是一次異步呼叫,否則是一次同步呼叫,根據上面傳遞的引數,此時isAsync方法回傳的是true,ServiceA同步呼叫ServiceB變成了異步呼叫,繼續看下面的異步呼叫,代碼段如下,
else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
}
這里直接回傳了一個RpcResult物件,沒有資料內容,所以到這里,這個案子也就破了,ServiceA想從回應中取目標資料得到的當然是null。再延伸一下,如果ServiceB再同步呼叫ServiceC,這是可以正常同步呼叫的,因為ServiceA呼叫完ServiceB后,ConsumerContextFilter的invoke方法會清除attachements,所以ServiceB可以正常同步呼叫ServiceC了。
對于上面的問題,解決辦法有三個:
1.方法呼叫兩次,所以不推薦
ServiceA呼叫ServiceB的地方寫兩次一樣的呼叫,這個方法原理就像ServiceB呼叫ServiceC一樣,即清除attachements,這個方法最簡單,但是可能對不了解的人來說,這塊業務代碼寫重復了,會不小心洗掉掉,而且從寫代碼的角度來說,這個很雞肋,所以不推薦。
2.修改Dubbo原始碼
修改AbstractInvoker第137行,改成每次都對async進行實際賦值,
boolean isAsync = getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false);
invocation.setAttachment(Constants.ASYNC_KEY, String.valueOf(isAsync));
3.自定義Filter
實作com.alibaba.dubbo.rpc.Filter,在RpcContext中清除這個async,
@Activate(group = {Constants.PROVIDER})
public class AsyncFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext().getAttachments().remove(Constants.ASYNC_KEY);
return invoker.invoke(invocation);
}
}
同時在src/main/resources/META-INF/dubbo/下添加com.alibaba.dubbo.rpc.Filter檔案,內容檔案如下:
asyncFilter=com.abc.filter.AsyncFilter
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />
調度器 Dispatcher 調度策略
all:所有訊息都派發到執行緒池,包括請求,回應,連接事件,斷開事件,心跳等,默認。
direct:所有訊息都不派發到執行緒池,全部在 IO 執行緒上直接執行。
message:只有請求回應訊息派發到執行緒池,其它連接斷開事件,心跳等訊息,直接在 IO 執行緒上執行。
execution:只請求訊息派發到執行緒池,不含回應,回應和其它連接斷開事件,心跳等訊息,直接在 IO 執行緒上執行。
connection:在 IO 執行緒上,將連接斷開事件放入佇列,有序逐個執行,其它訊息派發到執行緒池。
執行緒池 ThreadPool
fixed:固定大小執行緒池,啟動時建立執行緒,不關閉,一直持有,默認,200。
cached:快取執行緒池,空閑一分鐘自動洗掉,需要時重建。
limited:可伸縮執行緒池,但池中的執行緒數只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然來了大流量引起的性能問題。
eager:優先創建Worker執行緒池。在任務數量大于corePoolSize但是小于maximumPoolSize時,優先創建Worker來處理任務。當任務數量大于maximumPoolSize時,將任務放入阻塞佇列中。阻塞佇列充滿時拋出RejectedExecutionException。(相比于cached:cached在任務數量超過maximumPoolSize時直接拋出例外而不是將任務放入阻塞佇列)
uj5u.com熱心網友回復:
親自測驗有效轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/41325.html
標籤:Web 開發
