一,為什么要用Dubbo
1.為什么要用現成的框架呢?
如果我們自己去開發一個網路通信,需要考慮到
- 底層網路通信協議的處理
- 序列化和反序列化的處理作業
這些作業本身應該是通用的,應該是一個中間件服務,為整個公司提供遠程通信的服務,而不應該由業務開發人員來自己去實作,所以才有了這樣的 rpc 框架,使得我們呼叫遠程方法時就像呼叫本地方法那么簡單,不需要關心底層的通信邏輯,
2.大規模服務化對于服務治理的要求
當企業開始大規模的服務化以后,遠程通信帶來的弊端就越來越明顯了,
- 服務鏈路變長了,如何實作對服務鏈路的跟蹤和監控
- 服務的大規模集群使得服務之間需要依賴第三方注冊中心來解決服務的發現和服務的感知問題
- 服務通信之間的例外,需要有一種保護機制防止一個節點故障引發大規模的系統故障,所以要有容錯機制
- 服務大規模集群會是的客戶端需要引入負載均衡機制實作請求分發
dubbo 主要是一個分布式服務治理解決方案,那么什么是服務治理?服務治理主要是針對大規模服務化以后,服務之間的路由、負載均衡、容錯機制、服務降級這些問題的解決方案,而 Dubbo 實作的不僅僅是遠程服務通信,并且還解決了服務路由、負載、降級、容錯等功能,
二,Dubbo 的基本使用
1.dubbo-common
1)service
/**
* @author yhd
* @email yinhuidong1@xiaomi.com
* @description TODO
* @since 2021/4/2 0:32
*/
public interface LoginService {
String login(String username,String password);
}
2.dubbo-provider
1)pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo</artifactId>
<version>2.7.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>com.yhd</groupId>
<artifactId>dubbo-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
2)組態檔
server.port=8081
dubbo.application.name=dubbo-provider
dubbo.registry.address=zookeeper://localhost:2181
dubbo.registry.protocal=zookeeper
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880
3)主啟動類
@EnableDubbo
@ComponentScan("com.yhd")
@SpringBootApplication
public class DubboProviderApplication {
public static void main(String[] args) {
SpringApplication.run(DubboProviderApplication.class, args);
}
}
4)service
@DubboService
public class LoginServiceImpl implements LoginService {
@Override
public String login(String username, String password) {
return "login success!";
}
}
3.dubbo-consumer
1)Controller
@RestController
public class LoginController {
@DubboReference
private LoginService loginService;
@GetMapping("login/{aaa}/{bbb}")
public String login(@PathVariable("aaa")String aaa,@PathVariable("bbb") String bbb){
return loginService.login(aaa,bbb);
}
}
三,dubbo啟動原理
Dubbo 提供了幾種容器讓我們去啟動和發布服務
1.容器型別
Spring Container
自動加載 META-INF/spring 目錄下的所有 Spring 配置,
logback Container
自動裝配 logback 日志
Log4j Container
自動配置 log4j 的配置
Dubbo 提供了一個 Main.main 快速啟動相應的容器,默認情況下,只會啟動 spring 容器
2.原理分析
默認情況下,spring 容器,本質上,就是加在 spring ioc 容器,然后啟動一個 netty 服務實作服務的發布,所以并沒有特別多的黑科技,下面是spring 容器啟動的代碼
public void start() {
String configPath =ConfigUtils.getProperty( "dubbo.spring.config");
if (StringUtils.isEmpty(configPath)) {
configPath = "classpath*:META- - INF/spring/*.xml";
}
context = new
ClassPathXmlApplicationContext(configPath.split( "[,\ \\ \ s]+"),false);
context.refresh();
context.start();
}
四,Dubbo對注冊中心的支持
Dubbo 能夠支持的注冊中心有:consul、etcd、nacos、sofa、zookeeper、redis、multicast
1.Dubbo 集成 Zookeeper 的實作原理

2.dubbo 每次都要連 zookeeper ?
不是每次發起一個請求的時候,都需要訪問注冊中心,是通過快取實作,
其他注冊中心的實作,核心本質是一樣的,都是為了管理服務地址,
3.多注冊中心支持
Dubbo 中可以支持多注冊中心,有的時候,客戶端需要用呼叫的遠程服務不在同一個注冊中心上,那么客戶端就需要配置多個注冊中心來訪問,
五,Dubbo僅僅是一個RPC框架?
Dubbo 的核心功能,提供服務注冊和服務發現以及基于 Dubbo 協議的遠程通信,Dubbo 從另一個方面來看也可以認為是一個服務治理生態,
- Dubbo 可以支持市面上主流的注冊中心
- Dubbo 提供了 Container 的支持,默認提供了 3 種 container,
- Dubbo 對于 RPC 通信協議的支持,不僅僅是原生的 Dubbo 協議,它還圍繞著
rmi、hessian、http、webservice、thrift、rest
有了多協議的支持,使得其他 rpc 框架的應用程式可以快速的切入到 dubbo生態中, 同時,對于多協議的支持,使得不同應用場景的服務,可以選擇合適的協議來發布服務,并不一定要使用 dubbo 提供的長連接方式,
1.Dubbo 監控平臺安裝
Dubbo-Admin
- 修 改
dubbo-admin-server/src/main/resources/application.properties中的配置資訊 mvn clean package進行構建mvn clean package進行構建- 訪問
localhost:8080
2.Dubbo 的終端操作
Dubbo 里面提供了一種基于終端操作的方法來實作服務治理,
使用 telnet localhost 20880 連接到服務對應的埠,
1)常見命令
ls
ls: 顯示服務串列
ls -l: 顯示服務詳細資訊串列
ls XxxService: 顯示服務的方法串列
ls -l XxxService: 顯示服務的方法詳細資訊串列
ps
ps: 顯示服務埠串列
ps -l: 顯示服務地址串列
ps 20880: 顯示埠上的連接資訊
ps -l 20880: 顯示埠上的連接詳細資訊
cd
cd XxxService: 改變預設服務,當設定了預設服務,凡是需要輸入服務名作
為引數的命令,都可以省略服務引數
cd /: 取消預設服務
pwd
pwd: 顯示當前預設服務
count
count XxxService: 統計 1 次服務任意方法的呼叫情況
count XxxService 10: 統計 10 次服務任意方法的呼叫情況
count XxxService xxxMethod: 統計 1 次服務方法的呼叫情況
count XxxService xxxMethod 10: 統計 10 次服務方法的呼叫情況
六,負載均衡
1.負載均衡的背景
當服務端存在多個節點的集群時,zookeeper 上會維護不同集群節點,對于客戶端而言,他需要一種負載均衡機制來實作目標服務的請求負載,通過負載均衡,可以讓每個服務器節點獲得適合自己處理能力的負載,
Dubbo 里面默認就集成了負載均衡的演算法和實作,默認提供了 4 種負載均衡實作,
2.Dubbo 中負載均衡的應用
1)啟動兩臺一樣的服務
修改組態檔
dubbo.protocol.port=20881
2)代碼
@DubboService(loadbalance = "random")
3.Dubbo負載均衡演算法
1)RandomLoadBalance
權重隨機演算法,根據權重值進行隨機負載,
它的演算法思想很簡單,假設我們有一組服務器 servers = [A, B, C],他們對應的權重為weights = [5, 3, 2],權重總和為 10,現在把這些權重值平鋪在一維坐標值上,[0, 5) 區間屬于服務器 A,[5, 8) 區間屬于服務器 B,[8, 10) 區間屬于服務器 C,接下來通過亂數生成器生成一個范圍在 [0, 10) 之間的亂數,然后計算這個亂數會落到哪個區間上,比如數字 3 會落到服務器 A 對應的區間上,此時回傳服務器 A 即可,權重越大的機器,在坐標軸上對應的區間范圍就越大,因此亂數生成器生成的數字就會有更大的概率落到此區間內,只要亂數生成器產生的亂數分布性很好,在經過多次選擇后,每個服務器被選中的次數比例接近其權重比例,
2)LeastActiveLoadBalance
最少活躍呼叫數演算法,活躍呼叫數越小,表明該服務提供者效率越高,單位時間內可處理更多的請求這個是比較科學的負載均衡演算法,
每個服務提供者對應一個活躍數 active,初始情況下,所有服務提供者活躍數均為 0,每收到一個請求,活躍數加 1,完成請求后則將活躍數減 1,在服務運行一段時間后,性能好的服務提供者處理請求的速度更快,因此活躍數下降的也越快,此時這樣的服務提供者能夠優先獲取到新的服務請求,
3)ConsistentHashLoadBalance
hash 一致性演算法,相同引數的請求總是發到同一提供者,
當某一臺提供者掛時,原本發往該提供者的請求,基于虛擬節點,平攤到其它提供者,不會引起劇烈變動,
4)RoundRobinLoadBalance
加權輪詢演算法
所謂輪詢是指將請求輪流分配給每臺服務器,舉個例子,我們有三臺服務器 A、B、C,我們將第一個請求分配給服務器 A,第二個請求分配給服務器 B,第三個請求分配給服務器 C,第四個請求再次分配給服務器 A,這個程序就叫做輪詢,輪詢是一種無狀態負載均衡演算法,實作簡單,適用于每臺服務器性能相近的場景下,但現實情況下,我們并不能保證每臺服務器性能均相近,如果我們將等量的請求分配給性能較差的服務器,這顯然是不合理的,因此,這個時候我們需要對輪詢程序進行加權,以調控每臺服務器的負載,經過加權后,每臺服務器能夠得到的請求數比例,接近或等于他們的權重比,比如服務器 A、B、C 權重比為 5:2:1,那么在 8 次請求中,服務器 A 將收到其中的 5 次請求,服務器 B 會收到其中的 2 次請求,服務器 C 則收到其中的 1次請求,
5)一致性 hash 演算法原理
七,集群容錯
網路通信會有很多不確定因素,比如網路延遲、網路中斷、服務例外等,會造成當前這次請求出現失敗, 當服務通信出現這個問題時,需要采取一定的措施應對,而 dubbo 中提供了容錯機制來優雅處理這種錯誤,
在集群呼叫失敗時,Dubbo 提供了多種容錯方案,預設為 failover 重試,
@DubboService(loadbalance = "random",cluster = "failsafe")
1.Failover Cluster
失敗自動切換,當出現失敗,重試其它服務器,(預設),
通常用于讀操作,但重試會帶來更長延遲,
可通過 retries=“2” 來設定重試次數(不含第一次),
2.Failfast Cluster
快速失敗,只發起一次呼叫,失敗立即報錯,
通常用于非冪等性的寫操作,比如新增記錄,
3.Failsafe Cluster
失敗安全,出現例外時,直接忽略,
通常用于寫入審計日志等操作,
4.Failback Cluster
失敗自動恢復,后臺記錄失敗請求,定時重發,
通常用于訊息通知操作,
5.Forking Cluster
并行呼叫多個服務器,只要一個成功即回傳,
通常用于實時性要求較高的讀操作,但需要浪費更多服務資源,
可通過 forks=“2” 來設定最大并行數,
6.Broadcast Cluster
廣播呼叫所有提供者,逐個呼叫,任意一臺報錯則報錯,(2.1.0 開始支持)
通常用于通知所有提供者更新快取或日志等本地資源資訊,
在實際應用中 查詢陳述句容錯策略建議使用默認 Failover Cluster ,而增刪改 建議使用Failfast Cluster 或者 使用 Failover Cluster(retries=”0”) 策略 防止出現資料 重復添加等等其它問題!建議在設計介面時候把查詢介面方法單獨做一個介面提供查詢,
八,服務降級
1.降級的概念
當某個非關鍵服務出現錯誤時,可以通過降級功能來臨時屏蔽這個服務,降級可以有幾個層面的分類: 自動降級和人工降級; 按照功能可以分為:讀服務降級和寫服務降級;
- 對一些非核心服務進行人工降級,在大促之前通過降級開關關閉哪些推薦內容、評價等對主流程沒有影響的功能,
- 故障降級,比如呼叫的遠程服務掛了,網路故障、或者 RPC 服務回傳例外, 那么可以直接降級,降級的方案比如設定默認值、采用兜底資料(系統推薦的行為廣告掛了,可以提前準備靜態頁面做回傳)等等,
- 限流降級,在秒殺這種流量比較集中并且流量特別大的情況下,因為突發訪問量特別大可能會導致系統支撐不了,這個時候可以采用限流來限制訪問量,當達到閥值時,后續的請求被降級,比如進入排隊頁面,比如跳轉到錯誤頁(活動太火爆,稍后重試等),
那么,Dubbo 中如何實作服務降級呢?Dubbo 中提供了一個 mock 的配置,可以通過mock 來實作當服務提供方出現網路例外或者掛掉以后,客戶端不拋出例外,而是通過Mock 資料回傳自定義的資料,
2.Dubbo 實作服務降級
在 dubbo-client 端創建一個 mock 類,當出現服務降級時,會被呼叫
/**
* @author yhd
* @email yinhuidong1@xiaomi.com
* @description 服務降級兜底類
* @since 2021/4/2 15:28
*/
public class MockSayHelloService implements LoginService {
@Override
public String login(String username, String password) {
return "Sorry, 服務端發生例外,被降級啦!";
}
}
在消費方的主街上配置:
@DubboReference(mock = "com.yhd.dubboconsumer.mock.MockSayHelloService",
timeout = 1000, loadbalance = "random", cluster = "failfast",check = false)
private LoginService loginService;
3.啟動時檢查
Dubbo 預設會在啟動時檢查依賴的服務是否可用,不可用時會拋出例外,阻止 Spring初始化完成,以便上線時,能及早發現問題,默認 check=“true”,
可以通過 check=“false” 關閉檢查,比如,測驗時,有些服務不關心,或者出現了回圈依賴,必須有一方先啟動,
registry、reference、consumer 都可以配置 check 這個屬性.
@DubboReference(mock = "com.yhd.dubboconsumer.mock.MockSayHelloService",
timeout = 1000, loadbalance = "random", cluster = "failfast",check = false)
private LoginService loginService;
4.多版本支持
當一個介面實作,出現不兼容升級時,可以用版本號過渡,版本號不同的服務相互間不參考,
可以按照以下的步驟進行版本遷移:
- 在低壓力時間段,先升級一半提供者為新版本
- 再將所有消費者升級為新版本
- 然后將剩下的一半提供者升級為新版本
5.主機系結
1)默認的主機系結方式
- 通過
LocalHost.getLocalHost()獲取本機地址, - 如果是 127.*等
loopback(環路地址)地址,則掃描各網卡,獲取網卡 IP,- 如果是
springboot,修改配置:dubbo.protocol.host=””, - 如果注冊地址獲取不正確,可以通過在
dubbo.xml中加入主機地址的配置,
- 如果是
<dubbo:protocol host="205.182.23.201">
2)預設主機埠
dubbo: 20880
rmi: 1099
http: 80
hessian: 80
webservice: 80
memcached: 11211
redis: 6379
九,Dubbo 新的功能
1.動態配置規則
動態配置是 Dubbo2.7 版本引入的一個新的功能,簡單來說,就是把 dubbo.properties中的屬性進行集中式存盤,存盤在其他的服務器上,
那么如果需要用到集中式存盤,那么還需要一些配置中心的組件來支撐,目前 Dubbo 能支持的配置中心有:apollo、nacos、zookeeper,
從另外一個角度來看,我們之前用 zookeeper 實作服務注冊和發現,本質上就是使用 zookeeper 實作了配置中心,這個配置中心只是維護了服務注冊和服務感知的功能,在 2.7 版本中,dubbo 對配置中心做了延展,除了服務注冊之外,還可以把其他的資料存盤在 zookeeper 上,從而更好的進行維護,
1)在 dubboadmin 添加配置
應用名稱可以是 global,或者對應當前服務的應用名,如果是 global 表示全域配置,針對所有應用可見,
配置的內容,實際就是 dubbo.properties 中配置的基本資訊,只是同意存盤在了zookeeper 上,
2)本地的組態檔添加配置中心
在 application.properties 中添加配置中心的配置項,app-name對應的是上一步創建的配置項中的應用名.
dubbo.config-center.address= zookeeper://192.168.13.106 6 :2181
dubbo.config-center.app- - name= spring-boot-provider
#需要注意的是,存在于配置中心上的配置項,本地仍然需要配置一份,所以下面這些配置一定要加上,否則啟動不了,這樣做的目的是保證可靠性
dubbo.application.name= spring-boot-provider
dubbo.protocol.port= 20880
dubbo.protocol.name= dubbo
dubbo.registry.address= zookeeper://192.168.13.102:2181?backup=192.168.13.103:2181,192.168.13.104:2181
3)配置的優先級
引入配置中心后,配置的優先級就需要關注了,默認情況下,外部配置的優先級最高,也就是意味著配置中心上的配置會覆寫本地的配置,當然我們也可以調整優先級,
dubbo.config-center.highest-priority=false
4)配置中心的原理
默認所有的配置都存盤在/dubbo/config 節點,
namespace,用于不同配置的環境隔離,
config,Dubbo 約定的固定節點,不可更改,所有配置和服務治理規則都存盤在此節點下,
dubbo/application,分別用來隔離全域配置、應用級別配置:dubbo 是默認 group 值,
application 對應應用名,
dubbo.properties,此節點的 node value 存盤具體配置內容,

2.元資料中心
Dubbo2.7 的另外一個新的功能,就是增加了元資料的配置,
在 Dubbo2.7 之前,所有的配置資訊,比如服務介面名稱、重試次數、版本號、負載策略、容錯策略等等,所有引數都是基于 url 形式配置在 zookeeper 上的,這種方式會造成一些問題:
- url 內容過多,導致資料存盤空間增大
- url 需要涉及到網路傳輸,資料量過大會造成網路傳輸過慢
- 網路傳輸慢,會造成服務地址感知的延遲變大,影響服務的正常回應
服務提供者這邊的配置引數有 30 多個,有一半是不需要作為注冊中心進行存盤和傳輸地的,而消費者這邊可配置的引數有 25 個以上,只有個別是需要傳遞到注冊中心的,所以,在 Dubbo2.7 中對元資料進行了改造,簡單來說,就是把屬于服務治理的資料發布到注冊中心,其他的配置資料統一發布到元資料中心,這樣一來大大降低了注冊中心的負載,
1)元資料中心配置
元資料中心目前支持 redis 和 zookeeper,官方推薦是采用 redis,畢竟 redis 本身對于非結構化存盤的資料讀寫性能比較高,當然,也可以使用 zookeeper 來實作,將注冊中心地址、元資料中心地址等配置集中管理,可以做到統一環境、減少開發側感知,官網可查詢外部化配置,不過描述過于簡略,
dubbo.metadata-report.address= zookeeper://192.168.13.106:2181
dubbo.registry.simplified= true
#注冊到注冊中心的 URL 是否采用精簡模式的(與低版本兼容)
十,Dubbo 中的 SPI 機制
dubbo版本2.7.2,
1.Java SPI
SPI 是 JDK 內置的一種服務提供發現機制,目前市面上有很多框架都是用它來做服務的擴展發現,簡單來說,它是一種動態替換發現的機制,舉個簡單的例子,我們想在運行時動態給它添加實作,你只需要添加一個實作,然后把新的實作描述給 JDK 知道就行了,如 JDBC、日志框架都有用到,

1)實作SPI需要遵循的標準
- 需要在 classpath 下創建一個目錄,該目錄命名必須是:META-INF/service
- 在該目錄下創建一個 properties 檔案,該檔案需要滿足以下幾個條件
- 檔案名必須是擴展的介面的全路徑名稱
- 檔案內部描述的是該擴展介面的所有實作類
- 檔案的編碼格式是 UTF-8
- 通過 java.util.ServiceLoader 的加載機制來發現
2)SPI的實際應用
JDK 本身提供了資料訪問的 api,在 java.sql 這個包里面,
java.sql.Driver 的原始碼,Driver 并沒有實作,而是提供了一套標準的 api 介面,

通過 SPI 機制把 java.sql.Driver 和 mysql 的驅動做了集成,達到了各個資料庫廠商自己去實作資料庫連接,jdk 本身不關心你怎么實作,
門面模式?配接器模式?
3)SPI的缺點
- JDK 標準的 SPI 會一次性加載實體化擴展點的所有實作,什么意思呢?就是如果你在 META-INF/service 下的檔案里面加了 N個實作類,那么 JDK 啟動的時候都會一次性全部加載,那么如果有的擴展點實作初始化很耗時或者如果有些實作類并沒有用到,那么會很浪費資源
- 如果擴展點加載失敗,會導致呼叫方報錯,而且這個錯誤很難定位到是這個原因,
2.Dubbo 優化后的 SPI
1)基于 Dubbo SPI 的實作自己的擴展
Dubbo 的 SPI 擴展機制,有兩個規則
- 需要在 resource 目錄下配置 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services,并基于 SPI 介面去創建一個檔案,
- 檔案名稱和介面名稱保持一致,檔案內容和 SPI 有差異,內容是 KEY 對應 Value
Dubbo 針對的擴展點非常多,可以針對協議、攔截、集群、路由、負載均衡、序列化、容器… 幾乎里面用到的所有功能,都可以實作自己的擴展,這個是 dubbo 比較強大的一點,

2)擴展協議擴展點
- 創建如下結構,添加 META-INF.dubbo 檔案,類名和 Dubbo 提供的協議擴展點介面保持一致,

myProtocol=com.yhd.dubboprovider.diy.MyProtocol
- 創建 MyProtocol 協議類
- 可以實作自己的協議,我們為了模擬協議產生了作用,修改一個埠
public class MyProtocol implements Protocol {
@Override
public int getDefaultPort() {
return 8888;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return null;
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return null;
}
@Override
public void destroy() {
}
}
- 在呼叫處執行如下代碼
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");
System.out.print(protocol.getDefaultPort)
- 輸出結果,可以看到運行結果,是執行的自定義的協議擴展點,
- 總結:總的來說,思路和 SPI 是差不多,都是基于約定的路徑下制定組態檔,目的,通過配置的方式輕松實作功能的擴展,
一定有一個地方通過讀取指定路徑下的所有檔案進行 load,然后講對應的結果保存到一個 map 中,key 對應為名稱,value 對應為實作類,那么這個實作,一定就在 ExtensionLoader 中了,
3.Dubbo 的擴展點原理實作
Dubbo SPI和JDK SPI配置的不同,在Dubbo SPI中可以通過鍵值對的方式進行配置,這樣就可以按需加載指定的實作類,
Dubbo SPI的相關邏輯都被封裝到ExtensionLoader類中,通過ExtensionLoader我們可以加載指定的實作類,一個擴展介面就對應一個ExtensionLoader物件,在這里我們把它稱為:擴展點加載器,
1)屬性
public class ExtensionLoader<T> {
//擴展點組態檔的路徑,可以從3個地方加載到擴展點組態檔
private static final String SERVICES_DIRECTORY = "META-INF/services/";
private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";
//擴展點加載器的集合
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
//擴展點實作的集合
private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
//擴展點名稱和實作的映射快取
private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
//拓展點實作類集合快取
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
//擴展點名稱和@Activate的映射快取
private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
//擴展點實作的快取
private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
}
ExtensionLoader會把不同的擴展點配置和實作都快取起來,同時,Dubbo在官網上也給了我們提醒:擴展點使用單一實體加載(請確保擴展實作的執行緒安全性),快取在 ExtensionLoader中,下面我們看幾個重點方法,
2)獲取擴展點加載器
我們首先通過ExtensionLoader.getExtensionLoader() 方法獲取一個 ExtensionLoader 實體,它就是擴展點加載器,然后再通過 ExtensionLoader 的 getExtension 方法獲取拓展類物件,這其中,getExtensionLoader 方法用于從快取中獲取與拓展類對應的 ExtensionLoader,若快取未命中,則創建一個新的實體,
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null)
throw new IllegalArgumentException("Extension type == null");
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type(" + type +
") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
比如你可以通過下面這樣,來獲取Protocol介面的ExtensionLoader實體:
ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
就可以拿到擴展點加載器的物件實體:
com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]
3)獲取擴展類物件
上一步我們已經拿到加載器,然后可以根據加載器實體,通過擴展點的名稱獲取擴展類物件,
public T getExtension(String name) {
//校驗擴展點名稱的合法性
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
// 獲取默認的拓展實作類
if ("true".equals(name)) {
return getDefaultExtension();
}
//用于持有目標物件
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
它先嘗試從快取中獲取,未命中則創建擴展物件,那么它的創建程序是怎樣的呢?
private T createExtension(String name) {
//從組態檔中獲取所有的擴展類,Map資料結構
//然后根據名稱獲取對應的擴展類
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
//通過反射創建實體,然后放入快取
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
//注入依賴
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
// 包裝為Wrapper實體
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
這里的重點有兩個,依賴注入和Wrapper包裝類,它們是Dubbo中IOC 與 AOP 的具體實作,
①依賴注入
向拓展物件中注入依賴,它會獲取類的所有方法,判斷方法是否以 set 開頭,且方法僅有一個引數,且方法訪問級別為 public,就通過反射設定屬性值,所以說,Dubbo中的IOC僅支持以setter方式注入,
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
②Wrapper
它會將當前 instance 作為引數傳給 Wrapper 的構造方法,并通過反射創建 Wrapper 實體, 然后向 Wrapper 實體中注入依賴,最后將 Wrapper 實體再次賦值給 instance 變數,說起來可能比較繞,我們直接看下它最后生成的物件就明白了,
我們以DubboProtocol為例,它包裝后的物件為:

綜上所述,如果我們獲取一個擴展類物件,最后拿到的就是這個Wrapper類的實體,
就像這樣:
ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol extension = extensionLoader.getExtension("dubbo");
System.out.println(extension);
輸出為:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper@4cdf35a9,
4)獲取所有的擴展類
在我們通過名稱獲取擴展類物件之前,首先需要根據組態檔決議出所有的擴展類,
它是一個擴展點名稱和擴展類的映射表Map<String, Class<?>>
首先,還是從快取中cachedClasses獲取,如果沒有就呼叫loadExtensionClasses從組態檔中加載,組態檔有三個路徑:
- META-INF/services/
- META-INF/dubbo/
- META-INF/dubbo/internal/
先嘗試從快取中獲取,
private Map<String, Class<?>> getExtensionClasses() {
//從快取中獲取
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
//加載擴展類
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}
如果沒有,就呼叫loadExtensionClasses從組態檔中讀取,
private Map<String, Class<?>> loadExtensionClasses() {
//獲取 SPI 注解,這里的 type 變數是在呼叫 getExtensionLoader 方法時傳入的
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation != null) {
String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("more than 1 default extension
name on extension " + type.getName()+ ": " + Arrays.toString(names));
}
//設定默認的擴展名稱,參考getDefaultExtension 方法
//如果名稱為true,就是呼叫默認擴贊類
if (names.length == 1) cachedDefaultName = names[0];
}
}
//加載指定路徑的組態檔
Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadDirectory(extensionClasses, DUBBO_DIRECTORY);
loadDirectory(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}
以Protocol介面為例,獲取到的實作類集合如下,我們就可以根據名稱加載具體的擴展類物件,
{
registry=class com.alibaba.dubbo.registry.integration.RegistryProtocol
injvm=class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
thrift=class com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
mock=class com.alibaba.dubbo.rpc.support.MockProtocol
dubbo=class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
http=class com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
redis=class com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
rmi=class com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
}

4.自適應擴展機制
在Dubbo中,很多拓展都是通過 SPI 機制進行加載的,比如 Protocol、Cluster、LoadBalance 等,這些擴展并非在框架啟動階段就被加載,而是在擴展方法被呼叫的時候,根據URL物件引數進行加載,
那么,Dubbo就是通過自適應擴展機制來解決這個問題,
自適應拓展機制的實作邏輯是這樣的:
首先 Dubbo 會為拓展介面生成具有代理功能的代碼,然后通過 javassist 或 jdk 編譯這段代碼,得到 Class 類,最后再通過反射創建代理類,在代理類中,就可以通過URL物件的引數來確定到底呼叫哪個實作類,
1)Adaptive注解
在開始之前,我們有必要先看一下與自適應拓展息息相關的一個注解,即 Adaptive 注解,
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
從上面的代碼中可知,Adaptive 可注解在類或方法上,
- 標注在類上
Dubbo 不會為該類生成代理類, - 標注在方法上
Dubbo 則會為該方法生成代理邏輯,表示當前方法需要根據 引數URL 呼叫對應的擴展點實作,
2)獲取自適應拓展類
getAdaptiveExtension 方法是獲取自適應拓展的入口方法,
public T getAdaptiveExtension() {
// 從快取中獲取自適應拓展
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
//未命中快取,則創建自適應拓展,然后放入快取
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create
adaptive instance: " + t.toString(), t);
}
}
}
}
}
return (T) instance;
}
getAdaptiveExtension方法首先會檢查快取,快取未命中,則呼叫 createAdaptiveExtension方法創建自適應拓展,
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("
Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
這里的代碼較少,呼叫 getAdaptiveExtensionClass方法獲取自適應拓展 Class 物件,然后通過反射實體化,最后呼叫injectExtension方法向拓展實體中注入依賴,
獲取自適應擴展類程序如下:
private Class<?> getAdaptiveExtensionClass() {
//獲取當前介面的所有實作類
//如果某個實作類標注了@Adaptive,此時cachedAdaptiveClass不為空
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
//以上條件不成立,就創建自適應拓展類
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
在上面方法中,它會先獲取當前介面的所有實作類,如果某個實作類標注了@Adaptive,那么該類就被賦值給cachedAdaptiveClass變數并回傳,如果沒有,就呼叫createAdaptiveExtensionClass創建自適應拓展類,
private Class<?> createAdaptiveExtensionClass() {
//構建自適應拓展代碼
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
// 獲取編譯器實作類 這個Dubbo默認是采用javassist
Compiler compiler =ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
//編譯代碼,回傳類實體的物件
return compiler.compile(code, classLoader);
}
在生成自適應擴展類之前,Dubbo會檢查介面方法是否包含@Adaptive,如果方法上都沒有此注解,就要拋出例外,
if (!hasAdaptiveAnnotation){
throw new IllegalStateException(
"No adaptive method on extension " + type.getName() + ",
refuse to create the adaptive class!");
}
我們還是以Protocol介面為例,它的export()和refer()方法,都標注為@Adaptive,destroy和 getDefaultPort未標注 @Adaptive注解,Dubbo 不會為沒有標注 Adaptive 注解的方法生成代理邏輯,對于該種型別的方法,僅會生成一句拋出例外的代碼,
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
}
所以說當我們呼叫這兩個方法的時候,會先拿到URL物件中的協議名稱,再根據名稱找到具體的擴展點實作類,然后去呼叫,下面是生成自適應擴展類實體的源代碼:
package com.viewscenes.netsupervisor.adaptive;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;
public class Protocol$Adaptive implements Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");
}
public Exporter export(Invoker invoker)throws RpcException {
if (invoker == null) {
throw new IllegalArgumentException("Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("Invoker argument getUrl() == null");
}
URL url = invoker.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException("Fail to get extension(Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
}
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.export(invoker);
}
public Invoker refer(Class clazz,URL ur)throws RpcException {
if (ur == null) {
throw new IllegalArgumentException("url == null");
}
URL url = ur;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");
}
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
return extension.refer(clazz, url);
}
}
綜上所述,當我們獲取某個介面的自適應擴展類,實際就是一個Adaptive類實體,
ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol adaptiveExtension = extensionLoader.getAdaptiveExtension();
System.out.println(adaptiveExtension);
輸出為:
com.alibaba.dubbo.rpc.Protocol$Adaptive@47f6473

5.自動激活擴展點機制
自動激活擴展點,有點類似springboot 用到的 conditional,根據條件進行自動激活,但是這里設計的初衷是,對于一個類會加載多個擴展點的實作,這個時候可以通過自動激活擴展點進行動態加載, 從而簡化配置我們的配置,
@Activate 提供了一些配置來允許我們配置加載條件,比如 group 過濾,比如 key 過濾,
我們可以看看 org.apache.dubbo.Filter 這個類,它有非常多的實作,比如說 CacheFilter,這個快取過濾器,配置資訊如下:
group 表示客戶端和和服務端都會加載,value 表示 url 中有 cache_key 的時候
@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
public class CacheFilter implements Filter {
通過下面這段代碼,演示關于 Filter 的自動激活擴展點的效果,沒有添加“注釋代碼”時,list 的結果是 10,添加之后 list激活擴展點的效果,沒有添加“注釋代碼”時,list 的結果是 10,添加之后 list,會自動把 cacheFilter 加載進來,
public static void main(String[] args) {
SpringApplication.run(DubboProviderApplication.class, args);
/*Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");
System.out.println(protocol.getDefaultPort());*/
/*Compiler compiler = ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
//org.apache.dubbo.common.compiler.support.AdaptiveCompiler
System.out.println("compiler.getClass() = " + compiler.getClass());*/
ExtensionLoader<Filter> extensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
URL url = new URL("", "", 0);
List<Filter> filters = extensionLoader.getActivateExtension(//url.addParameter("cache","cache"), "cache");
System.out.println("filters.size() = " + filters.size());
}
這個方法的底層邏輯其實就是先獲取到所有對應的激活擴展類,在拿到URL,根據 @Activate 獲取到對應的擴展類組合在一起回傳,
十一,Dubbo原理-框架設計

config 配置層:對外配置介面,以 ServiceConfig, ReferenceConfig 為中心,可以直接初始化配置類,也可以通過 spring 決議配置生成配置類
proxy 服務代理層:服務介面透明代理,生成服務的客戶端 Stub 和服務器端 Skeleton, 以 ServiceProxy 為中心,擴展介面為 ProxyFactory
registry 注冊中心層:封裝服務地址的注冊與發現,以服務 URL 為中心,擴展介面為 RegistryFactory, Registry, RegistryService
cluster 路由層:封裝多個提供者的路由及負載均衡,并橋接注冊中心,以 Invoker 為中心,擴展介面為 Cluster, Directory, Router, LoadBalance
monitor 監控層:RPC 呼叫次數和呼叫時間監控,以 Statistics 為中心,擴展介面為 MonitorFactory, Monitor, MonitorService
protocol 遠程呼叫層:封裝 RPC 呼叫,以 Invocation, Result 為中心,擴展介面為 Protocol, Invoker, Exporter
exchange 資訊交換層:封裝請求回應模式,同步轉異步,以 Request, Response 為中心,擴展介面為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 網路傳輸層:抽象 mina 和 netty 為統一介面,以 Message 為中心,擴展介面為 Channel, Transporter, Client, Server, Codec
serialize 資料序列化層:可復用的一些工具,擴展介面為 Serialization, ObjectInput, ObjectOutput, ThreadPool

十二,服務暴露
分析:如果需要完成服務發布預注冊,需要實作哪些事情?
- 決議組態檔或注解
- 服務注冊
- 啟動netty服務實作遠程監聽
1.dubbo對于spring的擴展
1)spring的標簽擴展
在 spring 中定義了兩個介面
- NamespaceHandler: 注冊一堆 BeanDefinitionParser,利用他們來進行決議
- BeanDefinitionParser:用于決議每個 element 的內容
Spring 默認會加載 jar 包下的 META-INF/spring.handlers 檔案尋找對應的 NamespaceHandler, Dubbo-config 模塊下的 dubbo-config-spring就含有這個檔案,
2)dubbo的接入實作
Dubbo 中 spring 擴展就是使用 spring 的自定義型別,所以同樣也有 NamespaceHandler、BeanDefinitionParser,而NamespaceHandler 是 DubboNamespaceHandler,
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我們想看 dubbo:service 的配置,就直接看 DubboBeanDefinitionParser(ServiceBean.class,true),
這個里面主要做了一件事,把不同的配置分別轉化成 spring 容器中的 bean 物件
| application | ApplicationConfig |
|---|---|
| registry | RegistryConfig |
| monitor | MonitorConfig |
| provider | ProviderConfig |
| consumer | ConsumerConfig |
涉及到服務發布和服務呼叫的兩個配置的決議,用的是 ServiceBean 和 referenceBean,并不是 config 結尾的,這兩個類稍微特殊些,當然他同時也繼承了 ServiceConfig 和 ReferenceConfig,
3)DubboBeanDefinitionParser
這里面是實作具體組態檔決議的入口,它重寫了 parse 方法,對 spring 的配置進行決議,我們關注一下 ServiceBean 的決議.實際就是決議 dubbo:service 這個標簽中對應的屬性,
else if (ServiceBean.class.equals(beanClass)) {
String className = resolveAttribute(element, "class", parserContext);
if (StringUtils.isNotEmpty(className)) {
RootBeanDefinition classDefinition = new RootBeanDefinition();
classDefinition.setBeanClass(ReflectUtils.forName(className));
classDefinition.setLazyInit(false);
parseProperties(element.getChildNodes(), classDefinition, parserContext);
beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
}
}
4)ServiceBean的實作
ServiceBean 這個類,分別實作了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener,BeanNameAware, ApplicationEventPublisherAware,
①InitializingBean
介面為 bean 提供了初始化方法的方式,它只包括 afterPropertiesSet 方法,凡是繼承該介面的類,在初始化 bean 的時候會執行該方法,被重寫的方法為 afterPropertiesSet,
②DisposableBean
被重寫的方法為 destroy ,bean 被銷毀的時候,spring 容器會自動執行 destory 方法,比如釋放資源,
③ApplicationContextAware
實作了這個介面的 bean,當 spring 容器初始化的時候,會自動的將 ApplicationContext 注入進來,
④ApplicationListener
ApplicationEvent 事件監聽,spring 容器啟動后會發一個事件通知,被重寫的方法為: onApplicationEvent ,onApplicationEvent方法傳入的物件是 ContextRefreshedEvent,這個物件是當 Spring 的背景關系被重繪或者加載完畢的時候觸發的,因此服務就是在Spring 的背景關系重繪后進行匯出操作的,
⑤BeanNameAware
獲得自身初始化時,本身的 bean 的 id 屬性,被重寫的方法為 setBeanName,
⑥ApplicationEventPublisherAware
這個是一個異步事件發送器,被重寫的方法為 setApplicationEventPublisher ,簡單來說,在 spring 里面提供了類似于訊息佇列的異步事件解耦功能,(典型的觀察者模式的應用),
⑦spring 事件發送監聽由 3 個部分組成
- ApplicationEvent:表示事件本身,自定義事件需要繼承該類
- ApplicationEventPublisherAware:事件發送器,需要實作該介面
- ApplicationListener:事件監聽器介面
5)總結-dubbo啟動決議,加載組態檔

在dubbo的META-INF目錄下有一個 spring.handlers 組態檔,容器啟動就會加載這個組態檔,
這個組態檔里面注冊了一個 Bean 叫做 DubboNamespaceHandler ,這個 bean 就是dubbo 組態檔處理器,
在這個 bean里面有一個init(),這個方法里面會加載很多的 DubboBeanDefinitionParser ,DubboBeanDefinitionParser就是組態檔決議器,(除了 service 和 reference 對應的類叫 ServiceBean 和 ReferenceBean ,其他對應標簽對應的類都是xxxConfig),
DubboBeanDefinitionParser 的parse() 就是對標簽的決議,他會判斷這個標簽所屬的型別,然后根據標簽的配置進行屬性填充,最終將所有的bean注冊到配置中心,
2.ServiceBean 中服務暴露程序
在 ServiceBean 中,我們暫且只需要關注兩個方法,分別是:
- 在初始化 bean 的時候會執行該方法 afterPropertiesSet
- spring 容器啟動后會發一個事件通知 onApplicationEvent
1)afterPropertiesSet
這個方法里面,就是把 dubbo 中配置的 application 、 registry 、 service 、 protocol 等資訊,加載到對應的 config物體中,便于后續的使用,
@Override
public void afterPropertiesSet() throws Exception {
if (StringUtils.isEmpty(getPath())) {
if (StringUtils.isNotEmpty(getInterface())) {
setPath(getInterface());
}
}
}
2)onApplicationEvent
spring 容器啟動之后,會收到一個這樣的事件通知,這里面做了兩個事情
- 判斷服務是否已經發布過
- 如果沒有發布,則呼叫呼叫 export 進行服務發布的流程 入口
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
3)export
serviceBean 中,重寫了 export 方法,實作了 一個事件的發布,并且呼叫了 super.export() ,也就是會呼叫父類的 export 方法,
@Override
public void exported() {
super.exported();
// Publish ServiceBeanExportedEvent
publishExportEvent();
}
3.ServiceConfig配置類
所有的配置它都放在了一個 AbstractServiceConfig 的抽象類,自己實作了很多對于服務發布之前要做的操作邏輯,
1)export
public synchronized void export() {
// 當前的服務是否需要發布 , 通過配置實作: @Service(export = false)
if (!shouldExport()) {
return;
}
//2.7.5版本新增,多了一個Dubbo的引導類
if (bootstrap == null) {
//獲取引導類實體
bootstrap = DubboBootstrap.getInstance();
//呼叫引導類的初始化方法
bootstrap.initialize();
}
//檢查并更新配置
checkAndUpdateSubConfigs();
//初始化元資料中心 , 2.75 版本新增 ,用來存盤配置資訊,減輕配置中心壓力
serviceMetadata.setVersion(getVersion());
serviceMetadata.setGroup(getGroup());
serviceMetadata.setDefaultGroup(getGroup());
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
// 檢查是否需要延時發布,通過配置 @Service(delay = 1000) 實作,單位毫秒
if (shouldDelay()) {
// 這里的延時是通過定時器來實作
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
// 如果沒有配置 delay ,則直接呼叫 export 進行發布
doExport();
}
//發布一個事件 TODO
exported();
}
2)doExport
這里仍然還是在實作發布前的各種判斷
protected synchronized void doExport() {
if (unexported) {
throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
}
// 服務是否已經發布過了
if (exported) {
return;
}
exported = true;// 設定發布狀態
if (StringUtils.isEmpty(path)) {
path = interfaceName;//path 表示服務路徑,默認使用 interfaceName
}
doExportUrls();
}
3)doExportUrls
- 加載所有配置的注冊中心地址
- 遍歷所有配置的協議,protocols
- 針對每種協議發布一個對應協議的服務
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
// 加載所有配置的注冊中心的地址,組裝成一個 URL集合
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
for (ProtocolConfig protocolConfig : protocols) {
//group 跟 version 組成一個 pathKey(serviceName)
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// 服務注冊 TODO TODO TODO 重要的事情標記三遍
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
//發布指定協議的服務
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
4) doExportUrlsFor1Protocol
發布指定協議的服務,以 Dubbo 服務為例,
// export service 發布服務
//獲取當前要發布服務的IP和埠
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
//組裝URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// You can customize Configurator to append extra parameters
//通過 ConfiguratorFactory 去實作動態改變配置的功能 ,動態配置 TODO
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
//如果 scope!="none"則發布服務,默認 scope 為 null,
//如果 scope 不為 none,判斷是否為 local 或 remote,
//從而發布 Local 服務或 Remote 服務,默認兩個都會發布
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
//發布本地服務 injvm
//服務只是 injvm 的服務,提供一種消費者和提供者都在一個 jvm 內的呼叫方式,使用了 Injvm 協議,是一個偽協議,它不開啟埠,不發起遠程呼叫,只在 JVM 內直接關聯,(通過集合的方式保存了發布的服務資訊),但執行 Dubbo 的 Filter 鏈,簡單來說,就是你本地的 dubbo 服務呼叫,都依托于 dubbo 的標準來進行,這樣可以享受到 dubbo 的一些配置服務,
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//Invoker 是一個代理類,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起invoke 呼叫,它有可能是一個本地的實作,也可能是一個遠程的實作,也可能一個集群實作,
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//因為 2.7 引入了元資料,所以這里對 invoker 做了委托,把 invoker 交給DelegateProviderMetaDataInvoker 來處理,
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//發布代理
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
//添加到 exporters 集合
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
5)PROTOCOL.export
protocol.export,這個 protocol 是什么呢?找到定義處發現它是一個自適應擴展點,打開 Protocol 這個擴展點,又可以看到它是一個在方法層面上的自適應擴展,意味著它實作了對于 export 這個方法的適配,也就意味著這個 Protocol 是一個動態代理類,Protocol$Adaptive,
這個動態代理類,會根據 url 中配置的 protocol name 來實作對應協議的適配,
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
6)Protocol$Adaptive
在當前的場景中,protocol 會是呼叫誰呢?目前發布的 invoker(URL),實際上是一個 registry://協議,所以Protocol$Adaptive,會通過 getExtension(extName)得到一個 RegistryProtocol,
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
//獲取擴展類
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
//擴展類執行服務暴露方法
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
4.RegistryProtocol.export
很明顯,這個 RegistryProtocol 是用來實作服務注冊的,這里面會有很多處理邏輯,
- 實作對應協議的服務發布
- 實作服務注冊
- 訂閱服務重寫
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 這里獲得的是 zookeeper 注冊中心的 url: zookeeper://ip:port
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
// 這里是獲得服務提供者的 url, dubbo://ip:port...
URL providerUrl = getProviderUrl(originInvoker);
// 訂閱 override 資料,在 admin 控制臺可以針對服務進行治理,比如修改權重,修改路由機制等,當注冊中心有此服務的覆寫配置注冊進來時,推送訊息給提供者,重新暴露服務
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//========================================================================//
// 這里就交給了具體的協議去暴露服務,本質上這里就是啟動一個netty
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根據 invoker 中的 url 獲取 Registry 實體 : zookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 獲取要注冊到注冊中心的 URL: dubbo://ip:port
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
// 是否配置了注冊中心,如果是, 則需要注冊
if (register) {
register(registryUrl, registeredProviderUrl);
}
// 設定注冊中心的訂閱
registerStatedUrl(registryUrl, registeredProviderUrl, register);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
notifyExport(exporter);
// 保證每次 export 都回傳一個新的 exporter 實體
return new DestroyableExporter<>(exporter);
}
1)doLocalExport
先通過 doLocalExport 來暴露一個服務,本質上應該是啟動一個通信服務,主要的步驟是將本地 ip 和 20880 埠打開,進行監聽originInvoker: 應該是 registry://ip:port/com.alibaba.dubbo.registry.RegistryService
key: 從 originInvoker 中獲得發布協議的 url: dubbo://ip:port/...
bounds: 一個 prviderUrl 服務 export 之后,快取到 bounds 中,所以一個 providerUrl 只會對應一個 exporter
//computeIfAbsent 就相當于, java8 的語法
if(bounds.get(key)==null){
bounds.put(key,s->{})
}
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
// 對原有的 invoker, 委托給了 InvokerDelegate
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
// 將 invoker 轉換為 exporter 并啟動 netty 服務 ----》DubboProtocol
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
InvokerDelegete: 是 RegistryProtocol 的一個靜態內部類,該類是一個 originInvoker 的委托類,該類存盤了 originInvoker,其父類 InvokerWrapper 還會存盤 providerUrl,InvokerWrapper 會呼叫 originInvoker 的 invoke 方法,也會銷毀 invoker,可以管理 invoker 的生命周期,
5.DubboProtocol.export
基于動態代理的適配,很自然的就過渡到了 DubboProtocol 這個協議類中,但是實際上是 DubboProtocol 嗎?
這里并不是獲得一個單純的 DubboProtocol 擴展點,而是會通過 Wrapper 對 Protocol 進行裝飾,裝飾器分別為:
QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol,
為什么會這樣?回頭看SPI機制,
1)Wrapper包裝
在 ExtensionLoader.loadClass 這個方法中,有一段這樣的判斷,如果當前這個類是一個 wrapper 包裝類,也就是這個 wrapper中有構造方法,引數是當前被加載的擴展點的型別,則把這個 wrapper 類加入到 cacheWrapperClass 快取中,
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
} else if (isWrapperClass(clazz)) {
//如果當前這個類是一個 wrapper 包裝類,也就是這個 wrapper中有構造方法 引數是當前被加載的擴展點的型別
//則把這個 wrapper 類加入到 cacheWrapperClass 快取中,
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
cacheName(clazz, n);
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
private boolean isWrapperClass(Class<?> clazz) {
try {
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
我們可以在 dubbo 的組態檔中找到三個 Wrapper
QosprotocolWrapper:如果當前配置了注冊中心,則會啟動一個 Qos ,與在線運維相關,
ProtocolFilterWrapper:對 invoker 進行 filter 的包裝,實作請求的過濾,
ProtocolListenerWrapper:用于服務 export 時候插入監聽機制,暫未實作,
接著,在 getExtension->createExtension 方法中,會對 cacheWrapperClass 集合進行判斷,如果集合不為空,則進行包裝
if (wrap) {
List<Class<?>> wrapperClassesList = new ArrayList<>();
if (cachedWrapperClasses != null) {
wrapperClassesList.addAll(cachedWrapperClasses);
wrapperClassesList.sort(WrapperComparator.COMPARATOR);
Collections.reverse(wrapperClassesList);
}
if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
for (Class<?> wrapperClass : wrapperClassesList) {
Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
if (wrapper == null
|| (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
}
}
2)ProtocolFilterWrapper
這個是一個過濾器的包裝,使用責任鏈模式,對 invoker 進行了包裝,
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (UrlUtils.isRegistry(invoker.getUrl())) {
return protocol.export(invoker);
}
// 構建責任鏈,基于激活擴展點
return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
//下面代碼省略
}
我們看如下檔案:/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter
默認提供了非常多的過濾器, 然后基于條件激活擴展點,來對 invoker 進行包裝,從而在實作遠程呼叫的時候,會經過這些filter 進行過濾,
3)DubboProtocol.export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 獲取服務標識,理解成服務坐標也行,由服務組名,服務名,服務版本號以及埠組成,比如
//${group}/copm.yhd.practice.dubbo.ISayHelloService:${version}:20880
String key = serviceKey(url);
// 創建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 將 <key, exporter> 鍵值對放入快取中
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
//啟動服務
openServer(url);
//優化序列化
optimizeSerialization(url);
return exporter;
}
4)openServer
去開啟一個服務,并且放入到快取中->在同一臺機器上(單網卡),同一個埠上僅允許啟動一個服務器實體,
private void openServer(URL url) {
// find server.
// 獲取 host:port ,并將其作為服務器實體的 key ,用于標識當前的服務器實體
String key = url.getAddress();
//client can export a service which's only for server to invoke
//client 也可以暴露一個只有 server 可以呼叫的服務
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
//是否在 serverMap 中快取了
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//創建服務器實體
serverMap.put(key, createServer(url));
}
}
} else {
//服務器已創建,則根據 url 中的配置重置服務器
// server supports reset, use together with override
server.reset(url);
}
}
}
5)createServer
創建服務,開啟心跳檢測,默認使用 netty,組裝 url,
private ProtocolServer createServer(URL url) {
// 組裝 url ,在 url 中添加心跳時間、編解碼引數
url = URLBuilder.from(url)
// 當服務關閉以后,發送一個只讀的事件,默認是開啟狀態
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
// 啟動心跳配置
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通過 SPI 檢測是否存在 server 引數所代表的 Transporter 拓展,不存在則拋出例外
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
// 創建 ExchangeServer
ExchangeServer server;
try {
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
6)Exchangers.bind
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
// 獲取 Exchanger ,默認為 HeaderExchanger
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 呼叫 HeaderExchanger 的 bind 方法創建 ExchangeServer 實體
return getExchanger(url).bind(url, handler);
}
7)headerExchanger.bind
這里面包含多個邏輯
- new DecodeHandler(new HeaderExchangeHandler(handler))
- Transporters.bind
- new HeaderExchangeServer
目前只需要關心 transporters.bind 方法即可
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
8)Transporters.bind
public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素數量大于 1 ,則創建 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應 Transporter 實體,并呼叫實體方法
return getTransporter().bind(url, handler);
}
9)getTransporter
getTransporter 是一個自適應擴展點,它針對 bind 方法添加了自適應注解,意味著,bing 方法的具體實作,會基于Transporter$Adaptive 方法進行適配,那么在這里面默認的通信協議是 netty,所以它會采用 netty4 的實作,也就是org.apache.dubbo.remoting.transport.netty4.NettyTransporter,
10)NettyTransporter.bind
public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
return new NettyServer(url, handler);
}
6.NettyServer
初始化一個 nettyserver,并且從 url 中獲得相應的 ip/ port,然后呼叫 doOpen();
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
}
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
//獲取IP
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
//獲取埠
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
bindIp = ANYHOST_VALUE;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
try {
// 呼叫模板方法 doOpen 啟動服務器
doOpen();
if (logger.isInfoEnabled()) {
logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
}
} catch (Throwable t) {
throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
+ " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
}
executor = executorRepository.createExecutorIfAbsent(url);
}
1)doOpen
開啟 netty 服務
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
workerGroup = NettyEventLoopFactory.eventLoopGroup(
getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
"NettyServerWorker");
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
bootstrap.group(bossGroup, workerGroup)
.channel(NettyEventLoopFactory.serverSocketChannelClass())
.option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
ch.pipeline().addLast("negotiation",
SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
}
ch.pipeline()
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
// bind
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
這里用到了一個 handler 來處理客戶端傳遞過來的請求NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
這個 handler 是一個鏈路,后續接收到的請求,會一層一層的處理,
7.Invoker是什么
從前面的分析來看,服務的發布分三個階段
- 創造一個 invoker
- 把經歷過一系列處理的 invoker(各種包裝),在 DubboProtocol 中保存到 exporterMap 中
- 把 dubbo 協議的 url 地址注冊到注冊中心上
Invoker 是 Dubbo 領域模型中非常重要的一個概念, 和 ExtensionLoader 的重要性是一樣的,回到 ServiceConfig 中 export 的代碼,這段代碼是還沒有分析過的,以這個作為入口來分析export 出去的 invoker 到底是啥東西,
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
1)ProxyFacotory.getInvoker
這個是一個代理工程,用來生成 invoker,從它的定義來看,它是一個自適應擴展點,看到這樣的擴展點,我們幾乎可以不假思索的想到它會存在一個動態配接器類,
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
2)ProxyFactory
這個方法的簡單解讀為: 它是一個 spi 擴展點,并且默認的擴展實作是 javassit, 這個介面中有三個方法,并且都是加了@Adaptive 的自適應擴展點,所以如果呼叫 getInvoker 方法,應該會回傳一個 ProxyFactory$Adaptiv,
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({PROXY_KEY})
<T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;
@Adaptive({PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
3)ProxyFactory$Adaptive
這個自適應擴展點,做了兩件事情
- 通過
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)獲取了一個指定名稱的擴展點- 在
dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory中,定義了javassis=JavassisProxyFactory
- 在
- 呼叫
JavassisProxyFactory的getInvoker方法
javassist 是一個動態類別庫,用來實作動態代理的,構建好了代理類之后,回傳一個 AbstractproxyInvoker,并且它實作了 doInvoke 方法,這個地方似乎看到了 dubbo 消費者呼叫過來的時候觸發的影子,因為 wrapper.invokeMethod 本質上就是觸發上面動態代理類的方法 invokeMethod,
總結一下 Invoke 本質上應該是一個代理,經過層層包裝最終進行了發布,當消費者發起請求的時候,會獲得這個invoker 進行呼叫,最終發布出去的 invoker, 也不是單純的一個代理,也是經過多層包裝,

十三,服務注冊
1.服務注冊的核心邏輯
服務發布這一條線分析完成之后,再來了解一下服務注冊的程序,
在看服務發布的原始碼的時候,在RegistryProtocol這個類中,看到了服務發布的流程,
從export方法中抽離出來的部分代碼,就是服務注冊的流程,
// 根據 invoker 中的 url 獲取 Registry 實體 : zookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 獲取要注冊到注冊中心的 URL: dubbo://ip:port
final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
// 是否配置了注冊中心,如果是, 則需要注冊
if (register) {
register(registryUrl, registeredProviderUrl);
}
2.getRegistry
- 把url轉化為對應配置的注冊中心的具體協議
- 根據具體協議,從registryFactory中獲得指定的注冊中心實作
這個registryFactory具體是怎么賦值的呢?
protected Registry getRegistry(final Invoker<?> originInvoker) {
//把url轉化為配置的具體協議,比如zookeeper://ip:port. 這樣后續獲得的注冊中心就會是基于zk的實作
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
在RegistryProtocol中存在一段這樣的代碼,很明顯這是通過依賴注入來實作的擴展點,
private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
this.registryFactory = registryFactory;
}
按照擴展點的加載規則,我們可以先看看/META-INF/dubbo/internal路徑下找到RegistryFactory的組態檔.這個factory有多個擴展點的實作,
dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
接著,找到RegistryFactory的實作, 發現它里面有一個自適應的方法,根據url中protocol傳入的值進行適配,
@SPI("dubbo")
public interface RegistryFactory {
}
3.RegistryFactory$Adaptive
由于在前面的代碼中,url中的protocol已經改成了zookeeper,那么這個時候根據zookeeper獲得的spi擴展點應該是ZookeeperRegistryFactory,
import org.apache.dubbo.common.extension.ExtensionLoader;
public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {
public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
if (arg0 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg0;
String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );
if(extName == null) throw new IllegalStateException("Failed to getextension (org.apache.dubbo.registry.RegistryFactory) name from url (" +url.toString() + ") use keys([protocol])");
org.apache.dubbo.registry.RegistryFactory extension =(org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
return extension.getRegistry(arg0);
}
}
4.ZookeeperRegistryFactory
這個方法中并沒有getRegistry方法,而是在父類AbstractRegistryFactory
- 從快取REGISTRIES中,根據key獲得對應的Registry
- 如果不存在,則創建Registry
@Override
public Registry getRegistry(URL url) {
if (destroyed.get()) {
return DEFAULT_NOP_REGISTRY;
}
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = createRegistryCacheKey(url);
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
//創建注冊中心
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
5.createRegistry
創建一個zookeeperRegistry,把url和zookeepertransporter作為引數傳入,
zookeeperTransporter 這個屬性也是基于依賴注入來賦值的,具體的流程就不再分析了,這個的值應該是CuratorZookeeperTransporter 表示具體使用什么框架來和zk產生連接,
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
6.ZookeeperRegistry
這個方法中使用了CuratorZookeeperTransport來實作zk的連接,
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//獲得group名稱
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
//產生一個zookeeper連接
zkClient = zookeeperTransporter.connect(url);
//添加zookeeper狀態變化事件
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
try {
ZookeeperRegistry.this.recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
} else if (state == StateListener.SESSION_LOST) {
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
}
});
}
7.register
獲取到了注冊中心之后,回到服務注冊的核心邏輯,開始呼叫register方法,去將dubbo://的協議地址注冊到zookeeper上
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}
這個方法會呼叫FailbackRegistry類中的register. 為什么呢?因為ZookeeperRegistry這個類中并沒有register這個方法,但是他的父類FailbackRegistry中存在register方法,而這個類又重寫了AbstractRegistry類中的register方法,所以我們可以直接定位大FailbackRegistry這個類中的register方法中
8.FailbackRegistry.register
@Override
public void register(URL url) {
if (!acceptable(url)) {
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// 呼叫子類實作真正的服務注冊,把url注冊到zk上
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// If the startup detection is opened, the Exception is thrown directly.
// 如果開啟了啟動時檢測,則直接拋出例外
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
} else {
}
// Record a failed registration request to a failed list, retry regularly
// 將失敗的注冊請求記錄到失敗串列,定時重試
addFailedRegistered(url);
}
}
9.ZookeeperRegistry.doRegister
最終呼叫curator的客戶端把服務地址注冊到zk,
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}

十四,服務消費
1.服務消費應該具備的邏輯
- 生成一個代理物件–實作網路通信的細節
- 建立通信連接–netty
- 從zk獲取目標地址–訂閱節點變化
- 實作負載均衡
- 實作集群容錯
- mock
- 序列化
2.服務消費的入口
ReferenceAnnotationBeanPostProcessor->ReferenceBeanInvocationHandler.init->ReferenceConfig.get() //獲得一個遠程代理類
1)ReferenceConfig.get()
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {//如果當前介面的遠程代理參考為空,則進行初始化
init();
}
return ref;
}
2)init
初始化的程序,和服務發布的程序類似,會有特別多的判斷以及引數的組裝. 我們只需要關注createProxy,創建代理類的方法,
public synchronized void init() {
if (initialized) {
return;
}
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
checkAndUpdateSubConfigs();
checkStubAndLocal(interfaceClass);
ConfigValidationUtils.checkMock(interfaceClass, this);
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, CONSUMER_SIDE);
ReferenceConfigBase.appendRuntimeParameters(map);
if (!ProtocolUtils.isGeneric(generic)) {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
}
}
map.put(INTERFACE_KEY, interfaceName);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ConsumerConfig
// appendParameters(map, consumer, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, consumer);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
Map<String, AsyncMethodInfo> attributes = null;
if (CollectionUtils.isNotEmpty(getMethods())) {
attributes = new HashMap<>();
for (MethodConfig methodConfig : getMethods()) {
AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
String retryKey = methodConfig.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(methodConfig.getName() + ".retries", "0");
}
}
AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
if (asyncMethodInfo != null) {
// consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
attributes.put(methodConfig.getName(), asyncMethodInfo);
}
}
}
String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
if (StringUtils.isEmpty(hostToRegistry)) {
hostToRegistry = NetUtils.getLocalHost();
} else if (isInvalidLocalHost(hostToRegistry)) {
throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
}
map.put(REGISTER_IP_KEY, hostToRegistry);
serviceMetadata.getAttachments().putAll(map);
//創建代理
ref = createProxy(map);
serviceMetadata.setTarget(ref);
serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
consumerModel.setProxyObject(ref);
consumerModel.init(attributes);
initialized = true;
checkInvokerAvailable();
// dispatch a ReferenceConfigInitializedEvent since 2.7.4
dispatch(new ReferenceConfigInitializedEvent(this, invoker));
}
3)createProxy
- 判斷是否為本地呼叫,如果是則使用injvm協議進行呼叫
- 判斷是否為點對點呼叫,如果是則把url保存到urls集合中,如果url為1,進入步驟4,如果urls>1,則執行5
- 如果是配置了注冊中心,遍歷注冊中心,把url添加到urls集合,url為1,進入步驟4,如果urls>1,則執行5
- 直連構建invoker
- 構建invokers集合,通過cluster合并多個invoker
- 最后呼叫 ProxyFactory 生成代理類
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {//判斷是否是在同一個jvm行程中呼叫
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear();
//url 如果不為空,說明是點對點通信
if (url != null && url.length() > 0) {
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
// 檢測 url 協議是否為 registry,若是,表明用戶想使用指定的注冊中心
if (UrlUtils.isRegistry(url)) {
// 將 map 轉換為查詢字串,并作為 refer 引數的值添加到url 中
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服務提供者的一些配置(這些配置來源于用戶配置的 url 屬性),比如執行緒池相關配置,并保留服務提供者的部分配置,比如版本,group,時間戳等,最后將合并后的配置設定為 url 查詢字串中,
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
checkRegistry(); //校驗注冊中心的配置以及是否有必要從配置中心組裝url
//這里的代碼實作和服務端類似,也是根據注冊中心配置進行決議得到URL
//這里的URL肯定也是:registry://ip:port/org.apache.dubbo.service.RegsitryService
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {//如果沒有配置注冊中心,則報錯
throw new IllegalStateException();
}
}
}
//如果值配置了一個注冊中心或者一個服務提供者,直接使用refprotocol.refer
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {//遍歷urls生成多個invoker
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { //如果registryUrl不為空,構建靜態directory
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
// 通過Cluster將多個invoker合并RegistryAwareClusterInvoker(StaticDirectory) ->FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
String metadata = map.get(METADATA_KEY);
WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
if (metadataService != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataService.publishServiceDefinition(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
}
4)protocol.refer
這里通過指定的協議來呼叫refer生成一個invoker物件,invoker前面看過,它是一個代理物件,那么在當前的消費端而言,invoker主要用于執行遠程呼叫,
這個protocol,又是一個自適應擴展點,它得到的是一個Protocol$Adaptive.
根據當前的協議url,得到一個指定的擴展點,傳遞進來的引數中,協議地址為registry://,所以,我們可以直接定位到RegistryProtocol.refer代碼
5)RegistryProtocol.refer
- 組裝注冊中心協議的url
- 判斷是否配置了group,如果有,則cluster=getMergeableCluster(),構建invoker
- doRefer構建invoker
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
//根據配置的協議,生成注冊中心的url: zookeeper://
url = getRegistryUrl(url);
//獲取注冊中心
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 決議group引數,根據group決定cluster的型別
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
}
}
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
return doRefer(cluster, registry, type, url);
}
6)doRefer
- 構建一個RegistryDirectory
- 構建一個consumer://協議的地址注冊到注冊中心
- 訂閱zookeeper中節點的變化
- 呼叫cluster.join方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//RegistryDirectory初始化
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
//注冊consumer://協議的url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
//訂閱事件監聽
directory.subscribe(toSubscribeUrl(subscribeUrl));
//構建invoker
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
3.Cluster是什么
我們只關注一下Invoker這個代理類的創建程序,其他的暫且不關心.
Invoker<T> invoker = cluster.join(directory);
在方法refer中:
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
static Cluster getCluster(String name) {
return getCluster(name, true);
}
static Cluster getCluster(String name, boolean wrap) {
if (StringUtils.isEmpty(name)) {
name = Cluster.DEFAULT;
}
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
}
1)Cluster$Adaptive
在動態適配的類中會基于extName,選擇一個合適的擴展點進行適配,由于默認情況下cluster:failover,所以getExtension(“failover”)理論上應該回傳FailOverCluster,但實際上,這里做了包裝 MockClusterWrapper(FailOverCluster)
public class Cluster$Adaptive implements org.apache.dubbo.rpc.cluster.Cluster {
public org.apache.dubbo.rpc.Invoker join(org.apache.dubbo.rpc.cluster.Directory arg0) throws org.apache.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument ==null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argumentgetUrl() == null");
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("cluster", "failover");
if(extName == null)
throw new IllegalStateException("Failed to getextension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" +url.toString() + ") use keys([cluster])");
org.apache.dubbo.rpc.cluster.Cluster extension =(org.apache.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(org.apa
che.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
return extension.join(arg0);
}
}
2)cluster.join
再回到doRefer方法,下面這段代碼, 實際是呼叫MockClusterWrapper(FailOverCluster.join)
Invoker invoker = cluster.join(directory);
所以這里回傳的invoker,應該是MockClusterWrapper(FailOverCluster(directory))
接著回到ReferenceConfig.createProxy方法中的最后一行.
3)proxyFactory.getProxy
拿到invoker之后,會呼叫獲得一個動態代理類
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
而,這個PROXY_FACTORY 又是一個自適應擴展點,
/**
* A {@link ProxyFactory} implementation that will generate a reference service's proxy,the JavassistProxyFactory is
* its default implementation
*/
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
4)JavassistProxyFactory.getProxy
通過這個方法生成了一個動態代理類,并且對invoker再做了一層處理,InvokerInvocationHandler,意味著后續發起服務呼叫的時候,會由InvokerInvocationHandler來進行處理,
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
5)proxy.getProxy
在proxy.getProxy這個方法中會生成一個動態代理類,通過debug的形式可以看到動態代理類的原貌,在getProxy這個方法位置加一個斷點,
public static Proxy getProxy(Class<?>... ics) {
return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}
@Reference注入的一個物件實體本質上就是一個動態代理類,通過呼叫這個類中的方法,會觸發handler.invoke(), 而這個handler就是InvokerInvocationHandler,
4.網路連接的建立
接下來看 目標服務地址資訊以及網路通信的建立,
1)RegistryProtocol.doRefer
關注directory.subscribe這個方法,它是實作服務目標服務訂閱的,
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
directory.subscribe(toSubscribeUrl(subscribeUrl));
Invoker<T> invoker = cluster.join(directory);
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker;
}
RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}
2)RegistryDirectory.subscribe
訂閱注冊中心指定節點的變化,如果發生變化,則通知到RegistryDirectory,Directory其實和服務的注冊以及服務的發現有非常大的關聯,
public void subscribe(URL url) {
setConsumerUrl(url); //設定consumerUrl
//把當前RegistryDirectory作為listener,去監聽zk上節點的變化
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
//訂閱 -> 這里的registry是zookeeperRegsitry
registry.subscribe(url, this);
}
這里的registry 是ZookeeperRegistry ,會去監聽并獲取路徑下面的節點,監聽的路徑是:
/dubbo/org.apache.dubbo.demo.DemoService/providers、/dubbo/org.apache.dubbo.demo.DemoService/configurators、/dubbo/org.apache.dubbo.demo.DemoService/routers 節點下面的子節點變動,
3)FailbackRegistry.subscribe
listener為RegistryDirectory,后續要用到,移除失效的listener,呼叫doSubscribe進行訂閱,
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener);
removeFailedSubscribed(url, listener);
try {
// Sending a subscription request to the server side
doSubscribe(url, listener);
} catch (Exception e) {
Throwable t = e;
List<URL> urls = getCacheUrls(url);
if (CollectionUtils.isNotEmpty(urls)) {
notify(url, listener, urls);
logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
} else {
// If the startup detection is opened, the Exception is thrown directly.
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true);
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
} else {
logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
// Record a failed registration request to a failed list, retry regularly
addFailedSubscribed(url, listener);
}
}
4)ZookeeperRegistry.doSubscribe
這個方法是訂閱,邏輯實作比較多,可以分兩段來看,這里的實作把所有Service層發起的訂閱以及指定的Service層發起的訂閱分開處理,所有Service層類似于監控中心發起的訂閱,指定的Service層發起的訂閱可以看作是服務消費者的訂閱,我們只需要關心指定service層發起的訂閱即可,
public void doSubscribe(final URL url, final NotifyListener listener) {
try {
if (ANY_VALUE.equals(url.getServiceInterface())) {
String root = toRootPath();
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
for (String child : currentChilds) {
child = URL.decode(child);
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
zkClient.create(root, false);
List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
for (String service : services) {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
List<URL> urls = new ArrayList<>();
for (String path : toCategoriesPath(url)) {
// 如果之前該路徑沒有添加過listener,則創建一個map來放置listener
ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
// 如果沒有添加過對于子節點的listener,則創建,通知服務變化 回呼NotifyListener
ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
zkClient.create(path, false);
//添加path節點的當前節點及子節點監聽,并且獲取子節點資訊
//也就是dubbo://ip:port/...
List<String> children = zkClient.addChildListener(path, zkListener);
if (children != null) {
urls.addAll(toUrlsWithEmpty(url, path, children));
}
}
//呼叫notify進行通知,對已經可用的串列進行通知
notify(url, listener, urls);
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
5)FailbackRegistry.notify
呼叫FailbackRegistry.notify, 對引數進行判斷, 然后呼叫AbstractRegistry.notify方法,
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
try {
doNotify(url, listener, urls);
} catch (Exception t) {
// Record a failed registration request to a failed list, retry regularly
addFailedNotified(url, listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
}
6)AbstractRegistry.notify
這里面會針對每一個category,呼叫listener.notify進行通知,然后更新本地的快取檔案
protected void notify(URL url, NotifyListener listener, List<URL> urls) {
if (url == null) {
throw new IllegalArgumentException("notify url == null");
}
if (listener == null) {
throw new IllegalArgumentException("notify listener == null");
}
if ((CollectionUtils.isEmpty(urls))
&& !ANY_VALUE.equals(url.getServiceInterface())) {
logger.warn("Ignore empty notify urls for subscribe url " + url);
return;
}
if (logger.isInfoEnabled()) {
logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
}
// keep every provider's category.
Map<String, List<URL>> result = new HashMap<>();
for (URL u : urls) {
if (UrlUtils.isMatch(url, u)) {
String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
categoryList.add(u);
}
}
if (result.size() == 0) {
return;
}
Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
String category = entry.getKey();
List<URL> categoryList = entry.getValue();
categoryNotified.put(category, categoryList);
listener.notify(categoryList);
// We will update our cache file after each notification.
// When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
saveProperties(url);
}
}
消費端的listener是最開始傳遞過來的RegistryDirectory,所以這里會觸發RegistryDirectory.notify.
7)RegistryDirectory.notify
Invoker的網路連接以及后續的配置變更,都會呼叫這個notify方法
urls: zk的path資料,這里表示的是dubbo://
public synchronized void notify(List<URL> urls) {
//對url串列進行校驗、過濾,然后分成 config、router、provider 3個分組map
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));
List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
// 如果router 路由節點有變化,則從新將router 下的資料生成router
List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);
// 獲得provider URL,然后呼叫refreshOverrideAndInvoker進行重繪
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs);
}
8)refreshOverrideAndInvoker
- 逐個呼叫注冊中心里面的配置,覆寫原來的url,組成最新的url 放入overrideDirectoryUrl 存盤
- 根據 provider urls,重新重繪Invoker
private void refreshOverrideAndInvoker(List<URL> urls) {
// mock zookeeper://xxx?mock=return null
overrideDirectoryUrl();
refreshInvoker(urls);
}
9)refreshInvoker
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
if (invokerUrls.size() == 1
//...
//如果是空協議,則直接回傳不允許訪問
} else {
this.forbidden = false; // Allow to access
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {//如果url為空
invokerUrls = new ArrayList<>();
}
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<>();
this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
}
if (invokerUrls.isEmpty()) {
return;
}
//根據provider url,生成新的invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
*
* 1. The protocol configured by the client is inconsistent with the protocol of the server.
* eg: consumer protocol = dubbo, provider only has other protocol services(rest).
* 2. The registration center is not robust and pushes illegal specification data.
*
*/
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
.toString()));
return;
}
List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
routerChain.setInvokers(newInvokers);
//如果服務配置了分組,則把分組下的provider包裝成StaticDirectory,組成一個invoker
//實際上就是按照group進行合并
this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
this.urlInvokerMap = newUrlInvokerMap;
try {
//舊的url 是否在新map里面存在,不存在,就是銷毀url對應的Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
10)toInvokers
這里用到了protocol.refer來構建了一個invoker.invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url,providerUrl);
構建完成之后,會保存在 Map<String, Invoker<T>> urlInvokerMap 這個集合中,
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
continue;
}
}
if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
" in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
URL url = mergeUrl(providerUrl);
String key = url.toFullString(); // The parameter urls are sorted
if (keys.contains(key)) { // Repeated url
continue;
}
keys.add(key);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
if (url.hasParameter(DISABLED_KEY)) {
enabled = !url.getParameter(DISABLED_KEY, false);
} else {
enabled = url.getParameter(ENABLED_KEY, true);
}
if (enabled) {
invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
newUrlInvokerMap.put(key, invoker);
}
} else {
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
5.protocol.refer
呼叫指定的協議來進行遠程參考,protocol是一個Protocol$Adaptive類,而真正的實作應該是:
ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)
前面的包裝程序,在服務發布的時候已經分析過了,我們直接進入DubboProtocol.protocolBindingRefer方法
1)DubboProtocol.protocolBindingRefer
- 優化序列化
- 構建DubboInvoker
在構建DubboInvoker時,會構建一個ExchangeClient,通過getClients(url)方法,這里基本可以猜到到是服務的通信建立,
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
2)getClients
這里面是獲得客戶端連接的方法
- 判斷是否為共享連接,默認是共享同一個連接進行通信
- 是否配置了多個連接通道 connections,默認只有一個
private ExchangeClient[] getClients(URL url) {
// whether to share connection
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
//如果沒有配置連接數,則默認為共享連接
if (connections == 0) {
useShareConnect = true;
/*
* The xml configuration should have a higher priority than properties.
*/
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
///
shareClients = getSharedClient(url, connections);
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
3)getSharedClient
獲得一個共享連接
private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
String key = url.getAddress();
List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
//檢查當前的key檢查連接是否已經創建過并且可用,如果是,則直接回傳并且增加連接的個數的統計
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
//如果連接已經關倍訓者連接沒有創建過
locks.putIfAbsent(key, new Object());
synchronized (locks.get(key)) {
clients = referenceClientMap.get(key);
// dubbo check
// 在創建連接之前,在做一次檢查,防止連接并發創建
if (checkClientCanUse(clients)) {
batchClientRefIncr(clients);
return clients;
}
// connectNum must be greater than or equal to 1
// 連接數必須大于等于1
connectNum = Math.max(connectNum, 1);
// If the clients is empty, then the first initialization is
//如果當前消費者還沒有和服務端產生連接,則初始化
if (CollectionUtils.isEmpty(clients)) {
clients = buildReferenceCountExchangeClientList(url, connectNum);
//創建clients之后,保存到map中
referenceClientMap.put(key, clients);
} else {//如果clients不為空,則從clients陣列中進行遍歷
for (int i = 0; i < clients.size(); i++) {
ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
// If there is a client in the list that is no longer available, create a new one to replace him.
// 如果在集合中存在一個連接但是這個連接處于closed狀態,則重新構建一個進行替換
if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
clients.set(i, buildReferenceCountExchangeClient(url));
continue;
}
//增加個數
referenceCountExchangeClient.incrementAndGetCount();
}
}
locks.remove(key);
return clients;
}
}
4)buildReferenceCountExchangeClientList
根據連接數配置,來構建指定個數的鏈接,默認為1
private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
ExchangeClient exchangeClient = initClient(url);
return new ReferenceCountExchangeClient(exchangeClient);
}
5)initClient
進入到初始化客戶端連接的方法了,猜測應該是根據url中配置的引數進行遠程通信的構建
private ExchangeClient initClient(URL url) {
// client type setting.
// 獲得連接型別
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
//添加默認序列化方式
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
//設定心跳時間
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
// 判斷str是否存在于擴展點中,如果不存在則直接報錯
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
// 是否需要延遲創建連接,注意,這里的requestHandler是一個配接器
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
6)Exchangers.connect
創建一個客戶端連接
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
7)HeaderExchange.connect
主要關注transporters.connect
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
8)NettyTransport.connect
使用netty構建了一個客戶端連接
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
return new NettyClient(url, handler);
}
6.梳理
RegistryProtocol.refer 程序中有一個關鍵步驟,即在監聽到服務提供者url時觸發RegistryDirectory.notify() 方法,
RegistryDirectory.notify() 方法呼叫 refreshInvoker() 方法將服務提供者urls轉換為對應的 遠程invoker ,最終呼叫到 DubboProtocol.refer() 方法生成對應的 DubboInvoker ,
DubboInvoker 的構造方法中有一項入參 ExchangeClient[] clients ,即對應本文要講的網路客戶端 Client ,DubboInvoker就是通過呼叫 client.request() 方法完成網路通信的請求發送和回應接收功能,
Client 的具體生成程序就是通過 DubboProtocol 的 initClient(URL url) 方法創建了一個HeaderExchangeClient ,

7.客戶端生成Proxy
1)JavassistProxyFactory.getProxy
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
這個invoker實際上是:MockClusterWrapper(FailoverCluster(directory))然后通過InvokerInvocationHandler做了一層包裝變成了InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory))),
2)proxy.getProxy
這個方法里面,會生成一個動態代理的方法,我們通過debug可以看到動態位元組碼的拼接程序,它代理了當前這個介面的方法 sayHello , 并且方法里面是使用handler.invoke進行呼叫的,
而handler又是這樣一個實作:InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))
public java.lang.String sayHello(java.lang.String arg0){
Object[] args = new Object[1];
args[0] = ($w)$1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String)ret;
}
8.消費端呼叫的程序
1)nvokerInvocationHandler.invoke
這個方法主要判斷當前呼叫的遠程方法,如果是tostring、hashcode、equals,就直接回傳否則,呼叫invoker.invoke,進入到 MockClusterWrapper.invoke 方法,
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 0) {
if ("toString".equals(methodName)) {
return invoker.toString();
} else if ("$destroy".equals(methodName)) {
invoker.destroy();
return null;
} else if ("hashCode".equals(methodName)) {
return invoker.hashCode();
}
} else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
return invoker.equals(args[0]);
}
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
}
return invoker.invoke(rpcInvocation).recreate();
}
2)MockClusterInvoker.invoke
- 是否客戶端強制配置了mock呼叫,那么在這種場景中主要可以用來解決服務端還沒開發好的時候直接使用本地資料進行測驗
- 是否出現了例外,如果出現例外則使用配置好的Mock類來實作服務的降級
public Result invoke(Invocation invocation) throws RpcException {
Result result = null;
//從url中獲得MOCK_KEY對應的value
String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
//如果沒有配置mock,則直接傳遞給下個invoker呼叫
if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {//如果強制為本地呼叫,則執行mockInvoke
if (logger.isWarnEnabled()) {
logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
//fix:#4585
if(result.getException() != null && result.getException() instanceof RpcException){
RpcException rpcException= (RpcException)result.getException();
if(rpcException.isBiz()){
throw rpcException;
}else {
result = doMockInvoke(invocation, rpcException);
}
}
} catch (RpcException e) {
if (e.isBiz()) {
throw e;
}
//如果遠程呼叫出現例外,則使用Mock進行處理
if (logger.isWarnEnabled()) {
logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
return result;
}
3)AbstractClusterInvoker.invoke
public Result invoke(final Invocation invocation) throws RpcException {
checkWhetherDestroyed();
// binding attachments into invocation.
//Dubbo中,可以通過 RpcContext 上的 setAttachment 和 getAttachment 在服務消費方和提供方之間進行引數的隱式傳遞
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
}
//通過list獲得invoker串列,這個串列基本可以猜測到是從directory里面獲得的、但是這里面還實作了服務路由的邏輯,
//簡單來說就是先拿到invoker串列,然后通過router進行服務路由,篩選出符合路由規則的服務提供者
List<Invoker<T>> invokers = list(invocation);
//初始化負載均衡機制
LoadBalance loadbalance = initLoadBalance(invokers, invocation);
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
return doInvoke(invocation, invokers, loadbalance);
}
4)initLoadBalance
初始化負載均衡
protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
if (CollectionUtils.isNotEmpty(invokers)) {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
} else {
return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
}
}
5)FailoverClusterInvoker.doInvoke
failover是失敗重試,所以這里面應該會實作容錯的邏輯
- 獲得重試的次數,并且進行回圈
- 獲得目標服務,并且記錄當前已經呼叫過的目標服務防止下次繼續將請求發送過去
- 如果執行成功,則回傳結果
- 如果出現例外,判斷是否為業務例外,如果是則拋出,否則,進行下一次重試

- 這里的 Invoker 是 Provider 的一個可呼叫 Service 的抽象, Invoker 封裝了 Provider 地址及 Service 介面資訊
- Directory 代表多個 Invoker ,可以把它看成
List<Invoker>,但與 List 不同的是,它的值可能是動態變化的,比如注冊中心推送變更 - Cluster 將 Directory 中的多個 Invoker 偽裝成一個 Invoker ,對上層透明,偽裝程序包含了容錯邏輯,呼叫失敗后,重試另一個
- Router 負責從多個 Invoker 中按路由規則選出子集,比如讀寫分離,應用隔離等
- LoadBalance 負責從多個 Invoker 中選出具體的一個用于本次呼叫,選的程序包含了負載均衡演算法,呼叫失敗后,需要重選
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers, invocation);
String methodName = RpcUtils.getMethodName(invocation);
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
//通過負載均衡獲得目標invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
invoked.add(invoker);//記錄已經呼叫過的服務,下次呼叫會進行過濾
RpcContext.getContext().setInvokers((List) invoked);
try {
//服務呼叫成功,直接回傳結果
Result result = invoker.invoke(invocation);
if (le != null && logger.isWarnEnabled()) {
}
return result;
} catch (RpcException e) {
if (e.isBiz()) { // 如果是業務例外,直接拋出不進行重試
throw e;
}
le = e;//記錄例外資訊,進行下一次回圈
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode());
}
9.負載均衡
1)select
所有負載均衡實作類均繼承自 AbstractLoadBalance,該類實作了 LoadBalance 介面,并封裝了一些公共的邏輯,所以在分析負載均衡實作之前,先來看一下 AbstractLoadBalance 的邏輯,首先來看一下負載均衡的入口方法 select,如下:
protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl()
.getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);
//ignore overloaded method
if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
stickyInvoker = null;
}
//ignore concurrency problem
if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
if (availablecheck && stickyInvoker.isAvailable()) {
return stickyInvoker;
}
}
//呼叫 doSelect 方法進行負載均衡,該方法為抽象方法,由子類實作
Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) {
stickyInvoker = invoker;
}
return invoker;
}
2)doSelect
private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
if (CollectionUtils.isEmpty(invokers)) {
return null;
}
//如果就一個,就不負載均衡了,直接回傳
if (invokers.size() == 1) {
return invokers.get(0);
}
//負載均衡邏輯
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
//If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect.
if ((selected != null && selected.contains(invoker))
|| (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
try {
Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
if (rInvoker != null) {
invoker = rInvoker;
} else {
int index = invokers.indexOf(invoker);
try {
invoker = invokers.get((index + 1) % invokers.size());
} catch (Exception e) {
}
} catch (Throwable t) {
}
}
return invoker;
}
3)select
@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {
@Adaptive("loadbalance")
<T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;
}
這里應該選擇的RandomLoadBalance,默認情況下,
4)RandomLoadBalance.doSelect
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
// Number of invokers
int length = invokers.size();
// Every invoker has the same weight?
boolean sameWeight = true;
// the weight of every invokers
int[] weights = new int[length];
// the first invoker's weight
int firstWeight = getWeight(invokers.get(0), invocation);
weights[0] = firstWeight;
// The sum of weights
int totalWeight = firstWeight;
for (int i = 1; i < length; i++) {
int weight = getWeight(invokers.get(i), invocation);
// save for later use
weights[i] = weight;
// Sum
totalWeight += weight;
if (sameWeight && weight != firstWeight) {
sameWeight = false;
}
}
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= weights[i];
if (offset < 0) {
return invokers.get(i);
}
}
}
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}
10.DubboInvoker
1)AbstractInvoker.invoke
這里面也是對Invocation的attachments進行處理,把attachment加入到Invocation中,
這里的attachment,實際上是目標服務的介面資訊以及版本資訊,
2)DubboInvoker.doInvoker
這里面看到一個很熟悉的東西,就是ExchangeClient,這個是客戶端和服務端之間的連接,
然后如果當前方法有回傳值,也就是isOneway=false,則執行else邏輯,然后通過異步的形式進行通信,
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(PATH_KEY, getUrl().getPath());
inv.setAttachment(VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = calculateTimeout(invocation, methodName);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
ExecutorService executor = getCallbackExecutor(getUrl(), inv);
CompletableFuture<AppResponse> appResponseFuture =
currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(appResponseFuture);
AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
result.setExecutor(executor);
return result;
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
3)currentClient.request
它實際是一個ReferenceCountExchangeClient(HeaderExchangeClient()),
所以它的呼叫鏈路是:
ReferenceCountExchangeClient->HeaderExchangeClient->HeaderExchangeChannel->(request方法)
最終,把構建好的 RpcInvocation,組裝到一個Request物件中進行傳遞
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
channel.send的呼叫鏈路:
AbstractPeer.send ->AbstractClient.send->NettyChannel.send
通過NioSocketChannel把訊息發送出去
ChannelFuture future = channel.writeAndFlush(message);
十五,服務端接收訊息的處理流程
客戶端把訊息發送出去之后,服務端會收到訊息,然后把執行的結果回傳到客戶端
1.服務端接收到訊息
服務端這邊接收訊息的處理鏈路,也比較復雜,我們回到NettServer中創建io的程序.
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
.childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws
Exception {
// FIXME: should we use getTimeout()?
int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
NettyCodecAdapter adapter = new
NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new
LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", adapter.getDecoder())
.addLast("encoder", adapter.getEncoder())
.addLast("server-idle-handler", new
IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
.addLast("handler", nettyServerHandler);
}
});
handler配置的是nettyServerHandler
server-idle-handler 表示心跳處理的機制
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);
Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、統一對日志錯誤進行處理、統一對請求進行計數、控制Handler執行與否,
2.handler.channelRead()
服務端收到讀的請求是,會進入這個方法,
接著通過handler.received來處理msg,這個handle的鏈路很長,比較復雜,我們需要逐步剖析
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url,handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}
handler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler-HeaderExchangeHandler->最后進入這個方法->DubboProtocol$requestHandler(receive)
MultiMessageHandler: 復合訊息處理
HeartbeatHandler:心跳訊息處理,接收心跳并發送心跳回應
AllChannelHandler:業務執行緒轉化處理器,把接收到的訊息封裝成ChannelEventRunnable可執行任務給執行緒池處理
DecodeHandler:業務解碼處理器
3.HeaderExchangeHandler.received
互動層請求回應處理,有三種處理方式
- handlerRequest,雙向請求
- handler.received 單向請求
- handleResponse 回應訊息
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel =HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
4.ExchangeHandler.reply
接著進入到ExchangeHandler.reply這個方法中
- 把message轉化為Invocation
- 呼叫getInvoker獲得一個Invoker物件
- 然后通過
Result result = invoker.invoke(inv);進行呼叫
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
if (!(message instanceof Invocation)) {
throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
} else {
Invocation inv = (Invocation)message;
Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("_isCallBackServiceInvoke"))) {
String methodsStr = (String)invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr != null && methodsStr.contains(",")) {
String[] methods = methodsStr.split(",");
String[] var8 = methods;
int var9 = methods.length;
for(int var10 = 0; var10 < var9; ++var10) {
String method = var8[var10];
if (inv.getMethodName().equals(method)) {
hasMethod = true;
break;
}
}
} else {
hasMethod = inv.getMethodName().equals(methodsStr);
}
if (!hasMethod) {
DubboProtocol.this.logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
}
5.getInvoker
這里面是獲得一個invoker的實作DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
這段代碼非常熟悉,exporterMap不就是我們之前在分析服務發布的程序中,保存的Invoker嗎?而key,就是對應的interface:port ,
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = (String)inv.getObjectAttachments().get("path");
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("dubbo.stub.event"));
if (isStubServiceInvoke) {
port = channel.getRemoteAddress().getPort();
}
isCallBackServiceInvoke = this.isClientSide(channel) && !isStubServiceInvoke;
if (isCallBackServiceInvoke) {
path = path + "." + inv.getObjectAttachments().get("callback.service.instid");
inv.getObjectAttachments().put("_isCallBackServiceInvoke", Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, (String)inv.getObjectAttachments().get("version"), (String)inv.getObjectAttachments().get("group"));
DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
if (exporter == null) {
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv));
} else {
return exporter.getInvoker();
}
}
6.exporterMap
Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
在服務發布時,實際上是把invoker包裝成了DubboExpoter,然后放入到exporterMap中,
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
this.exporterMap.put(key, exporter);
Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
Boolean isCallbackservice = url.getParameter("is_callback_service", false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
if ((stubServiceMethods == null || stubServiceMethods.length() == 0) && this.logger.isWarnEnabled()) {
this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
}
}
this.openServer(url);
this.optimizeSerialization(url);
return exporter;
}
7.invoker.invoke(inv)
接著呼叫invoker.invoke
此時的invoker是一個什么呢?
invoker=ProtocolFilterWrapper(InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker)))
最后一定會進入到這個代碼里面
8.AbstractProxyInvoker
在AbstractProxyInvoker里面,doInvoker本質上呼叫的是wrapper.invokeMethod()
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);
}
};
而Wrapper是一個動態代理類,它的定義是這樣的, 最終呼叫w.sayHello()方法進行處理
到此為止,服務端的處理程序就分析完了,

十六,性能調優相關引數
1.常用的性能調優引數
| 引數名 | 作用范圍 | 說明 | 默認值 | 備注 |
|---|---|---|---|---|
| threads | provider | 業務處理執行緒池大小 | 200 | |
| iothreads | provider | io執行緒池大小 | cpu個數+1 | |
| queues | provider | 執行緒池佇列大小,當執行緒池滿時,排隊等待執行的佇列大小,建議不要設定,當執行緒池時應立即失敗,重試其他服務提供機器,而不是排隊,除非有特殊需求, | 0 | |
| connections | consumer | 對每個提供者的最大連接數,rmi,http,hession等短連接協議表示限制連接數,Dubbo等長連接表示簡歷的長連接個數 | 0 | Dubbo協議默認共享一個長連接 |
| actives | consumer | 每個服務消費者每個方法的對大并發呼叫數 | 0 | 0表示不限制 |
| accepts | provider | 服務提供方最大可接收連接數 | 0 | 0表示不限制 |
| executes | provider | 服務提供者每個服務每個方法最大可并行執行請求數 | 0 | 0表示不限制 |
2.各個引數的作用

- 當consumer發起一個請求時,首先經過active limit(引數actives)進行方法級別的限制,其實作方式為CHM中存放計數器(AtomicInteger),請求時加1,請求完成(包括例外)減1,如果超過actives則等待有其他請求完成后重試或者超時后失敗;
- 從多個連接(connections)中選擇一個連接發送資料,對于默認的netty實作來說,由于可以復用連接,默認一個連接就可以,不過如果你在壓測,且只有一個consumer,一個provider,此時適當的加大connections確實能夠增強網路傳輸能力,但線上業務由于有多個consumer多個provider,因此不建議增加connections引數;
- 連接到達provider時(如dubbo的初次連接),首先會判斷總連接數是否超限(acceps),超過限制連接將被拒絕;
- 連接成功后,具體的請求交給io thread處理,io threads雖然是處理資料的讀寫,但io部分為異步,更多的消耗的是cpu,因此iothreads默認cpu個數+1是比較合理的設定,不建議調整此引數;
- 資料讀取并反序列化以后,交給業務執行緒池處理,默認情況下執行緒池為fixed,且排隊佇列為0(queues),這種情況下,最大并發等于業務執行緒池大小(threads),如果希望有請求的堆積能力,可以調整queues引數,如果希望快速失敗由其他節點處理(官方推薦方式),則不修改queues,只調整threads;
- execute limit(引數executes)是方法級別的并發限制,原理與actives類似,只是少了等待的程序,即受限后立即失敗
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/274714.html
標籤:java
上一篇:設計模式-代理模式(靜態代理、動態代理、cglib代理)
下一篇:JVM面經
